Skip to main content

nautilus_kraken/python/
websocket_spot.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Kraken WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc`:
23//!
24//! - `ws_client: Option<Arc<WebSocketClient>>` - The WebSocket connection.
25//! - `subscriptions: Arc<DashMap<String, KrakenWsChannel>>` - Subscription tracking.
26//!
27//! Without shared state, clones would be independent, causing:
28//! - Lost WebSocket messages.
29//! - Missing subscription data.
30//! - Connection state desynchronization.
31//!
32//! ## Connection Flow
33//!
34//! 1. Clone the client for async operation.
35//! 2. Connect and populate shared state on the clone.
36//! 3. Spawn stream handler as background task.
37//! 4. Return immediately (non-blocking).
38//!
39//! ## Important Notes
40//!
41//! - Never use `block_on()` - it blocks the runtime.
42//! - Always clone before async blocks for lifetime requirements.
43
44use std::sync::{
45    Arc,
46    atomic::{AtomicU64, Ordering},
47};
48
49use futures_util::StreamExt;
50use nautilus_common::live::get_runtime;
51use nautilus_core::{
52    AtomicMap,
53    python::{call_python_threadsafe, to_pyruntime_err},
54    time::get_atomic_clock_realtime,
55};
56use nautilus_model::{
57    data::{BarType, Data, OrderBookDeltas, OrderBookDeltas_API},
58    identifiers::{
59        AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
60    },
61    instruments::{Instrument, InstrumentAny},
62    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
63    reports::{FillReport, OrderStatusReport},
64};
65use pyo3::{IntoPyObjectExt, prelude::*};
66use tokio_util::sync::CancellationToken;
67
68use crate::{
69    common::{
70        consts::KRAKEN_VENUE,
71        enums::{KrakenEnvironment, KrakenProductType},
72        urls::get_kraken_ws_private_url,
73    },
74    config::KrakenDataClientConfig,
75    websocket::spot_v2::{
76        client::KrakenSpotWebSocketClient,
77        messages::KrakenSpotWsMessage,
78        parse::{
79            parse_book_deltas, parse_quote_tick, parse_trade_tick, parse_ws_bar,
80            parse_ws_fill_report, parse_ws_order_status_report,
81        },
82    },
83};
84
85#[pymethods]
86#[pyo3_stub_gen::derive::gen_stub_pymethods]
87impl KrakenSpotWebSocketClient {
88    /// WebSocket client for the Kraken Spot v2 streaming API.
89    #[new]
90    #[pyo3(signature = (environment=None, private=false, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None, proxy_url=None))]
91    fn py_new(
92        environment: Option<KrakenEnvironment>,
93        private: bool,
94        base_url: Option<String>,
95        heartbeat_secs: Option<u64>,
96        api_key: Option<String>,
97        api_secret: Option<String>,
98        proxy_url: Option<String>,
99    ) -> Self {
100        let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
101
102        let (resolved_api_key, resolved_api_secret) =
103            crate::common::credential::KrakenCredential::resolve_spot(api_key, api_secret)
104                .map(|c| c.into_parts())
105                .map_or((None, None), |(k, s)| (Some(k), Some(s)));
106
107        let (ws_public_url, ws_private_url) = if private {
108            // Use provided URL or default to the private endpoint
109            let private_url = base_url.unwrap_or_else(|| {
110                get_kraken_ws_private_url(KrakenProductType::Spot, env).to_string()
111            });
112            (None, Some(private_url))
113        } else {
114            (base_url, None)
115        };
116
117        let config = KrakenDataClientConfig {
118            environment: env,
119            ws_public_url,
120            ws_private_url,
121            heartbeat_interval_secs: heartbeat_secs
122                .unwrap_or(KrakenDataClientConfig::default().heartbeat_interval_secs),
123            api_key: resolved_api_key,
124            api_secret: resolved_api_secret,
125            proxy_url: proxy_url.clone(),
126            ..Default::default()
127        };
128
129        let token = CancellationToken::new();
130
131        Self::new(config, token, proxy_url)
132    }
133
134    /// Returns the WebSocket URL.
135    #[getter]
136    #[pyo3(name = "url")]
137    #[must_use]
138    pub fn py_url(&self) -> &str {
139        self.url()
140    }
141
142    /// Returns true if connected (not closed).
143    #[pyo3(name = "is_connected")]
144    fn py_is_connected(&self) -> bool {
145        self.is_connected()
146    }
147
148    /// Returns true if the connection is active.
149    #[pyo3(name = "is_active")]
150    fn py_is_active(&self) -> bool {
151        self.is_active()
152    }
153
154    /// Returns true if the connection is closed.
155    #[pyo3(name = "is_closed")]
156    fn py_is_closed(&self) -> bool {
157        self.is_closed()
158    }
159
160    /// Returns all active subscriptions.
161    #[pyo3(name = "get_subscriptions")]
162    fn py_get_subscriptions(&self) -> Vec<String> {
163        self.get_subscriptions()
164    }
165
166    /// Cancels all pending requests.
167    #[pyo3(name = "cancel_all_requests")]
168    fn py_cancel_all_requests(&self) {
169        self.cancel_all_requests();
170    }
171
172    /// Connects to the WebSocket server.
173    #[pyo3(name = "connect")]
174    #[expect(clippy::needless_pass_by_value)]
175    fn py_connect<'py>(
176        &mut self,
177        py: Python<'py>,
178        loop_: Py<PyAny>,
179        instruments: Vec<Py<PyAny>>,
180        callback: Py<PyAny>,
181    ) -> PyResult<Bound<'py, PyAny>> {
182        let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
183
184        let instruments_map = Arc::new(AtomicMap::<InstrumentId, InstrumentAny>::new());
185
186        for inst in instruments {
187            let inst_any = pyobject_to_instrument_any(py, inst)?;
188            instruments_map.insert(inst_any.id(), inst_any);
189        }
190
191        let account_id = self.account_id_shared().clone();
192        let truncated_id_map = self.truncated_id_map().clone();
193        let mut client = self.clone();
194
195        pyo3_async_runtimes::tokio::future_into_py(py, async move {
196            client.connect().await.map_err(to_pyruntime_err)?;
197
198            let stream = client.stream().map_err(to_pyruntime_err)?;
199            let clock = get_atomic_clock_realtime();
200            let book_sequence = Arc::new(AtomicU64::new(0));
201
202            get_runtime().spawn(async move {
203                tokio::pin!(stream);
204                let order_qty_cache: Arc<AtomicMap<String, f64>> =
205                    Arc::new(AtomicMap::new());
206
207                while let Some(msg) = stream.next().await {
208                    let ts_init = clock.get_time_ns();
209
210                    match msg {
211                        KrakenSpotWsMessage::Ticker(tickers) => {
212                            for ticker in &tickers {
213                                let instrument_id = InstrumentId::new(
214                                    Symbol::new(ticker.symbol.as_str()),
215                                    *KRAKEN_VENUE,
216                                );
217                                let instrument =
218                                    instruments_map.load().get(&instrument_id).cloned();
219
220                                if let Some(ref inst) = instrument {
221                                    match parse_quote_tick(ticker, inst, ts_init) {
222                                        Ok(quote) => {
223                                            Python::attach(|py| {
224                                                let py_obj =
225                                                    data_to_pycapsule(py, Data::Quote(quote));
226                                                call_python_threadsafe(
227                                                    py, &call_soon, &callback, py_obj,
228                                                );
229                                            });
230                                        }
231                                        Err(e) => {
232                                            log::error!("Failed to parse quote tick: {e}");
233                                        }
234                                    }
235                                }
236                            }
237                        }
238                        KrakenSpotWsMessage::Trade(trades) => {
239                            for trade in &trades {
240                                let instrument_id = InstrumentId::new(
241                                    Symbol::new(trade.symbol.as_str()),
242                                    *KRAKEN_VENUE,
243                                );
244                                let instrument =
245                                    instruments_map.load().get(&instrument_id).cloned();
246
247                                if let Some(ref inst) = instrument {
248                                    match parse_trade_tick(trade, inst, ts_init) {
249                                        Ok(tick) => {
250                                            Python::attach(|py| {
251                                                let py_obj =
252                                                    data_to_pycapsule(py, Data::Trade(tick));
253                                                call_python_threadsafe(
254                                                    py, &call_soon, &callback, py_obj,
255                                                );
256                                            });
257                                        }
258                                        Err(e) => {
259                                            log::error!("Failed to parse trade tick: {e}");
260                                        }
261                                    }
262                                }
263                            }
264                        }
265                        KrakenSpotWsMessage::Book {
266                            data,
267                            is_snapshot: _,
268                        } => {
269                            for book in &data {
270                                let instrument_id = InstrumentId::new(
271                                    Symbol::new(book.symbol.as_str()),
272                                    *KRAKEN_VENUE,
273                                );
274                                let instrument =
275                                    instruments_map.load().get(&instrument_id).cloned();
276
277                                if let Some(ref inst) = instrument {
278                                    let sequence = book_sequence.fetch_add(1, Ordering::Relaxed);
279                                    match parse_book_deltas(book, inst, sequence, ts_init) {
280                                        Ok(delta_vec) => {
281                                            if delta_vec.is_empty() {
282                                                continue;
283                                            }
284                                            let deltas = OrderBookDeltas::new(inst.id(), delta_vec);
285                                            Python::attach(|py| {
286                                                let py_obj = data_to_pycapsule(
287                                                    py,
288                                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
289                                                );
290                                                call_python_threadsafe(
291                                                    py, &call_soon, &callback, py_obj,
292                                                );
293                                            });
294                                        }
295                                        Err(e) => {
296                                            log::error!("Failed to parse book deltas: {e}");
297                                        }
298                                    }
299                                }
300                            }
301                        }
302                        KrakenSpotWsMessage::Ohlc(ohlc_data) => {
303                            for ohlc in &ohlc_data {
304                                let instrument_id = InstrumentId::new(
305                                    Symbol::new(ohlc.symbol.as_str()),
306                                    *KRAKEN_VENUE,
307                                );
308                                let instrument =
309                                    instruments_map.load().get(&instrument_id).cloned();
310
311                                if let Some(ref inst) = instrument {
312                                    match parse_ws_bar(ohlc, inst, ts_init) {
313                                        Ok(bar) => {
314                                            Python::attach(|py| {
315                                                let py_obj = data_to_pycapsule(py, Data::Bar(bar));
316                                                call_python_threadsafe(
317                                                    py, &call_soon, &callback, py_obj,
318                                                );
319                                            });
320                                        }
321                                        Err(e) => {
322                                            log::error!("Failed to parse bar: {e}");
323                                        }
324                                    }
325                                }
326                            }
327                        }
328                        KrakenSpotWsMessage::Execution(executions) => {
329                            let acct_id = account_id.read().ok().and_then(|g| *g);
330                            let Some(acct_id) = acct_id else {
331                                log::trace!(
332                                    "Execution message received but no account_id set (data-only client)"
333                                );
334                                continue;
335                            };
336
337                            for exec in &executions {
338                                let symbol = match &exec.symbol {
339                                    Some(s) => s.as_str(),
340                                    None => {
341                                        log::debug!(
342                                            "Execution without symbol: exec_type={:?}, order_id={}",
343                                            exec.exec_type,
344                                            exec.order_id
345                                        );
346                                        continue;
347                                    }
348                                };
349
350                                let instrument_id = InstrumentId::new(
351                                    Symbol::new(symbol),
352                                    *KRAKEN_VENUE,
353                                );
354                                let instrument =
355                                    instruments_map.load().get(&instrument_id).cloned();
356
357                                let Some(ref inst) = instrument else {
358                                    log::warn!("No instrument for symbol: {symbol}");
359                                    continue;
360                                };
361
362                                let cached_qty = exec.cl_ord_id.as_ref().and_then(|id| {
363                                    order_qty_cache.load().get(id).copied()
364                                });
365
366                                if let (Some(qty), Some(cl_ord_id)) =
367                                    (exec.order_qty, &exec.cl_ord_id)
368                                {
369                                    order_qty_cache.insert(cl_ord_id.clone(), qty);
370                                }
371
372                                match parse_ws_order_status_report(
373                                    exec, inst, acct_id, cached_qty, ts_init,
374                                ) {
375                                    Ok(mut report) => {
376                                        if let Some(ref cl_ord_id) = exec.cl_ord_id {
377                                            let full_id = truncated_id_map
378                                                .load()
379                                                .get(cl_ord_id)
380                                                .copied()
381                                                .unwrap_or_else(|| ClientOrderId::new(cl_ord_id));
382                                            report = report.with_client_order_id(full_id);
383                                        }
384                                        dispatch_order_status_report(
385                                            report, &call_soon, &callback,
386                                        );
387                                    }
388                                    Err(e) => {
389                                        log::error!("Failed to parse order status report: {e}");
390                                    }
391                                }
392
393                                if exec.exec_id.is_some() {
394                                    match parse_ws_fill_report(exec, inst, acct_id, ts_init) {
395                                        Ok(mut report) => {
396                                            if let Some(ref cl_ord_id) = exec.cl_ord_id {
397                                                let full_id = truncated_id_map
398                                                    .load()
399                                                    .get(cl_ord_id)
400                                                    .copied()
401                                                    .unwrap_or_else(|| {
402                                                        ClientOrderId::new(cl_ord_id)
403                                                    });
404                                                report.client_order_id = Some(full_id);
405                                            }
406                                            dispatch_fill_report(report, &call_soon, &callback);
407                                        }
408                                        Err(e) => {
409                                            log::error!("Failed to parse fill report: {e}");
410                                        }
411                                    }
412                                }
413                            }
414                        }
415                        KrakenSpotWsMessage::Reconnected => {
416                            log::info!("WebSocket reconnected");
417                        }
418                    }
419                }
420            });
421
422            Ok(())
423        })
424    }
425
426    /// Waits until the connection is active or timeout.
427    #[pyo3(name = "wait_until_active")]
428    fn py_wait_until_active<'py>(
429        &self,
430        py: Python<'py>,
431        timeout_secs: f64,
432    ) -> PyResult<Bound<'py, PyAny>> {
433        let client = self.clone();
434
435        pyo3_async_runtimes::tokio::future_into_py(py, async move {
436            client
437                .wait_until_active(timeout_secs)
438                .await
439                .map_err(to_pyruntime_err)?;
440            Ok(())
441        })
442    }
443
444    /// Authenticates with the Kraken API to enable private subscriptions.
445    #[pyo3(name = "authenticate")]
446    fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
447        let client = self.clone();
448
449        pyo3_async_runtimes::tokio::future_into_py(py, async move {
450            client.authenticate().await.map_err(to_pyruntime_err)?;
451            Ok(())
452        })
453    }
454
455    /// Returns true if the WebSocket is authenticated for private subscriptions.
456    #[pyo3(name = "is_authenticated")]
457    fn py_is_authenticated(&self) -> bool {
458        self.is_authenticated()
459    }
460
461    /// Waits until the WebSocket is authenticated or the timeout elapses.
462    ///
463    /// Returns an error on timeout or explicit auth failure.
464    #[pyo3(name = "wait_until_authenticated")]
465    fn py_wait_until_authenticated<'py>(
466        &self,
467        py: Python<'py>,
468        timeout_secs: f64,
469    ) -> PyResult<Bound<'py, PyAny>> {
470        let client = self.clone();
471
472        pyo3_async_runtimes::tokio::future_into_py(py, async move {
473            client
474                .wait_until_authenticated(timeout_secs)
475                .await
476                .map_err(to_pyruntime_err)?;
477            Ok(())
478        })
479    }
480
481    /// Disconnects from the WebSocket server.
482    #[pyo3(name = "disconnect")]
483    fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
484        let mut client = self.clone();
485
486        pyo3_async_runtimes::tokio::future_into_py(py, async move {
487            client.disconnect().await.map_err(to_pyruntime_err)?;
488            Ok(())
489        })
490    }
491
492    /// Sends a ping message to keep the connection alive.
493    #[pyo3(name = "send_ping")]
494    fn py_send_ping<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
495        let client = self.clone();
496
497        pyo3_async_runtimes::tokio::future_into_py(py, async move {
498            client.send_ping().await.map_err(to_pyruntime_err)?;
499            Ok(())
500        })
501    }
502
503    /// Closes the WebSocket connection.
504    #[pyo3(name = "close")]
505    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
506        let mut client = self.clone();
507
508        pyo3_async_runtimes::tokio::future_into_py(py, async move {
509            client.close().await.map_err(to_pyruntime_err)?;
510            Ok(())
511        })
512    }
513
514    /// Sets the account ID for execution report parsing.
515    #[pyo3(name = "set_account_id")]
516    fn py_set_account_id(&self, account_id: AccountId) {
517        self.set_account_id(account_id);
518    }
519
520    /// Caches an instrument for execution report parsing.
521    #[pyo3(name = "cache_instrument")]
522    fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
523        let inst_any = pyobject_to_instrument_any(py, instrument)?;
524        self.cache_instrument(inst_any);
525        Ok(())
526    }
527
528    /// Caches a client order for truncated ID resolution.
529    #[pyo3(name = "cache_client_order")]
530    fn py_cache_client_order(
531        &self,
532        client_order_id: ClientOrderId,
533        venue_order_id: Option<VenueOrderId>,
534        instrument_id: InstrumentId,
535        trader_id: TraderId,
536        strategy_id: StrategyId,
537    ) {
538        self.cache_client_order(
539            client_order_id,
540            venue_order_id,
541            instrument_id,
542            trader_id,
543            strategy_id,
544        );
545    }
546
547    /// Subscribes to order book updates for the given instrument.
548    #[pyo3(name = "subscribe_book")]
549    fn py_subscribe_book<'py>(
550        &self,
551        py: Python<'py>,
552        instrument_id: InstrumentId,
553        depth: Option<u32>,
554    ) -> PyResult<Bound<'py, PyAny>> {
555        let client = self.clone();
556
557        pyo3_async_runtimes::tokio::future_into_py(py, async move {
558            client
559                .subscribe_book(instrument_id, depth)
560                .await
561                .map_err(to_pyruntime_err)?;
562            Ok(())
563        })
564    }
565
566    /// Subscribes to quote updates for the given instrument.
567    ///
568    /// Uses the Ticker channel with `event_trigger: "bbo"` for updates only on
569    /// best bid/offer changes.
570    #[pyo3(name = "subscribe_quotes")]
571    fn py_subscribe_quotes<'py>(
572        &self,
573        py: Python<'py>,
574        instrument_id: InstrumentId,
575    ) -> PyResult<Bound<'py, PyAny>> {
576        let client = self.clone();
577
578        pyo3_async_runtimes::tokio::future_into_py(py, async move {
579            client
580                .subscribe_quotes(instrument_id)
581                .await
582                .map_err(to_pyruntime_err)?;
583            Ok(())
584        })
585    }
586
587    /// Subscribes to trade updates for the given instrument.
588    #[pyo3(name = "subscribe_trades")]
589    fn py_subscribe_trades<'py>(
590        &self,
591        py: Python<'py>,
592        instrument_id: InstrumentId,
593    ) -> PyResult<Bound<'py, PyAny>> {
594        let client = self.clone();
595
596        pyo3_async_runtimes::tokio::future_into_py(py, async move {
597            client
598                .subscribe_trades(instrument_id)
599                .await
600                .map_err(to_pyruntime_err)?;
601            Ok(())
602        })
603    }
604
605    /// Subscribes to bar/OHLC updates for the given bar type.
606    #[pyo3(name = "subscribe_bars")]
607    fn py_subscribe_bars<'py>(
608        &self,
609        py: Python<'py>,
610        bar_type: BarType,
611    ) -> PyResult<Bound<'py, PyAny>> {
612        let client = self.clone();
613
614        pyo3_async_runtimes::tokio::future_into_py(py, async move {
615            client
616                .subscribe_bars(bar_type)
617                .await
618                .map_err(to_pyruntime_err)?;
619            Ok(())
620        })
621    }
622
623    /// Subscribes to execution updates (order and fill events).
624    ///
625    /// Requires authentication - call `authenticate()` first.
626    #[pyo3(name = "subscribe_executions")]
627    #[pyo3(signature = (snap_orders=true, snap_trades=true))]
628    fn py_subscribe_executions<'py>(
629        &self,
630        py: Python<'py>,
631        snap_orders: bool,
632        snap_trades: bool,
633    ) -> PyResult<Bound<'py, PyAny>> {
634        let client = self.clone();
635
636        pyo3_async_runtimes::tokio::future_into_py(py, async move {
637            client
638                .subscribe_executions(snap_orders, snap_trades)
639                .await
640                .map_err(to_pyruntime_err)?;
641            Ok(())
642        })
643    }
644
645    /// Unsubscribes from order book updates for the given instrument.
646    #[pyo3(name = "unsubscribe_book")]
647    fn py_unsubscribe_book<'py>(
648        &self,
649        py: Python<'py>,
650        instrument_id: InstrumentId,
651    ) -> PyResult<Bound<'py, PyAny>> {
652        let client = self.clone();
653
654        pyo3_async_runtimes::tokio::future_into_py(py, async move {
655            client
656                .unsubscribe_book(instrument_id)
657                .await
658                .map_err(to_pyruntime_err)?;
659            Ok(())
660        })
661    }
662
663    /// Unsubscribes from quote updates for the given instrument.
664    #[pyo3(name = "unsubscribe_quotes")]
665    fn py_unsubscribe_quotes<'py>(
666        &self,
667        py: Python<'py>,
668        instrument_id: InstrumentId,
669    ) -> PyResult<Bound<'py, PyAny>> {
670        let client = self.clone();
671
672        pyo3_async_runtimes::tokio::future_into_py(py, async move {
673            client
674                .unsubscribe_quotes(instrument_id)
675                .await
676                .map_err(to_pyruntime_err)?;
677            Ok(())
678        })
679    }
680
681    /// Unsubscribes from trade updates for the given instrument.
682    #[pyo3(name = "unsubscribe_trades")]
683    fn py_unsubscribe_trades<'py>(
684        &self,
685        py: Python<'py>,
686        instrument_id: InstrumentId,
687    ) -> PyResult<Bound<'py, PyAny>> {
688        let client = self.clone();
689
690        pyo3_async_runtimes::tokio::future_into_py(py, async move {
691            client
692                .unsubscribe_trades(instrument_id)
693                .await
694                .map_err(to_pyruntime_err)?;
695            Ok(())
696        })
697    }
698
699    /// Unsubscribes from bar/OHLC updates for the given bar type.
700    #[pyo3(name = "unsubscribe_bars")]
701    fn py_unsubscribe_bars<'py>(
702        &self,
703        py: Python<'py>,
704        bar_type: BarType,
705    ) -> PyResult<Bound<'py, PyAny>> {
706        let client = self.clone();
707
708        pyo3_async_runtimes::tokio::future_into_py(py, async move {
709            client
710                .unsubscribe_bars(bar_type)
711                .await
712                .map_err(to_pyruntime_err)?;
713            Ok(())
714        })
715    }
716}
717
718fn dispatch_order_status_report(
719    report: OrderStatusReport,
720    call_soon: &Py<PyAny>,
721    callback: &Py<PyAny>,
722) {
723    Python::attach(|py| match report.into_py_any(py) {
724        Ok(py_obj) => {
725            call_python_threadsafe(py, call_soon, callback, py_obj);
726        }
727        Err(e) => {
728            log::error!("Failed to convert OrderStatusReport to Python: {e}");
729        }
730    });
731}
732
733fn dispatch_fill_report(report: FillReport, call_soon: &Py<PyAny>, callback: &Py<PyAny>) {
734    Python::attach(|py| match report.into_py_any(py) {
735        Ok(py_obj) => {
736            call_python_threadsafe(py, call_soon, callback, py_obj);
737        }
738        Err(e) => {
739            log::error!("Failed to convert FillReport to Python: {e}");
740        }
741    });
742}