1use std::{fmt::Debug, sync::Arc};
29
30use ahash::AHashMap;
31use futures_util::StreamExt;
32use nautilus_common::{cache::quote::QuoteCache, live::get_runtime};
33use nautilus_core::{
34 AtomicMap, UUID4, UnixNanos,
35 python::{call_python_threadsafe, to_pyruntime_err, to_pyvalue_err},
36 time::get_atomic_clock_realtime,
37};
38use nautilus_model::{
39 data::{Data, InstrumentStatus, bar::BarType},
40 enums::{MarketStatusAction, OrderSide, OrderType},
41 events::{OrderAccepted, OrderUpdated},
42 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
43 instruments::{Instrument, InstrumentAny},
44 python::{
45 data::data_to_pycapsule,
46 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
47 },
48 types::Price,
49};
50use nautilus_network::websocket::TransportBackend;
51use pyo3::{conversion::IntoPyObjectExt, prelude::*};
52use ustr::Ustr;
53
54use crate::{
55 common::{
56 enums::{
57 BitmexEnvironment, BitmexExecType, BitmexInstrumentState, BitmexOrderType,
58 BitmexPegPriceType,
59 },
60 parse::{
61 parse_contracts_quantity, parse_instrument_id, parse_optional_datetime_to_unix_nanos,
62 },
63 },
64 http::parse::{InstrumentParseResult, parse_instrument_any},
65 websocket::{
66 BitmexWebSocketClient,
67 dispatch::{OrderIdentity, WsDispatchState, fill_report_to_order_filled},
68 enums::{BitmexAction, BitmexWsTopic},
69 messages::{
70 BitmexExecutionMsg, BitmexInstrumentMsg, BitmexQuoteMsg, BitmexTableMessage,
71 BitmexWsMessage, OrderData,
72 },
73 parse::{
74 ParsedOrderEvent, parse_book_msg_vec, parse_book10_msg_vec, parse_execution_msg,
75 parse_funding_msg, parse_instrument_msg, parse_order_event, parse_order_msg,
76 parse_order_update_msg, parse_position_msg, parse_trade_bin_msg_vec,
77 parse_trade_msg_vec, parse_wallet_msg,
78 },
79 },
80};
81
82#[pyclass(
85 name = "BitmexWebSocketClient",
86 module = "nautilus_trader.core.nautilus_pyo3.bitmex"
87)]
88#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.bitmex")]
89pub struct PyBitmexWebSocketClient {
90 inner: BitmexWebSocketClient,
91 instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
92 ws_dispatch_state: Arc<WsDispatchState>,
93}
94
95impl Debug for PyBitmexWebSocketClient {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 f.debug_struct(stringify!(PyBitmexWebSocketClient))
98 .field("inner", &self.inner)
99 .finish_non_exhaustive()
100 }
101}
102
103#[pymethods]
104#[pyo3_stub_gen::derive::gen_stub_pymethods]
105impl PyBitmexWebSocketClient {
106 #[new]
107 #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=5, environment=BitmexEnvironment::Mainnet, proxy_url=None))]
108 fn py_new(
109 url: Option<String>,
110 api_key: Option<String>,
111 api_secret: Option<String>,
112 account_id: Option<AccountId>,
113 heartbeat: u64,
114 environment: BitmexEnvironment,
115 proxy_url: Option<String>,
116 ) -> PyResult<Self> {
117 let inner = BitmexWebSocketClient::new_with_env(
118 url,
119 api_key,
120 api_secret,
121 account_id,
122 heartbeat,
123 environment,
124 TransportBackend::default(),
125 proxy_url,
126 )
127 .map_err(to_pyvalue_err)?;
128 Ok(Self {
129 inner,
130 instruments_cache: Arc::new(AtomicMap::new()),
131 ws_dispatch_state: Arc::new(WsDispatchState::default()),
132 })
133 }
134
135 #[staticmethod]
136 #[pyo3(name = "from_env")]
137 fn py_from_env() -> PyResult<Self> {
138 let inner = BitmexWebSocketClient::from_env().map_err(to_pyvalue_err)?;
139 Ok(Self {
140 inner,
141 instruments_cache: Arc::new(AtomicMap::new()),
142 ws_dispatch_state: Arc::new(WsDispatchState::default()),
143 })
144 }
145
146 #[getter]
147 #[pyo3(name = "url")]
148 #[must_use]
149 fn py_url(&self) -> &str {
150 self.inner.url()
151 }
152
153 #[getter]
154 #[pyo3(name = "api_key")]
155 #[must_use]
156 fn py_api_key(&self) -> Option<&str> {
157 self.inner.api_key()
158 }
159
160 #[getter]
161 #[pyo3(name = "api_key_masked")]
162 #[must_use]
163 fn py_api_key_masked(&self) -> Option<String> {
164 self.inner.api_key_masked()
165 }
166
167 #[pyo3(name = "is_active")]
168 fn py_is_active(&mut self) -> bool {
169 self.inner.is_active()
170 }
171
172 #[pyo3(name = "is_closed")]
173 fn py_is_closed(&mut self) -> bool {
174 self.inner.is_closed()
175 }
176
177 #[pyo3(name = "get_subscriptions")]
178 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
179 self.inner.get_subscriptions(instrument_id)
180 }
181
182 #[pyo3(name = "set_account_id")]
183 fn py_set_account_id(&mut self, account_id: AccountId) {
184 self.inner.set_account_id(account_id);
185 }
186
187 #[pyo3(name = "register_order_identity")]
188 fn py_register_order_identity(
189 &self,
190 client_order_id: ClientOrderId,
191 instrument_id: InstrumentId,
192 strategy_id: StrategyId,
193 order_side: OrderSide,
194 order_type: OrderType,
195 ) {
196 self.ws_dispatch_state.order_identities.insert(
197 client_order_id,
198 OrderIdentity {
199 instrument_id,
200 strategy_id,
201 order_side,
202 order_type,
203 },
204 );
205 }
206
207 #[pyo3(name = "remove_order_identity")]
208 fn py_remove_order_identity(&self, client_order_id: ClientOrderId) {
209 self.ws_dispatch_state
210 .order_identities
211 .remove(&client_order_id);
212 }
213
214 #[pyo3(name = "cache_instrument")]
215 fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
216 let inst = pyobject_to_instrument_any(py, instrument)?;
217 let symbol = inst.symbol().inner();
218 self.instruments_cache.insert(symbol, inst);
219 Ok(())
220 }
221
222 #[pyo3(name = "connect")]
223 #[pyo3(signature = (loop_, instruments, callback, trader_id=None))]
224 #[expect(clippy::needless_pass_by_value)]
225 fn py_connect<'py>(
226 &mut self,
227 py: Python<'py>,
228 loop_: Py<PyAny>,
229 instruments: Vec<Py<PyAny>>,
230 callback: Py<PyAny>,
231 trader_id: Option<TraderId>,
232 ) -> PyResult<Bound<'py, PyAny>> {
233 let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
234
235 let cache = Arc::clone(&self.instruments_cache);
236 {
237 let mut initial: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
238
239 for inst_py in instruments {
240 let inst = pyobject_to_instrument_any(py, inst_py)?;
241 initial.insert(inst.symbol().inner(), inst);
242 }
243 cache.rcu(|m| {
244 for (k, v) in &initial {
245 m.insert(*k, v.clone());
246 }
247 });
248 }
249
250 let clock = get_atomic_clock_realtime();
251 let mut client = self.inner.clone();
252 let account_id = self.inner.account_id();
253 let dispatch_state = Arc::clone(&self.ws_dispatch_state);
254 let trader_id = trader_id.unwrap_or(TraderId::from("TRADER-000"));
255
256 pyo3_async_runtimes::tokio::future_into_py(py, async move {
257 client.connect().await.map_err(to_pyruntime_err)?;
258
259 let stream = client.stream();
260
261 get_runtime().spawn(async move {
262 let _client = client; tokio::pin!(stream);
264
265 let mut quote_cache = QuoteCache::new();
266 let mut order_type_cache: AHashMap<ClientOrderId, OrderType> = AHashMap::new();
267 let mut order_symbol_cache: AHashMap<ClientOrderId, Ustr> = AHashMap::new();
268
269 while let Some(msg) = stream.next().await {
270 let ts_init = clock.get_time_ns();
271
272 match msg {
273 BitmexWsMessage::Table(table_msg) => {
274 handle_table_message(
275 table_msg,
276 &cache,
277 &mut quote_cache,
278 &mut order_type_cache,
279 &mut order_symbol_cache,
280 &dispatch_state,
281 trader_id,
282 account_id,
283 ts_init,
284 &call_soon,
285 &callback,
286 );
287 }
288 BitmexWsMessage::Reconnected => {
289 quote_cache.clear();
290 order_type_cache.clear();
291 order_symbol_cache.clear();
292 }
293 BitmexWsMessage::Authenticated => {}
294 }
295 }
296 });
297
298 Ok(())
299 })
300 }
301
302 #[pyo3(name = "wait_until_active")]
303 fn py_wait_until_active<'py>(
304 &self,
305 py: Python<'py>,
306 timeout_secs: f64,
307 ) -> PyResult<Bound<'py, PyAny>> {
308 let client = self.inner.clone();
309
310 pyo3_async_runtimes::tokio::future_into_py(py, async move {
311 client
312 .wait_until_active(timeout_secs)
313 .await
314 .map_err(to_pyruntime_err)?;
315 Ok(())
316 })
317 }
318
319 #[pyo3(name = "close")]
320 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
321 let mut client = self.inner.clone();
322
323 pyo3_async_runtimes::tokio::future_into_py(py, async move {
324 if let Err(e) = client.close().await {
325 log::error!("Error on close: {e}");
326 }
327 Ok(())
328 })
329 }
330
331 #[pyo3(name = "subscribe_instruments")]
332 fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
333 let client = self.inner.clone();
334
335 pyo3_async_runtimes::tokio::future_into_py(py, async move {
336 if let Err(e) = client.subscribe_instruments().await {
337 log::error!("Failed to subscribe to instruments: {e}");
338 }
339 Ok(())
340 })
341 }
342
343 #[pyo3(name = "subscribe_instrument")]
344 fn py_subscribe_instrument<'py>(
345 &self,
346 py: Python<'py>,
347 instrument_id: InstrumentId,
348 ) -> PyResult<Bound<'py, PyAny>> {
349 let client = self.inner.clone();
350
351 pyo3_async_runtimes::tokio::future_into_py(py, async move {
352 if let Err(e) = client.subscribe_instrument(instrument_id).await {
353 log::error!("Failed to subscribe to instrument: {e}");
354 }
355 Ok(())
356 })
357 }
358
359 #[pyo3(name = "subscribe_book")]
360 fn py_subscribe_book<'py>(
361 &self,
362 py: Python<'py>,
363 instrument_id: InstrumentId,
364 ) -> PyResult<Bound<'py, PyAny>> {
365 let client = self.inner.clone();
366
367 pyo3_async_runtimes::tokio::future_into_py(py, async move {
368 if let Err(e) = client.subscribe_book(instrument_id).await {
369 log::error!("Failed to subscribe to order book: {e}");
370 }
371 Ok(())
372 })
373 }
374
375 #[pyo3(name = "subscribe_book_25")]
376 fn py_subscribe_book_25<'py>(
377 &self,
378 py: Python<'py>,
379 instrument_id: InstrumentId,
380 ) -> PyResult<Bound<'py, PyAny>> {
381 let client = self.inner.clone();
382
383 pyo3_async_runtimes::tokio::future_into_py(py, async move {
384 if let Err(e) = client.subscribe_book_25(instrument_id).await {
385 log::error!("Failed to subscribe to order book 25: {e}");
386 }
387 Ok(())
388 })
389 }
390
391 #[pyo3(name = "subscribe_book_depth10")]
392 fn py_subscribe_book_depth10<'py>(
393 &self,
394 py: Python<'py>,
395 instrument_id: InstrumentId,
396 ) -> PyResult<Bound<'py, PyAny>> {
397 let client = self.inner.clone();
398
399 pyo3_async_runtimes::tokio::future_into_py(py, async move {
400 if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
401 log::error!("Failed to subscribe to order book depth 10: {e}");
402 }
403 Ok(())
404 })
405 }
406
407 #[pyo3(name = "subscribe_quotes")]
408 fn py_subscribe_quotes<'py>(
409 &self,
410 py: Python<'py>,
411 instrument_id: InstrumentId,
412 ) -> PyResult<Bound<'py, PyAny>> {
413 let client = self.inner.clone();
414
415 pyo3_async_runtimes::tokio::future_into_py(py, async move {
416 if let Err(e) = client.subscribe_quotes(instrument_id).await {
417 log::error!("Failed to subscribe to quotes: {e}");
418 }
419 Ok(())
420 })
421 }
422
423 #[pyo3(name = "subscribe_trades")]
424 fn py_subscribe_trades<'py>(
425 &self,
426 py: Python<'py>,
427 instrument_id: InstrumentId,
428 ) -> PyResult<Bound<'py, PyAny>> {
429 let client = self.inner.clone();
430
431 pyo3_async_runtimes::tokio::future_into_py(py, async move {
432 if let Err(e) = client.subscribe_trades(instrument_id).await {
433 log::error!("Failed to subscribe to trades: {e}");
434 }
435 Ok(())
436 })
437 }
438
439 #[pyo3(name = "subscribe_mark_prices")]
440 fn py_subscribe_mark_prices<'py>(
441 &self,
442 py: Python<'py>,
443 instrument_id: InstrumentId,
444 ) -> PyResult<Bound<'py, PyAny>> {
445 let client = self.inner.clone();
446
447 pyo3_async_runtimes::tokio::future_into_py(py, async move {
448 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
449 log::error!("Failed to subscribe to mark prices: {e}");
450 }
451 Ok(())
452 })
453 }
454
455 #[pyo3(name = "subscribe_index_prices")]
456 fn py_subscribe_index_prices<'py>(
457 &self,
458 py: Python<'py>,
459 instrument_id: InstrumentId,
460 ) -> PyResult<Bound<'py, PyAny>> {
461 let client = self.inner.clone();
462
463 pyo3_async_runtimes::tokio::future_into_py(py, async move {
464 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
465 log::error!("Failed to subscribe to index prices: {e}");
466 }
467 Ok(())
468 })
469 }
470
471 #[pyo3(name = "subscribe_funding_rates")]
472 fn py_subscribe_funding_rates<'py>(
473 &self,
474 py: Python<'py>,
475 instrument_id: InstrumentId,
476 ) -> PyResult<Bound<'py, PyAny>> {
477 let client = self.inner.clone();
478
479 pyo3_async_runtimes::tokio::future_into_py(py, async move {
480 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
481 log::error!("Failed to subscribe to funding: {e}");
482 }
483 Ok(())
484 })
485 }
486
487 #[pyo3(name = "subscribe_bars")]
488 fn py_subscribe_bars<'py>(
489 &self,
490 py: Python<'py>,
491 bar_type: BarType,
492 ) -> PyResult<Bound<'py, PyAny>> {
493 let client = self.inner.clone();
494
495 pyo3_async_runtimes::tokio::future_into_py(py, async move {
496 if let Err(e) = client.subscribe_bars(bar_type).await {
497 log::error!("Failed to subscribe to bars: {e}");
498 }
499 Ok(())
500 })
501 }
502
503 #[pyo3(name = "unsubscribe_instruments")]
504 fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
505 let client = self.inner.clone();
506
507 pyo3_async_runtimes::tokio::future_into_py(py, async move {
508 if let Err(e) = client.unsubscribe_instruments().await {
509 log::error!("Failed to unsubscribe from instruments: {e}");
510 }
511 Ok(())
512 })
513 }
514
515 #[pyo3(name = "unsubscribe_instrument")]
516 fn py_unsubscribe_instrument<'py>(
517 &self,
518 py: Python<'py>,
519 instrument_id: InstrumentId,
520 ) -> PyResult<Bound<'py, PyAny>> {
521 let client = self.inner.clone();
522
523 pyo3_async_runtimes::tokio::future_into_py(py, async move {
524 if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
525 log::error!("Failed to unsubscribe from instrument: {e}");
526 }
527 Ok(())
528 })
529 }
530
531 #[pyo3(name = "unsubscribe_book")]
532 fn py_unsubscribe_book<'py>(
533 &self,
534 py: Python<'py>,
535 instrument_id: InstrumentId,
536 ) -> PyResult<Bound<'py, PyAny>> {
537 let client = self.inner.clone();
538
539 pyo3_async_runtimes::tokio::future_into_py(py, async move {
540 if let Err(e) = client.unsubscribe_book(instrument_id).await {
541 log::error!("Failed to unsubscribe from order book: {e}");
542 }
543 Ok(())
544 })
545 }
546
547 #[pyo3(name = "unsubscribe_book_25")]
548 fn py_unsubscribe_book_25<'py>(
549 &self,
550 py: Python<'py>,
551 instrument_id: InstrumentId,
552 ) -> PyResult<Bound<'py, PyAny>> {
553 let client = self.inner.clone();
554
555 pyo3_async_runtimes::tokio::future_into_py(py, async move {
556 if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
557 log::error!("Failed to unsubscribe from order book 25: {e}");
558 }
559 Ok(())
560 })
561 }
562
563 #[pyo3(name = "unsubscribe_book_depth10")]
564 fn py_unsubscribe_book_depth10<'py>(
565 &self,
566 py: Python<'py>,
567 instrument_id: InstrumentId,
568 ) -> PyResult<Bound<'py, PyAny>> {
569 let client = self.inner.clone();
570
571 pyo3_async_runtimes::tokio::future_into_py(py, async move {
572 if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
573 log::error!("Failed to unsubscribe from order book depth 10: {e}");
574 }
575 Ok(())
576 })
577 }
578
579 #[pyo3(name = "unsubscribe_quotes")]
580 fn py_unsubscribe_quotes<'py>(
581 &self,
582 py: Python<'py>,
583 instrument_id: InstrumentId,
584 ) -> PyResult<Bound<'py, PyAny>> {
585 let client = self.inner.clone();
586
587 pyo3_async_runtimes::tokio::future_into_py(py, async move {
588 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
589 log::error!("Failed to unsubscribe from quotes: {e}");
590 }
591 Ok(())
592 })
593 }
594
595 #[pyo3(name = "unsubscribe_trades")]
596 fn py_unsubscribe_trades<'py>(
597 &self,
598 py: Python<'py>,
599 instrument_id: InstrumentId,
600 ) -> PyResult<Bound<'py, PyAny>> {
601 let client = self.inner.clone();
602
603 pyo3_async_runtimes::tokio::future_into_py(py, async move {
604 if let Err(e) = client.unsubscribe_trades(instrument_id).await {
605 log::error!("Failed to unsubscribe from trades: {e}");
606 }
607 Ok(())
608 })
609 }
610
611 #[pyo3(name = "unsubscribe_mark_prices")]
612 fn py_unsubscribe_mark_prices<'py>(
613 &self,
614 py: Python<'py>,
615 instrument_id: InstrumentId,
616 ) -> PyResult<Bound<'py, PyAny>> {
617 let client = self.inner.clone();
618
619 pyo3_async_runtimes::tokio::future_into_py(py, async move {
620 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
621 log::error!("Failed to unsubscribe from mark prices: {e}");
622 }
623 Ok(())
624 })
625 }
626
627 #[pyo3(name = "unsubscribe_index_prices")]
628 fn py_unsubscribe_index_prices<'py>(
629 &self,
630 py: Python<'py>,
631 instrument_id: InstrumentId,
632 ) -> PyResult<Bound<'py, PyAny>> {
633 let client = self.inner.clone();
634
635 pyo3_async_runtimes::tokio::future_into_py(py, async move {
636 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
637 log::error!("Failed to unsubscribe from index prices: {e}");
638 }
639 Ok(())
640 })
641 }
642
643 #[pyo3(name = "unsubscribe_funding_rates")]
644 fn py_unsubscribe_funding_rates<'py>(
645 &self,
646 py: Python<'py>,
647 instrument_id: InstrumentId,
648 ) -> PyResult<Bound<'py, PyAny>> {
649 let client = self.inner.clone();
650 pyo3_async_runtimes::tokio::future_into_py(py, async move {
651 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
652 log::error!("Failed to unsubscribe from funding rates: {e}");
653 }
654 Ok(())
655 })
656 }
657
658 #[pyo3(name = "unsubscribe_bars")]
659 fn py_unsubscribe_bars<'py>(
660 &self,
661 py: Python<'py>,
662 bar_type: BarType,
663 ) -> PyResult<Bound<'py, PyAny>> {
664 let client = self.inner.clone();
665
666 pyo3_async_runtimes::tokio::future_into_py(py, async move {
667 if let Err(e) = client.unsubscribe_bars(bar_type).await {
668 log::error!("Failed to unsubscribe from bars: {e}");
669 }
670 Ok(())
671 })
672 }
673
674 #[pyo3(name = "subscribe_orders")]
675 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
676 let client = self.inner.clone();
677
678 pyo3_async_runtimes::tokio::future_into_py(py, async move {
679 if let Err(e) = client.subscribe_orders().await {
680 log::error!("Failed to subscribe to orders: {e}");
681 }
682 Ok(())
683 })
684 }
685
686 #[pyo3(name = "subscribe_executions")]
687 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
688 let client = self.inner.clone();
689
690 pyo3_async_runtimes::tokio::future_into_py(py, async move {
691 if let Err(e) = client.subscribe_executions().await {
692 log::error!("Failed to subscribe to executions: {e}");
693 }
694 Ok(())
695 })
696 }
697
698 #[pyo3(name = "subscribe_positions")]
699 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
700 let client = self.inner.clone();
701
702 pyo3_async_runtimes::tokio::future_into_py(py, async move {
703 if let Err(e) = client.subscribe_positions().await {
704 log::error!("Failed to subscribe to positions: {e}");
705 }
706 Ok(())
707 })
708 }
709
710 #[pyo3(name = "subscribe_margin")]
711 fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
712 let client = self.inner.clone();
713
714 pyo3_async_runtimes::tokio::future_into_py(py, async move {
715 if let Err(e) = client.subscribe_margin().await {
716 log::error!("Failed to subscribe to margin: {e}");
717 }
718 Ok(())
719 })
720 }
721
722 #[pyo3(name = "subscribe_wallet")]
723 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
724 let client = self.inner.clone();
725
726 pyo3_async_runtimes::tokio::future_into_py(py, async move {
727 if let Err(e) = client.subscribe_wallet().await {
728 log::error!("Failed to subscribe to wallet: {e}");
729 }
730 Ok(())
731 })
732 }
733
734 #[pyo3(name = "unsubscribe_orders")]
735 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
736 let client = self.inner.clone();
737
738 pyo3_async_runtimes::tokio::future_into_py(py, async move {
739 if let Err(e) = client.unsubscribe_orders().await {
740 log::error!("Failed to unsubscribe from orders: {e}");
741 }
742 Ok(())
743 })
744 }
745
746 #[pyo3(name = "unsubscribe_executions")]
747 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
748 let client = self.inner.clone();
749
750 pyo3_async_runtimes::tokio::future_into_py(py, async move {
751 if let Err(e) = client.unsubscribe_executions().await {
752 log::error!("Failed to unsubscribe from executions: {e}");
753 }
754 Ok(())
755 })
756 }
757
758 #[pyo3(name = "unsubscribe_positions")]
759 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
760 let client = self.inner.clone();
761
762 pyo3_async_runtimes::tokio::future_into_py(py, async move {
763 if let Err(e) = client.unsubscribe_positions().await {
764 log::error!("Failed to unsubscribe from positions: {e}");
765 }
766 Ok(())
767 })
768 }
769
770 #[pyo3(name = "unsubscribe_margin")]
771 fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
772 let client = self.inner.clone();
773
774 pyo3_async_runtimes::tokio::future_into_py(py, async move {
775 if let Err(e) = client.unsubscribe_margin().await {
776 log::error!("Failed to unsubscribe from margin: {e}");
777 }
778 Ok(())
779 })
780 }
781
782 #[pyo3(name = "unsubscribe_wallet")]
783 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
784 let client = self.inner.clone();
785
786 pyo3_async_runtimes::tokio::future_into_py(py, async move {
787 if let Err(e) = client.unsubscribe_wallet().await {
788 log::error!("Failed to unsubscribe from wallet: {e}");
789 }
790 Ok(())
791 })
792 }
793}
794
795#[expect(clippy::too_many_arguments)]
796fn handle_table_message(
797 table_msg: BitmexTableMessage,
798 instruments_cache: &Arc<AtomicMap<Ustr, InstrumentAny>>,
799 quote_cache: &mut QuoteCache,
800 order_type_cache: &mut AHashMap<ClientOrderId, OrderType>,
801 order_symbol_cache: &mut AHashMap<ClientOrderId, Ustr>,
802 dispatch_state: &WsDispatchState,
803 trader_id: TraderId,
804 account_id: AccountId,
805 ts_init: UnixNanos,
806 call_soon: &Py<PyAny>,
807 callback: &Py<PyAny>,
808) {
809 if let BitmexTableMessage::Instrument { action, data } = table_msg {
810 handle_instrument_messages(
811 action,
812 data,
813 instruments_cache,
814 ts_init,
815 call_soon,
816 callback,
817 );
818 return;
819 }
820
821 let instruments = instruments_cache.load();
822
823 match table_msg {
824 BitmexTableMessage::OrderBookL2 { action, data }
825 | BitmexTableMessage::OrderBookL2_25 { action, data } => {
826 if !data.is_empty() {
827 for d in parse_book_msg_vec(data, action, &instruments, ts_init) {
828 send_data_to_python(d, call_soon, callback);
829 }
830 }
831 }
832 BitmexTableMessage::OrderBook10 { data, .. } => {
833 if !data.is_empty() {
834 for d in parse_book10_msg_vec(data, &instruments, ts_init) {
835 send_data_to_python(d, call_soon, callback);
836 }
837 }
838 }
839 BitmexTableMessage::Quote { data, .. } => {
840 handle_quote_messages(
841 data,
842 &instruments,
843 quote_cache,
844 ts_init,
845 call_soon,
846 callback,
847 );
848 }
849 BitmexTableMessage::Trade { data, .. } => {
850 if !data.is_empty() {
851 for d in parse_trade_msg_vec(data, &instruments, ts_init) {
852 send_data_to_python(d, call_soon, callback);
853 }
854 }
855 }
856 BitmexTableMessage::TradeBin1m { action, data } => {
857 if action != BitmexAction::Partial && !data.is_empty() {
858 for d in
859 parse_trade_bin_msg_vec(data, &BitmexWsTopic::TradeBin1m, &instruments, ts_init)
860 {
861 send_data_to_python(d, call_soon, callback);
862 }
863 }
864 }
865 BitmexTableMessage::TradeBin5m { action, data } => {
866 if action != BitmexAction::Partial && !data.is_empty() {
867 for d in
868 parse_trade_bin_msg_vec(data, &BitmexWsTopic::TradeBin5m, &instruments, ts_init)
869 {
870 send_data_to_python(d, call_soon, callback);
871 }
872 }
873 }
874 BitmexTableMessage::TradeBin1h { action, data } => {
875 if action != BitmexAction::Partial && !data.is_empty() {
876 for d in
877 parse_trade_bin_msg_vec(data, &BitmexWsTopic::TradeBin1h, &instruments, ts_init)
878 {
879 send_data_to_python(d, call_soon, callback);
880 }
881 }
882 }
883 BitmexTableMessage::TradeBin1d { action, data } => {
884 if action != BitmexAction::Partial && !data.is_empty() {
885 for d in
886 parse_trade_bin_msg_vec(data, &BitmexWsTopic::TradeBin1d, &instruments, ts_init)
887 {
888 send_data_to_python(d, call_soon, callback);
889 }
890 }
891 }
892 BitmexTableMessage::Funding { data, .. } => {
893 for msg in data {
894 send_to_python(parse_funding_msg(&msg, ts_init), call_soon, callback);
895 }
896 }
897 BitmexTableMessage::Order { data, .. } => {
898 handle_order_messages(
899 data,
900 &instruments,
901 order_type_cache,
902 order_symbol_cache,
903 dispatch_state,
904 trader_id,
905 account_id,
906 ts_init,
907 call_soon,
908 callback,
909 );
910 }
911 BitmexTableMessage::Execution { data, .. } => {
912 handle_execution_messages(
913 data,
914 &instruments,
915 order_symbol_cache,
916 dispatch_state,
917 trader_id,
918 ts_init,
919 call_soon,
920 callback,
921 );
922 }
923 BitmexTableMessage::Position { data, .. } => {
924 for msg in data {
925 let Some(instrument) = instruments.get(&msg.symbol) else {
926 log::warn!("Instrument cache miss for position symbol={}", msg.symbol);
927 continue;
928 };
929
930 send_to_python(
931 parse_position_msg(&msg, instrument, ts_init),
932 call_soon,
933 callback,
934 );
935 }
936 }
937 BitmexTableMessage::Wallet { data, .. } => {
938 for msg in data {
939 send_to_python(parse_wallet_msg(&msg, ts_init), call_soon, callback);
940 }
941 }
942 BitmexTableMessage::Margin { .. } => {}
943 _ => {
944 log::debug!("Unhandled table message type in Python WebSocket client");
945 }
946 }
947}
948
949fn handle_quote_messages(
950 data: Vec<BitmexQuoteMsg>,
951 instruments: &AHashMap<Ustr, InstrumentAny>,
952 quote_cache: &mut QuoteCache,
953 ts_init: UnixNanos,
954 call_soon: &Py<PyAny>,
955 callback: &Py<PyAny>,
956) {
957 for msg in data {
958 let Some(instrument) = instruments.get(&msg.symbol) else {
959 log::error!(
960 "Instrument cache miss: quote dropped for symbol={}",
961 msg.symbol,
962 );
963 continue;
964 };
965
966 let instrument_id = instrument.id();
967 let price_precision = instrument.price_precision();
968
969 let bid_price = msg.bid_price.map(|p| Price::new(p, price_precision));
970 let ask_price = msg.ask_price.map(|p| Price::new(p, price_precision));
971 let bid_size = msg
972 .bid_size
973 .map(|s| parse_contracts_quantity(s, instrument));
974 let ask_size = msg
975 .ask_size
976 .map(|s| parse_contracts_quantity(s, instrument));
977 let ts_event = UnixNanos::from(msg.timestamp);
978
979 match quote_cache.process(
980 instrument_id,
981 bid_price,
982 ask_price,
983 bid_size,
984 ask_size,
985 ts_event,
986 ts_init,
987 ) {
988 Ok(quote) => send_data_to_python(Data::Quote(quote), call_soon, callback),
989 Err(e) => {
990 log::warn!("Failed to process quote for {}: {e}", msg.symbol);
991 }
992 }
993 }
994}
995
996fn handle_instrument_messages(
997 action: BitmexAction,
998 data: Vec<BitmexInstrumentMsg>,
999 instruments_cache: &Arc<AtomicMap<Ustr, InstrumentAny>>,
1000 ts_init: UnixNanos,
1001 call_soon: &Py<PyAny>,
1002 callback: &Py<PyAny>,
1003) {
1004 if action == BitmexAction::Partial || action == BitmexAction::Insert {
1005 let data_for_prices = data.clone();
1006
1007 let mut new_instruments: Vec<(Ustr, InstrumentAny)> = Vec::new();
1008
1009 for msg in data {
1010 match msg.try_into() {
1011 Ok(http_inst) => match parse_instrument_any(&http_inst, ts_init) {
1012 InstrumentParseResult::Ok(boxed) => {
1013 let inst = *boxed;
1014 let symbol = inst.symbol().inner();
1015 new_instruments.push((symbol, inst));
1016 }
1017 InstrumentParseResult::Unsupported { .. }
1018 | InstrumentParseResult::Inactive { .. } => {}
1019 InstrumentParseResult::Failed { symbol, error, .. } => {
1020 log::warn!("Failed to parse instrument {symbol}: {error}");
1021 }
1022 },
1023 Err(e) => {
1024 log::debug!("Skipping instrument (missing required fields): {e}");
1025 }
1026 }
1027 }
1028
1029 instruments_cache.rcu(|m| {
1030 for (symbol, inst) in &new_instruments {
1031 m.insert(*symbol, inst.clone());
1032 }
1033 });
1034
1035 for (_, inst) in &new_instruments {
1036 Python::attach(|py| {
1037 if let Ok(py_obj) = instrument_any_to_pyobject(py, inst.clone()) {
1038 call_python_threadsafe(py, call_soon, callback, py_obj);
1039 }
1040 });
1041 }
1042
1043 let cache = instruments_cache.load();
1044 for msg in data_for_prices {
1045 for d in parse_instrument_msg(&msg, &cache, ts_init) {
1046 send_data_to_python(d, call_soon, callback);
1047 }
1048 }
1049 } else {
1050 for msg in &data {
1051 if let Some(state_str) = &msg.state
1052 && let Ok(state) =
1053 serde_json::from_str::<BitmexInstrumentState>(&format!("\"{state_str}\""))
1054 {
1055 let instrument_id = parse_instrument_id(msg.symbol);
1056 let action = MarketStatusAction::from(&state);
1057 let is_trading = Some(state == BitmexInstrumentState::Open);
1058 let ts_event =
1059 parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
1060 let status = InstrumentStatus::new(
1061 instrument_id,
1062 action,
1063 ts_event,
1064 ts_init,
1065 None,
1066 None,
1067 is_trading,
1068 None,
1069 None,
1070 );
1071 send_to_python(status, call_soon, callback);
1072 }
1073 }
1074
1075 let cache = instruments_cache.load();
1076 for msg in data {
1077 for d in parse_instrument_msg(&msg, &cache, ts_init) {
1078 send_data_to_python(d, call_soon, callback);
1079 }
1080 }
1081 }
1082}
1083
1084#[expect(clippy::too_many_arguments)]
1085fn handle_order_messages(
1086 data: Vec<OrderData>,
1087 instruments: &AHashMap<Ustr, InstrumentAny>,
1088 order_type_cache: &mut AHashMap<ClientOrderId, OrderType>,
1089 order_symbol_cache: &mut AHashMap<ClientOrderId, Ustr>,
1090 dispatch_state: &WsDispatchState,
1091 trader_id: TraderId,
1092 account_id: AccountId,
1093 ts_init: UnixNanos,
1094 call_soon: &Py<PyAny>,
1095 callback: &Py<PyAny>,
1096) {
1097 for order_data in data {
1098 match order_data {
1099 OrderData::Full(order_msg) => {
1100 let Some(instrument) = instruments.get(&order_msg.symbol) else {
1101 log::warn!(
1102 "Instrument cache miss for order symbol={}",
1103 order_msg.symbol
1104 );
1105 continue;
1106 };
1107
1108 let client_order_id = order_msg.cl_ord_id.map(ClientOrderId::new);
1109
1110 if let Some(ref cid) = client_order_id {
1111 if let Some(ord_type) = &order_msg.ord_type {
1112 let order_type: OrderType = if *ord_type == BitmexOrderType::Pegged
1113 && order_msg.peg_price_type == Some(BitmexPegPriceType::TrailingStopPeg)
1114 {
1115 if order_msg.price.is_some() {
1116 OrderType::TrailingStopLimit
1117 } else {
1118 OrderType::TrailingStopMarket
1119 }
1120 } else {
1121 (*ord_type).into()
1122 };
1123 order_type_cache.insert(*cid, order_type);
1124 }
1125 order_symbol_cache.insert(*cid, order_msg.symbol);
1126 }
1127
1128 let identity = client_order_id.and_then(|cid| {
1129 dispatch_state
1130 .order_identities
1131 .get(&cid)
1132 .map(|r| (cid, r.clone()))
1133 });
1134
1135 if let Some((cid, ident)) = identity {
1136 if let Some(event) = parse_order_event(
1137 &order_msg,
1138 cid,
1139 account_id,
1140 trader_id,
1141 ident.strategy_id,
1142 ts_init,
1143 ) {
1144 let venue_order_id = VenueOrderId::new(order_msg.order_id.to_string());
1145 dispatch_order_event_to_python(
1146 event,
1147 cid,
1148 account_id,
1149 venue_order_id,
1150 &ident,
1151 dispatch_state,
1152 trader_id,
1153 ts_init,
1154 call_soon,
1155 callback,
1156 );
1157 }
1158
1159 if order_msg.ord_status.is_terminal() {
1160 order_type_cache.remove(&cid);
1161 order_symbol_cache.remove(&cid);
1162 }
1163 } else {
1164 match parse_order_msg(&order_msg, instrument, order_type_cache, ts_init) {
1165 Ok(report) => {
1166 if report.order_status.is_closed()
1167 && let Some(cid) = report.client_order_id
1168 {
1169 order_type_cache.remove(&cid);
1170 order_symbol_cache.remove(&cid);
1171 }
1172 send_to_python(report, call_soon, callback);
1173 }
1174 Err(e) => log::error!("Failed to parse order message: {e}"),
1175 }
1176 }
1177 }
1178 OrderData::Update(msg) => {
1179 if let Some(cl_ord_id) = &msg.cl_ord_id {
1180 let cid = ClientOrderId::new(cl_ord_id);
1181 order_symbol_cache.insert(cid, msg.symbol);
1182 }
1183
1184 let Some(instrument) = instruments.get(&msg.symbol) else {
1185 log::warn!(
1186 "Instrument cache miss for order update symbol={}",
1187 msg.symbol,
1188 );
1189 continue;
1190 };
1191
1192 let identity = msg.cl_ord_id.as_ref().and_then(|cl| {
1193 let cid = ClientOrderId::new(cl);
1194 dispatch_state
1195 .order_identities
1196 .get(&cid)
1197 .map(|r| (cid, r.clone()))
1198 });
1199
1200 if let Some((cid, ident)) = identity {
1201 if let Some(event) =
1202 parse_order_update_msg(&msg, instrument, account_id, ts_init)
1203 {
1204 let enriched = OrderUpdated::new(
1205 trader_id,
1206 ident.strategy_id,
1207 event.instrument_id,
1208 cid,
1209 event.quantity,
1210 event.event_id,
1211 event.ts_event,
1212 event.ts_init,
1213 false,
1214 event.venue_order_id,
1215 Some(account_id),
1216 event.price,
1217 event.trigger_price,
1218 event.protection_price,
1219 false, );
1221 let venue_order_id = enriched
1222 .venue_order_id
1223 .unwrap_or_else(|| VenueOrderId::new(msg.order_id.to_string()));
1224 ensure_accepted_to_python(
1225 cid,
1226 account_id,
1227 venue_order_id,
1228 &ident,
1229 dispatch_state,
1230 trader_id,
1231 ts_init,
1232 call_soon,
1233 callback,
1234 );
1235 send_to_python(enriched, call_soon, callback);
1236 }
1237 } else {
1238 log::debug!(
1239 "Skipping order update for untracked order: order_id={}",
1240 msg.order_id,
1241 );
1242 }
1243 }
1244 }
1245 }
1246}
1247
1248#[expect(clippy::too_many_arguments)]
1249fn handle_execution_messages(
1250 data: Vec<BitmexExecutionMsg>,
1251 instruments: &AHashMap<Ustr, InstrumentAny>,
1252 order_symbol_cache: &AHashMap<ClientOrderId, Ustr>,
1253 dispatch_state: &WsDispatchState,
1254 trader_id: TraderId,
1255 ts_init: UnixNanos,
1256 call_soon: &Py<PyAny>,
1257 callback: &Py<PyAny>,
1258) {
1259 for exec_msg in data {
1260 let symbol = exec_msg.symbol.or_else(|| {
1261 exec_msg
1262 .cl_ord_id
1263 .map(ClientOrderId::new)
1264 .and_then(|cid| order_symbol_cache.get(&cid).copied())
1265 });
1266
1267 let Some(symbol) = symbol else {
1268 if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
1269 if exec_msg.exec_type == Some(BitmexExecType::Trade) {
1270 log::warn!(
1271 "Execution missing symbol and not in cache: \
1272 cl_ord_id={cl_ord_id}, exec_id={:?}",
1273 exec_msg.exec_id,
1274 );
1275 } else {
1276 log::debug!(
1277 "Execution missing symbol and not in cache: \
1278 cl_ord_id={cl_ord_id}, exec_type={:?}",
1279 exec_msg.exec_type,
1280 );
1281 }
1282 } else if exec_msg.exec_type == Some(BitmexExecType::CancelReject) {
1283 log::debug!(
1284 "CancelReject missing symbol/clOrdID: exec_id={:?}, order_id={:?}",
1285 exec_msg.exec_id,
1286 exec_msg.order_id,
1287 );
1288 } else {
1289 log::warn!(
1290 "Execution missing both symbol and clOrdID: \
1291 exec_id={:?}, order_id={:?}, exec_type={:?}",
1292 exec_msg.exec_id,
1293 exec_msg.order_id,
1294 exec_msg.exec_type,
1295 );
1296 }
1297 continue;
1298 };
1299
1300 let Some(instrument) = instruments.get(&symbol) else {
1301 log::warn!("Instrument cache miss for execution symbol={symbol}");
1302 continue;
1303 };
1304
1305 let Some(fill) = parse_execution_msg(exec_msg, instrument, ts_init) else {
1306 continue;
1307 };
1308
1309 let identity = fill.client_order_id.and_then(|cid| {
1310 dispatch_state
1311 .order_identities
1312 .get(&cid)
1313 .map(|r| (cid, r.clone()))
1314 });
1315
1316 if let Some((cid, ident)) = identity {
1317 let venue_order_id = fill.venue_order_id;
1318 ensure_accepted_to_python(
1319 cid,
1320 fill.account_id,
1321 venue_order_id,
1322 &ident,
1323 dispatch_state,
1324 trader_id,
1325 ts_init,
1326 call_soon,
1327 callback,
1328 );
1329 dispatch_state.insert_filled(cid);
1330 dispatch_state.remove_triggered(&cid);
1331 let filled =
1332 fill_report_to_order_filled(&fill, trader_id, &ident, instrument.quote_currency());
1333 send_to_python(filled, call_soon, callback);
1334 } else {
1335 send_to_python(fill, call_soon, callback);
1336 }
1337 }
1338}
1339
1340#[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
1342fn dispatch_order_event_to_python(
1343 event: ParsedOrderEvent,
1344 client_order_id: ClientOrderId,
1345 account_id: AccountId,
1346 venue_order_id: VenueOrderId,
1347 identity: &OrderIdentity,
1348 state: &WsDispatchState,
1349 trader_id: TraderId,
1350 ts_init: UnixNanos,
1351 call_soon: &Py<PyAny>,
1352 callback: &Py<PyAny>,
1353) {
1354 let is_terminal;
1355
1356 match event {
1357 ParsedOrderEvent::Accepted(e) => {
1358 if state.accepted_contains(&client_order_id)
1359 || state.filled_contains(&client_order_id)
1360 || state.triggered_contains(&client_order_id)
1361 {
1362 log::debug!("Skipping duplicate Accepted for {client_order_id}");
1363 return;
1364 }
1365 state.insert_accepted(client_order_id);
1366 is_terminal = false;
1367 send_to_python(e, call_soon, callback);
1368 }
1369 ParsedOrderEvent::Triggered(e) => {
1370 if state.filled_contains(&client_order_id) {
1371 log::debug!("Skipping stale Triggered for {client_order_id} (already filled)");
1372 return;
1373 }
1374 ensure_accepted_to_python(
1375 client_order_id,
1376 account_id,
1377 venue_order_id,
1378 identity,
1379 state,
1380 trader_id,
1381 ts_init,
1382 call_soon,
1383 callback,
1384 );
1385 state.insert_triggered(client_order_id);
1386 is_terminal = false;
1387 send_to_python(e, call_soon, callback);
1388 }
1389 ParsedOrderEvent::Canceled(e) => {
1390 ensure_accepted_to_python(
1391 client_order_id,
1392 account_id,
1393 venue_order_id,
1394 identity,
1395 state,
1396 trader_id,
1397 ts_init,
1398 call_soon,
1399 callback,
1400 );
1401 state.remove_triggered(&client_order_id);
1402 state.remove_filled(&client_order_id);
1403 is_terminal = true;
1404 send_to_python(e, call_soon, callback);
1405 }
1406 ParsedOrderEvent::Expired(e) => {
1407 ensure_accepted_to_python(
1408 client_order_id,
1409 account_id,
1410 venue_order_id,
1411 identity,
1412 state,
1413 trader_id,
1414 ts_init,
1415 call_soon,
1416 callback,
1417 );
1418 state.remove_triggered(&client_order_id);
1419 state.remove_filled(&client_order_id);
1420 is_terminal = true;
1421 send_to_python(e, call_soon, callback);
1422 }
1423 ParsedOrderEvent::Rejected(e) => {
1424 state.remove_triggered(&client_order_id);
1425 state.remove_filled(&client_order_id);
1426 is_terminal = true;
1427 send_to_python(e, call_soon, callback);
1428 }
1429 }
1430
1431 if is_terminal {
1432 state.order_identities.remove(&client_order_id);
1433 state.remove_accepted(&client_order_id);
1434 }
1435}
1436
1437#[expect(clippy::too_many_arguments)]
1439fn ensure_accepted_to_python(
1440 client_order_id: ClientOrderId,
1441 account_id: AccountId,
1442 venue_order_id: VenueOrderId,
1443 identity: &OrderIdentity,
1444 state: &WsDispatchState,
1445 trader_id: TraderId,
1446 ts_init: UnixNanos,
1447 call_soon: &Py<PyAny>,
1448 callback: &Py<PyAny>,
1449) {
1450 if state.accepted_contains(&client_order_id) {
1451 return;
1452 }
1453 state.insert_accepted(client_order_id);
1454 let accepted = OrderAccepted::new(
1455 trader_id,
1456 identity.strategy_id,
1457 identity.instrument_id,
1458 client_order_id,
1459 venue_order_id,
1460 account_id,
1461 UUID4::new(),
1462 ts_init,
1463 ts_init,
1464 false,
1465 );
1466 send_to_python(accepted, call_soon, callback);
1467}
1468
1469fn send_data_to_python(data: Data, call_soon: &Py<PyAny>, callback: &Py<PyAny>) {
1470 Python::attach(|py| {
1471 let py_obj = data_to_pycapsule(py, data);
1472 call_python_threadsafe(py, call_soon, callback, py_obj);
1473 });
1474}
1475
1476fn send_to_python<T: for<'py> IntoPyObjectExt<'py>>(
1477 value: T,
1478 call_soon: &Py<PyAny>,
1479 callback: &Py<PyAny>,
1480) {
1481 Python::attach(|py| {
1482 if let Ok(py_obj) = value.into_py_any(py) {
1483 call_python_threadsafe(py, call_soon, callback, py_obj);
1484 }
1485 });
1486}