Skip to main content

nautilus_bitmex/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the BitMEX WebSocket client.
17//!
18//! [`PyBitmexWebSocketClient`] wraps the Rust [`BitmexWebSocketClient`] and adds an
19//! instrument cache at the Python boundary. The inner client is a pure network component
20//! that emits venue-specific types; this wrapper parses them into Nautilus domain objects
21//! before passing them to Python callbacks.
22//!
23//! The instrument cache is shared via `Arc<AtomicMap>` so that:
24//! - Python can inject instruments at any time via `cache_instrument`.
25//! - The spawned stream task reads from the same cache for parsing.
26//! - Instrument table messages from the venue update the cache automatically.
27
28use std::{fmt::Debug, sync::Arc};
29
30use ahash::AHashMap;
31use futures_util::StreamExt;
32use nautilus_common::{cache::quote::QuoteCache, live::get_runtime};
33use nautilus_core::{
34    AtomicMap, UUID4, UnixNanos,
35    python::{call_python_threadsafe, to_pyruntime_err, to_pyvalue_err},
36    time::get_atomic_clock_realtime,
37};
38use nautilus_model::{
39    data::{Data, InstrumentStatus, bar::BarType},
40    enums::{MarketStatusAction, OrderSide, OrderType},
41    events::{OrderAccepted, OrderUpdated},
42    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
43    instruments::{Instrument, InstrumentAny},
44    python::{
45        data::data_to_pycapsule,
46        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
47    },
48    types::Price,
49};
50use nautilus_network::websocket::TransportBackend;
51use pyo3::{conversion::IntoPyObjectExt, prelude::*};
52use ustr::Ustr;
53
54use crate::{
55    common::{
56        enums::{
57            BitmexEnvironment, BitmexExecType, BitmexInstrumentState, BitmexOrderType,
58            BitmexPegPriceType,
59        },
60        parse::{
61            parse_contracts_quantity, parse_instrument_id, parse_optional_datetime_to_unix_nanos,
62        },
63    },
64    http::parse::{InstrumentParseResult, parse_instrument_any},
65    websocket::{
66        BitmexWebSocketClient,
67        dispatch::{OrderIdentity, WsDispatchState, fill_report_to_order_filled},
68        enums::{BitmexAction, BitmexWsTopic},
69        messages::{
70            BitmexExecutionMsg, BitmexInstrumentMsg, BitmexQuoteMsg, BitmexTableMessage,
71            BitmexWsMessage, OrderData,
72        },
73        parse::{
74            ParsedOrderEvent, parse_book_msg_vec, parse_book10_msg_vec, parse_execution_msg,
75            parse_funding_msg, parse_instrument_msg, parse_order_event, parse_order_msg,
76            parse_order_update_msg, parse_position_msg, parse_trade_bin_msg_vec,
77            parse_trade_msg_vec, parse_wallet_msg,
78        },
79    },
80};
81
82/// Python wrapper around [`BitmexWebSocketClient`] that holds an instrument cache
83/// at the Python boundary for parsing venue messages into Nautilus domain types.
84#[pyclass(
85    name = "BitmexWebSocketClient",
86    module = "nautilus_trader.core.nautilus_pyo3.bitmex"
87)]
88#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.bitmex")]
89pub struct PyBitmexWebSocketClient {
90    inner: BitmexWebSocketClient,
91    instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
92    ws_dispatch_state: Arc<WsDispatchState>,
93}
94
95impl Debug for PyBitmexWebSocketClient {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.debug_struct(stringify!(PyBitmexWebSocketClient))
98            .field("inner", &self.inner)
99            .finish_non_exhaustive()
100    }
101}
102
103#[pymethods]
104#[pyo3_stub_gen::derive::gen_stub_pymethods]
105impl PyBitmexWebSocketClient {
106    #[new]
107    #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=5, environment=BitmexEnvironment::Mainnet, proxy_url=None))]
108    fn py_new(
109        url: Option<String>,
110        api_key: Option<String>,
111        api_secret: Option<String>,
112        account_id: Option<AccountId>,
113        heartbeat: u64,
114        environment: BitmexEnvironment,
115        proxy_url: Option<String>,
116    ) -> PyResult<Self> {
117        let inner = BitmexWebSocketClient::new_with_env(
118            url,
119            api_key,
120            api_secret,
121            account_id,
122            heartbeat,
123            environment,
124            TransportBackend::default(),
125            proxy_url,
126        )
127        .map_err(to_pyvalue_err)?;
128        Ok(Self {
129            inner,
130            instruments_cache: Arc::new(AtomicMap::new()),
131            ws_dispatch_state: Arc::new(WsDispatchState::default()),
132        })
133    }
134
135    #[staticmethod]
136    #[pyo3(name = "from_env")]
137    fn py_from_env() -> PyResult<Self> {
138        let inner = BitmexWebSocketClient::from_env().map_err(to_pyvalue_err)?;
139        Ok(Self {
140            inner,
141            instruments_cache: Arc::new(AtomicMap::new()),
142            ws_dispatch_state: Arc::new(WsDispatchState::default()),
143        })
144    }
145
146    #[getter]
147    #[pyo3(name = "url")]
148    #[must_use]
149    fn py_url(&self) -> &str {
150        self.inner.url()
151    }
152
153    #[getter]
154    #[pyo3(name = "api_key")]
155    #[must_use]
156    fn py_api_key(&self) -> Option<&str> {
157        self.inner.api_key()
158    }
159
160    #[getter]
161    #[pyo3(name = "api_key_masked")]
162    #[must_use]
163    fn py_api_key_masked(&self) -> Option<String> {
164        self.inner.api_key_masked()
165    }
166
167    #[pyo3(name = "is_active")]
168    fn py_is_active(&mut self) -> bool {
169        self.inner.is_active()
170    }
171
172    #[pyo3(name = "is_closed")]
173    fn py_is_closed(&mut self) -> bool {
174        self.inner.is_closed()
175    }
176
177    #[pyo3(name = "get_subscriptions")]
178    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
179        self.inner.get_subscriptions(instrument_id)
180    }
181
182    #[pyo3(name = "set_account_id")]
183    fn py_set_account_id(&mut self, account_id: AccountId) {
184        self.inner.set_account_id(account_id);
185    }
186
187    #[pyo3(name = "register_order_identity")]
188    fn py_register_order_identity(
189        &self,
190        client_order_id: ClientOrderId,
191        instrument_id: InstrumentId,
192        strategy_id: StrategyId,
193        order_side: OrderSide,
194        order_type: OrderType,
195    ) {
196        self.ws_dispatch_state.order_identities.insert(
197            client_order_id,
198            OrderIdentity {
199                instrument_id,
200                strategy_id,
201                order_side,
202                order_type,
203            },
204        );
205    }
206
207    #[pyo3(name = "remove_order_identity")]
208    fn py_remove_order_identity(&self, client_order_id: ClientOrderId) {
209        self.ws_dispatch_state
210            .order_identities
211            .remove(&client_order_id);
212    }
213
214    #[pyo3(name = "cache_instrument")]
215    fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
216        let inst = pyobject_to_instrument_any(py, instrument)?;
217        let symbol = inst.symbol().inner();
218        self.instruments_cache.insert(symbol, inst);
219        Ok(())
220    }
221
222    #[pyo3(name = "connect")]
223    #[pyo3(signature = (loop_, instruments, callback, trader_id=None))]
224    #[expect(clippy::needless_pass_by_value)]
225    fn py_connect<'py>(
226        &mut self,
227        py: Python<'py>,
228        loop_: Py<PyAny>,
229        instruments: Vec<Py<PyAny>>,
230        callback: Py<PyAny>,
231        trader_id: Option<TraderId>,
232    ) -> PyResult<Bound<'py, PyAny>> {
233        let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
234
235        let cache = Arc::clone(&self.instruments_cache);
236        {
237            let mut initial: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
238
239            for inst_py in instruments {
240                let inst = pyobject_to_instrument_any(py, inst_py)?;
241                initial.insert(inst.symbol().inner(), inst);
242            }
243            cache.rcu(|m| {
244                for (k, v) in &initial {
245                    m.insert(*k, v.clone());
246                }
247            });
248        }
249
250        let clock = get_atomic_clock_realtime();
251        let mut client = self.inner.clone();
252        let account_id = self.inner.account_id();
253        let dispatch_state = Arc::clone(&self.ws_dispatch_state);
254        let trader_id = trader_id.unwrap_or(TraderId::from("TRADER-000"));
255
256        pyo3_async_runtimes::tokio::future_into_py(py, async move {
257            client.connect().await.map_err(to_pyruntime_err)?;
258
259            let stream = client.stream();
260
261            get_runtime().spawn(async move {
262                let _client = client; // Keep client alive for the entire duration
263                tokio::pin!(stream);
264
265                let mut quote_cache = QuoteCache::new();
266                let mut order_type_cache: AHashMap<ClientOrderId, OrderType> = AHashMap::new();
267                let mut order_symbol_cache: AHashMap<ClientOrderId, Ustr> = AHashMap::new();
268
269                while let Some(msg) = stream.next().await {
270                    let ts_init = clock.get_time_ns();
271
272                    match msg {
273                        BitmexWsMessage::Table(table_msg) => {
274                            handle_table_message(
275                                table_msg,
276                                &cache,
277                                &mut quote_cache,
278                                &mut order_type_cache,
279                                &mut order_symbol_cache,
280                                &dispatch_state,
281                                trader_id,
282                                account_id,
283                                ts_init,
284                                &call_soon,
285                                &callback,
286                            );
287                        }
288                        BitmexWsMessage::Reconnected => {
289                            quote_cache.clear();
290                            order_type_cache.clear();
291                            order_symbol_cache.clear();
292                        }
293                        BitmexWsMessage::Authenticated => {}
294                    }
295                }
296            });
297
298            Ok(())
299        })
300    }
301
302    #[pyo3(name = "wait_until_active")]
303    fn py_wait_until_active<'py>(
304        &self,
305        py: Python<'py>,
306        timeout_secs: f64,
307    ) -> PyResult<Bound<'py, PyAny>> {
308        let client = self.inner.clone();
309
310        pyo3_async_runtimes::tokio::future_into_py(py, async move {
311            client
312                .wait_until_active(timeout_secs)
313                .await
314                .map_err(to_pyruntime_err)?;
315            Ok(())
316        })
317    }
318
319    #[pyo3(name = "close")]
320    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
321        let mut client = self.inner.clone();
322
323        pyo3_async_runtimes::tokio::future_into_py(py, async move {
324            if let Err(e) = client.close().await {
325                log::error!("Error on close: {e}");
326            }
327            Ok(())
328        })
329    }
330
331    #[pyo3(name = "subscribe_instruments")]
332    fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
333        let client = self.inner.clone();
334
335        pyo3_async_runtimes::tokio::future_into_py(py, async move {
336            if let Err(e) = client.subscribe_instruments().await {
337                log::error!("Failed to subscribe to instruments: {e}");
338            }
339            Ok(())
340        })
341    }
342
343    #[pyo3(name = "subscribe_instrument")]
344    fn py_subscribe_instrument<'py>(
345        &self,
346        py: Python<'py>,
347        instrument_id: InstrumentId,
348    ) -> PyResult<Bound<'py, PyAny>> {
349        let client = self.inner.clone();
350
351        pyo3_async_runtimes::tokio::future_into_py(py, async move {
352            if let Err(e) = client.subscribe_instrument(instrument_id).await {
353                log::error!("Failed to subscribe to instrument: {e}");
354            }
355            Ok(())
356        })
357    }
358
359    #[pyo3(name = "subscribe_book")]
360    fn py_subscribe_book<'py>(
361        &self,
362        py: Python<'py>,
363        instrument_id: InstrumentId,
364    ) -> PyResult<Bound<'py, PyAny>> {
365        let client = self.inner.clone();
366
367        pyo3_async_runtimes::tokio::future_into_py(py, async move {
368            if let Err(e) = client.subscribe_book(instrument_id).await {
369                log::error!("Failed to subscribe to order book: {e}");
370            }
371            Ok(())
372        })
373    }
374
375    #[pyo3(name = "subscribe_book_25")]
376    fn py_subscribe_book_25<'py>(
377        &self,
378        py: Python<'py>,
379        instrument_id: InstrumentId,
380    ) -> PyResult<Bound<'py, PyAny>> {
381        let client = self.inner.clone();
382
383        pyo3_async_runtimes::tokio::future_into_py(py, async move {
384            if let Err(e) = client.subscribe_book_25(instrument_id).await {
385                log::error!("Failed to subscribe to order book 25: {e}");
386            }
387            Ok(())
388        })
389    }
390
391    #[pyo3(name = "subscribe_book_depth10")]
392    fn py_subscribe_book_depth10<'py>(
393        &self,
394        py: Python<'py>,
395        instrument_id: InstrumentId,
396    ) -> PyResult<Bound<'py, PyAny>> {
397        let client = self.inner.clone();
398
399        pyo3_async_runtimes::tokio::future_into_py(py, async move {
400            if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
401                log::error!("Failed to subscribe to order book depth 10: {e}");
402            }
403            Ok(())
404        })
405    }
406
407    #[pyo3(name = "subscribe_quotes")]
408    fn py_subscribe_quotes<'py>(
409        &self,
410        py: Python<'py>,
411        instrument_id: InstrumentId,
412    ) -> PyResult<Bound<'py, PyAny>> {
413        let client = self.inner.clone();
414
415        pyo3_async_runtimes::tokio::future_into_py(py, async move {
416            if let Err(e) = client.subscribe_quotes(instrument_id).await {
417                log::error!("Failed to subscribe to quotes: {e}");
418            }
419            Ok(())
420        })
421    }
422
423    #[pyo3(name = "subscribe_trades")]
424    fn py_subscribe_trades<'py>(
425        &self,
426        py: Python<'py>,
427        instrument_id: InstrumentId,
428    ) -> PyResult<Bound<'py, PyAny>> {
429        let client = self.inner.clone();
430
431        pyo3_async_runtimes::tokio::future_into_py(py, async move {
432            if let Err(e) = client.subscribe_trades(instrument_id).await {
433                log::error!("Failed to subscribe to trades: {e}");
434            }
435            Ok(())
436        })
437    }
438
439    #[pyo3(name = "subscribe_mark_prices")]
440    fn py_subscribe_mark_prices<'py>(
441        &self,
442        py: Python<'py>,
443        instrument_id: InstrumentId,
444    ) -> PyResult<Bound<'py, PyAny>> {
445        let client = self.inner.clone();
446
447        pyo3_async_runtimes::tokio::future_into_py(py, async move {
448            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
449                log::error!("Failed to subscribe to mark prices: {e}");
450            }
451            Ok(())
452        })
453    }
454
455    #[pyo3(name = "subscribe_index_prices")]
456    fn py_subscribe_index_prices<'py>(
457        &self,
458        py: Python<'py>,
459        instrument_id: InstrumentId,
460    ) -> PyResult<Bound<'py, PyAny>> {
461        let client = self.inner.clone();
462
463        pyo3_async_runtimes::tokio::future_into_py(py, async move {
464            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
465                log::error!("Failed to subscribe to index prices: {e}");
466            }
467            Ok(())
468        })
469    }
470
471    #[pyo3(name = "subscribe_funding_rates")]
472    fn py_subscribe_funding_rates<'py>(
473        &self,
474        py: Python<'py>,
475        instrument_id: InstrumentId,
476    ) -> PyResult<Bound<'py, PyAny>> {
477        let client = self.inner.clone();
478
479        pyo3_async_runtimes::tokio::future_into_py(py, async move {
480            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
481                log::error!("Failed to subscribe to funding: {e}");
482            }
483            Ok(())
484        })
485    }
486
487    #[pyo3(name = "subscribe_bars")]
488    fn py_subscribe_bars<'py>(
489        &self,
490        py: Python<'py>,
491        bar_type: BarType,
492    ) -> PyResult<Bound<'py, PyAny>> {
493        let client = self.inner.clone();
494
495        pyo3_async_runtimes::tokio::future_into_py(py, async move {
496            if let Err(e) = client.subscribe_bars(bar_type).await {
497                log::error!("Failed to subscribe to bars: {e}");
498            }
499            Ok(())
500        })
501    }
502
503    #[pyo3(name = "unsubscribe_instruments")]
504    fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
505        let client = self.inner.clone();
506
507        pyo3_async_runtimes::tokio::future_into_py(py, async move {
508            if let Err(e) = client.unsubscribe_instruments().await {
509                log::error!("Failed to unsubscribe from instruments: {e}");
510            }
511            Ok(())
512        })
513    }
514
515    #[pyo3(name = "unsubscribe_instrument")]
516    fn py_unsubscribe_instrument<'py>(
517        &self,
518        py: Python<'py>,
519        instrument_id: InstrumentId,
520    ) -> PyResult<Bound<'py, PyAny>> {
521        let client = self.inner.clone();
522
523        pyo3_async_runtimes::tokio::future_into_py(py, async move {
524            if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
525                log::error!("Failed to unsubscribe from instrument: {e}");
526            }
527            Ok(())
528        })
529    }
530
531    #[pyo3(name = "unsubscribe_book")]
532    fn py_unsubscribe_book<'py>(
533        &self,
534        py: Python<'py>,
535        instrument_id: InstrumentId,
536    ) -> PyResult<Bound<'py, PyAny>> {
537        let client = self.inner.clone();
538
539        pyo3_async_runtimes::tokio::future_into_py(py, async move {
540            if let Err(e) = client.unsubscribe_book(instrument_id).await {
541                log::error!("Failed to unsubscribe from order book: {e}");
542            }
543            Ok(())
544        })
545    }
546
547    #[pyo3(name = "unsubscribe_book_25")]
548    fn py_unsubscribe_book_25<'py>(
549        &self,
550        py: Python<'py>,
551        instrument_id: InstrumentId,
552    ) -> PyResult<Bound<'py, PyAny>> {
553        let client = self.inner.clone();
554
555        pyo3_async_runtimes::tokio::future_into_py(py, async move {
556            if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
557                log::error!("Failed to unsubscribe from order book 25: {e}");
558            }
559            Ok(())
560        })
561    }
562
563    #[pyo3(name = "unsubscribe_book_depth10")]
564    fn py_unsubscribe_book_depth10<'py>(
565        &self,
566        py: Python<'py>,
567        instrument_id: InstrumentId,
568    ) -> PyResult<Bound<'py, PyAny>> {
569        let client = self.inner.clone();
570
571        pyo3_async_runtimes::tokio::future_into_py(py, async move {
572            if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
573                log::error!("Failed to unsubscribe from order book depth 10: {e}");
574            }
575            Ok(())
576        })
577    }
578
579    #[pyo3(name = "unsubscribe_quotes")]
580    fn py_unsubscribe_quotes<'py>(
581        &self,
582        py: Python<'py>,
583        instrument_id: InstrumentId,
584    ) -> PyResult<Bound<'py, PyAny>> {
585        let client = self.inner.clone();
586
587        pyo3_async_runtimes::tokio::future_into_py(py, async move {
588            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
589                log::error!("Failed to unsubscribe from quotes: {e}");
590            }
591            Ok(())
592        })
593    }
594
595    #[pyo3(name = "unsubscribe_trades")]
596    fn py_unsubscribe_trades<'py>(
597        &self,
598        py: Python<'py>,
599        instrument_id: InstrumentId,
600    ) -> PyResult<Bound<'py, PyAny>> {
601        let client = self.inner.clone();
602
603        pyo3_async_runtimes::tokio::future_into_py(py, async move {
604            if let Err(e) = client.unsubscribe_trades(instrument_id).await {
605                log::error!("Failed to unsubscribe from trades: {e}");
606            }
607            Ok(())
608        })
609    }
610
611    #[pyo3(name = "unsubscribe_mark_prices")]
612    fn py_unsubscribe_mark_prices<'py>(
613        &self,
614        py: Python<'py>,
615        instrument_id: InstrumentId,
616    ) -> PyResult<Bound<'py, PyAny>> {
617        let client = self.inner.clone();
618
619        pyo3_async_runtimes::tokio::future_into_py(py, async move {
620            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
621                log::error!("Failed to unsubscribe from mark prices: {e}");
622            }
623            Ok(())
624        })
625    }
626
627    #[pyo3(name = "unsubscribe_index_prices")]
628    fn py_unsubscribe_index_prices<'py>(
629        &self,
630        py: Python<'py>,
631        instrument_id: InstrumentId,
632    ) -> PyResult<Bound<'py, PyAny>> {
633        let client = self.inner.clone();
634
635        pyo3_async_runtimes::tokio::future_into_py(py, async move {
636            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
637                log::error!("Failed to unsubscribe from index prices: {e}");
638            }
639            Ok(())
640        })
641    }
642
643    #[pyo3(name = "unsubscribe_funding_rates")]
644    fn py_unsubscribe_funding_rates<'py>(
645        &self,
646        py: Python<'py>,
647        instrument_id: InstrumentId,
648    ) -> PyResult<Bound<'py, PyAny>> {
649        let client = self.inner.clone();
650        pyo3_async_runtimes::tokio::future_into_py(py, async move {
651            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
652                log::error!("Failed to unsubscribe from funding rates: {e}");
653            }
654            Ok(())
655        })
656    }
657
658    #[pyo3(name = "unsubscribe_bars")]
659    fn py_unsubscribe_bars<'py>(
660        &self,
661        py: Python<'py>,
662        bar_type: BarType,
663    ) -> PyResult<Bound<'py, PyAny>> {
664        let client = self.inner.clone();
665
666        pyo3_async_runtimes::tokio::future_into_py(py, async move {
667            if let Err(e) = client.unsubscribe_bars(bar_type).await {
668                log::error!("Failed to unsubscribe from bars: {e}");
669            }
670            Ok(())
671        })
672    }
673
674    #[pyo3(name = "subscribe_orders")]
675    fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
676        let client = self.inner.clone();
677
678        pyo3_async_runtimes::tokio::future_into_py(py, async move {
679            if let Err(e) = client.subscribe_orders().await {
680                log::error!("Failed to subscribe to orders: {e}");
681            }
682            Ok(())
683        })
684    }
685
686    #[pyo3(name = "subscribe_executions")]
687    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
688        let client = self.inner.clone();
689
690        pyo3_async_runtimes::tokio::future_into_py(py, async move {
691            if let Err(e) = client.subscribe_executions().await {
692                log::error!("Failed to subscribe to executions: {e}");
693            }
694            Ok(())
695        })
696    }
697
698    #[pyo3(name = "subscribe_positions")]
699    fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
700        let client = self.inner.clone();
701
702        pyo3_async_runtimes::tokio::future_into_py(py, async move {
703            if let Err(e) = client.subscribe_positions().await {
704                log::error!("Failed to subscribe to positions: {e}");
705            }
706            Ok(())
707        })
708    }
709
710    #[pyo3(name = "subscribe_margin")]
711    fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
712        let client = self.inner.clone();
713
714        pyo3_async_runtimes::tokio::future_into_py(py, async move {
715            if let Err(e) = client.subscribe_margin().await {
716                log::error!("Failed to subscribe to margin: {e}");
717            }
718            Ok(())
719        })
720    }
721
722    #[pyo3(name = "subscribe_wallet")]
723    fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
724        let client = self.inner.clone();
725
726        pyo3_async_runtimes::tokio::future_into_py(py, async move {
727            if let Err(e) = client.subscribe_wallet().await {
728                log::error!("Failed to subscribe to wallet: {e}");
729            }
730            Ok(())
731        })
732    }
733
734    #[pyo3(name = "unsubscribe_orders")]
735    fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
736        let client = self.inner.clone();
737
738        pyo3_async_runtimes::tokio::future_into_py(py, async move {
739            if let Err(e) = client.unsubscribe_orders().await {
740                log::error!("Failed to unsubscribe from orders: {e}");
741            }
742            Ok(())
743        })
744    }
745
746    #[pyo3(name = "unsubscribe_executions")]
747    fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
748        let client = self.inner.clone();
749
750        pyo3_async_runtimes::tokio::future_into_py(py, async move {
751            if let Err(e) = client.unsubscribe_executions().await {
752                log::error!("Failed to unsubscribe from executions: {e}");
753            }
754            Ok(())
755        })
756    }
757
758    #[pyo3(name = "unsubscribe_positions")]
759    fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
760        let client = self.inner.clone();
761
762        pyo3_async_runtimes::tokio::future_into_py(py, async move {
763            if let Err(e) = client.unsubscribe_positions().await {
764                log::error!("Failed to unsubscribe from positions: {e}");
765            }
766            Ok(())
767        })
768    }
769
770    #[pyo3(name = "unsubscribe_margin")]
771    fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
772        let client = self.inner.clone();
773
774        pyo3_async_runtimes::tokio::future_into_py(py, async move {
775            if let Err(e) = client.unsubscribe_margin().await {
776                log::error!("Failed to unsubscribe from margin: {e}");
777            }
778            Ok(())
779        })
780    }
781
782    #[pyo3(name = "unsubscribe_wallet")]
783    fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
784        let client = self.inner.clone();
785
786        pyo3_async_runtimes::tokio::future_into_py(py, async move {
787            if let Err(e) = client.unsubscribe_wallet().await {
788                log::error!("Failed to unsubscribe from wallet: {e}");
789            }
790            Ok(())
791        })
792    }
793}
794
795#[expect(clippy::too_many_arguments)]
796fn handle_table_message(
797    table_msg: BitmexTableMessage,
798    instruments_cache: &Arc<AtomicMap<Ustr, InstrumentAny>>,
799    quote_cache: &mut QuoteCache,
800    order_type_cache: &mut AHashMap<ClientOrderId, OrderType>,
801    order_symbol_cache: &mut AHashMap<ClientOrderId, Ustr>,
802    dispatch_state: &WsDispatchState,
803    trader_id: TraderId,
804    account_id: AccountId,
805    ts_init: UnixNanos,
806    call_soon: &Py<PyAny>,
807    callback: &Py<PyAny>,
808) {
809    if let BitmexTableMessage::Instrument { action, data } = table_msg {
810        handle_instrument_messages(
811            action,
812            data,
813            instruments_cache,
814            ts_init,
815            call_soon,
816            callback,
817        );
818        return;
819    }
820
821    let instruments = instruments_cache.load();
822
823    match table_msg {
824        BitmexTableMessage::OrderBookL2 { action, data }
825        | BitmexTableMessage::OrderBookL2_25 { action, data } => {
826            if !data.is_empty() {
827                for d in parse_book_msg_vec(data, action, &instruments, ts_init) {
828                    send_data_to_python(d, call_soon, callback);
829                }
830            }
831        }
832        BitmexTableMessage::OrderBook10 { data, .. } => {
833            if !data.is_empty() {
834                for d in parse_book10_msg_vec(data, &instruments, ts_init) {
835                    send_data_to_python(d, call_soon, callback);
836                }
837            }
838        }
839        BitmexTableMessage::Quote { data, .. } => {
840            handle_quote_messages(
841                data,
842                &instruments,
843                quote_cache,
844                ts_init,
845                call_soon,
846                callback,
847            );
848        }
849        BitmexTableMessage::Trade { data, .. } => {
850            if !data.is_empty() {
851                for d in parse_trade_msg_vec(data, &instruments, ts_init) {
852                    send_data_to_python(d, call_soon, callback);
853                }
854            }
855        }
856        BitmexTableMessage::TradeBin1m { action, data } => {
857            if action != BitmexAction::Partial && !data.is_empty() {
858                for d in
859                    parse_trade_bin_msg_vec(data, &BitmexWsTopic::TradeBin1m, &instruments, ts_init)
860                {
861                    send_data_to_python(d, call_soon, callback);
862                }
863            }
864        }
865        BitmexTableMessage::TradeBin5m { action, data } => {
866            if action != BitmexAction::Partial && !data.is_empty() {
867                for d in
868                    parse_trade_bin_msg_vec(data, &BitmexWsTopic::TradeBin5m, &instruments, ts_init)
869                {
870                    send_data_to_python(d, call_soon, callback);
871                }
872            }
873        }
874        BitmexTableMessage::TradeBin1h { action, data } => {
875            if action != BitmexAction::Partial && !data.is_empty() {
876                for d in
877                    parse_trade_bin_msg_vec(data, &BitmexWsTopic::TradeBin1h, &instruments, ts_init)
878                {
879                    send_data_to_python(d, call_soon, callback);
880                }
881            }
882        }
883        BitmexTableMessage::TradeBin1d { action, data } => {
884            if action != BitmexAction::Partial && !data.is_empty() {
885                for d in
886                    parse_trade_bin_msg_vec(data, &BitmexWsTopic::TradeBin1d, &instruments, ts_init)
887                {
888                    send_data_to_python(d, call_soon, callback);
889                }
890            }
891        }
892        BitmexTableMessage::Funding { data, .. } => {
893            for msg in data {
894                send_to_python(parse_funding_msg(&msg, ts_init), call_soon, callback);
895            }
896        }
897        BitmexTableMessage::Order { data, .. } => {
898            handle_order_messages(
899                data,
900                &instruments,
901                order_type_cache,
902                order_symbol_cache,
903                dispatch_state,
904                trader_id,
905                account_id,
906                ts_init,
907                call_soon,
908                callback,
909            );
910        }
911        BitmexTableMessage::Execution { data, .. } => {
912            handle_execution_messages(
913                data,
914                &instruments,
915                order_symbol_cache,
916                dispatch_state,
917                trader_id,
918                ts_init,
919                call_soon,
920                callback,
921            );
922        }
923        BitmexTableMessage::Position { data, .. } => {
924            for msg in data {
925                let Some(instrument) = instruments.get(&msg.symbol) else {
926                    log::warn!("Instrument cache miss for position symbol={}", msg.symbol);
927                    continue;
928                };
929
930                send_to_python(
931                    parse_position_msg(&msg, instrument, ts_init),
932                    call_soon,
933                    callback,
934                );
935            }
936        }
937        BitmexTableMessage::Wallet { data, .. } => {
938            for msg in data {
939                send_to_python(parse_wallet_msg(&msg, ts_init), call_soon, callback);
940            }
941        }
942        BitmexTableMessage::Margin { .. } => {}
943        _ => {
944            log::debug!("Unhandled table message type in Python WebSocket client");
945        }
946    }
947}
948
949fn handle_quote_messages(
950    data: Vec<BitmexQuoteMsg>,
951    instruments: &AHashMap<Ustr, InstrumentAny>,
952    quote_cache: &mut QuoteCache,
953    ts_init: UnixNanos,
954    call_soon: &Py<PyAny>,
955    callback: &Py<PyAny>,
956) {
957    for msg in data {
958        let Some(instrument) = instruments.get(&msg.symbol) else {
959            log::error!(
960                "Instrument cache miss: quote dropped for symbol={}",
961                msg.symbol,
962            );
963            continue;
964        };
965
966        let instrument_id = instrument.id();
967        let price_precision = instrument.price_precision();
968
969        let bid_price = msg.bid_price.map(|p| Price::new(p, price_precision));
970        let ask_price = msg.ask_price.map(|p| Price::new(p, price_precision));
971        let bid_size = msg
972            .bid_size
973            .map(|s| parse_contracts_quantity(s, instrument));
974        let ask_size = msg
975            .ask_size
976            .map(|s| parse_contracts_quantity(s, instrument));
977        let ts_event = UnixNanos::from(msg.timestamp);
978
979        match quote_cache.process(
980            instrument_id,
981            bid_price,
982            ask_price,
983            bid_size,
984            ask_size,
985            ts_event,
986            ts_init,
987        ) {
988            Ok(quote) => send_data_to_python(Data::Quote(quote), call_soon, callback),
989            Err(e) => {
990                log::warn!("Failed to process quote for {}: {e}", msg.symbol);
991            }
992        }
993    }
994}
995
996fn handle_instrument_messages(
997    action: BitmexAction,
998    data: Vec<BitmexInstrumentMsg>,
999    instruments_cache: &Arc<AtomicMap<Ustr, InstrumentAny>>,
1000    ts_init: UnixNanos,
1001    call_soon: &Py<PyAny>,
1002    callback: &Py<PyAny>,
1003) {
1004    if action == BitmexAction::Partial || action == BitmexAction::Insert {
1005        let data_for_prices = data.clone();
1006
1007        let mut new_instruments: Vec<(Ustr, InstrumentAny)> = Vec::new();
1008
1009        for msg in data {
1010            match msg.try_into() {
1011                Ok(http_inst) => match parse_instrument_any(&http_inst, ts_init) {
1012                    InstrumentParseResult::Ok(boxed) => {
1013                        let inst = *boxed;
1014                        let symbol = inst.symbol().inner();
1015                        new_instruments.push((symbol, inst));
1016                    }
1017                    InstrumentParseResult::Unsupported { .. }
1018                    | InstrumentParseResult::Inactive { .. } => {}
1019                    InstrumentParseResult::Failed { symbol, error, .. } => {
1020                        log::warn!("Failed to parse instrument {symbol}: {error}");
1021                    }
1022                },
1023                Err(e) => {
1024                    log::debug!("Skipping instrument (missing required fields): {e}");
1025                }
1026            }
1027        }
1028
1029        instruments_cache.rcu(|m| {
1030            for (symbol, inst) in &new_instruments {
1031                m.insert(*symbol, inst.clone());
1032            }
1033        });
1034
1035        for (_, inst) in &new_instruments {
1036            Python::attach(|py| {
1037                if let Ok(py_obj) = instrument_any_to_pyobject(py, inst.clone()) {
1038                    call_python_threadsafe(py, call_soon, callback, py_obj);
1039                }
1040            });
1041        }
1042
1043        let cache = instruments_cache.load();
1044        for msg in data_for_prices {
1045            for d in parse_instrument_msg(&msg, &cache, ts_init) {
1046                send_data_to_python(d, call_soon, callback);
1047            }
1048        }
1049    } else {
1050        for msg in &data {
1051            if let Some(state_str) = &msg.state
1052                && let Ok(state) =
1053                    serde_json::from_str::<BitmexInstrumentState>(&format!("\"{state_str}\""))
1054            {
1055                let instrument_id = parse_instrument_id(msg.symbol);
1056                let action = MarketStatusAction::from(&state);
1057                let is_trading = Some(state == BitmexInstrumentState::Open);
1058                let ts_event =
1059                    parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
1060                let status = InstrumentStatus::new(
1061                    instrument_id,
1062                    action,
1063                    ts_event,
1064                    ts_init,
1065                    None,
1066                    None,
1067                    is_trading,
1068                    None,
1069                    None,
1070                );
1071                send_to_python(status, call_soon, callback);
1072            }
1073        }
1074
1075        let cache = instruments_cache.load();
1076        for msg in data {
1077            for d in parse_instrument_msg(&msg, &cache, ts_init) {
1078                send_data_to_python(d, call_soon, callback);
1079            }
1080        }
1081    }
1082}
1083
1084#[expect(clippy::too_many_arguments)]
1085fn handle_order_messages(
1086    data: Vec<OrderData>,
1087    instruments: &AHashMap<Ustr, InstrumentAny>,
1088    order_type_cache: &mut AHashMap<ClientOrderId, OrderType>,
1089    order_symbol_cache: &mut AHashMap<ClientOrderId, Ustr>,
1090    dispatch_state: &WsDispatchState,
1091    trader_id: TraderId,
1092    account_id: AccountId,
1093    ts_init: UnixNanos,
1094    call_soon: &Py<PyAny>,
1095    callback: &Py<PyAny>,
1096) {
1097    for order_data in data {
1098        match order_data {
1099            OrderData::Full(order_msg) => {
1100                let Some(instrument) = instruments.get(&order_msg.symbol) else {
1101                    log::warn!(
1102                        "Instrument cache miss for order symbol={}",
1103                        order_msg.symbol
1104                    );
1105                    continue;
1106                };
1107
1108                let client_order_id = order_msg.cl_ord_id.map(ClientOrderId::new);
1109
1110                if let Some(ref cid) = client_order_id {
1111                    if let Some(ord_type) = &order_msg.ord_type {
1112                        let order_type: OrderType = if *ord_type == BitmexOrderType::Pegged
1113                            && order_msg.peg_price_type == Some(BitmexPegPriceType::TrailingStopPeg)
1114                        {
1115                            if order_msg.price.is_some() {
1116                                OrderType::TrailingStopLimit
1117                            } else {
1118                                OrderType::TrailingStopMarket
1119                            }
1120                        } else {
1121                            (*ord_type).into()
1122                        };
1123                        order_type_cache.insert(*cid, order_type);
1124                    }
1125                    order_symbol_cache.insert(*cid, order_msg.symbol);
1126                }
1127
1128                let identity = client_order_id.and_then(|cid| {
1129                    dispatch_state
1130                        .order_identities
1131                        .get(&cid)
1132                        .map(|r| (cid, r.clone()))
1133                });
1134
1135                if let Some((cid, ident)) = identity {
1136                    if let Some(event) = parse_order_event(
1137                        &order_msg,
1138                        cid,
1139                        account_id,
1140                        trader_id,
1141                        ident.strategy_id,
1142                        ts_init,
1143                    ) {
1144                        let venue_order_id = VenueOrderId::new(order_msg.order_id.to_string());
1145                        dispatch_order_event_to_python(
1146                            event,
1147                            cid,
1148                            account_id,
1149                            venue_order_id,
1150                            &ident,
1151                            dispatch_state,
1152                            trader_id,
1153                            ts_init,
1154                            call_soon,
1155                            callback,
1156                        );
1157                    }
1158
1159                    if order_msg.ord_status.is_terminal() {
1160                        order_type_cache.remove(&cid);
1161                        order_symbol_cache.remove(&cid);
1162                    }
1163                } else {
1164                    match parse_order_msg(&order_msg, instrument, order_type_cache, ts_init) {
1165                        Ok(report) => {
1166                            if report.order_status.is_closed()
1167                                && let Some(cid) = report.client_order_id
1168                            {
1169                                order_type_cache.remove(&cid);
1170                                order_symbol_cache.remove(&cid);
1171                            }
1172                            send_to_python(report, call_soon, callback);
1173                        }
1174                        Err(e) => log::error!("Failed to parse order message: {e}"),
1175                    }
1176                }
1177            }
1178            OrderData::Update(msg) => {
1179                if let Some(cl_ord_id) = &msg.cl_ord_id {
1180                    let cid = ClientOrderId::new(cl_ord_id);
1181                    order_symbol_cache.insert(cid, msg.symbol);
1182                }
1183
1184                let Some(instrument) = instruments.get(&msg.symbol) else {
1185                    log::warn!(
1186                        "Instrument cache miss for order update symbol={}",
1187                        msg.symbol,
1188                    );
1189                    continue;
1190                };
1191
1192                let identity = msg.cl_ord_id.as_ref().and_then(|cl| {
1193                    let cid = ClientOrderId::new(cl);
1194                    dispatch_state
1195                        .order_identities
1196                        .get(&cid)
1197                        .map(|r| (cid, r.clone()))
1198                });
1199
1200                if let Some((cid, ident)) = identity {
1201                    if let Some(event) =
1202                        parse_order_update_msg(&msg, instrument, account_id, ts_init)
1203                    {
1204                        let enriched = OrderUpdated::new(
1205                            trader_id,
1206                            ident.strategy_id,
1207                            event.instrument_id,
1208                            cid,
1209                            event.quantity,
1210                            event.event_id,
1211                            event.ts_event,
1212                            event.ts_init,
1213                            false,
1214                            event.venue_order_id,
1215                            Some(account_id),
1216                            event.price,
1217                            event.trigger_price,
1218                            event.protection_price,
1219                            false, // is_quote_quantity
1220                        );
1221                        let venue_order_id = enriched
1222                            .venue_order_id
1223                            .unwrap_or_else(|| VenueOrderId::new(msg.order_id.to_string()));
1224                        ensure_accepted_to_python(
1225                            cid,
1226                            account_id,
1227                            venue_order_id,
1228                            &ident,
1229                            dispatch_state,
1230                            trader_id,
1231                            ts_init,
1232                            call_soon,
1233                            callback,
1234                        );
1235                        send_to_python(enriched, call_soon, callback);
1236                    }
1237                } else {
1238                    log::debug!(
1239                        "Skipping order update for untracked order: order_id={}",
1240                        msg.order_id,
1241                    );
1242                }
1243            }
1244        }
1245    }
1246}
1247
1248#[expect(clippy::too_many_arguments)]
1249fn handle_execution_messages(
1250    data: Vec<BitmexExecutionMsg>,
1251    instruments: &AHashMap<Ustr, InstrumentAny>,
1252    order_symbol_cache: &AHashMap<ClientOrderId, Ustr>,
1253    dispatch_state: &WsDispatchState,
1254    trader_id: TraderId,
1255    ts_init: UnixNanos,
1256    call_soon: &Py<PyAny>,
1257    callback: &Py<PyAny>,
1258) {
1259    for exec_msg in data {
1260        let symbol = exec_msg.symbol.or_else(|| {
1261            exec_msg
1262                .cl_ord_id
1263                .map(ClientOrderId::new)
1264                .and_then(|cid| order_symbol_cache.get(&cid).copied())
1265        });
1266
1267        let Some(symbol) = symbol else {
1268            if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
1269                if exec_msg.exec_type == Some(BitmexExecType::Trade) {
1270                    log::warn!(
1271                        "Execution missing symbol and not in cache: \
1272                        cl_ord_id={cl_ord_id}, exec_id={:?}",
1273                        exec_msg.exec_id,
1274                    );
1275                } else {
1276                    log::debug!(
1277                        "Execution missing symbol and not in cache: \
1278                        cl_ord_id={cl_ord_id}, exec_type={:?}",
1279                        exec_msg.exec_type,
1280                    );
1281                }
1282            } else if exec_msg.exec_type == Some(BitmexExecType::CancelReject) {
1283                log::debug!(
1284                    "CancelReject missing symbol/clOrdID: exec_id={:?}, order_id={:?}",
1285                    exec_msg.exec_id,
1286                    exec_msg.order_id,
1287                );
1288            } else {
1289                log::warn!(
1290                    "Execution missing both symbol and clOrdID: \
1291                    exec_id={:?}, order_id={:?}, exec_type={:?}",
1292                    exec_msg.exec_id,
1293                    exec_msg.order_id,
1294                    exec_msg.exec_type,
1295                );
1296            }
1297            continue;
1298        };
1299
1300        let Some(instrument) = instruments.get(&symbol) else {
1301            log::warn!("Instrument cache miss for execution symbol={symbol}");
1302            continue;
1303        };
1304
1305        let Some(fill) = parse_execution_msg(exec_msg, instrument, ts_init) else {
1306            continue;
1307        };
1308
1309        let identity = fill.client_order_id.and_then(|cid| {
1310            dispatch_state
1311                .order_identities
1312                .get(&cid)
1313                .map(|r| (cid, r.clone()))
1314        });
1315
1316        if let Some((cid, ident)) = identity {
1317            let venue_order_id = fill.venue_order_id;
1318            ensure_accepted_to_python(
1319                cid,
1320                fill.account_id,
1321                venue_order_id,
1322                &ident,
1323                dispatch_state,
1324                trader_id,
1325                ts_init,
1326                call_soon,
1327                callback,
1328            );
1329            dispatch_state.insert_filled(cid);
1330            dispatch_state.remove_triggered(&cid);
1331            let filled =
1332                fill_report_to_order_filled(&fill, trader_id, &ident, instrument.quote_currency());
1333            send_to_python(filled, call_soon, callback);
1334        } else {
1335            send_to_python(fill, call_soon, callback);
1336        }
1337    }
1338}
1339
1340/// Dispatches a parsed order event to Python with lifecycle synthesis and deduplication.
1341#[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
1342fn dispatch_order_event_to_python(
1343    event: ParsedOrderEvent,
1344    client_order_id: ClientOrderId,
1345    account_id: AccountId,
1346    venue_order_id: VenueOrderId,
1347    identity: &OrderIdentity,
1348    state: &WsDispatchState,
1349    trader_id: TraderId,
1350    ts_init: UnixNanos,
1351    call_soon: &Py<PyAny>,
1352    callback: &Py<PyAny>,
1353) {
1354    let is_terminal;
1355
1356    match event {
1357        ParsedOrderEvent::Accepted(e) => {
1358            if state.accepted_contains(&client_order_id)
1359                || state.filled_contains(&client_order_id)
1360                || state.triggered_contains(&client_order_id)
1361            {
1362                log::debug!("Skipping duplicate Accepted for {client_order_id}");
1363                return;
1364            }
1365            state.insert_accepted(client_order_id);
1366            is_terminal = false;
1367            send_to_python(e, call_soon, callback);
1368        }
1369        ParsedOrderEvent::Triggered(e) => {
1370            if state.filled_contains(&client_order_id) {
1371                log::debug!("Skipping stale Triggered for {client_order_id} (already filled)");
1372                return;
1373            }
1374            ensure_accepted_to_python(
1375                client_order_id,
1376                account_id,
1377                venue_order_id,
1378                identity,
1379                state,
1380                trader_id,
1381                ts_init,
1382                call_soon,
1383                callback,
1384            );
1385            state.insert_triggered(client_order_id);
1386            is_terminal = false;
1387            send_to_python(e, call_soon, callback);
1388        }
1389        ParsedOrderEvent::Canceled(e) => {
1390            ensure_accepted_to_python(
1391                client_order_id,
1392                account_id,
1393                venue_order_id,
1394                identity,
1395                state,
1396                trader_id,
1397                ts_init,
1398                call_soon,
1399                callback,
1400            );
1401            state.remove_triggered(&client_order_id);
1402            state.remove_filled(&client_order_id);
1403            is_terminal = true;
1404            send_to_python(e, call_soon, callback);
1405        }
1406        ParsedOrderEvent::Expired(e) => {
1407            ensure_accepted_to_python(
1408                client_order_id,
1409                account_id,
1410                venue_order_id,
1411                identity,
1412                state,
1413                trader_id,
1414                ts_init,
1415                call_soon,
1416                callback,
1417            );
1418            state.remove_triggered(&client_order_id);
1419            state.remove_filled(&client_order_id);
1420            is_terminal = true;
1421            send_to_python(e, call_soon, callback);
1422        }
1423        ParsedOrderEvent::Rejected(e) => {
1424            state.remove_triggered(&client_order_id);
1425            state.remove_filled(&client_order_id);
1426            is_terminal = true;
1427            send_to_python(e, call_soon, callback);
1428        }
1429    }
1430
1431    if is_terminal {
1432        state.order_identities.remove(&client_order_id);
1433        state.remove_accepted(&client_order_id);
1434    }
1435}
1436
1437/// Synthesizes and sends `OrderAccepted` to Python if one has not yet been emitted.
1438#[expect(clippy::too_many_arguments)]
1439fn ensure_accepted_to_python(
1440    client_order_id: ClientOrderId,
1441    account_id: AccountId,
1442    venue_order_id: VenueOrderId,
1443    identity: &OrderIdentity,
1444    state: &WsDispatchState,
1445    trader_id: TraderId,
1446    ts_init: UnixNanos,
1447    call_soon: &Py<PyAny>,
1448    callback: &Py<PyAny>,
1449) {
1450    if state.accepted_contains(&client_order_id) {
1451        return;
1452    }
1453    state.insert_accepted(client_order_id);
1454    let accepted = OrderAccepted::new(
1455        trader_id,
1456        identity.strategy_id,
1457        identity.instrument_id,
1458        client_order_id,
1459        venue_order_id,
1460        account_id,
1461        UUID4::new(),
1462        ts_init,
1463        ts_init,
1464        false,
1465    );
1466    send_to_python(accepted, call_soon, callback);
1467}
1468
1469fn send_data_to_python(data: Data, call_soon: &Py<PyAny>, callback: &Py<PyAny>) {
1470    Python::attach(|py| {
1471        let py_obj = data_to_pycapsule(py, data);
1472        call_python_threadsafe(py, call_soon, callback, py_obj);
1473    });
1474}
1475
1476fn send_to_python<T: for<'py> IntoPyObjectExt<'py>>(
1477    value: T,
1478    call_soon: &Py<PyAny>,
1479    callback: &Py<PyAny>,
1480) {
1481    Python::attach(|py| {
1482        if let Ok(py_obj) = value.into_py_any(py) {
1483            call_python_threadsafe(py, call_soon, callback, py_obj);
1484        }
1485    });
1486}