Skip to main content

nautilus_okx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the OKX WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - RwLock is preferred over Mutex (many reads, few writes).
43
44use std::str::FromStr;
45
46use ahash::{AHashMap, AHashSet};
47use futures_util::StreamExt;
48use nautilus_common::{cache::quote::QuoteCache, live::get_runtime};
49use nautilus_core::{
50    UUID4, UnixNanos,
51    python::{call_python_threadsafe, to_pyruntime_err, to_pyvalue_err},
52    time::{AtomicTime, get_atomic_clock_realtime},
53};
54use nautilus_model::{
55    data::{BarType, Data, InstrumentStatus, OrderBookDeltas_API},
56    enums::{OrderSide, OrderType, PositionSide, TimeInForce},
57    events::{OrderAccepted, OrderCancelRejected, OrderModifyRejected, OrderRejected},
58    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
59    instruments::{Instrument, InstrumentAny},
60    python::{
61        data::data_to_pycapsule,
62        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
63    },
64    types::{Money, Price, Quantity},
65};
66use nautilus_network::websocket::TransportBackend;
67use pyo3::{IntoPyObjectExt, prelude::*, types::PyDict};
68use ustr::Ustr;
69
70use super::{extract_optional_string, extract_optional_trigger_type};
71use crate::{
72    common::{
73        consts::{OKX_FIELD_CLORDID, OKX_FIELD_SCODE, OKX_FIELD_SMSG, OKX_SUCCESS_CODE},
74        enums::{
75            OKXBookAction, OKXGreeksType, OKXInstrumentStatus, OKXInstrumentType, OKXTradeMode,
76            OKXVipLevel,
77        },
78        models::OKXInstrument,
79        parse::{
80            okx_status_to_market_action, parse_account_state, parse_instrument_any,
81            parse_millisecond_timestamp, parse_position_status_report, parse_price, parse_quantity,
82        },
83    },
84    http::models::{OKXAccount, OKXPosition},
85    websocket::{
86        OKXWebSocketClient,
87        enums::{OKXWsChannel, OKXWsOperation},
88        messages::{
89            ExecutionReport, NautilusWsMessage, OKXAlgoOrderMsg, OKXBookMsg, OKXOptionSummaryMsg,
90            OKXOrderMsg, OKXWebSocketError, OKXWsMessage, WsAttachAlgoOrdParams,
91            WsAttachAlgoOrdParamsBuilder,
92        },
93        parse::{
94            extract_fees_from_cached_instrument, parse_algo_order_msg, parse_book_msg_vec,
95            parse_index_price_msg_vec, parse_option_summary_greeks, parse_order_msg_vec,
96            parse_ws_message_data,
97        },
98    },
99};
100
101fn parse_attach_algo_ords(
102    py: Python<'_>,
103    attach_algo_ords: Option<Vec<Py<PyDict>>>,
104) -> PyResult<Option<Vec<WsAttachAlgoOrdParams>>> {
105    attach_algo_ords
106        .map(|items| {
107            items
108                .into_iter()
109                .map(|item| {
110                    let dict = item.bind(py);
111                    let mut builder = WsAttachAlgoOrdParamsBuilder::default();
112
113                    if let Some(value) = extract_optional_string(dict, "attach_algo_cl_ord_id")? {
114                        builder.attach_algo_cl_ord_id(value);
115                    }
116
117                    if let Some(value) = extract_optional_string(dict, "sl_trigger_px")? {
118                        builder.sl_trigger_px(value);
119                    }
120
121                    if let Some(value) = extract_optional_string(dict, "sl_ord_px")? {
122                        builder.sl_ord_px(value);
123                    }
124
125                    if let Some(value) = extract_optional_trigger_type(dict, "sl_trigger_px_type")?
126                    {
127                        builder.sl_trigger_px_type(value);
128                    }
129
130                    if let Some(value) = extract_optional_string(dict, "tp_trigger_px")? {
131                        builder.tp_trigger_px(value);
132                    }
133
134                    if let Some(value) = extract_optional_string(dict, "tp_ord_px")? {
135                        builder.tp_ord_px(value);
136                    }
137
138                    if let Some(value) = extract_optional_trigger_type(dict, "tp_trigger_px_type")?
139                    {
140                        builder.tp_trigger_px_type(value);
141                    }
142
143                    builder.build().map_err(to_pyvalue_err)
144                })
145                .collect::<PyResult<Vec<_>>>()
146        })
147        .transpose()
148}
149
150#[pyo3::pymethods]
151impl OKXWebSocketError {
152    #[getter]
153    pub fn code(&self) -> &str {
154        &self.code
155    }
156
157    #[getter]
158    pub fn message(&self) -> &str {
159        &self.message
160    }
161
162    #[getter]
163    pub fn conn_id(&self) -> Option<&str> {
164        self.conn_id.as_deref()
165    }
166
167    #[getter]
168    pub fn ts_event(&self) -> u64 {
169        self.timestamp
170    }
171
172    fn __repr__(&self) -> String {
173        format!(
174            "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
175            self.code, self.message, self.conn_id, self.timestamp
176        )
177    }
178}
179
180#[pymethods]
181#[pyo3_stub_gen::derive::gen_stub_pymethods]
182impl OKXWebSocketClient {
183    /// Provides a WebSocket client for connecting to [OKX](https://okx.com).
184    #[new]
185    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None, auth_timeout_secs=None, proxy_url=None))]
186    #[expect(clippy::too_many_arguments)]
187    fn py_new(
188        url: Option<String>,
189        api_key: Option<String>,
190        api_secret: Option<String>,
191        api_passphrase: Option<String>,
192        account_id: Option<AccountId>,
193        heartbeat: Option<u64>,
194        auth_timeout_secs: Option<u64>,
195        proxy_url: Option<String>,
196    ) -> PyResult<Self> {
197        Self::new(
198            url,
199            api_key,
200            api_secret,
201            api_passphrase,
202            account_id,
203            heartbeat,
204            auth_timeout_secs,
205            TransportBackend::default(),
206            proxy_url,
207        )
208        .map_err(to_pyvalue_err)
209    }
210
211    #[staticmethod]
212    #[pyo3(name = "with_credentials")]
213    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None, auth_timeout_secs=None, proxy_url=None))]
214    #[expect(clippy::too_many_arguments)]
215    fn py_with_credentials(
216        url: Option<String>,
217        api_key: Option<String>,
218        api_secret: Option<String>,
219        api_passphrase: Option<String>,
220        account_id: Option<AccountId>,
221        heartbeat: Option<u64>,
222        auth_timeout_secs: Option<u64>,
223        proxy_url: Option<String>,
224    ) -> PyResult<Self> {
225        Self::with_credentials(
226            url,
227            api_key,
228            api_secret,
229            api_passphrase,
230            account_id,
231            heartbeat,
232            auth_timeout_secs,
233            TransportBackend::default(),
234            proxy_url,
235        )
236        .map_err(to_pyvalue_err)
237    }
238
239    #[staticmethod]
240    #[pyo3(name = "from_env")]
241    fn py_from_env() -> PyResult<Self> {
242        Self::from_env().map_err(to_pyvalue_err)
243    }
244
245    #[getter]
246    #[pyo3(name = "url")]
247    #[must_use]
248    pub fn py_url(&self) -> &str {
249        self.url()
250    }
251
252    #[getter]
253    #[pyo3(name = "api_key")]
254    #[must_use]
255    pub fn py_api_key(&self) -> Option<&str> {
256        self.api_key()
257    }
258
259    #[getter]
260    #[pyo3(name = "api_key_masked")]
261    #[must_use]
262    pub fn py_api_key_masked(&self) -> Option<String> {
263        self.api_key_masked()
264    }
265
266    #[pyo3(name = "is_active")]
267    fn py_is_active(&mut self) -> bool {
268        self.is_active()
269    }
270
271    #[pyo3(name = "is_closed")]
272    fn py_is_closed(&mut self) -> bool {
273        self.is_closed()
274    }
275
276    #[pyo3(name = "cancel_all_requests")]
277    pub fn py_cancel_all_requests(&self) {
278        self.cancel_all_requests();
279    }
280
281    #[pyo3(name = "get_subscriptions")]
282    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
283        let channels = self.get_subscriptions(instrument_id);
284
285        // Convert to OKX channel names
286        channels
287            .iter()
288            .map(|c| {
289                serde_json::to_value(c)
290                    .ok()
291                    .and_then(|v| v.as_str().map(String::from))
292                    .unwrap_or_else(|| c.to_string())
293            })
294            .collect()
295    }
296
297    /// Sets the VIP level for this client.
298    ///
299    /// The VIP level determines which WebSocket channels are available.
300    #[pyo3(name = "set_vip_level")]
301    fn py_set_vip_level(&self, vip_level: OKXVipLevel) {
302        self.set_vip_level(vip_level);
303    }
304
305    /// Gets the current VIP level.
306    #[pyo3(name = "vip_level")]
307    #[getter]
308    fn py_vip_level(&self) -> OKXVipLevel {
309        self.vip_level()
310    }
311
312    #[pyo3(name = "connect")]
313    #[expect(clippy::needless_pass_by_value)]
314    fn py_connect<'py>(
315        &mut self,
316        py: Python<'py>,
317        loop_: Py<PyAny>,
318        instruments: Vec<Py<PyAny>>,
319        callback: Py<PyAny>,
320    ) -> PyResult<Bound<'py, PyAny>> {
321        let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
322
323        let mut instruments_any = Vec::new();
324
325        for inst in instruments {
326            let inst_any = pyobject_to_instrument_any(py, inst)?;
327            instruments_any.push(inst_any);
328        }
329
330        self.cache_instruments(&instruments_any);
331
332        let mut client = self.clone();
333
334        pyo3_async_runtimes::tokio::future_into_py(py, async move {
335            client.connect().await.map_err(to_pyruntime_err)?;
336
337            let stream = client.stream();
338            let clock = get_atomic_clock_realtime();
339
340            get_runtime().spawn(async move {
341                let account_id = client.account_id;
342                let mut instruments_by_symbol = client.instruments_snapshot();
343                let mut quote_cache = QuoteCache::new();
344                let mut funding_cache: AHashMap<Ustr, (Ustr, u64)> = AHashMap::new();
345                let mut fee_cache: AHashMap<Ustr, Money> = AHashMap::new();
346                let mut filled_qty_cache: AHashMap<Ustr, Quantity> = AHashMap::new();
347                let option_greeks_subs_arc = client.option_greeks_subs().clone();
348                tokio::pin!(stream);
349
350                while let Some(msg) = stream.next().await {
351                    match msg {
352                        OKXWsMessage::BookData { arg, action, data } => {
353                            handle_book_data(
354                                arg.inst_id,
355                                action,
356                                data,
357                                &instruments_by_symbol,
358                                clock,
359                                &call_soon,
360                                &callback,
361                            );
362                        }
363                        OKXWsMessage::ChannelData {
364                            channel,
365                            inst_id,
366                            data,
367                        } => {
368                            let greeks_guard = option_greeks_subs_arc.load();
369                            handle_channel_data(
370                                &channel,
371                                inst_id,
372                                data,
373                                &mut instruments_by_symbol,
374                                &mut quote_cache,
375                                &mut funding_cache,
376                                &greeks_guard,
377                                clock,
378                                &call_soon,
379                                &callback,
380                            );
381                        }
382                        OKXWsMessage::Instruments(okx_instruments) => {
383                            handle_instruments(
384                                okx_instruments,
385                                &mut instruments_by_symbol,
386                                clock,
387                                &call_soon,
388                                &callback,
389                            );
390                        }
391                        OKXWsMessage::Orders(order_msgs) => {
392                            handle_orders(
393                                &order_msgs,
394                                account_id,
395                                &instruments_by_symbol,
396                                &mut fee_cache,
397                                &mut filled_qty_cache,
398                                clock,
399                                &call_soon,
400                                &callback,
401                            );
402                        }
403                        OKXWsMessage::AlgoOrders(algo_msgs) => {
404                            handle_algo_orders(
405                                algo_msgs,
406                                account_id,
407                                &instruments_by_symbol,
408                                clock,
409                                &call_soon,
410                                &callback,
411                            );
412                        }
413                        OKXWsMessage::Account(data) => {
414                            handle_account(data, account_id, clock, &call_soon, &callback);
415                        }
416                        OKXWsMessage::Positions(data) => {
417                            handle_positions(
418                                data,
419                                account_id,
420                                &instruments_by_symbol,
421                                clock,
422                                &call_soon,
423                                &callback,
424                            );
425                        }
426                        OKXWsMessage::OrderResponse {
427                            id,
428                            op,
429                            code,
430                            msg,
431                            data,
432                        } => {
433                            handle_order_response(
434                                id.as_deref(),
435                                &op,
436                                &code,
437                                &msg,
438                                &data,
439                                &client,
440                                account_id,
441                                clock,
442                                &call_soon,
443                                &callback,
444                            );
445                        }
446                        OKXWsMessage::SendFailed {
447                            request_id,
448                            client_order_id,
449                            op,
450                            error,
451                        } => {
452                            handle_send_failed(
453                                &request_id,
454                                client_order_id,
455                                op.as_ref(),
456                                &error,
457                                &client,
458                                account_id,
459                                clock,
460                                &call_soon,
461                                &callback,
462                            );
463                        }
464                        OKXWsMessage::Error(msg) => {
465                            call_python_with_data(&call_soon, &callback, |py| msg.into_py_any(py));
466                        }
467                        OKXWsMessage::Reconnected => {
468                            quote_cache.clear();
469                        }
470                        OKXWsMessage::Authenticated => {}
471                    }
472                }
473            });
474
475            Ok(())
476        })
477    }
478
479    #[pyo3(name = "wait_until_active")]
480    fn py_wait_until_active<'py>(
481        &self,
482        py: Python<'py>,
483        timeout_secs: f64,
484    ) -> PyResult<Bound<'py, PyAny>> {
485        let client = self.clone();
486
487        pyo3_async_runtimes::tokio::future_into_py(py, async move {
488            client
489                .wait_until_active(timeout_secs)
490                .await
491                .map_err(to_pyruntime_err)?;
492            Ok(())
493        })
494    }
495
496    #[pyo3(name = "close")]
497    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
498        let mut client = self.clone();
499
500        pyo3_async_runtimes::tokio::future_into_py(py, async move {
501            if let Err(e) = client.close().await {
502                log::error!("Error on close: {e}");
503            }
504            Ok(())
505        })
506    }
507
508    #[pyo3(name = "subscribe_instruments")]
509    fn py_subscribe_instruments<'py>(
510        &self,
511        py: Python<'py>,
512        instrument_type: OKXInstrumentType,
513    ) -> PyResult<Bound<'py, PyAny>> {
514        let client = self.clone();
515
516        pyo3_async_runtimes::tokio::future_into_py(py, async move {
517            if let Err(e) = client.subscribe_instruments(instrument_type).await {
518                log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
519            }
520            Ok(())
521        })
522    }
523
524    #[pyo3(name = "subscribe_instrument")]
525    fn py_subscribe_instrument<'py>(
526        &self,
527        py: Python<'py>,
528        instrument_id: InstrumentId,
529    ) -> PyResult<Bound<'py, PyAny>> {
530        let client = self.clone();
531
532        pyo3_async_runtimes::tokio::future_into_py(py, async move {
533            if let Err(e) = client.subscribe_instrument(instrument_id).await {
534                log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
535            }
536            Ok(())
537        })
538    }
539
540    #[pyo3(name = "subscribe_book")]
541    fn py_subscribe_book<'py>(
542        &self,
543        py: Python<'py>,
544        instrument_id: InstrumentId,
545    ) -> PyResult<Bound<'py, PyAny>> {
546        let client = self.clone();
547
548        pyo3_async_runtimes::tokio::future_into_py(py, async move {
549            client
550                .subscribe_book(instrument_id)
551                .await
552                .map_err(to_pyvalue_err)
553        })
554    }
555
556    #[pyo3(name = "subscribe_book50_l2_tbt")]
557    fn py_subscribe_book50_l2_tbt<'py>(
558        &self,
559        py: Python<'py>,
560        instrument_id: InstrumentId,
561    ) -> PyResult<Bound<'py, PyAny>> {
562        let client = self.clone();
563
564        pyo3_async_runtimes::tokio::future_into_py(py, async move {
565            if let Err(e) = client.subscribe_book50_l2_tbt(instrument_id).await {
566                log::error!("Failed to subscribe to book50_tbt: {e}");
567            }
568            Ok(())
569        })
570    }
571
572    #[pyo3(name = "subscribe_book_l2_tbt")]
573    fn py_subscribe_book_l2_tbt<'py>(
574        &self,
575        py: Python<'py>,
576        instrument_id: InstrumentId,
577    ) -> PyResult<Bound<'py, PyAny>> {
578        let client = self.clone();
579
580        pyo3_async_runtimes::tokio::future_into_py(py, async move {
581            if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
582                log::error!("Failed to subscribe to books_l2_tbt: {e}");
583            }
584            Ok(())
585        })
586    }
587
588    #[pyo3(name = "subscribe_book_with_depth")]
589    fn py_subscribe_book_with_depth<'py>(
590        &self,
591        py: Python<'py>,
592        instrument_id: InstrumentId,
593        depth: u16,
594    ) -> PyResult<Bound<'py, PyAny>> {
595        let client = self.clone();
596
597        pyo3_async_runtimes::tokio::future_into_py(py, async move {
598            client
599                .subscribe_book_with_depth(instrument_id, depth)
600                .await
601                .map_err(to_pyvalue_err)
602        })
603    }
604
605    #[pyo3(name = "subscribe_book_depth5")]
606    fn py_subscribe_book_depth5<'py>(
607        &self,
608        py: Python<'py>,
609        instrument_id: InstrumentId,
610    ) -> PyResult<Bound<'py, PyAny>> {
611        let client = self.clone();
612
613        pyo3_async_runtimes::tokio::future_into_py(py, async move {
614            if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
615                log::error!("Failed to subscribe to books5: {e}");
616            }
617            Ok(())
618        })
619    }
620
621    #[pyo3(name = "subscribe_quotes")]
622    fn py_subscribe_quotes<'py>(
623        &self,
624        py: Python<'py>,
625        instrument_id: InstrumentId,
626    ) -> PyResult<Bound<'py, PyAny>> {
627        let client = self.clone();
628
629        pyo3_async_runtimes::tokio::future_into_py(py, async move {
630            if let Err(e) = client.subscribe_quotes(instrument_id).await {
631                log::error!("Failed to subscribe to quotes: {e}");
632            }
633            Ok(())
634        })
635    }
636
637    #[pyo3(name = "subscribe_trades")]
638    fn py_subscribe_trades<'py>(
639        &self,
640        py: Python<'py>,
641        instrument_id: InstrumentId,
642        aggregated: bool,
643    ) -> PyResult<Bound<'py, PyAny>> {
644        let client = self.clone();
645
646        pyo3_async_runtimes::tokio::future_into_py(py, async move {
647            if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
648                log::error!("Failed to subscribe to trades: {e}");
649            }
650            Ok(())
651        })
652    }
653
654    #[pyo3(name = "subscribe_bars")]
655    fn py_subscribe_bars<'py>(
656        &self,
657        py: Python<'py>,
658        bar_type: BarType,
659    ) -> PyResult<Bound<'py, PyAny>> {
660        let client = self.clone();
661
662        pyo3_async_runtimes::tokio::future_into_py(py, async move {
663            if let Err(e) = client.subscribe_bars(bar_type).await {
664                log::error!("Failed to subscribe to bars: {e}");
665            }
666            Ok(())
667        })
668    }
669
670    #[pyo3(name = "unsubscribe_book")]
671    fn py_unsubscribe_book<'py>(
672        &self,
673        py: Python<'py>,
674        instrument_id: InstrumentId,
675    ) -> PyResult<Bound<'py, PyAny>> {
676        let client = self.clone();
677
678        pyo3_async_runtimes::tokio::future_into_py(py, async move {
679            if let Err(e) = client.unsubscribe_book(instrument_id).await {
680                log::error!("Failed to unsubscribe from order book: {e}");
681            }
682            Ok(())
683        })
684    }
685
686    #[pyo3(name = "unsubscribe_book_depth5")]
687    fn py_unsubscribe_book_depth5<'py>(
688        &self,
689        py: Python<'py>,
690        instrument_id: InstrumentId,
691    ) -> PyResult<Bound<'py, PyAny>> {
692        let client = self.clone();
693
694        pyo3_async_runtimes::tokio::future_into_py(py, async move {
695            if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
696                log::error!("Failed to unsubscribe from books5: {e}");
697            }
698            Ok(())
699        })
700    }
701
702    #[pyo3(name = "unsubscribe_book50_l2_tbt")]
703    fn py_unsubscribe_book50_l2_tbt<'py>(
704        &self,
705        py: Python<'py>,
706        instrument_id: InstrumentId,
707    ) -> PyResult<Bound<'py, PyAny>> {
708        let client = self.clone();
709
710        pyo3_async_runtimes::tokio::future_into_py(py, async move {
711            if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
712                log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
713            }
714            Ok(())
715        })
716    }
717
718    #[pyo3(name = "unsubscribe_book_l2_tbt")]
719    fn py_unsubscribe_book_l2_tbt<'py>(
720        &self,
721        py: Python<'py>,
722        instrument_id: InstrumentId,
723    ) -> PyResult<Bound<'py, PyAny>> {
724        let client = self.clone();
725
726        pyo3_async_runtimes::tokio::future_into_py(py, async move {
727            if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
728                log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
729            }
730            Ok(())
731        })
732    }
733
734    #[pyo3(name = "unsubscribe_quotes")]
735    fn py_unsubscribe_quotes<'py>(
736        &self,
737        py: Python<'py>,
738        instrument_id: InstrumentId,
739    ) -> PyResult<Bound<'py, PyAny>> {
740        let client = self.clone();
741
742        pyo3_async_runtimes::tokio::future_into_py(py, async move {
743            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
744                log::error!("Failed to unsubscribe from quotes: {e}");
745            }
746            Ok(())
747        })
748    }
749
750    #[pyo3(name = "unsubscribe_trades")]
751    fn py_unsubscribe_trades<'py>(
752        &self,
753        py: Python<'py>,
754        instrument_id: InstrumentId,
755        aggregated: bool,
756    ) -> PyResult<Bound<'py, PyAny>> {
757        let client = self.clone();
758
759        pyo3_async_runtimes::tokio::future_into_py(py, async move {
760            if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
761                log::error!("Failed to unsubscribe from trades: {e}");
762            }
763            Ok(())
764        })
765    }
766
767    #[pyo3(name = "unsubscribe_bars")]
768    fn py_unsubscribe_bars<'py>(
769        &self,
770        py: Python<'py>,
771        bar_type: BarType,
772    ) -> PyResult<Bound<'py, PyAny>> {
773        let client = self.clone();
774
775        pyo3_async_runtimes::tokio::future_into_py(py, async move {
776            if let Err(e) = client.unsubscribe_bars(bar_type).await {
777                log::error!("Failed to unsubscribe from bars: {e}");
778            }
779            Ok(())
780        })
781    }
782
783    #[pyo3(name = "subscribe_ticker")]
784    fn py_subscribe_ticker<'py>(
785        &self,
786        py: Python<'py>,
787        instrument_id: InstrumentId,
788    ) -> PyResult<Bound<'py, PyAny>> {
789        let client = self.clone();
790
791        pyo3_async_runtimes::tokio::future_into_py(py, async move {
792            if let Err(e) = client.subscribe_ticker(instrument_id).await {
793                log::error!("Failed to subscribe to ticker: {e}");
794            }
795            Ok(())
796        })
797    }
798
799    #[pyo3(name = "unsubscribe_ticker")]
800    fn py_unsubscribe_ticker<'py>(
801        &self,
802        py: Python<'py>,
803        instrument_id: InstrumentId,
804    ) -> PyResult<Bound<'py, PyAny>> {
805        let client = self.clone();
806
807        pyo3_async_runtimes::tokio::future_into_py(py, async move {
808            if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
809                log::error!("Failed to unsubscribe from ticker: {e}");
810            }
811            Ok(())
812        })
813    }
814
815    #[pyo3(name = "subscribe_mark_prices")]
816    fn py_subscribe_mark_prices<'py>(
817        &self,
818        py: Python<'py>,
819        instrument_id: InstrumentId,
820    ) -> PyResult<Bound<'py, PyAny>> {
821        let client = self.clone();
822
823        pyo3_async_runtimes::tokio::future_into_py(py, async move {
824            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
825                log::error!("Failed to subscribe to mark prices: {e}");
826            }
827            Ok(())
828        })
829    }
830
831    #[pyo3(name = "unsubscribe_mark_prices")]
832    fn py_unsubscribe_mark_prices<'py>(
833        &self,
834        py: Python<'py>,
835        instrument_id: InstrumentId,
836    ) -> PyResult<Bound<'py, PyAny>> {
837        let client = self.clone();
838
839        pyo3_async_runtimes::tokio::future_into_py(py, async move {
840            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
841                log::error!("Failed to unsubscribe from mark prices: {e}");
842            }
843            Ok(())
844        })
845    }
846
847    #[pyo3(name = "subscribe_index_prices")]
848    fn py_subscribe_index_prices<'py>(
849        &self,
850        py: Python<'py>,
851        instrument_id: InstrumentId,
852    ) -> PyResult<Bound<'py, PyAny>> {
853        let client = self.clone();
854
855        pyo3_async_runtimes::tokio::future_into_py(py, async move {
856            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
857                log::error!("Failed to subscribe to index prices: {e}");
858            }
859            Ok(())
860        })
861    }
862
863    #[pyo3(name = "unsubscribe_index_prices")]
864    fn py_unsubscribe_index_prices<'py>(
865        &self,
866        py: Python<'py>,
867        instrument_id: InstrumentId,
868    ) -> PyResult<Bound<'py, PyAny>> {
869        let client = self.clone();
870
871        pyo3_async_runtimes::tokio::future_into_py(py, async move {
872            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
873                log::error!("Failed to unsubscribe from index prices: {e}");
874            }
875            Ok(())
876        })
877    }
878
879    #[pyo3(name = "add_option_greeks_sub")]
880    fn py_add_option_greeks_sub(&self, instrument_id: InstrumentId) {
881        self.add_option_greeks_sub(instrument_id);
882    }
883
884    #[pyo3(name = "add_option_greeks_sub_with_conventions")]
885    fn py_add_option_greeks_sub_with_conventions(
886        &self,
887        instrument_id: InstrumentId,
888        conventions: Vec<OKXGreeksType>,
889    ) {
890        self.add_option_greeks_sub_with_conventions(
891            instrument_id,
892            conventions.into_iter().collect(),
893        );
894    }
895
896    #[pyo3(name = "remove_option_greeks_sub")]
897    fn py_remove_option_greeks_sub(&self, instrument_id: InstrumentId) {
898        self.remove_option_greeks_sub(&instrument_id);
899    }
900
901    #[pyo3(name = "subscribe_option_summary")]
902    fn py_subscribe_option_summary<'py>(
903        &self,
904        py: Python<'py>,
905        inst_family: &str,
906    ) -> PyResult<Bound<'py, PyAny>> {
907        let client = self.clone();
908        let family = Ustr::from(inst_family);
909
910        pyo3_async_runtimes::tokio::future_into_py(py, async move {
911            if let Err(e) = client.subscribe_option_summary(family).await {
912                log::error!("Failed to subscribe to option summary: {e}");
913            }
914            Ok(())
915        })
916    }
917
918    #[pyo3(name = "unsubscribe_option_summary")]
919    fn py_unsubscribe_option_summary<'py>(
920        &self,
921        py: Python<'py>,
922        inst_family: &str,
923    ) -> PyResult<Bound<'py, PyAny>> {
924        let client = self.clone();
925        let family = Ustr::from(inst_family);
926
927        pyo3_async_runtimes::tokio::future_into_py(py, async move {
928            if let Err(e) = client.unsubscribe_option_summary(family).await {
929                log::error!("Failed to unsubscribe from option summary: {e}");
930            }
931            Ok(())
932        })
933    }
934
935    #[pyo3(name = "subscribe_funding_rates")]
936    fn py_subscribe_funding_rates<'py>(
937        &self,
938        py: Python<'py>,
939        instrument_id: InstrumentId,
940    ) -> PyResult<Bound<'py, PyAny>> {
941        let client = self.clone();
942
943        pyo3_async_runtimes::tokio::future_into_py(py, async move {
944            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
945                log::error!("Failed to subscribe to funding rates: {e}");
946            }
947            Ok(())
948        })
949    }
950
951    #[pyo3(name = "unsubscribe_funding_rates")]
952    fn py_unsubscribe_funding_rates<'py>(
953        &self,
954        py: Python<'py>,
955        instrument_id: InstrumentId,
956    ) -> PyResult<Bound<'py, PyAny>> {
957        let client = self.clone();
958
959        pyo3_async_runtimes::tokio::future_into_py(py, async move {
960            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
961                log::error!("Failed to unsubscribe from funding rates: {e}");
962            }
963            Ok(())
964        })
965    }
966
967    #[pyo3(name = "subscribe_orders")]
968    fn py_subscribe_orders<'py>(
969        &self,
970        py: Python<'py>,
971        instrument_type: OKXInstrumentType,
972    ) -> PyResult<Bound<'py, PyAny>> {
973        let client = self.clone();
974
975        pyo3_async_runtimes::tokio::future_into_py(py, async move {
976            if let Err(e) = client.subscribe_orders(instrument_type).await {
977                log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
978            }
979            Ok(())
980        })
981    }
982
983    #[pyo3(name = "unsubscribe_orders")]
984    fn py_unsubscribe_orders<'py>(
985        &self,
986        py: Python<'py>,
987        instrument_type: OKXInstrumentType,
988    ) -> PyResult<Bound<'py, PyAny>> {
989        let client = self.clone();
990
991        pyo3_async_runtimes::tokio::future_into_py(py, async move {
992            if let Err(e) = client.unsubscribe_orders(instrument_type).await {
993                log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
994            }
995            Ok(())
996        })
997    }
998
999    #[pyo3(name = "subscribe_orders_algo")]
1000    fn py_subscribe_orders_algo<'py>(
1001        &self,
1002        py: Python<'py>,
1003        instrument_type: OKXInstrumentType,
1004    ) -> PyResult<Bound<'py, PyAny>> {
1005        let client = self.clone();
1006
1007        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1008            if let Err(e) = client.subscribe_orders_algo(instrument_type).await {
1009                log::error!("Failed to subscribe to algo orders '{instrument_type}': {e}");
1010            }
1011            Ok(())
1012        })
1013    }
1014
1015    #[pyo3(name = "unsubscribe_orders_algo")]
1016    fn py_unsubscribe_orders_algo<'py>(
1017        &self,
1018        py: Python<'py>,
1019        instrument_type: OKXInstrumentType,
1020    ) -> PyResult<Bound<'py, PyAny>> {
1021        let client = self.clone();
1022
1023        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1024            if let Err(e) = client.unsubscribe_orders_algo(instrument_type).await {
1025                log::error!("Failed to unsubscribe from algo orders '{instrument_type}': {e}");
1026            }
1027            Ok(())
1028        })
1029    }
1030
1031    #[pyo3(name = "subscribe_algo_advance")]
1032    fn py_subscribe_algo_advance<'py>(
1033        &self,
1034        py: Python<'py>,
1035        instrument_type: OKXInstrumentType,
1036    ) -> PyResult<Bound<'py, PyAny>> {
1037        let client = self.clone();
1038
1039        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1040            if let Err(e) = client.subscribe_algo_advance(instrument_type).await {
1041                log::error!("Failed to subscribe to algo-advance '{instrument_type}': {e}");
1042            }
1043            Ok(())
1044        })
1045    }
1046
1047    #[pyo3(name = "unsubscribe_algo_advance")]
1048    fn py_unsubscribe_algo_advance<'py>(
1049        &self,
1050        py: Python<'py>,
1051        instrument_type: OKXInstrumentType,
1052    ) -> PyResult<Bound<'py, PyAny>> {
1053        let client = self.clone();
1054
1055        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1056            if let Err(e) = client.unsubscribe_algo_advance(instrument_type).await {
1057                log::error!("Failed to unsubscribe from algo-advance '{instrument_type}': {e}");
1058            }
1059            Ok(())
1060        })
1061    }
1062
1063    #[pyo3(name = "subscribe_fills")]
1064    fn py_subscribe_fills<'py>(
1065        &self,
1066        py: Python<'py>,
1067        instrument_type: OKXInstrumentType,
1068    ) -> PyResult<Bound<'py, PyAny>> {
1069        let client = self.clone();
1070
1071        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1072            if let Err(e) = client.subscribe_fills(instrument_type).await {
1073                log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
1074            }
1075            Ok(())
1076        })
1077    }
1078
1079    #[pyo3(name = "unsubscribe_fills")]
1080    fn py_unsubscribe_fills<'py>(
1081        &self,
1082        py: Python<'py>,
1083        instrument_type: OKXInstrumentType,
1084    ) -> PyResult<Bound<'py, PyAny>> {
1085        let client = self.clone();
1086
1087        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1088            if let Err(e) = client.unsubscribe_fills(instrument_type).await {
1089                log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
1090            }
1091            Ok(())
1092        })
1093    }
1094
1095    #[pyo3(name = "subscribe_account")]
1096    fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1097        let client = self.clone();
1098
1099        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1100            if let Err(e) = client.subscribe_account().await {
1101                log::error!("Failed to subscribe to account: {e}");
1102            }
1103            Ok(())
1104        })
1105    }
1106
1107    #[pyo3(name = "unsubscribe_account")]
1108    fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1109        let client = self.clone();
1110
1111        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1112            if let Err(e) = client.unsubscribe_account().await {
1113                log::error!("Failed to unsubscribe from account: {e}");
1114            }
1115            Ok(())
1116        })
1117    }
1118
1119    #[pyo3(name = "submit_order")]
1120    #[pyo3(signature = (
1121        trader_id,
1122        strategy_id,
1123        instrument_id,
1124        td_mode,
1125        client_order_id,
1126        order_side,
1127        order_type,
1128        quantity,
1129        time_in_force=None,
1130        price=None,
1131        trigger_price=None,
1132        post_only=None,
1133        reduce_only=None,
1134        quote_quantity=None,
1135        position_side=None,
1136        attach_algo_ords=None,
1137        px_usd=None,
1138        px_vol=None,
1139    ))]
1140    #[expect(clippy::too_many_arguments)]
1141    fn py_submit_order<'py>(
1142        &self,
1143        py: Python<'py>,
1144        trader_id: TraderId,
1145        strategy_id: StrategyId,
1146        instrument_id: InstrumentId,
1147        td_mode: OKXTradeMode,
1148        client_order_id: ClientOrderId,
1149        order_side: OrderSide,
1150        order_type: OrderType,
1151        quantity: Quantity,
1152        time_in_force: Option<TimeInForce>,
1153        price: Option<Price>,
1154        trigger_price: Option<Price>,
1155        post_only: Option<bool>,
1156        reduce_only: Option<bool>,
1157        quote_quantity: Option<bool>,
1158        position_side: Option<PositionSide>,
1159        attach_algo_ords: Option<Vec<Py<PyDict>>>,
1160        px_usd: Option<String>,
1161        px_vol: Option<String>,
1162    ) -> PyResult<Bound<'py, PyAny>> {
1163        let attach_algo_ords = parse_attach_algo_ords(py, attach_algo_ords)?;
1164        let client = self.clone();
1165
1166        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1167            client
1168                .submit_order(
1169                    trader_id,
1170                    strategy_id,
1171                    instrument_id,
1172                    td_mode,
1173                    client_order_id,
1174                    order_side,
1175                    order_type,
1176                    quantity,
1177                    time_in_force,
1178                    price,
1179                    trigger_price,
1180                    post_only,
1181                    reduce_only,
1182                    quote_quantity,
1183                    position_side,
1184                    attach_algo_ords,
1185                    px_usd,
1186                    px_vol,
1187                )
1188                .await
1189                .map_err(to_pyvalue_err)
1190        })
1191    }
1192
1193    #[pyo3(name = "cancel_order", signature = (
1194        trader_id,
1195        strategy_id,
1196        instrument_id,
1197        client_order_id=None,
1198        venue_order_id=None,
1199    ))]
1200    fn py_cancel_order<'py>(
1201        &self,
1202        py: Python<'py>,
1203        trader_id: TraderId,
1204        strategy_id: StrategyId,
1205        instrument_id: InstrumentId,
1206        client_order_id: Option<ClientOrderId>,
1207        venue_order_id: Option<VenueOrderId>,
1208    ) -> PyResult<Bound<'py, PyAny>> {
1209        let client = self.clone();
1210
1211        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1212            client
1213                .cancel_order(
1214                    trader_id,
1215                    strategy_id,
1216                    instrument_id,
1217                    client_order_id,
1218                    venue_order_id,
1219                )
1220                .await
1221                .map_err(to_pyvalue_err)
1222        })
1223    }
1224
1225    #[pyo3(name = "modify_order")]
1226    #[pyo3(signature = (
1227        trader_id,
1228        strategy_id,
1229        instrument_id,
1230        client_order_id=None,
1231        venue_order_id=None,
1232        price=None,
1233        quantity=None,
1234        new_px_usd=None,
1235        new_px_vol=None,
1236    ))]
1237    #[expect(clippy::too_many_arguments)]
1238    fn py_modify_order<'py>(
1239        &self,
1240        py: Python<'py>,
1241        trader_id: TraderId,
1242        strategy_id: StrategyId,
1243        instrument_id: InstrumentId,
1244        client_order_id: Option<ClientOrderId>,
1245        venue_order_id: Option<VenueOrderId>,
1246        price: Option<Price>,
1247        quantity: Option<Quantity>,
1248        new_px_usd: Option<String>,
1249        new_px_vol: Option<String>,
1250    ) -> PyResult<Bound<'py, PyAny>> {
1251        let client = self.clone();
1252
1253        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1254            client
1255                .modify_order(
1256                    trader_id,
1257                    strategy_id,
1258                    instrument_id,
1259                    client_order_id,
1260                    price,
1261                    quantity,
1262                    venue_order_id,
1263                    new_px_usd,
1264                    new_px_vol,
1265                )
1266                .await
1267                .map_err(to_pyvalue_err)
1268        })
1269    }
1270
1271    #[expect(clippy::type_complexity)]
1272    #[pyo3(name = "batch_submit_orders")]
1273    fn py_batch_submit_orders<'py>(
1274        &self,
1275        py: Python<'py>,
1276        orders: Vec<Py<PyAny>>,
1277    ) -> PyResult<Bound<'py, PyAny>> {
1278        let mut domain_orders = Vec::with_capacity(orders.len());
1279
1280        for obj in orders {
1281            let (
1282                instrument_type,
1283                instrument_id,
1284                td_mode,
1285                client_order_id,
1286                order_side,
1287                order_type,
1288                quantity,
1289                position_side,
1290                price,
1291                trigger_price,
1292                post_only,
1293                reduce_only,
1294            ): (
1295                OKXInstrumentType,
1296                InstrumentId,
1297                OKXTradeMode,
1298                ClientOrderId,
1299                OrderSide,
1300                OrderType,
1301                Quantity,
1302                Option<PositionSide>,
1303                Option<Price>,
1304                Option<Price>,
1305                Option<bool>,
1306                Option<bool>,
1307            ) = obj.extract(py).map_err(to_pyruntime_err)?;
1308
1309            domain_orders.push((
1310                instrument_type,
1311                instrument_id,
1312                td_mode,
1313                client_order_id,
1314                order_side,
1315                position_side,
1316                order_type,
1317                quantity,
1318                price,
1319                trigger_price,
1320                post_only,
1321                reduce_only,
1322            ));
1323        }
1324
1325        let client = self.clone();
1326
1327        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1328            client
1329                .batch_submit_orders(domain_orders)
1330                .await
1331                .map_err(to_pyvalue_err)
1332        })
1333    }
1334
1335    /// Cancels multiple orders via WebSocket.
1336    #[pyo3(name = "batch_cancel_orders")]
1337    fn py_batch_cancel_orders<'py>(
1338        &self,
1339        py: Python<'py>,
1340        cancels: Vec<Py<PyAny>>,
1341    ) -> PyResult<Bound<'py, PyAny>> {
1342        let mut batched_cancels = Vec::with_capacity(cancels.len());
1343
1344        for obj in cancels {
1345            let (instrument_id, client_order_id, order_id): (
1346                InstrumentId,
1347                Option<ClientOrderId>,
1348                Option<VenueOrderId>,
1349            ) = obj.extract(py).map_err(to_pyruntime_err)?;
1350            batched_cancels.push((instrument_id, client_order_id, order_id));
1351        }
1352
1353        let client = self.clone();
1354
1355        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1356            client
1357                .batch_cancel_orders(batched_cancels)
1358                .await
1359                .map_err(to_pyvalue_err)
1360        })
1361    }
1362
1363    #[pyo3(name = "batch_modify_orders")]
1364    fn py_batch_modify_orders<'py>(
1365        &self,
1366        py: Python<'py>,
1367        orders: Vec<Py<PyAny>>,
1368    ) -> PyResult<Bound<'py, PyAny>> {
1369        let mut domain_orders = Vec::with_capacity(orders.len());
1370
1371        for obj in orders {
1372            let (
1373                instrument_type,
1374                instrument_id,
1375                client_order_id,
1376                new_client_order_id,
1377                price,
1378                quantity,
1379            ): (
1380                String,
1381                InstrumentId,
1382                ClientOrderId,
1383                ClientOrderId,
1384                Option<Price>,
1385                Option<Quantity>,
1386            ) = obj.extract(py).map_err(to_pyruntime_err)?;
1387            let inst_type =
1388                OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1389            domain_orders.push((
1390                inst_type,
1391                instrument_id,
1392                client_order_id,
1393                new_client_order_id,
1394                price,
1395                quantity,
1396            ));
1397        }
1398
1399        let client = self.clone();
1400
1401        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1402            client
1403                .batch_modify_orders(domain_orders)
1404                .await
1405                .map_err(to_pyvalue_err)
1406        })
1407    }
1408
1409    #[pyo3(name = "mass_cancel_orders")]
1410    fn py_mass_cancel_orders<'py>(
1411        &self,
1412        py: Python<'py>,
1413        instrument_id: InstrumentId,
1414    ) -> PyResult<Bound<'py, PyAny>> {
1415        let client = self.clone();
1416
1417        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1418            client
1419                .mass_cancel_orders(instrument_id)
1420                .await
1421                .map_err(to_pyvalue_err)
1422        })
1423    }
1424
1425    #[pyo3(name = "cache_instruments")]
1426    fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
1427        let instruments: Result<Vec<_>, _> = instruments
1428            .into_iter()
1429            .map(|inst| pyobject_to_instrument_any(py, inst))
1430            .collect();
1431        self.cache_instruments(&instruments?);
1432        Ok(())
1433    }
1434
1435    #[pyo3(name = "cache_instrument")]
1436    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
1437        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
1438        Ok(())
1439    }
1440
1441    #[pyo3(name = "cache_inst_id_codes")]
1442    fn py_cache_inst_id_codes(&self, mappings: Vec<(String, u64)>) {
1443        let ustr_mappings = mappings
1444            .into_iter()
1445            .map(|(inst_id, code)| (Ustr::from(&inst_id), code));
1446        self.cache_inst_id_codes(ustr_mappings);
1447    }
1448}
1449
1450fn handle_book_data(
1451    inst_id: Option<Ustr>,
1452    action: OKXBookAction,
1453    data: Vec<OKXBookMsg>,
1454    instruments_by_symbol: &AHashMap<Ustr, InstrumentAny>,
1455    clock: &AtomicTime,
1456    call_soon: &Py<PyAny>,
1457    callback: &Py<PyAny>,
1458) {
1459    let Some(inst_id) = inst_id else { return };
1460    let Some(instrument) = instruments_by_symbol.get(&inst_id) else {
1461        log::warn!("No cached instrument for book data: {inst_id}");
1462        return;
1463    };
1464    let ts_init = clock.get_time_ns();
1465
1466    match parse_book_msg_vec(
1467        data,
1468        &instrument.id(),
1469        instrument.price_precision(),
1470        instrument.size_precision(),
1471        action,
1472        ts_init,
1473    ) {
1474        Ok(data_vec) => Python::attach(|py| {
1475            for d in data_vec {
1476                let py_obj = data_to_pycapsule(py, d);
1477                call_python_threadsafe(py, call_soon, callback, py_obj);
1478            }
1479        }),
1480        Err(e) => log::error!("Failed to parse book data: {e}"),
1481    }
1482}
1483
1484#[expect(clippy::too_many_arguments)]
1485fn handle_channel_data(
1486    channel: &OKXWsChannel,
1487    inst_id: Option<Ustr>,
1488    data: serde_json::Value,
1489    instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
1490    quote_cache: &mut QuoteCache,
1491    funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
1492    option_greeks_subs: &AHashMap<InstrumentId, AHashSet<OKXGreeksType>>,
1493    clock: &AtomicTime,
1494    call_soon: &Py<PyAny>,
1495    callback: &Py<PyAny>,
1496) {
1497    if matches!(channel, OKXWsChannel::OptionSummary) {
1498        let ts_init = clock.get_time_ns();
1499
1500        match serde_json::from_value::<Vec<OKXOptionSummaryMsg>>(data) {
1501            Ok(msgs) => {
1502                for msg in &msgs {
1503                    let Some(instrument) = instruments_by_symbol.get(&msg.inst_id) else {
1504                        continue;
1505                    };
1506                    let instrument_id = instrument.id();
1507                    let Some(conventions) = option_greeks_subs.get(&instrument_id) else {
1508                        continue;
1509                    };
1510
1511                    for greeks_type in conventions {
1512                        match parse_option_summary_greeks(
1513                            msg,
1514                            &instrument_id,
1515                            *greeks_type,
1516                            ts_init,
1517                        ) {
1518                            Ok(greeks) => {
1519                                Python::attach(|py| match greeks.into_py_any(py) {
1520                                    Ok(py_obj) => {
1521                                        call_python_threadsafe(py, call_soon, callback, py_obj);
1522                                    }
1523                                    Err(e) => {
1524                                        log::error!(
1525                                            "Failed to convert OptionGreeks to Python: {e}"
1526                                        );
1527                                    }
1528                                });
1529                            }
1530                            Err(e) => {
1531                                log::error!(
1532                                    "Failed to parse option summary for {} ({greeks_type:?}): {e}",
1533                                    msg.inst_id
1534                                );
1535                            }
1536                        }
1537                    }
1538                }
1539            }
1540            Err(e) => log::error!("Failed to deserialize option summary data: {e}"),
1541        }
1542        return;
1543    }
1544
1545    let Some(inst_id) = inst_id else { return };
1546
1547    if matches!(channel, OKXWsChannel::IndexTickers) {
1548        let ts_init = clock.get_time_ns();
1549        let prefix = format!("{inst_id}-");
1550        let matching: Vec<_> = instruments_by_symbol
1551            .values()
1552            .filter(|i| {
1553                let s = i.symbol().inner();
1554                s == inst_id || s.as_str().starts_with(&prefix)
1555            })
1556            .collect();
1557
1558        for instrument in matching {
1559            if let Ok(data_vec) = parse_index_price_msg_vec(
1560                data.clone(),
1561                &instrument.id(),
1562                instrument.price_precision(),
1563                ts_init,
1564            ) {
1565                Python::attach(|py| {
1566                    for d in data_vec {
1567                        let py_obj = data_to_pycapsule(py, d);
1568                        call_python_threadsafe(py, call_soon, callback, py_obj);
1569                    }
1570                });
1571            }
1572        }
1573        return;
1574    }
1575
1576    let Some(instrument) = instruments_by_symbol.get(&inst_id) else {
1577        log::warn!("No cached instrument for {channel:?}: {inst_id}");
1578        return;
1579    };
1580    let instrument_id = instrument.id();
1581    let price_precision = instrument.price_precision();
1582    let size_precision = instrument.size_precision();
1583    let ts_init = clock.get_time_ns();
1584
1585    if matches!(channel, OKXWsChannel::BboTbt) {
1586        handle_bbo_tbt(
1587            data,
1588            instrument_id,
1589            price_precision,
1590            size_precision,
1591            ts_init,
1592            quote_cache,
1593            call_soon,
1594            callback,
1595        );
1596        return;
1597    }
1598
1599    match parse_ws_message_data(
1600        channel,
1601        data,
1602        &instrument_id,
1603        price_precision,
1604        size_precision,
1605        ts_init,
1606        funding_cache,
1607        instruments_by_symbol,
1608    ) {
1609        Ok(Some(ws_msg)) => {
1610            dispatch_nautilus_ws_msg_to_python(ws_msg, call_soon, callback, instruments_by_symbol);
1611        }
1612        Ok(None) => {}
1613        Err(e) => {
1614            log::error!("Failed to parse {channel:?} data: {e}");
1615        }
1616    }
1617}
1618
1619#[expect(clippy::too_many_arguments)]
1620fn handle_bbo_tbt(
1621    data: serde_json::Value,
1622    instrument_id: InstrumentId,
1623    price_precision: u8,
1624    size_precision: u8,
1625    ts_init: UnixNanos,
1626    quote_cache: &mut QuoteCache,
1627    call_soon: &Py<PyAny>,
1628    callback: &Py<PyAny>,
1629) {
1630    let msgs: Vec<OKXBookMsg> = match serde_json::from_value(data) {
1631        Ok(msgs) => msgs,
1632        Err(e) => {
1633            log::error!("Failed to deserialize BboTbt data: {e}");
1634            return;
1635        }
1636    };
1637
1638    for msg in &msgs {
1639        let bid = msg.bids.first();
1640        let ask = msg.asks.first();
1641
1642        let bid_price = bid.and_then(|e| parse_price(&e.price, price_precision).ok());
1643        let bid_size = bid.and_then(|e| parse_quantity(&e.size, size_precision).ok());
1644        let ask_price = ask.and_then(|e| parse_price(&e.price, price_precision).ok());
1645        let ask_size = ask.and_then(|e| parse_quantity(&e.size, size_precision).ok());
1646        let ts_event = parse_millisecond_timestamp(msg.ts);
1647
1648        match quote_cache.process(
1649            instrument_id,
1650            bid_price,
1651            ask_price,
1652            bid_size,
1653            ask_size,
1654            ts_event,
1655            ts_init,
1656        ) {
1657            Ok(quote) => {
1658                Python::attach(|py| {
1659                    let py_obj = data_to_pycapsule(py, Data::Quote(quote));
1660                    call_python_threadsafe(py, call_soon, callback, py_obj);
1661                });
1662            }
1663            Err(e) => {
1664                log::debug!("Skipping partial BboTbt for {instrument_id}: {e}");
1665            }
1666        }
1667    }
1668}
1669
1670fn handle_instruments(
1671    okx_instruments: Vec<OKXInstrument>,
1672    instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
1673    clock: &AtomicTime,
1674    call_soon: &Py<PyAny>,
1675    callback: &Py<PyAny>,
1676) {
1677    let ts_init = clock.get_time_ns();
1678
1679    for okx_inst in okx_instruments {
1680        let inst_key = Ustr::from(&okx_inst.inst_id);
1681        let (margin_init, margin_maint, maker_fee, taker_fee) =
1682            instruments_by_symbol.get(&inst_key).map_or(
1683                (None, None, None, None),
1684                extract_fees_from_cached_instrument,
1685            );
1686        let status_action = okx_status_to_market_action(okx_inst.state);
1687        let is_live = matches!(okx_inst.state, OKXInstrumentStatus::Live);
1688
1689        if let Ok(Some(inst_any)) = parse_instrument_any(
1690            &okx_inst,
1691            margin_init,
1692            margin_maint,
1693            maker_fee,
1694            taker_fee,
1695            ts_init,
1696        ) {
1697            let instrument_id = inst_any.id();
1698            instruments_by_symbol.insert(inst_any.symbol().inner(), inst_any.clone());
1699            call_python_with_data(call_soon, callback, |py| {
1700                instrument_any_to_pyobject(py, inst_any)
1701            });
1702            let status = InstrumentStatus::new(
1703                instrument_id,
1704                status_action,
1705                ts_init,
1706                ts_init,
1707                None,
1708                None,
1709                Some(is_live),
1710                None,
1711                None,
1712            );
1713            call_python_with_data(call_soon, callback, |py| status.into_py_any(py));
1714        }
1715    }
1716}
1717
1718#[expect(clippy::too_many_arguments)]
1719fn handle_orders(
1720    order_msgs: &[OKXOrderMsg],
1721    account_id: AccountId,
1722    instruments_by_symbol: &AHashMap<Ustr, InstrumentAny>,
1723    fee_cache: &mut AHashMap<Ustr, Money>,
1724    filled_qty_cache: &mut AHashMap<Ustr, Quantity>,
1725    clock: &AtomicTime,
1726    call_soon: &Py<PyAny>,
1727    callback: &Py<PyAny>,
1728) {
1729    let ts_init = clock.get_time_ns();
1730
1731    match parse_order_msg_vec(
1732        order_msgs,
1733        account_id,
1734        instruments_by_symbol,
1735        fee_cache,
1736        filled_qty_cache,
1737        ts_init,
1738    ) {
1739        Ok(reports) => {
1740            dispatch_execution_reports_to_python(reports, call_soon, callback);
1741        }
1742        Err(e) => {
1743            log::error!("Failed to parse order messages: {e}");
1744        }
1745    }
1746}
1747
1748fn handle_algo_orders(
1749    algo_msgs: Vec<OKXAlgoOrderMsg>,
1750    account_id: AccountId,
1751    instruments_by_symbol: &AHashMap<Ustr, InstrumentAny>,
1752    clock: &AtomicTime,
1753    call_soon: &Py<PyAny>,
1754    callback: &Py<PyAny>,
1755) {
1756    let ts_init = clock.get_time_ns();
1757    for algo_msg in algo_msgs {
1758        match parse_algo_order_msg(&algo_msg, account_id, instruments_by_symbol, ts_init) {
1759            Ok(Some(report)) => {
1760                dispatch_execution_reports_to_python(vec![report], call_soon, callback);
1761            }
1762            Ok(None) => {}
1763            Err(e) => {
1764                log::error!("Failed to parse algo order: {e}");
1765            }
1766        }
1767    }
1768}
1769
1770fn handle_account(
1771    data: serde_json::Value,
1772    account_id: AccountId,
1773    clock: &AtomicTime,
1774    call_soon: &Py<PyAny>,
1775    callback: &Py<PyAny>,
1776) {
1777    if let Ok(accounts) = serde_json::from_value::<Vec<OKXAccount>>(data) {
1778        let ts_init = clock.get_time_ns();
1779        for account in &accounts {
1780            if let Ok(account_state) = parse_account_state(account, account_id, ts_init) {
1781                call_python_with_data(call_soon, callback, |py| account_state.into_py_any(py));
1782            }
1783        }
1784    }
1785}
1786
1787fn handle_positions(
1788    data: serde_json::Value,
1789    account_id: AccountId,
1790    instruments_by_symbol: &AHashMap<Ustr, InstrumentAny>,
1791    clock: &AtomicTime,
1792    call_soon: &Py<PyAny>,
1793    callback: &Py<PyAny>,
1794) {
1795    if let Ok(positions) = serde_json::from_value::<Vec<OKXPosition>>(data) {
1796        let ts_init = clock.get_time_ns();
1797
1798        for position in positions {
1799            let inst_key = Ustr::from(&position.inst_id);
1800            if let Some(instrument) = instruments_by_symbol.get(&inst_key) {
1801                match parse_position_status_report(
1802                    &position,
1803                    account_id,
1804                    instrument.id(),
1805                    instrument.size_precision(),
1806                    ts_init,
1807                ) {
1808                    Ok(report) => {
1809                        call_python_with_data(call_soon, callback, |py| report.into_py_any(py));
1810                    }
1811                    Err(e) => {
1812                        log::error!("Failed to parse position: {e}");
1813                    }
1814                }
1815            }
1816        }
1817    }
1818}
1819
1820#[expect(clippy::too_many_arguments)]
1821fn handle_order_response(
1822    id: Option<&str>,
1823    op: &OKXWsOperation,
1824    code: &str,
1825    msg: &str,
1826    data: &[serde_json::Value],
1827    client: &OKXWebSocketClient,
1828    account_id: AccountId,
1829    clock: &AtomicTime,
1830    call_soon: &Py<PyAny>,
1831    callback: &Py<PyAny>,
1832) {
1833    for item in data {
1834        let s_code = item
1835            .get(OKX_FIELD_SCODE)
1836            .and_then(|v| v.as_str())
1837            .unwrap_or("");
1838        let s_msg = item
1839            .get(OKX_FIELD_SMSG)
1840            .and_then(|v| v.as_str())
1841            .unwrap_or("");
1842        let cl_ord_id = item
1843            .get(OKX_FIELD_CLORDID)
1844            .and_then(|v| v.as_str())
1845            .unwrap_or("");
1846
1847        if s_code == OKX_SUCCESS_CODE {
1848            log::debug!("Order response ok: op={op:?} cl_ord_id={cl_ord_id}");
1849            match op {
1850                OKXWsOperation::Order | OKXWsOperation::BatchOrders => {
1851                    if let Some((_, info)) = client.pending_orders.remove(cl_ord_id) {
1852                        let venue_order_id = item
1853                            .get("ordId")
1854                            .and_then(|v| v.as_str())
1855                            .filter(|s| !s.is_empty());
1856
1857                        if let Some(ord_id) = venue_order_id {
1858                            let ts_init = clock.get_time_ns();
1859                            let accepted = OrderAccepted::new(
1860                                info.trader_id,
1861                                info.strategy_id,
1862                                info.instrument_id,
1863                                ClientOrderId::from(cl_ord_id),
1864                                VenueOrderId::new(ord_id),
1865                                account_id,
1866                                UUID4::new(),
1867                                ts_init,
1868                                ts_init,
1869                                false,
1870                            );
1871                            call_python_with_data(call_soon, callback, |py| {
1872                                accepted.into_py_any(py)
1873                            });
1874                        } else {
1875                            log::error!(
1876                                "No venue_order_id for accepted order: cl_ord_id={cl_ord_id}"
1877                            );
1878                        }
1879                    }
1880                }
1881                OKXWsOperation::OrderAlgo => {
1882                    client.pending_orders.remove(cl_ord_id);
1883                    log::debug!("Algo order placement confirmed: cl_ord_id={cl_ord_id}");
1884                }
1885                OKXWsOperation::CancelOrder
1886                | OKXWsOperation::BatchCancelOrders
1887                | OKXWsOperation::MassCancel
1888                | OKXWsOperation::CancelAlgos => {
1889                    client.pending_cancels.remove(cl_ord_id);
1890                }
1891                OKXWsOperation::AmendOrder | OKXWsOperation::BatchAmendOrders => {
1892                    client.pending_amends.remove(cl_ord_id);
1893                }
1894                _ => {}
1895            }
1896        } else if !cl_ord_id.is_empty() {
1897            log::warn!(
1898                "Order response rejected: op={op:?} cl_ord_id={cl_ord_id} \
1899                 s_code={s_code} s_msg={s_msg}"
1900            );
1901            let ts_init = clock.get_time_ns();
1902            let client_order_id = ClientOrderId::from(cl_ord_id);
1903            let venue_order_id = item
1904                .get("ordId")
1905                .and_then(|v| v.as_str())
1906                .filter(|s| !s.is_empty())
1907                .map(VenueOrderId::new);
1908
1909            match op {
1910                OKXWsOperation::Order | OKXWsOperation::BatchOrders | OKXWsOperation::OrderAlgo => {
1911                    if let Some((_, info)) = client.pending_orders.remove(cl_ord_id) {
1912                        let rejected = OrderRejected::new(
1913                            info.trader_id,
1914                            info.strategy_id,
1915                            info.instrument_id,
1916                            client_order_id,
1917                            account_id,
1918                            Ustr::from(s_msg),
1919                            UUID4::new(),
1920                            ts_init,
1921                            ts_init,
1922                            false,
1923                            false,
1924                        );
1925                        call_python_with_data(call_soon, callback, |py| rejected.into_py_any(py));
1926                    }
1927                }
1928                OKXWsOperation::CancelOrder
1929                | OKXWsOperation::BatchCancelOrders
1930                | OKXWsOperation::MassCancel
1931                | OKXWsOperation::CancelAlgos => {
1932                    if let Some((_, info)) = client.pending_cancels.remove(cl_ord_id) {
1933                        let rejected = OrderCancelRejected::new(
1934                            info.trader_id,
1935                            info.strategy_id,
1936                            info.instrument_id,
1937                            client_order_id,
1938                            Ustr::from(s_msg),
1939                            UUID4::new(),
1940                            ts_init,
1941                            ts_init,
1942                            false,
1943                            venue_order_id,
1944                            Some(account_id),
1945                        );
1946                        call_python_with_data(call_soon, callback, |py| rejected.into_py_any(py));
1947                    }
1948                }
1949                OKXWsOperation::AmendOrder | OKXWsOperation::BatchAmendOrders => {
1950                    if let Some((_, info)) = client.pending_amends.remove(cl_ord_id) {
1951                        let rejected = OrderModifyRejected::new(
1952                            info.trader_id,
1953                            info.strategy_id,
1954                            info.instrument_id,
1955                            client_order_id,
1956                            Ustr::from(s_msg),
1957                            UUID4::new(),
1958                            ts_init,
1959                            ts_init,
1960                            false,
1961                            venue_order_id,
1962                            Some(account_id),
1963                        );
1964                        call_python_with_data(call_soon, callback, |py| rejected.into_py_any(py));
1965                    }
1966                }
1967                _ => {}
1968            }
1969        }
1970    }
1971
1972    if code != "0" && data.is_empty() {
1973        log::warn!("Order response error (no data): id={id:?} op={op:?} code={code} msg={msg}");
1974    }
1975}
1976
1977#[expect(clippy::too_many_arguments)]
1978fn handle_send_failed(
1979    request_id: &str,
1980    client_order_id: Option<ClientOrderId>,
1981    op: Option<&OKXWsOperation>,
1982    error: &str,
1983    client: &OKXWebSocketClient,
1984    account_id: AccountId,
1985    clock: &AtomicTime,
1986    call_soon: &Py<PyAny>,
1987    callback: &Py<PyAny>,
1988) {
1989    log::error!("WebSocket send failed: request_id={request_id} error={error}");
1990
1991    let Some(client_order_id) = client_order_id else {
1992        return;
1993    };
1994    let cl_ord_str = client_order_id.to_string();
1995    let ts_init = clock.get_time_ns();
1996
1997    match op {
1998        Some(OKXWsOperation::Order | OKXWsOperation::BatchOrders | OKXWsOperation::OrderAlgo) => {
1999            if let Some((_, info)) = client.pending_orders.remove(&cl_ord_str) {
2000                let rejected = OrderRejected::new(
2001                    info.trader_id,
2002                    info.strategy_id,
2003                    info.instrument_id,
2004                    client_order_id,
2005                    account_id,
2006                    Ustr::from(error),
2007                    UUID4::new(),
2008                    ts_init,
2009                    ts_init,
2010                    false,
2011                    false,
2012                );
2013                call_python_with_data(call_soon, callback, |py| rejected.into_py_any(py));
2014            }
2015        }
2016        Some(
2017            OKXWsOperation::CancelOrder
2018            | OKXWsOperation::BatchCancelOrders
2019            | OKXWsOperation::MassCancel
2020            | OKXWsOperation::CancelAlgos,
2021        ) => {
2022            if let Some((_, info)) = client.pending_cancels.remove(&cl_ord_str) {
2023                let rejected = OrderCancelRejected::new(
2024                    info.trader_id,
2025                    info.strategy_id,
2026                    info.instrument_id,
2027                    client_order_id,
2028                    Ustr::from(error),
2029                    UUID4::new(),
2030                    ts_init,
2031                    ts_init,
2032                    false,
2033                    None,
2034                    Some(account_id),
2035                );
2036                call_python_with_data(call_soon, callback, |py| rejected.into_py_any(py));
2037            }
2038        }
2039        Some(OKXWsOperation::AmendOrder | OKXWsOperation::BatchAmendOrders) => {
2040            if let Some((_, info)) = client.pending_amends.remove(&cl_ord_str) {
2041                let rejected = OrderModifyRejected::new(
2042                    info.trader_id,
2043                    info.strategy_id,
2044                    info.instrument_id,
2045                    client_order_id,
2046                    Ustr::from(error),
2047                    UUID4::new(),
2048                    ts_init,
2049                    ts_init,
2050                    false,
2051                    None,
2052                    Some(account_id),
2053                );
2054                call_python_with_data(call_soon, callback, |py| rejected.into_py_any(py));
2055            }
2056        }
2057        _ => {
2058            log::warn!("SendFailed for {client_order_id} with unknown op, cannot emit rejection");
2059        }
2060    }
2061}
2062
2063fn call_python_with_data<F>(call_soon: &Py<PyAny>, callback: &Py<PyAny>, data_converter: F)
2064where
2065    F: FnOnce(Python) -> PyResult<Py<PyAny>>,
2066{
2067    Python::attach(|py| match data_converter(py) {
2068        Ok(py_obj) => call_python_threadsafe(py, call_soon, callback, py_obj),
2069        Err(e) => log::error!("Failed to convert data to Python object: {e}"),
2070    });
2071}
2072
2073fn dispatch_nautilus_ws_msg_to_python(
2074    msg: NautilusWsMessage,
2075    call_soon: &Py<PyAny>,
2076    callback: &Py<PyAny>,
2077    instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
2078) {
2079    match msg {
2080        NautilusWsMessage::Data(payloads) => Python::attach(|py| {
2081            for data in payloads {
2082                let py_obj = data_to_pycapsule(py, data);
2083                call_python_threadsafe(py, call_soon, callback, py_obj);
2084            }
2085        }),
2086        NautilusWsMessage::Deltas(deltas) => Python::attach(|py| {
2087            let py_obj = data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(deltas)));
2088            call_python_threadsafe(py, call_soon, callback, py_obj);
2089        }),
2090        NautilusWsMessage::FundingRates(updates) => {
2091            for data in updates {
2092                call_python_with_data(call_soon, callback, |py| data.into_py_any(py));
2093            }
2094        }
2095        NautilusWsMessage::Instrument(instrument, status) => {
2096            instruments_by_symbol.insert(instrument.symbol().inner(), (*instrument).clone());
2097            call_python_with_data(call_soon, callback, |py| {
2098                instrument_any_to_pyobject(py, *instrument)
2099            });
2100
2101            if let Some(status) = status {
2102                call_python_with_data(call_soon, callback, |py| status.into_py_any(py));
2103            }
2104        }
2105        _ => {}
2106    }
2107}
2108
2109fn dispatch_execution_reports_to_python(
2110    reports: Vec<ExecutionReport>,
2111    call_soon: &Py<PyAny>,
2112    callback: &Py<PyAny>,
2113) {
2114    for report in reports {
2115        match report {
2116            ExecutionReport::Order(report) => {
2117                call_python_with_data(call_soon, callback, |py| report.into_py_any(py));
2118            }
2119            ExecutionReport::Fill(report) => {
2120                call_python_with_data(call_soon, callback, |py| report.into_py_any(py));
2121            }
2122        }
2123    }
2124}