1use std::str::FromStr;
45
46use ahash::{AHashMap, AHashSet};
47use futures_util::StreamExt;
48use nautilus_common::{cache::quote::QuoteCache, live::get_runtime};
49use nautilus_core::{
50 UUID4, UnixNanos,
51 python::{call_python_threadsafe, to_pyruntime_err, to_pyvalue_err},
52 time::{AtomicTime, get_atomic_clock_realtime},
53};
54use nautilus_model::{
55 data::{BarType, Data, InstrumentStatus, OrderBookDeltas_API},
56 enums::{OrderSide, OrderType, PositionSide, TimeInForce},
57 events::{OrderAccepted, OrderCancelRejected, OrderModifyRejected, OrderRejected},
58 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
59 instruments::{Instrument, InstrumentAny},
60 python::{
61 data::data_to_pycapsule,
62 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
63 },
64 types::{Money, Price, Quantity},
65};
66use nautilus_network::websocket::TransportBackend;
67use pyo3::{IntoPyObjectExt, prelude::*, types::PyDict};
68use ustr::Ustr;
69
70use super::{extract_optional_string, extract_optional_trigger_type};
71use crate::{
72 common::{
73 consts::{OKX_FIELD_CLORDID, OKX_FIELD_SCODE, OKX_FIELD_SMSG, OKX_SUCCESS_CODE},
74 enums::{
75 OKXBookAction, OKXGreeksType, OKXInstrumentStatus, OKXInstrumentType, OKXTradeMode,
76 OKXVipLevel,
77 },
78 models::OKXInstrument,
79 parse::{
80 okx_status_to_market_action, parse_account_state, parse_instrument_any,
81 parse_millisecond_timestamp, parse_position_status_report, parse_price, parse_quantity,
82 },
83 },
84 http::models::{OKXAccount, OKXPosition},
85 websocket::{
86 OKXWebSocketClient,
87 enums::{OKXWsChannel, OKXWsOperation},
88 messages::{
89 ExecutionReport, NautilusWsMessage, OKXAlgoOrderMsg, OKXBookMsg, OKXOptionSummaryMsg,
90 OKXOrderMsg, OKXWebSocketError, OKXWsMessage, WsAttachAlgoOrdParams,
91 WsAttachAlgoOrdParamsBuilder,
92 },
93 parse::{
94 extract_fees_from_cached_instrument, parse_algo_order_msg, parse_book_msg_vec,
95 parse_index_price_msg_vec, parse_option_summary_greeks, parse_order_msg_vec,
96 parse_ws_message_data,
97 },
98 },
99};
100
101fn parse_attach_algo_ords(
102 py: Python<'_>,
103 attach_algo_ords: Option<Vec<Py<PyDict>>>,
104) -> PyResult<Option<Vec<WsAttachAlgoOrdParams>>> {
105 attach_algo_ords
106 .map(|items| {
107 items
108 .into_iter()
109 .map(|item| {
110 let dict = item.bind(py);
111 let mut builder = WsAttachAlgoOrdParamsBuilder::default();
112
113 if let Some(value) = extract_optional_string(dict, "attach_algo_cl_ord_id")? {
114 builder.attach_algo_cl_ord_id(value);
115 }
116
117 if let Some(value) = extract_optional_string(dict, "sl_trigger_px")? {
118 builder.sl_trigger_px(value);
119 }
120
121 if let Some(value) = extract_optional_string(dict, "sl_ord_px")? {
122 builder.sl_ord_px(value);
123 }
124
125 if let Some(value) = extract_optional_trigger_type(dict, "sl_trigger_px_type")?
126 {
127 builder.sl_trigger_px_type(value);
128 }
129
130 if let Some(value) = extract_optional_string(dict, "tp_trigger_px")? {
131 builder.tp_trigger_px(value);
132 }
133
134 if let Some(value) = extract_optional_string(dict, "tp_ord_px")? {
135 builder.tp_ord_px(value);
136 }
137
138 if let Some(value) = extract_optional_trigger_type(dict, "tp_trigger_px_type")?
139 {
140 builder.tp_trigger_px_type(value);
141 }
142
143 builder.build().map_err(to_pyvalue_err)
144 })
145 .collect::<PyResult<Vec<_>>>()
146 })
147 .transpose()
148}
149
150#[pyo3::pymethods]
151impl OKXWebSocketError {
152 #[getter]
153 pub fn code(&self) -> &str {
154 &self.code
155 }
156
157 #[getter]
158 pub fn message(&self) -> &str {
159 &self.message
160 }
161
162 #[getter]
163 pub fn conn_id(&self) -> Option<&str> {
164 self.conn_id.as_deref()
165 }
166
167 #[getter]
168 pub fn ts_event(&self) -> u64 {
169 self.timestamp
170 }
171
172 fn __repr__(&self) -> String {
173 format!(
174 "OKXWebSocketError(code='{}', message='{}', conn_id={:?}, ts_event={})",
175 self.code, self.message, self.conn_id, self.timestamp
176 )
177 }
178}
179
180#[pymethods]
181#[pyo3_stub_gen::derive::gen_stub_pymethods]
182impl OKXWebSocketClient {
183 #[new]
185 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None, auth_timeout_secs=None, proxy_url=None))]
186 #[expect(clippy::too_many_arguments)]
187 fn py_new(
188 url: Option<String>,
189 api_key: Option<String>,
190 api_secret: Option<String>,
191 api_passphrase: Option<String>,
192 account_id: Option<AccountId>,
193 heartbeat: Option<u64>,
194 auth_timeout_secs: Option<u64>,
195 proxy_url: Option<String>,
196 ) -> PyResult<Self> {
197 Self::new(
198 url,
199 api_key,
200 api_secret,
201 api_passphrase,
202 account_id,
203 heartbeat,
204 auth_timeout_secs,
205 TransportBackend::default(),
206 proxy_url,
207 )
208 .map_err(to_pyvalue_err)
209 }
210
211 #[staticmethod]
212 #[pyo3(name = "with_credentials")]
213 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, account_id=None, heartbeat=None, auth_timeout_secs=None, proxy_url=None))]
214 #[expect(clippy::too_many_arguments)]
215 fn py_with_credentials(
216 url: Option<String>,
217 api_key: Option<String>,
218 api_secret: Option<String>,
219 api_passphrase: Option<String>,
220 account_id: Option<AccountId>,
221 heartbeat: Option<u64>,
222 auth_timeout_secs: Option<u64>,
223 proxy_url: Option<String>,
224 ) -> PyResult<Self> {
225 Self::with_credentials(
226 url,
227 api_key,
228 api_secret,
229 api_passphrase,
230 account_id,
231 heartbeat,
232 auth_timeout_secs,
233 TransportBackend::default(),
234 proxy_url,
235 )
236 .map_err(to_pyvalue_err)
237 }
238
239 #[staticmethod]
240 #[pyo3(name = "from_env")]
241 fn py_from_env() -> PyResult<Self> {
242 Self::from_env().map_err(to_pyvalue_err)
243 }
244
245 #[getter]
246 #[pyo3(name = "url")]
247 #[must_use]
248 pub fn py_url(&self) -> &str {
249 self.url()
250 }
251
252 #[getter]
253 #[pyo3(name = "api_key")]
254 #[must_use]
255 pub fn py_api_key(&self) -> Option<&str> {
256 self.api_key()
257 }
258
259 #[getter]
260 #[pyo3(name = "api_key_masked")]
261 #[must_use]
262 pub fn py_api_key_masked(&self) -> Option<String> {
263 self.api_key_masked()
264 }
265
266 #[pyo3(name = "is_active")]
267 fn py_is_active(&mut self) -> bool {
268 self.is_active()
269 }
270
271 #[pyo3(name = "is_closed")]
272 fn py_is_closed(&mut self) -> bool {
273 self.is_closed()
274 }
275
276 #[pyo3(name = "cancel_all_requests")]
277 pub fn py_cancel_all_requests(&self) {
278 self.cancel_all_requests();
279 }
280
281 #[pyo3(name = "get_subscriptions")]
282 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
283 let channels = self.get_subscriptions(instrument_id);
284
285 channels
287 .iter()
288 .map(|c| {
289 serde_json::to_value(c)
290 .ok()
291 .and_then(|v| v.as_str().map(String::from))
292 .unwrap_or_else(|| c.to_string())
293 })
294 .collect()
295 }
296
297 #[pyo3(name = "set_vip_level")]
301 fn py_set_vip_level(&self, vip_level: OKXVipLevel) {
302 self.set_vip_level(vip_level);
303 }
304
305 #[pyo3(name = "vip_level")]
307 #[getter]
308 fn py_vip_level(&self) -> OKXVipLevel {
309 self.vip_level()
310 }
311
312 #[pyo3(name = "connect")]
313 #[expect(clippy::needless_pass_by_value)]
314 fn py_connect<'py>(
315 &mut self,
316 py: Python<'py>,
317 loop_: Py<PyAny>,
318 instruments: Vec<Py<PyAny>>,
319 callback: Py<PyAny>,
320 ) -> PyResult<Bound<'py, PyAny>> {
321 let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
322
323 let mut instruments_any = Vec::new();
324
325 for inst in instruments {
326 let inst_any = pyobject_to_instrument_any(py, inst)?;
327 instruments_any.push(inst_any);
328 }
329
330 self.cache_instruments(&instruments_any);
331
332 let mut client = self.clone();
333
334 pyo3_async_runtimes::tokio::future_into_py(py, async move {
335 client.connect().await.map_err(to_pyruntime_err)?;
336
337 let stream = client.stream();
338 let clock = get_atomic_clock_realtime();
339
340 get_runtime().spawn(async move {
341 let account_id = client.account_id;
342 let mut instruments_by_symbol = client.instruments_snapshot();
343 let mut quote_cache = QuoteCache::new();
344 let mut funding_cache: AHashMap<Ustr, (Ustr, u64)> = AHashMap::new();
345 let mut fee_cache: AHashMap<Ustr, Money> = AHashMap::new();
346 let mut filled_qty_cache: AHashMap<Ustr, Quantity> = AHashMap::new();
347 let option_greeks_subs_arc = client.option_greeks_subs().clone();
348 tokio::pin!(stream);
349
350 while let Some(msg) = stream.next().await {
351 match msg {
352 OKXWsMessage::BookData { arg, action, data } => {
353 handle_book_data(
354 arg.inst_id,
355 action,
356 data,
357 &instruments_by_symbol,
358 clock,
359 &call_soon,
360 &callback,
361 );
362 }
363 OKXWsMessage::ChannelData {
364 channel,
365 inst_id,
366 data,
367 } => {
368 let greeks_guard = option_greeks_subs_arc.load();
369 handle_channel_data(
370 &channel,
371 inst_id,
372 data,
373 &mut instruments_by_symbol,
374 &mut quote_cache,
375 &mut funding_cache,
376 &greeks_guard,
377 clock,
378 &call_soon,
379 &callback,
380 );
381 }
382 OKXWsMessage::Instruments(okx_instruments) => {
383 handle_instruments(
384 okx_instruments,
385 &mut instruments_by_symbol,
386 clock,
387 &call_soon,
388 &callback,
389 );
390 }
391 OKXWsMessage::Orders(order_msgs) => {
392 handle_orders(
393 &order_msgs,
394 account_id,
395 &instruments_by_symbol,
396 &mut fee_cache,
397 &mut filled_qty_cache,
398 clock,
399 &call_soon,
400 &callback,
401 );
402 }
403 OKXWsMessage::AlgoOrders(algo_msgs) => {
404 handle_algo_orders(
405 algo_msgs,
406 account_id,
407 &instruments_by_symbol,
408 clock,
409 &call_soon,
410 &callback,
411 );
412 }
413 OKXWsMessage::Account(data) => {
414 handle_account(data, account_id, clock, &call_soon, &callback);
415 }
416 OKXWsMessage::Positions(data) => {
417 handle_positions(
418 data,
419 account_id,
420 &instruments_by_symbol,
421 clock,
422 &call_soon,
423 &callback,
424 );
425 }
426 OKXWsMessage::OrderResponse {
427 id,
428 op,
429 code,
430 msg,
431 data,
432 } => {
433 handle_order_response(
434 id.as_deref(),
435 &op,
436 &code,
437 &msg,
438 &data,
439 &client,
440 account_id,
441 clock,
442 &call_soon,
443 &callback,
444 );
445 }
446 OKXWsMessage::SendFailed {
447 request_id,
448 client_order_id,
449 op,
450 error,
451 } => {
452 handle_send_failed(
453 &request_id,
454 client_order_id,
455 op.as_ref(),
456 &error,
457 &client,
458 account_id,
459 clock,
460 &call_soon,
461 &callback,
462 );
463 }
464 OKXWsMessage::Error(msg) => {
465 call_python_with_data(&call_soon, &callback, |py| msg.into_py_any(py));
466 }
467 OKXWsMessage::Reconnected => {
468 quote_cache.clear();
469 }
470 OKXWsMessage::Authenticated => {}
471 }
472 }
473 });
474
475 Ok(())
476 })
477 }
478
479 #[pyo3(name = "wait_until_active")]
480 fn py_wait_until_active<'py>(
481 &self,
482 py: Python<'py>,
483 timeout_secs: f64,
484 ) -> PyResult<Bound<'py, PyAny>> {
485 let client = self.clone();
486
487 pyo3_async_runtimes::tokio::future_into_py(py, async move {
488 client
489 .wait_until_active(timeout_secs)
490 .await
491 .map_err(to_pyruntime_err)?;
492 Ok(())
493 })
494 }
495
496 #[pyo3(name = "close")]
497 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
498 let mut client = self.clone();
499
500 pyo3_async_runtimes::tokio::future_into_py(py, async move {
501 if let Err(e) = client.close().await {
502 log::error!("Error on close: {e}");
503 }
504 Ok(())
505 })
506 }
507
508 #[pyo3(name = "subscribe_instruments")]
509 fn py_subscribe_instruments<'py>(
510 &self,
511 py: Python<'py>,
512 instrument_type: OKXInstrumentType,
513 ) -> PyResult<Bound<'py, PyAny>> {
514 let client = self.clone();
515
516 pyo3_async_runtimes::tokio::future_into_py(py, async move {
517 if let Err(e) = client.subscribe_instruments(instrument_type).await {
518 log::error!("Failed to subscribe to instruments '{instrument_type}': {e}");
519 }
520 Ok(())
521 })
522 }
523
524 #[pyo3(name = "subscribe_instrument")]
525 fn py_subscribe_instrument<'py>(
526 &self,
527 py: Python<'py>,
528 instrument_id: InstrumentId,
529 ) -> PyResult<Bound<'py, PyAny>> {
530 let client = self.clone();
531
532 pyo3_async_runtimes::tokio::future_into_py(py, async move {
533 if let Err(e) = client.subscribe_instrument(instrument_id).await {
534 log::error!("Failed to subscribe to instrument {instrument_id}: {e}");
535 }
536 Ok(())
537 })
538 }
539
540 #[pyo3(name = "subscribe_book")]
541 fn py_subscribe_book<'py>(
542 &self,
543 py: Python<'py>,
544 instrument_id: InstrumentId,
545 ) -> PyResult<Bound<'py, PyAny>> {
546 let client = self.clone();
547
548 pyo3_async_runtimes::tokio::future_into_py(py, async move {
549 client
550 .subscribe_book(instrument_id)
551 .await
552 .map_err(to_pyvalue_err)
553 })
554 }
555
556 #[pyo3(name = "subscribe_book50_l2_tbt")]
557 fn py_subscribe_book50_l2_tbt<'py>(
558 &self,
559 py: Python<'py>,
560 instrument_id: InstrumentId,
561 ) -> PyResult<Bound<'py, PyAny>> {
562 let client = self.clone();
563
564 pyo3_async_runtimes::tokio::future_into_py(py, async move {
565 if let Err(e) = client.subscribe_book50_l2_tbt(instrument_id).await {
566 log::error!("Failed to subscribe to book50_tbt: {e}");
567 }
568 Ok(())
569 })
570 }
571
572 #[pyo3(name = "subscribe_book_l2_tbt")]
573 fn py_subscribe_book_l2_tbt<'py>(
574 &self,
575 py: Python<'py>,
576 instrument_id: InstrumentId,
577 ) -> PyResult<Bound<'py, PyAny>> {
578 let client = self.clone();
579
580 pyo3_async_runtimes::tokio::future_into_py(py, async move {
581 if let Err(e) = client.subscribe_book_l2_tbt(instrument_id).await {
582 log::error!("Failed to subscribe to books_l2_tbt: {e}");
583 }
584 Ok(())
585 })
586 }
587
588 #[pyo3(name = "subscribe_book_with_depth")]
589 fn py_subscribe_book_with_depth<'py>(
590 &self,
591 py: Python<'py>,
592 instrument_id: InstrumentId,
593 depth: u16,
594 ) -> PyResult<Bound<'py, PyAny>> {
595 let client = self.clone();
596
597 pyo3_async_runtimes::tokio::future_into_py(py, async move {
598 client
599 .subscribe_book_with_depth(instrument_id, depth)
600 .await
601 .map_err(to_pyvalue_err)
602 })
603 }
604
605 #[pyo3(name = "subscribe_book_depth5")]
606 fn py_subscribe_book_depth5<'py>(
607 &self,
608 py: Python<'py>,
609 instrument_id: InstrumentId,
610 ) -> PyResult<Bound<'py, PyAny>> {
611 let client = self.clone();
612
613 pyo3_async_runtimes::tokio::future_into_py(py, async move {
614 if let Err(e) = client.subscribe_book_depth5(instrument_id).await {
615 log::error!("Failed to subscribe to books5: {e}");
616 }
617 Ok(())
618 })
619 }
620
621 #[pyo3(name = "subscribe_quotes")]
622 fn py_subscribe_quotes<'py>(
623 &self,
624 py: Python<'py>,
625 instrument_id: InstrumentId,
626 ) -> PyResult<Bound<'py, PyAny>> {
627 let client = self.clone();
628
629 pyo3_async_runtimes::tokio::future_into_py(py, async move {
630 if let Err(e) = client.subscribe_quotes(instrument_id).await {
631 log::error!("Failed to subscribe to quotes: {e}");
632 }
633 Ok(())
634 })
635 }
636
637 #[pyo3(name = "subscribe_trades")]
638 fn py_subscribe_trades<'py>(
639 &self,
640 py: Python<'py>,
641 instrument_id: InstrumentId,
642 aggregated: bool,
643 ) -> PyResult<Bound<'py, PyAny>> {
644 let client = self.clone();
645
646 pyo3_async_runtimes::tokio::future_into_py(py, async move {
647 if let Err(e) = client.subscribe_trades(instrument_id, aggregated).await {
648 log::error!("Failed to subscribe to trades: {e}");
649 }
650 Ok(())
651 })
652 }
653
654 #[pyo3(name = "subscribe_bars")]
655 fn py_subscribe_bars<'py>(
656 &self,
657 py: Python<'py>,
658 bar_type: BarType,
659 ) -> PyResult<Bound<'py, PyAny>> {
660 let client = self.clone();
661
662 pyo3_async_runtimes::tokio::future_into_py(py, async move {
663 if let Err(e) = client.subscribe_bars(bar_type).await {
664 log::error!("Failed to subscribe to bars: {e}");
665 }
666 Ok(())
667 })
668 }
669
670 #[pyo3(name = "unsubscribe_book")]
671 fn py_unsubscribe_book<'py>(
672 &self,
673 py: Python<'py>,
674 instrument_id: InstrumentId,
675 ) -> PyResult<Bound<'py, PyAny>> {
676 let client = self.clone();
677
678 pyo3_async_runtimes::tokio::future_into_py(py, async move {
679 if let Err(e) = client.unsubscribe_book(instrument_id).await {
680 log::error!("Failed to unsubscribe from order book: {e}");
681 }
682 Ok(())
683 })
684 }
685
686 #[pyo3(name = "unsubscribe_book_depth5")]
687 fn py_unsubscribe_book_depth5<'py>(
688 &self,
689 py: Python<'py>,
690 instrument_id: InstrumentId,
691 ) -> PyResult<Bound<'py, PyAny>> {
692 let client = self.clone();
693
694 pyo3_async_runtimes::tokio::future_into_py(py, async move {
695 if let Err(e) = client.unsubscribe_book_depth5(instrument_id).await {
696 log::error!("Failed to unsubscribe from books5: {e}");
697 }
698 Ok(())
699 })
700 }
701
702 #[pyo3(name = "unsubscribe_book50_l2_tbt")]
703 fn py_unsubscribe_book50_l2_tbt<'py>(
704 &self,
705 py: Python<'py>,
706 instrument_id: InstrumentId,
707 ) -> PyResult<Bound<'py, PyAny>> {
708 let client = self.clone();
709
710 pyo3_async_runtimes::tokio::future_into_py(py, async move {
711 if let Err(e) = client.unsubscribe_book50_l2_tbt(instrument_id).await {
712 log::error!("Failed to unsubscribe from books50_l2_tbt: {e}");
713 }
714 Ok(())
715 })
716 }
717
718 #[pyo3(name = "unsubscribe_book_l2_tbt")]
719 fn py_unsubscribe_book_l2_tbt<'py>(
720 &self,
721 py: Python<'py>,
722 instrument_id: InstrumentId,
723 ) -> PyResult<Bound<'py, PyAny>> {
724 let client = self.clone();
725
726 pyo3_async_runtimes::tokio::future_into_py(py, async move {
727 if let Err(e) = client.unsubscribe_book_l2_tbt(instrument_id).await {
728 log::error!("Failed to unsubscribe from books_l2_tbt: {e}");
729 }
730 Ok(())
731 })
732 }
733
734 #[pyo3(name = "unsubscribe_quotes")]
735 fn py_unsubscribe_quotes<'py>(
736 &self,
737 py: Python<'py>,
738 instrument_id: InstrumentId,
739 ) -> PyResult<Bound<'py, PyAny>> {
740 let client = self.clone();
741
742 pyo3_async_runtimes::tokio::future_into_py(py, async move {
743 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
744 log::error!("Failed to unsubscribe from quotes: {e}");
745 }
746 Ok(())
747 })
748 }
749
750 #[pyo3(name = "unsubscribe_trades")]
751 fn py_unsubscribe_trades<'py>(
752 &self,
753 py: Python<'py>,
754 instrument_id: InstrumentId,
755 aggregated: bool,
756 ) -> PyResult<Bound<'py, PyAny>> {
757 let client = self.clone();
758
759 pyo3_async_runtimes::tokio::future_into_py(py, async move {
760 if let Err(e) = client.unsubscribe_trades(instrument_id, aggregated).await {
761 log::error!("Failed to unsubscribe from trades: {e}");
762 }
763 Ok(())
764 })
765 }
766
767 #[pyo3(name = "unsubscribe_bars")]
768 fn py_unsubscribe_bars<'py>(
769 &self,
770 py: Python<'py>,
771 bar_type: BarType,
772 ) -> PyResult<Bound<'py, PyAny>> {
773 let client = self.clone();
774
775 pyo3_async_runtimes::tokio::future_into_py(py, async move {
776 if let Err(e) = client.unsubscribe_bars(bar_type).await {
777 log::error!("Failed to unsubscribe from bars: {e}");
778 }
779 Ok(())
780 })
781 }
782
783 #[pyo3(name = "subscribe_ticker")]
784 fn py_subscribe_ticker<'py>(
785 &self,
786 py: Python<'py>,
787 instrument_id: InstrumentId,
788 ) -> PyResult<Bound<'py, PyAny>> {
789 let client = self.clone();
790
791 pyo3_async_runtimes::tokio::future_into_py(py, async move {
792 if let Err(e) = client.subscribe_ticker(instrument_id).await {
793 log::error!("Failed to subscribe to ticker: {e}");
794 }
795 Ok(())
796 })
797 }
798
799 #[pyo3(name = "unsubscribe_ticker")]
800 fn py_unsubscribe_ticker<'py>(
801 &self,
802 py: Python<'py>,
803 instrument_id: InstrumentId,
804 ) -> PyResult<Bound<'py, PyAny>> {
805 let client = self.clone();
806
807 pyo3_async_runtimes::tokio::future_into_py(py, async move {
808 if let Err(e) = client.unsubscribe_ticker(instrument_id).await {
809 log::error!("Failed to unsubscribe from ticker: {e}");
810 }
811 Ok(())
812 })
813 }
814
815 #[pyo3(name = "subscribe_mark_prices")]
816 fn py_subscribe_mark_prices<'py>(
817 &self,
818 py: Python<'py>,
819 instrument_id: InstrumentId,
820 ) -> PyResult<Bound<'py, PyAny>> {
821 let client = self.clone();
822
823 pyo3_async_runtimes::tokio::future_into_py(py, async move {
824 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
825 log::error!("Failed to subscribe to mark prices: {e}");
826 }
827 Ok(())
828 })
829 }
830
831 #[pyo3(name = "unsubscribe_mark_prices")]
832 fn py_unsubscribe_mark_prices<'py>(
833 &self,
834 py: Python<'py>,
835 instrument_id: InstrumentId,
836 ) -> PyResult<Bound<'py, PyAny>> {
837 let client = self.clone();
838
839 pyo3_async_runtimes::tokio::future_into_py(py, async move {
840 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
841 log::error!("Failed to unsubscribe from mark prices: {e}");
842 }
843 Ok(())
844 })
845 }
846
847 #[pyo3(name = "subscribe_index_prices")]
848 fn py_subscribe_index_prices<'py>(
849 &self,
850 py: Python<'py>,
851 instrument_id: InstrumentId,
852 ) -> PyResult<Bound<'py, PyAny>> {
853 let client = self.clone();
854
855 pyo3_async_runtimes::tokio::future_into_py(py, async move {
856 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
857 log::error!("Failed to subscribe to index prices: {e}");
858 }
859 Ok(())
860 })
861 }
862
863 #[pyo3(name = "unsubscribe_index_prices")]
864 fn py_unsubscribe_index_prices<'py>(
865 &self,
866 py: Python<'py>,
867 instrument_id: InstrumentId,
868 ) -> PyResult<Bound<'py, PyAny>> {
869 let client = self.clone();
870
871 pyo3_async_runtimes::tokio::future_into_py(py, async move {
872 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
873 log::error!("Failed to unsubscribe from index prices: {e}");
874 }
875 Ok(())
876 })
877 }
878
879 #[pyo3(name = "add_option_greeks_sub")]
880 fn py_add_option_greeks_sub(&self, instrument_id: InstrumentId) {
881 self.add_option_greeks_sub(instrument_id);
882 }
883
884 #[pyo3(name = "add_option_greeks_sub_with_conventions")]
885 fn py_add_option_greeks_sub_with_conventions(
886 &self,
887 instrument_id: InstrumentId,
888 conventions: Vec<OKXGreeksType>,
889 ) {
890 self.add_option_greeks_sub_with_conventions(
891 instrument_id,
892 conventions.into_iter().collect(),
893 );
894 }
895
896 #[pyo3(name = "remove_option_greeks_sub")]
897 fn py_remove_option_greeks_sub(&self, instrument_id: InstrumentId) {
898 self.remove_option_greeks_sub(&instrument_id);
899 }
900
901 #[pyo3(name = "subscribe_option_summary")]
902 fn py_subscribe_option_summary<'py>(
903 &self,
904 py: Python<'py>,
905 inst_family: &str,
906 ) -> PyResult<Bound<'py, PyAny>> {
907 let client = self.clone();
908 let family = Ustr::from(inst_family);
909
910 pyo3_async_runtimes::tokio::future_into_py(py, async move {
911 if let Err(e) = client.subscribe_option_summary(family).await {
912 log::error!("Failed to subscribe to option summary: {e}");
913 }
914 Ok(())
915 })
916 }
917
918 #[pyo3(name = "unsubscribe_option_summary")]
919 fn py_unsubscribe_option_summary<'py>(
920 &self,
921 py: Python<'py>,
922 inst_family: &str,
923 ) -> PyResult<Bound<'py, PyAny>> {
924 let client = self.clone();
925 let family = Ustr::from(inst_family);
926
927 pyo3_async_runtimes::tokio::future_into_py(py, async move {
928 if let Err(e) = client.unsubscribe_option_summary(family).await {
929 log::error!("Failed to unsubscribe from option summary: {e}");
930 }
931 Ok(())
932 })
933 }
934
935 #[pyo3(name = "subscribe_funding_rates")]
936 fn py_subscribe_funding_rates<'py>(
937 &self,
938 py: Python<'py>,
939 instrument_id: InstrumentId,
940 ) -> PyResult<Bound<'py, PyAny>> {
941 let client = self.clone();
942
943 pyo3_async_runtimes::tokio::future_into_py(py, async move {
944 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
945 log::error!("Failed to subscribe to funding rates: {e}");
946 }
947 Ok(())
948 })
949 }
950
951 #[pyo3(name = "unsubscribe_funding_rates")]
952 fn py_unsubscribe_funding_rates<'py>(
953 &self,
954 py: Python<'py>,
955 instrument_id: InstrumentId,
956 ) -> PyResult<Bound<'py, PyAny>> {
957 let client = self.clone();
958
959 pyo3_async_runtimes::tokio::future_into_py(py, async move {
960 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
961 log::error!("Failed to unsubscribe from funding rates: {e}");
962 }
963 Ok(())
964 })
965 }
966
967 #[pyo3(name = "subscribe_orders")]
968 fn py_subscribe_orders<'py>(
969 &self,
970 py: Python<'py>,
971 instrument_type: OKXInstrumentType,
972 ) -> PyResult<Bound<'py, PyAny>> {
973 let client = self.clone();
974
975 pyo3_async_runtimes::tokio::future_into_py(py, async move {
976 if let Err(e) = client.subscribe_orders(instrument_type).await {
977 log::error!("Failed to subscribe to orders '{instrument_type}': {e}");
978 }
979 Ok(())
980 })
981 }
982
983 #[pyo3(name = "unsubscribe_orders")]
984 fn py_unsubscribe_orders<'py>(
985 &self,
986 py: Python<'py>,
987 instrument_type: OKXInstrumentType,
988 ) -> PyResult<Bound<'py, PyAny>> {
989 let client = self.clone();
990
991 pyo3_async_runtimes::tokio::future_into_py(py, async move {
992 if let Err(e) = client.unsubscribe_orders(instrument_type).await {
993 log::error!("Failed to unsubscribe from orders '{instrument_type}': {e}");
994 }
995 Ok(())
996 })
997 }
998
999 #[pyo3(name = "subscribe_orders_algo")]
1000 fn py_subscribe_orders_algo<'py>(
1001 &self,
1002 py: Python<'py>,
1003 instrument_type: OKXInstrumentType,
1004 ) -> PyResult<Bound<'py, PyAny>> {
1005 let client = self.clone();
1006
1007 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1008 if let Err(e) = client.subscribe_orders_algo(instrument_type).await {
1009 log::error!("Failed to subscribe to algo orders '{instrument_type}': {e}");
1010 }
1011 Ok(())
1012 })
1013 }
1014
1015 #[pyo3(name = "unsubscribe_orders_algo")]
1016 fn py_unsubscribe_orders_algo<'py>(
1017 &self,
1018 py: Python<'py>,
1019 instrument_type: OKXInstrumentType,
1020 ) -> PyResult<Bound<'py, PyAny>> {
1021 let client = self.clone();
1022
1023 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1024 if let Err(e) = client.unsubscribe_orders_algo(instrument_type).await {
1025 log::error!("Failed to unsubscribe from algo orders '{instrument_type}': {e}");
1026 }
1027 Ok(())
1028 })
1029 }
1030
1031 #[pyo3(name = "subscribe_algo_advance")]
1032 fn py_subscribe_algo_advance<'py>(
1033 &self,
1034 py: Python<'py>,
1035 instrument_type: OKXInstrumentType,
1036 ) -> PyResult<Bound<'py, PyAny>> {
1037 let client = self.clone();
1038
1039 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1040 if let Err(e) = client.subscribe_algo_advance(instrument_type).await {
1041 log::error!("Failed to subscribe to algo-advance '{instrument_type}': {e}");
1042 }
1043 Ok(())
1044 })
1045 }
1046
1047 #[pyo3(name = "unsubscribe_algo_advance")]
1048 fn py_unsubscribe_algo_advance<'py>(
1049 &self,
1050 py: Python<'py>,
1051 instrument_type: OKXInstrumentType,
1052 ) -> PyResult<Bound<'py, PyAny>> {
1053 let client = self.clone();
1054
1055 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1056 if let Err(e) = client.unsubscribe_algo_advance(instrument_type).await {
1057 log::error!("Failed to unsubscribe from algo-advance '{instrument_type}': {e}");
1058 }
1059 Ok(())
1060 })
1061 }
1062
1063 #[pyo3(name = "subscribe_fills")]
1064 fn py_subscribe_fills<'py>(
1065 &self,
1066 py: Python<'py>,
1067 instrument_type: OKXInstrumentType,
1068 ) -> PyResult<Bound<'py, PyAny>> {
1069 let client = self.clone();
1070
1071 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1072 if let Err(e) = client.subscribe_fills(instrument_type).await {
1073 log::error!("Failed to subscribe to fills '{instrument_type}': {e}");
1074 }
1075 Ok(())
1076 })
1077 }
1078
1079 #[pyo3(name = "unsubscribe_fills")]
1080 fn py_unsubscribe_fills<'py>(
1081 &self,
1082 py: Python<'py>,
1083 instrument_type: OKXInstrumentType,
1084 ) -> PyResult<Bound<'py, PyAny>> {
1085 let client = self.clone();
1086
1087 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1088 if let Err(e) = client.unsubscribe_fills(instrument_type).await {
1089 log::error!("Failed to unsubscribe from fills '{instrument_type}': {e}");
1090 }
1091 Ok(())
1092 })
1093 }
1094
1095 #[pyo3(name = "subscribe_account")]
1096 fn py_subscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1097 let client = self.clone();
1098
1099 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1100 if let Err(e) = client.subscribe_account().await {
1101 log::error!("Failed to subscribe to account: {e}");
1102 }
1103 Ok(())
1104 })
1105 }
1106
1107 #[pyo3(name = "unsubscribe_account")]
1108 fn py_unsubscribe_account<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1109 let client = self.clone();
1110
1111 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1112 if let Err(e) = client.unsubscribe_account().await {
1113 log::error!("Failed to unsubscribe from account: {e}");
1114 }
1115 Ok(())
1116 })
1117 }
1118
1119 #[pyo3(name = "submit_order")]
1120 #[pyo3(signature = (
1121 trader_id,
1122 strategy_id,
1123 instrument_id,
1124 td_mode,
1125 client_order_id,
1126 order_side,
1127 order_type,
1128 quantity,
1129 time_in_force=None,
1130 price=None,
1131 trigger_price=None,
1132 post_only=None,
1133 reduce_only=None,
1134 quote_quantity=None,
1135 position_side=None,
1136 attach_algo_ords=None,
1137 px_usd=None,
1138 px_vol=None,
1139 ))]
1140 #[expect(clippy::too_many_arguments)]
1141 fn py_submit_order<'py>(
1142 &self,
1143 py: Python<'py>,
1144 trader_id: TraderId,
1145 strategy_id: StrategyId,
1146 instrument_id: InstrumentId,
1147 td_mode: OKXTradeMode,
1148 client_order_id: ClientOrderId,
1149 order_side: OrderSide,
1150 order_type: OrderType,
1151 quantity: Quantity,
1152 time_in_force: Option<TimeInForce>,
1153 price: Option<Price>,
1154 trigger_price: Option<Price>,
1155 post_only: Option<bool>,
1156 reduce_only: Option<bool>,
1157 quote_quantity: Option<bool>,
1158 position_side: Option<PositionSide>,
1159 attach_algo_ords: Option<Vec<Py<PyDict>>>,
1160 px_usd: Option<String>,
1161 px_vol: Option<String>,
1162 ) -> PyResult<Bound<'py, PyAny>> {
1163 let attach_algo_ords = parse_attach_algo_ords(py, attach_algo_ords)?;
1164 let client = self.clone();
1165
1166 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1167 client
1168 .submit_order(
1169 trader_id,
1170 strategy_id,
1171 instrument_id,
1172 td_mode,
1173 client_order_id,
1174 order_side,
1175 order_type,
1176 quantity,
1177 time_in_force,
1178 price,
1179 trigger_price,
1180 post_only,
1181 reduce_only,
1182 quote_quantity,
1183 position_side,
1184 attach_algo_ords,
1185 px_usd,
1186 px_vol,
1187 )
1188 .await
1189 .map_err(to_pyvalue_err)
1190 })
1191 }
1192
1193 #[pyo3(name = "cancel_order", signature = (
1194 trader_id,
1195 strategy_id,
1196 instrument_id,
1197 client_order_id=None,
1198 venue_order_id=None,
1199 ))]
1200 fn py_cancel_order<'py>(
1201 &self,
1202 py: Python<'py>,
1203 trader_id: TraderId,
1204 strategy_id: StrategyId,
1205 instrument_id: InstrumentId,
1206 client_order_id: Option<ClientOrderId>,
1207 venue_order_id: Option<VenueOrderId>,
1208 ) -> PyResult<Bound<'py, PyAny>> {
1209 let client = self.clone();
1210
1211 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1212 client
1213 .cancel_order(
1214 trader_id,
1215 strategy_id,
1216 instrument_id,
1217 client_order_id,
1218 venue_order_id,
1219 )
1220 .await
1221 .map_err(to_pyvalue_err)
1222 })
1223 }
1224
1225 #[pyo3(name = "modify_order")]
1226 #[pyo3(signature = (
1227 trader_id,
1228 strategy_id,
1229 instrument_id,
1230 client_order_id=None,
1231 venue_order_id=None,
1232 price=None,
1233 quantity=None,
1234 new_px_usd=None,
1235 new_px_vol=None,
1236 ))]
1237 #[expect(clippy::too_many_arguments)]
1238 fn py_modify_order<'py>(
1239 &self,
1240 py: Python<'py>,
1241 trader_id: TraderId,
1242 strategy_id: StrategyId,
1243 instrument_id: InstrumentId,
1244 client_order_id: Option<ClientOrderId>,
1245 venue_order_id: Option<VenueOrderId>,
1246 price: Option<Price>,
1247 quantity: Option<Quantity>,
1248 new_px_usd: Option<String>,
1249 new_px_vol: Option<String>,
1250 ) -> PyResult<Bound<'py, PyAny>> {
1251 let client = self.clone();
1252
1253 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1254 client
1255 .modify_order(
1256 trader_id,
1257 strategy_id,
1258 instrument_id,
1259 client_order_id,
1260 price,
1261 quantity,
1262 venue_order_id,
1263 new_px_usd,
1264 new_px_vol,
1265 )
1266 .await
1267 .map_err(to_pyvalue_err)
1268 })
1269 }
1270
1271 #[expect(clippy::type_complexity)]
1272 #[pyo3(name = "batch_submit_orders")]
1273 fn py_batch_submit_orders<'py>(
1274 &self,
1275 py: Python<'py>,
1276 orders: Vec<Py<PyAny>>,
1277 ) -> PyResult<Bound<'py, PyAny>> {
1278 let mut domain_orders = Vec::with_capacity(orders.len());
1279
1280 for obj in orders {
1281 let (
1282 instrument_type,
1283 instrument_id,
1284 td_mode,
1285 client_order_id,
1286 order_side,
1287 order_type,
1288 quantity,
1289 position_side,
1290 price,
1291 trigger_price,
1292 post_only,
1293 reduce_only,
1294 ): (
1295 OKXInstrumentType,
1296 InstrumentId,
1297 OKXTradeMode,
1298 ClientOrderId,
1299 OrderSide,
1300 OrderType,
1301 Quantity,
1302 Option<PositionSide>,
1303 Option<Price>,
1304 Option<Price>,
1305 Option<bool>,
1306 Option<bool>,
1307 ) = obj.extract(py).map_err(to_pyruntime_err)?;
1308
1309 domain_orders.push((
1310 instrument_type,
1311 instrument_id,
1312 td_mode,
1313 client_order_id,
1314 order_side,
1315 position_side,
1316 order_type,
1317 quantity,
1318 price,
1319 trigger_price,
1320 post_only,
1321 reduce_only,
1322 ));
1323 }
1324
1325 let client = self.clone();
1326
1327 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1328 client
1329 .batch_submit_orders(domain_orders)
1330 .await
1331 .map_err(to_pyvalue_err)
1332 })
1333 }
1334
1335 #[pyo3(name = "batch_cancel_orders")]
1337 fn py_batch_cancel_orders<'py>(
1338 &self,
1339 py: Python<'py>,
1340 cancels: Vec<Py<PyAny>>,
1341 ) -> PyResult<Bound<'py, PyAny>> {
1342 let mut batched_cancels = Vec::with_capacity(cancels.len());
1343
1344 for obj in cancels {
1345 let (instrument_id, client_order_id, order_id): (
1346 InstrumentId,
1347 Option<ClientOrderId>,
1348 Option<VenueOrderId>,
1349 ) = obj.extract(py).map_err(to_pyruntime_err)?;
1350 batched_cancels.push((instrument_id, client_order_id, order_id));
1351 }
1352
1353 let client = self.clone();
1354
1355 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1356 client
1357 .batch_cancel_orders(batched_cancels)
1358 .await
1359 .map_err(to_pyvalue_err)
1360 })
1361 }
1362
1363 #[pyo3(name = "batch_modify_orders")]
1364 fn py_batch_modify_orders<'py>(
1365 &self,
1366 py: Python<'py>,
1367 orders: Vec<Py<PyAny>>,
1368 ) -> PyResult<Bound<'py, PyAny>> {
1369 let mut domain_orders = Vec::with_capacity(orders.len());
1370
1371 for obj in orders {
1372 let (
1373 instrument_type,
1374 instrument_id,
1375 client_order_id,
1376 new_client_order_id,
1377 price,
1378 quantity,
1379 ): (
1380 String,
1381 InstrumentId,
1382 ClientOrderId,
1383 ClientOrderId,
1384 Option<Price>,
1385 Option<Quantity>,
1386 ) = obj.extract(py).map_err(to_pyruntime_err)?;
1387 let inst_type =
1388 OKXInstrumentType::from_str(&instrument_type).map_err(to_pyvalue_err)?;
1389 domain_orders.push((
1390 inst_type,
1391 instrument_id,
1392 client_order_id,
1393 new_client_order_id,
1394 price,
1395 quantity,
1396 ));
1397 }
1398
1399 let client = self.clone();
1400
1401 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1402 client
1403 .batch_modify_orders(domain_orders)
1404 .await
1405 .map_err(to_pyvalue_err)
1406 })
1407 }
1408
1409 #[pyo3(name = "mass_cancel_orders")]
1410 fn py_mass_cancel_orders<'py>(
1411 &self,
1412 py: Python<'py>,
1413 instrument_id: InstrumentId,
1414 ) -> PyResult<Bound<'py, PyAny>> {
1415 let client = self.clone();
1416
1417 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1418 client
1419 .mass_cancel_orders(instrument_id)
1420 .await
1421 .map_err(to_pyvalue_err)
1422 })
1423 }
1424
1425 #[pyo3(name = "cache_instruments")]
1426 fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
1427 let instruments: Result<Vec<_>, _> = instruments
1428 .into_iter()
1429 .map(|inst| pyobject_to_instrument_any(py, inst))
1430 .collect();
1431 self.cache_instruments(&instruments?);
1432 Ok(())
1433 }
1434
1435 #[pyo3(name = "cache_instrument")]
1436 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
1437 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
1438 Ok(())
1439 }
1440
1441 #[pyo3(name = "cache_inst_id_codes")]
1442 fn py_cache_inst_id_codes(&self, mappings: Vec<(String, u64)>) {
1443 let ustr_mappings = mappings
1444 .into_iter()
1445 .map(|(inst_id, code)| (Ustr::from(&inst_id), code));
1446 self.cache_inst_id_codes(ustr_mappings);
1447 }
1448}
1449
1450fn handle_book_data(
1451 inst_id: Option<Ustr>,
1452 action: OKXBookAction,
1453 data: Vec<OKXBookMsg>,
1454 instruments_by_symbol: &AHashMap<Ustr, InstrumentAny>,
1455 clock: &AtomicTime,
1456 call_soon: &Py<PyAny>,
1457 callback: &Py<PyAny>,
1458) {
1459 let Some(inst_id) = inst_id else { return };
1460 let Some(instrument) = instruments_by_symbol.get(&inst_id) else {
1461 log::warn!("No cached instrument for book data: {inst_id}");
1462 return;
1463 };
1464 let ts_init = clock.get_time_ns();
1465
1466 match parse_book_msg_vec(
1467 data,
1468 &instrument.id(),
1469 instrument.price_precision(),
1470 instrument.size_precision(),
1471 action,
1472 ts_init,
1473 ) {
1474 Ok(data_vec) => Python::attach(|py| {
1475 for d in data_vec {
1476 let py_obj = data_to_pycapsule(py, d);
1477 call_python_threadsafe(py, call_soon, callback, py_obj);
1478 }
1479 }),
1480 Err(e) => log::error!("Failed to parse book data: {e}"),
1481 }
1482}
1483
1484#[expect(clippy::too_many_arguments)]
1485fn handle_channel_data(
1486 channel: &OKXWsChannel,
1487 inst_id: Option<Ustr>,
1488 data: serde_json::Value,
1489 instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
1490 quote_cache: &mut QuoteCache,
1491 funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
1492 option_greeks_subs: &AHashMap<InstrumentId, AHashSet<OKXGreeksType>>,
1493 clock: &AtomicTime,
1494 call_soon: &Py<PyAny>,
1495 callback: &Py<PyAny>,
1496) {
1497 if matches!(channel, OKXWsChannel::OptionSummary) {
1498 let ts_init = clock.get_time_ns();
1499
1500 match serde_json::from_value::<Vec<OKXOptionSummaryMsg>>(data) {
1501 Ok(msgs) => {
1502 for msg in &msgs {
1503 let Some(instrument) = instruments_by_symbol.get(&msg.inst_id) else {
1504 continue;
1505 };
1506 let instrument_id = instrument.id();
1507 let Some(conventions) = option_greeks_subs.get(&instrument_id) else {
1508 continue;
1509 };
1510
1511 for greeks_type in conventions {
1512 match parse_option_summary_greeks(
1513 msg,
1514 &instrument_id,
1515 *greeks_type,
1516 ts_init,
1517 ) {
1518 Ok(greeks) => {
1519 Python::attach(|py| match greeks.into_py_any(py) {
1520 Ok(py_obj) => {
1521 call_python_threadsafe(py, call_soon, callback, py_obj);
1522 }
1523 Err(e) => {
1524 log::error!(
1525 "Failed to convert OptionGreeks to Python: {e}"
1526 );
1527 }
1528 });
1529 }
1530 Err(e) => {
1531 log::error!(
1532 "Failed to parse option summary for {} ({greeks_type:?}): {e}",
1533 msg.inst_id
1534 );
1535 }
1536 }
1537 }
1538 }
1539 }
1540 Err(e) => log::error!("Failed to deserialize option summary data: {e}"),
1541 }
1542 return;
1543 }
1544
1545 let Some(inst_id) = inst_id else { return };
1546
1547 if matches!(channel, OKXWsChannel::IndexTickers) {
1548 let ts_init = clock.get_time_ns();
1549 let prefix = format!("{inst_id}-");
1550 let matching: Vec<_> = instruments_by_symbol
1551 .values()
1552 .filter(|i| {
1553 let s = i.symbol().inner();
1554 s == inst_id || s.as_str().starts_with(&prefix)
1555 })
1556 .collect();
1557
1558 for instrument in matching {
1559 if let Ok(data_vec) = parse_index_price_msg_vec(
1560 data.clone(),
1561 &instrument.id(),
1562 instrument.price_precision(),
1563 ts_init,
1564 ) {
1565 Python::attach(|py| {
1566 for d in data_vec {
1567 let py_obj = data_to_pycapsule(py, d);
1568 call_python_threadsafe(py, call_soon, callback, py_obj);
1569 }
1570 });
1571 }
1572 }
1573 return;
1574 }
1575
1576 let Some(instrument) = instruments_by_symbol.get(&inst_id) else {
1577 log::warn!("No cached instrument for {channel:?}: {inst_id}");
1578 return;
1579 };
1580 let instrument_id = instrument.id();
1581 let price_precision = instrument.price_precision();
1582 let size_precision = instrument.size_precision();
1583 let ts_init = clock.get_time_ns();
1584
1585 if matches!(channel, OKXWsChannel::BboTbt) {
1586 handle_bbo_tbt(
1587 data,
1588 instrument_id,
1589 price_precision,
1590 size_precision,
1591 ts_init,
1592 quote_cache,
1593 call_soon,
1594 callback,
1595 );
1596 return;
1597 }
1598
1599 match parse_ws_message_data(
1600 channel,
1601 data,
1602 &instrument_id,
1603 price_precision,
1604 size_precision,
1605 ts_init,
1606 funding_cache,
1607 instruments_by_symbol,
1608 ) {
1609 Ok(Some(ws_msg)) => {
1610 dispatch_nautilus_ws_msg_to_python(ws_msg, call_soon, callback, instruments_by_symbol);
1611 }
1612 Ok(None) => {}
1613 Err(e) => {
1614 log::error!("Failed to parse {channel:?} data: {e}");
1615 }
1616 }
1617}
1618
1619#[expect(clippy::too_many_arguments)]
1620fn handle_bbo_tbt(
1621 data: serde_json::Value,
1622 instrument_id: InstrumentId,
1623 price_precision: u8,
1624 size_precision: u8,
1625 ts_init: UnixNanos,
1626 quote_cache: &mut QuoteCache,
1627 call_soon: &Py<PyAny>,
1628 callback: &Py<PyAny>,
1629) {
1630 let msgs: Vec<OKXBookMsg> = match serde_json::from_value(data) {
1631 Ok(msgs) => msgs,
1632 Err(e) => {
1633 log::error!("Failed to deserialize BboTbt data: {e}");
1634 return;
1635 }
1636 };
1637
1638 for msg in &msgs {
1639 let bid = msg.bids.first();
1640 let ask = msg.asks.first();
1641
1642 let bid_price = bid.and_then(|e| parse_price(&e.price, price_precision).ok());
1643 let bid_size = bid.and_then(|e| parse_quantity(&e.size, size_precision).ok());
1644 let ask_price = ask.and_then(|e| parse_price(&e.price, price_precision).ok());
1645 let ask_size = ask.and_then(|e| parse_quantity(&e.size, size_precision).ok());
1646 let ts_event = parse_millisecond_timestamp(msg.ts);
1647
1648 match quote_cache.process(
1649 instrument_id,
1650 bid_price,
1651 ask_price,
1652 bid_size,
1653 ask_size,
1654 ts_event,
1655 ts_init,
1656 ) {
1657 Ok(quote) => {
1658 Python::attach(|py| {
1659 let py_obj = data_to_pycapsule(py, Data::Quote(quote));
1660 call_python_threadsafe(py, call_soon, callback, py_obj);
1661 });
1662 }
1663 Err(e) => {
1664 log::debug!("Skipping partial BboTbt for {instrument_id}: {e}");
1665 }
1666 }
1667 }
1668}
1669
1670fn handle_instruments(
1671 okx_instruments: Vec<OKXInstrument>,
1672 instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
1673 clock: &AtomicTime,
1674 call_soon: &Py<PyAny>,
1675 callback: &Py<PyAny>,
1676) {
1677 let ts_init = clock.get_time_ns();
1678
1679 for okx_inst in okx_instruments {
1680 let inst_key = Ustr::from(&okx_inst.inst_id);
1681 let (margin_init, margin_maint, maker_fee, taker_fee) =
1682 instruments_by_symbol.get(&inst_key).map_or(
1683 (None, None, None, None),
1684 extract_fees_from_cached_instrument,
1685 );
1686 let status_action = okx_status_to_market_action(okx_inst.state);
1687 let is_live = matches!(okx_inst.state, OKXInstrumentStatus::Live);
1688
1689 if let Ok(Some(inst_any)) = parse_instrument_any(
1690 &okx_inst,
1691 margin_init,
1692 margin_maint,
1693 maker_fee,
1694 taker_fee,
1695 ts_init,
1696 ) {
1697 let instrument_id = inst_any.id();
1698 instruments_by_symbol.insert(inst_any.symbol().inner(), inst_any.clone());
1699 call_python_with_data(call_soon, callback, |py| {
1700 instrument_any_to_pyobject(py, inst_any)
1701 });
1702 let status = InstrumentStatus::new(
1703 instrument_id,
1704 status_action,
1705 ts_init,
1706 ts_init,
1707 None,
1708 None,
1709 Some(is_live),
1710 None,
1711 None,
1712 );
1713 call_python_with_data(call_soon, callback, |py| status.into_py_any(py));
1714 }
1715 }
1716}
1717
1718#[expect(clippy::too_many_arguments)]
1719fn handle_orders(
1720 order_msgs: &[OKXOrderMsg],
1721 account_id: AccountId,
1722 instruments_by_symbol: &AHashMap<Ustr, InstrumentAny>,
1723 fee_cache: &mut AHashMap<Ustr, Money>,
1724 filled_qty_cache: &mut AHashMap<Ustr, Quantity>,
1725 clock: &AtomicTime,
1726 call_soon: &Py<PyAny>,
1727 callback: &Py<PyAny>,
1728) {
1729 let ts_init = clock.get_time_ns();
1730
1731 match parse_order_msg_vec(
1732 order_msgs,
1733 account_id,
1734 instruments_by_symbol,
1735 fee_cache,
1736 filled_qty_cache,
1737 ts_init,
1738 ) {
1739 Ok(reports) => {
1740 dispatch_execution_reports_to_python(reports, call_soon, callback);
1741 }
1742 Err(e) => {
1743 log::error!("Failed to parse order messages: {e}");
1744 }
1745 }
1746}
1747
1748fn handle_algo_orders(
1749 algo_msgs: Vec<OKXAlgoOrderMsg>,
1750 account_id: AccountId,
1751 instruments_by_symbol: &AHashMap<Ustr, InstrumentAny>,
1752 clock: &AtomicTime,
1753 call_soon: &Py<PyAny>,
1754 callback: &Py<PyAny>,
1755) {
1756 let ts_init = clock.get_time_ns();
1757 for algo_msg in algo_msgs {
1758 match parse_algo_order_msg(&algo_msg, account_id, instruments_by_symbol, ts_init) {
1759 Ok(Some(report)) => {
1760 dispatch_execution_reports_to_python(vec![report], call_soon, callback);
1761 }
1762 Ok(None) => {}
1763 Err(e) => {
1764 log::error!("Failed to parse algo order: {e}");
1765 }
1766 }
1767 }
1768}
1769
1770fn handle_account(
1771 data: serde_json::Value,
1772 account_id: AccountId,
1773 clock: &AtomicTime,
1774 call_soon: &Py<PyAny>,
1775 callback: &Py<PyAny>,
1776) {
1777 if let Ok(accounts) = serde_json::from_value::<Vec<OKXAccount>>(data) {
1778 let ts_init = clock.get_time_ns();
1779 for account in &accounts {
1780 if let Ok(account_state) = parse_account_state(account, account_id, ts_init) {
1781 call_python_with_data(call_soon, callback, |py| account_state.into_py_any(py));
1782 }
1783 }
1784 }
1785}
1786
1787fn handle_positions(
1788 data: serde_json::Value,
1789 account_id: AccountId,
1790 instruments_by_symbol: &AHashMap<Ustr, InstrumentAny>,
1791 clock: &AtomicTime,
1792 call_soon: &Py<PyAny>,
1793 callback: &Py<PyAny>,
1794) {
1795 if let Ok(positions) = serde_json::from_value::<Vec<OKXPosition>>(data) {
1796 let ts_init = clock.get_time_ns();
1797
1798 for position in positions {
1799 let inst_key = Ustr::from(&position.inst_id);
1800 if let Some(instrument) = instruments_by_symbol.get(&inst_key) {
1801 match parse_position_status_report(
1802 &position,
1803 account_id,
1804 instrument.id(),
1805 instrument.size_precision(),
1806 ts_init,
1807 ) {
1808 Ok(report) => {
1809 call_python_with_data(call_soon, callback, |py| report.into_py_any(py));
1810 }
1811 Err(e) => {
1812 log::error!("Failed to parse position: {e}");
1813 }
1814 }
1815 }
1816 }
1817 }
1818}
1819
1820#[expect(clippy::too_many_arguments)]
1821fn handle_order_response(
1822 id: Option<&str>,
1823 op: &OKXWsOperation,
1824 code: &str,
1825 msg: &str,
1826 data: &[serde_json::Value],
1827 client: &OKXWebSocketClient,
1828 account_id: AccountId,
1829 clock: &AtomicTime,
1830 call_soon: &Py<PyAny>,
1831 callback: &Py<PyAny>,
1832) {
1833 for item in data {
1834 let s_code = item
1835 .get(OKX_FIELD_SCODE)
1836 .and_then(|v| v.as_str())
1837 .unwrap_or("");
1838 let s_msg = item
1839 .get(OKX_FIELD_SMSG)
1840 .and_then(|v| v.as_str())
1841 .unwrap_or("");
1842 let cl_ord_id = item
1843 .get(OKX_FIELD_CLORDID)
1844 .and_then(|v| v.as_str())
1845 .unwrap_or("");
1846
1847 if s_code == OKX_SUCCESS_CODE {
1848 log::debug!("Order response ok: op={op:?} cl_ord_id={cl_ord_id}");
1849 match op {
1850 OKXWsOperation::Order | OKXWsOperation::BatchOrders => {
1851 if let Some((_, info)) = client.pending_orders.remove(cl_ord_id) {
1852 let venue_order_id = item
1853 .get("ordId")
1854 .and_then(|v| v.as_str())
1855 .filter(|s| !s.is_empty());
1856
1857 if let Some(ord_id) = venue_order_id {
1858 let ts_init = clock.get_time_ns();
1859 let accepted = OrderAccepted::new(
1860 info.trader_id,
1861 info.strategy_id,
1862 info.instrument_id,
1863 ClientOrderId::from(cl_ord_id),
1864 VenueOrderId::new(ord_id),
1865 account_id,
1866 UUID4::new(),
1867 ts_init,
1868 ts_init,
1869 false,
1870 );
1871 call_python_with_data(call_soon, callback, |py| {
1872 accepted.into_py_any(py)
1873 });
1874 } else {
1875 log::error!(
1876 "No venue_order_id for accepted order: cl_ord_id={cl_ord_id}"
1877 );
1878 }
1879 }
1880 }
1881 OKXWsOperation::OrderAlgo => {
1882 client.pending_orders.remove(cl_ord_id);
1883 log::debug!("Algo order placement confirmed: cl_ord_id={cl_ord_id}");
1884 }
1885 OKXWsOperation::CancelOrder
1886 | OKXWsOperation::BatchCancelOrders
1887 | OKXWsOperation::MassCancel
1888 | OKXWsOperation::CancelAlgos => {
1889 client.pending_cancels.remove(cl_ord_id);
1890 }
1891 OKXWsOperation::AmendOrder | OKXWsOperation::BatchAmendOrders => {
1892 client.pending_amends.remove(cl_ord_id);
1893 }
1894 _ => {}
1895 }
1896 } else if !cl_ord_id.is_empty() {
1897 log::warn!(
1898 "Order response rejected: op={op:?} cl_ord_id={cl_ord_id} \
1899 s_code={s_code} s_msg={s_msg}"
1900 );
1901 let ts_init = clock.get_time_ns();
1902 let client_order_id = ClientOrderId::from(cl_ord_id);
1903 let venue_order_id = item
1904 .get("ordId")
1905 .and_then(|v| v.as_str())
1906 .filter(|s| !s.is_empty())
1907 .map(VenueOrderId::new);
1908
1909 match op {
1910 OKXWsOperation::Order | OKXWsOperation::BatchOrders | OKXWsOperation::OrderAlgo => {
1911 if let Some((_, info)) = client.pending_orders.remove(cl_ord_id) {
1912 let rejected = OrderRejected::new(
1913 info.trader_id,
1914 info.strategy_id,
1915 info.instrument_id,
1916 client_order_id,
1917 account_id,
1918 Ustr::from(s_msg),
1919 UUID4::new(),
1920 ts_init,
1921 ts_init,
1922 false,
1923 false,
1924 );
1925 call_python_with_data(call_soon, callback, |py| rejected.into_py_any(py));
1926 }
1927 }
1928 OKXWsOperation::CancelOrder
1929 | OKXWsOperation::BatchCancelOrders
1930 | OKXWsOperation::MassCancel
1931 | OKXWsOperation::CancelAlgos => {
1932 if let Some((_, info)) = client.pending_cancels.remove(cl_ord_id) {
1933 let rejected = OrderCancelRejected::new(
1934 info.trader_id,
1935 info.strategy_id,
1936 info.instrument_id,
1937 client_order_id,
1938 Ustr::from(s_msg),
1939 UUID4::new(),
1940 ts_init,
1941 ts_init,
1942 false,
1943 venue_order_id,
1944 Some(account_id),
1945 );
1946 call_python_with_data(call_soon, callback, |py| rejected.into_py_any(py));
1947 }
1948 }
1949 OKXWsOperation::AmendOrder | OKXWsOperation::BatchAmendOrders => {
1950 if let Some((_, info)) = client.pending_amends.remove(cl_ord_id) {
1951 let rejected = OrderModifyRejected::new(
1952 info.trader_id,
1953 info.strategy_id,
1954 info.instrument_id,
1955 client_order_id,
1956 Ustr::from(s_msg),
1957 UUID4::new(),
1958 ts_init,
1959 ts_init,
1960 false,
1961 venue_order_id,
1962 Some(account_id),
1963 );
1964 call_python_with_data(call_soon, callback, |py| rejected.into_py_any(py));
1965 }
1966 }
1967 _ => {}
1968 }
1969 }
1970 }
1971
1972 if code != "0" && data.is_empty() {
1973 log::warn!("Order response error (no data): id={id:?} op={op:?} code={code} msg={msg}");
1974 }
1975}
1976
1977#[expect(clippy::too_many_arguments)]
1978fn handle_send_failed(
1979 request_id: &str,
1980 client_order_id: Option<ClientOrderId>,
1981 op: Option<&OKXWsOperation>,
1982 error: &str,
1983 client: &OKXWebSocketClient,
1984 account_id: AccountId,
1985 clock: &AtomicTime,
1986 call_soon: &Py<PyAny>,
1987 callback: &Py<PyAny>,
1988) {
1989 log::error!("WebSocket send failed: request_id={request_id} error={error}");
1990
1991 let Some(client_order_id) = client_order_id else {
1992 return;
1993 };
1994 let cl_ord_str = client_order_id.to_string();
1995 let ts_init = clock.get_time_ns();
1996
1997 match op {
1998 Some(OKXWsOperation::Order | OKXWsOperation::BatchOrders | OKXWsOperation::OrderAlgo) => {
1999 if let Some((_, info)) = client.pending_orders.remove(&cl_ord_str) {
2000 let rejected = OrderRejected::new(
2001 info.trader_id,
2002 info.strategy_id,
2003 info.instrument_id,
2004 client_order_id,
2005 account_id,
2006 Ustr::from(error),
2007 UUID4::new(),
2008 ts_init,
2009 ts_init,
2010 false,
2011 false,
2012 );
2013 call_python_with_data(call_soon, callback, |py| rejected.into_py_any(py));
2014 }
2015 }
2016 Some(
2017 OKXWsOperation::CancelOrder
2018 | OKXWsOperation::BatchCancelOrders
2019 | OKXWsOperation::MassCancel
2020 | OKXWsOperation::CancelAlgos,
2021 ) => {
2022 if let Some((_, info)) = client.pending_cancels.remove(&cl_ord_str) {
2023 let rejected = OrderCancelRejected::new(
2024 info.trader_id,
2025 info.strategy_id,
2026 info.instrument_id,
2027 client_order_id,
2028 Ustr::from(error),
2029 UUID4::new(),
2030 ts_init,
2031 ts_init,
2032 false,
2033 None,
2034 Some(account_id),
2035 );
2036 call_python_with_data(call_soon, callback, |py| rejected.into_py_any(py));
2037 }
2038 }
2039 Some(OKXWsOperation::AmendOrder | OKXWsOperation::BatchAmendOrders) => {
2040 if let Some((_, info)) = client.pending_amends.remove(&cl_ord_str) {
2041 let rejected = OrderModifyRejected::new(
2042 info.trader_id,
2043 info.strategy_id,
2044 info.instrument_id,
2045 client_order_id,
2046 Ustr::from(error),
2047 UUID4::new(),
2048 ts_init,
2049 ts_init,
2050 false,
2051 None,
2052 Some(account_id),
2053 );
2054 call_python_with_data(call_soon, callback, |py| rejected.into_py_any(py));
2055 }
2056 }
2057 _ => {
2058 log::warn!("SendFailed for {client_order_id} with unknown op, cannot emit rejection");
2059 }
2060 }
2061}
2062
2063fn call_python_with_data<F>(call_soon: &Py<PyAny>, callback: &Py<PyAny>, data_converter: F)
2064where
2065 F: FnOnce(Python) -> PyResult<Py<PyAny>>,
2066{
2067 Python::attach(|py| match data_converter(py) {
2068 Ok(py_obj) => call_python_threadsafe(py, call_soon, callback, py_obj),
2069 Err(e) => log::error!("Failed to convert data to Python object: {e}"),
2070 });
2071}
2072
2073fn dispatch_nautilus_ws_msg_to_python(
2074 msg: NautilusWsMessage,
2075 call_soon: &Py<PyAny>,
2076 callback: &Py<PyAny>,
2077 instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
2078) {
2079 match msg {
2080 NautilusWsMessage::Data(payloads) => Python::attach(|py| {
2081 for data in payloads {
2082 let py_obj = data_to_pycapsule(py, data);
2083 call_python_threadsafe(py, call_soon, callback, py_obj);
2084 }
2085 }),
2086 NautilusWsMessage::Deltas(deltas) => Python::attach(|py| {
2087 let py_obj = data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(deltas)));
2088 call_python_threadsafe(py, call_soon, callback, py_obj);
2089 }),
2090 NautilusWsMessage::FundingRates(updates) => {
2091 for data in updates {
2092 call_python_with_data(call_soon, callback, |py| data.into_py_any(py));
2093 }
2094 }
2095 NautilusWsMessage::Instrument(instrument, status) => {
2096 instruments_by_symbol.insert(instrument.symbol().inner(), (*instrument).clone());
2097 call_python_with_data(call_soon, callback, |py| {
2098 instrument_any_to_pyobject(py, *instrument)
2099 });
2100
2101 if let Some(status) = status {
2102 call_python_with_data(call_soon, callback, |py| status.into_py_any(py));
2103 }
2104 }
2105 _ => {}
2106 }
2107}
2108
2109fn dispatch_execution_reports_to_python(
2110 reports: Vec<ExecutionReport>,
2111 call_soon: &Py<PyAny>,
2112 callback: &Py<PyAny>,
2113) {
2114 for report in reports {
2115 match report {
2116 ExecutionReport::Order(report) => {
2117 call_python_with_data(call_soon, callback, |py| report.into_py_any(py));
2118 }
2119 ExecutionReport::Fill(report) => {
2120 call_python_with_data(call_soon, callback, |py| report.into_py_any(py));
2121 }
2122 }
2123 }
2124}