Skip to main content

nautilus_architect_ax/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for Ax WebSocket clients.
17//!
18//! [`PyAxMdWebSocketClient`] and [`PyAxOrdersWebSocketClient`] wrap the Rust clients
19//! and add instrument caches at the Python boundary. The inner clients are pure network
20//! components that emit venue-specific types; these wrappers parse them into Nautilus
21//! domain objects before passing them to Python callbacks.
22
23use std::{
24    fmt::Debug,
25    sync::{Arc, Mutex},
26};
27
28use ahash::{AHashMap, AHashSet};
29use futures_util::StreamExt;
30use nautilus_common::live::get_runtime;
31use nautilus_core::{
32    AtomicMap, UUID4, UnixNanos,
33    python::{call_python_threadsafe, to_pyruntime_err},
34    time::{AtomicTime, get_atomic_clock_realtime},
35};
36use nautilus_model::{
37    data::{BarType, Data, InstrumentStatus, MarkPriceUpdate, OrderBookDeltas_API},
38    enums::{MarketStatusAction, OrderSide, OrderType, TimeInForce},
39    events::OrderCancelRejected,
40    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
41    instruments::{Instrument, InstrumentAny},
42    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
43    types::{Price, Quantity},
44};
45use nautilus_network::websocket::TransportBackend;
46use pyo3::{IntoPyObjectExt, prelude::*};
47use ustr::Ustr;
48
49use crate::{
50    common::{
51        enums::{AxCandleWidth, AxInstrumentState, AxMarketDataLevel},
52        parse::ax_timestamp_stn_to_unix_nanos,
53    },
54    execution::{
55        cleanup_terminal_order_tracking, create_order_accepted, create_order_canceled,
56        create_order_expired, create_order_filled, create_order_rejected,
57    },
58    http::models::AxOrderRejectReason,
59    websocket::{
60        data::{
61            AxMdWebSocketClient,
62            client::SymbolDataTypes,
63            parse::{
64                parse_book_l1_quote, parse_book_l2_deltas, parse_book_l3_deltas, parse_candle_bar,
65                parse_trade_tick,
66            },
67        },
68        messages::{AxDataWsMessage, AxMdCandle, AxMdMessage, AxOrdersWsMessage, AxWsOrderEvent},
69        orders::{AxOrdersWebSocketClient, OrdersCaches},
70    },
71};
72
73/// Python wrapper around [`AxMdWebSocketClient`] that holds an instrument cache
74/// at the Python boundary for parsing venue messages into Nautilus domain types.
75#[pyclass(
76    name = "AxMdWebSocketClient",
77    module = "nautilus_trader.core.nautilus_pyo3.architect"
78)]
79#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.architect_ax")]
80pub struct PyAxMdWebSocketClient {
81    inner: AxMdWebSocketClient,
82    instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
83}
84
85impl Debug for PyAxMdWebSocketClient {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        f.debug_struct(stringify!(PyAxMdWebSocketClient))
88            .field("inner", &self.inner)
89            .finish_non_exhaustive()
90    }
91}
92
93#[pymethods]
94#[pyo3_stub_gen::derive::gen_stub_pymethods]
95impl PyAxMdWebSocketClient {
96    #[new]
97    #[pyo3(signature = (url, auth_token, heartbeat=30, proxy_url=None))]
98    fn py_new(url: String, auth_token: String, heartbeat: u64, proxy_url: Option<String>) -> Self {
99        Self {
100            inner: AxMdWebSocketClient::new(
101                url,
102                auth_token,
103                heartbeat,
104                TransportBackend::default(),
105                proxy_url,
106            ),
107            instruments_cache: Arc::new(AtomicMap::new()),
108        }
109    }
110
111    #[staticmethod]
112    #[pyo3(name = "without_auth")]
113    #[pyo3(signature = (url, heartbeat=30, proxy_url=None))]
114    fn py_without_auth(url: String, heartbeat: u64, proxy_url: Option<String>) -> Self {
115        Self {
116            inner: AxMdWebSocketClient::without_auth(
117                url,
118                heartbeat,
119                TransportBackend::default(),
120                proxy_url,
121            ),
122            instruments_cache: Arc::new(AtomicMap::new()),
123        }
124    }
125
126    #[getter]
127    #[pyo3(name = "url")]
128    #[must_use]
129    pub fn py_url(&self) -> &str {
130        self.inner.url()
131    }
132
133    #[pyo3(name = "is_active")]
134    #[must_use]
135    pub fn py_is_active(&self) -> bool {
136        self.inner.is_active()
137    }
138
139    #[pyo3(name = "is_closed")]
140    #[must_use]
141    pub fn py_is_closed(&self) -> bool {
142        self.inner.is_closed()
143    }
144
145    #[pyo3(name = "subscription_count")]
146    #[must_use]
147    pub fn py_subscription_count(&self) -> usize {
148        self.inner.subscription_count()
149    }
150
151    #[pyo3(name = "set_auth_token")]
152    fn py_set_auth_token(&mut self, token: String) {
153        self.inner.set_auth_token(token);
154    }
155
156    #[pyo3(name = "cache_instrument")]
157    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
158        let inst = pyobject_to_instrument_any(py, instrument)?;
159        let symbol = inst.symbol().inner();
160        self.instruments_cache.insert(symbol, inst);
161        Ok(())
162    }
163
164    #[pyo3(name = "connect")]
165    #[expect(clippy::needless_pass_by_value)]
166    fn py_connect<'py>(
167        &mut self,
168        py: Python<'py>,
169        loop_: Py<PyAny>,
170        callback: Py<PyAny>,
171    ) -> PyResult<Bound<'py, PyAny>> {
172        let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
173
174        let clock = get_atomic_clock_realtime();
175        let instruments = Arc::clone(&self.instruments_cache);
176        let symbol_data_types = self.inner.symbol_data_types();
177        let status_invalidations = self.inner.status_invalidations();
178        let mut client = self.inner.clone();
179
180        pyo3_async_runtimes::tokio::future_into_py(py, async move {
181            client.connect().await.map_err(to_pyruntime_err)?;
182
183            let stream = client.stream();
184
185            get_runtime().spawn(async move {
186                let _client = client;
187                tokio::pin!(stream);
188
189                let mut book_sequences: AHashMap<Ustr, u64> = AHashMap::new();
190                let mut candle_cache: AHashMap<(Ustr, AxCandleWidth), AxMdCandle> = AHashMap::new();
191                let mut instrument_states: AHashMap<Ustr, AxInstrumentState> = AHashMap::new();
192
193                while let Some(msg) = stream.next().await {
194                    let ts_init = clock.get_time_ns();
195
196                    drain_status_invalidations(&status_invalidations, &mut instrument_states);
197
198                    match msg {
199                        AxDataWsMessage::MdMessage(md_msg) => {
200                            handle_md_message(
201                                md_msg,
202                                &instruments,
203                                &symbol_data_types,
204                                &mut book_sequences,
205                                &mut candle_cache,
206                                &mut instrument_states,
207                                ts_init,
208                                &call_soon,
209                                &callback,
210                            );
211                        }
212                        AxDataWsMessage::Reconnected => {
213                            candle_cache.clear();
214                            instrument_states.clear();
215                            log::info!("AX WebSocket reconnected");
216                        }
217                        AxDataWsMessage::CandleUnsubscribed { symbol, width } => {
218                            candle_cache.remove(&(symbol, width));
219                        }
220                    }
221                }
222            });
223
224            Ok(())
225        })
226    }
227
228    #[pyo3(name = "subscribe_book_deltas")]
229    fn py_subscribe_book_deltas<'py>(
230        &self,
231        py: Python<'py>,
232        instrument_id: InstrumentId,
233        level: AxMarketDataLevel,
234    ) -> PyResult<Bound<'py, PyAny>> {
235        let client = self.inner.clone();
236        let symbol = instrument_id.symbol.to_string();
237
238        pyo3_async_runtimes::tokio::future_into_py(py, async move {
239            client
240                .subscribe_book_deltas(&symbol, level)
241                .await
242                .map_err(to_pyruntime_err)
243        })
244    }
245
246    #[pyo3(name = "subscribe_quotes")]
247    fn py_subscribe_quotes<'py>(
248        &self,
249        py: Python<'py>,
250        instrument_id: InstrumentId,
251    ) -> PyResult<Bound<'py, PyAny>> {
252        let client = self.inner.clone();
253        let symbol = instrument_id.symbol.to_string();
254
255        pyo3_async_runtimes::tokio::future_into_py(py, async move {
256            client
257                .subscribe_quotes(&symbol)
258                .await
259                .map_err(to_pyruntime_err)
260        })
261    }
262
263    #[pyo3(name = "subscribe_trades")]
264    fn py_subscribe_trades<'py>(
265        &self,
266        py: Python<'py>,
267        instrument_id: InstrumentId,
268    ) -> PyResult<Bound<'py, PyAny>> {
269        let client = self.inner.clone();
270        let symbol = instrument_id.symbol.to_string();
271
272        pyo3_async_runtimes::tokio::future_into_py(py, async move {
273            client
274                .subscribe_trades(&symbol)
275                .await
276                .map_err(to_pyruntime_err)
277        })
278    }
279
280    #[pyo3(name = "subscribe_mark_prices")]
281    fn py_subscribe_mark_prices<'py>(
282        &self,
283        py: Python<'py>,
284        instrument_id: InstrumentId,
285    ) -> PyResult<Bound<'py, PyAny>> {
286        let client = self.inner.clone();
287        let symbol = instrument_id.symbol.to_string();
288
289        pyo3_async_runtimes::tokio::future_into_py(py, async move {
290            client
291                .subscribe_mark_prices(&symbol)
292                .await
293                .map_err(to_pyruntime_err)
294        })
295    }
296
297    #[pyo3(name = "subscribe_instrument_status")]
298    fn py_subscribe_instrument_status<'py>(
299        &self,
300        py: Python<'py>,
301        instrument_id: InstrumentId,
302    ) -> PyResult<Bound<'py, PyAny>> {
303        let client = self.inner.clone();
304        let symbol = instrument_id.symbol.to_string();
305
306        pyo3_async_runtimes::tokio::future_into_py(py, async move {
307            client
308                .subscribe_instrument_status(&symbol)
309                .await
310                .map_err(to_pyruntime_err)
311        })
312    }
313
314    #[pyo3(name = "unsubscribe_book_deltas")]
315    fn py_unsubscribe_book_deltas<'py>(
316        &self,
317        py: Python<'py>,
318        instrument_id: InstrumentId,
319    ) -> PyResult<Bound<'py, PyAny>> {
320        let client = self.inner.clone();
321        let symbol = instrument_id.symbol.to_string();
322
323        pyo3_async_runtimes::tokio::future_into_py(py, async move {
324            client
325                .unsubscribe_book_deltas(&symbol)
326                .await
327                .map_err(to_pyruntime_err)
328        })
329    }
330
331    #[pyo3(name = "subscribe_bars")]
332    fn py_subscribe_bars<'py>(
333        &self,
334        py: Python<'py>,
335        bar_type: BarType,
336    ) -> PyResult<Bound<'py, PyAny>> {
337        let client = self.inner.clone();
338        let symbol = bar_type.instrument_id().symbol.to_string();
339        let width = AxCandleWidth::try_from(&bar_type.spec()).map_err(to_pyruntime_err)?;
340
341        pyo3_async_runtimes::tokio::future_into_py(py, async move {
342            client
343                .subscribe_candles(&symbol, width)
344                .await
345                .map_err(to_pyruntime_err)
346        })
347    }
348
349    #[pyo3(name = "unsubscribe_quotes")]
350    fn py_unsubscribe_quotes<'py>(
351        &self,
352        py: Python<'py>,
353        instrument_id: InstrumentId,
354    ) -> PyResult<Bound<'py, PyAny>> {
355        let client = self.inner.clone();
356        let symbol = instrument_id.symbol.to_string();
357
358        pyo3_async_runtimes::tokio::future_into_py(py, async move {
359            client
360                .unsubscribe_quotes(&symbol)
361                .await
362                .map_err(to_pyruntime_err)
363        })
364    }
365
366    #[pyo3(name = "unsubscribe_trades")]
367    fn py_unsubscribe_trades<'py>(
368        &self,
369        py: Python<'py>,
370        instrument_id: InstrumentId,
371    ) -> PyResult<Bound<'py, PyAny>> {
372        let client = self.inner.clone();
373        let symbol = instrument_id.symbol.to_string();
374
375        pyo3_async_runtimes::tokio::future_into_py(py, async move {
376            client
377                .unsubscribe_trades(&symbol)
378                .await
379                .map_err(to_pyruntime_err)
380        })
381    }
382
383    #[pyo3(name = "unsubscribe_bars")]
384    fn py_unsubscribe_bars<'py>(
385        &self,
386        py: Python<'py>,
387        bar_type: BarType,
388    ) -> PyResult<Bound<'py, PyAny>> {
389        let client = self.inner.clone();
390        let symbol = bar_type.instrument_id().symbol.to_string();
391        let width = AxCandleWidth::try_from(&bar_type.spec()).map_err(to_pyruntime_err)?;
392
393        pyo3_async_runtimes::tokio::future_into_py(py, async move {
394            client
395                .unsubscribe_candles(&symbol, width)
396                .await
397                .map_err(to_pyruntime_err)
398        })
399    }
400
401    #[pyo3(name = "unsubscribe_mark_prices")]
402    fn py_unsubscribe_mark_prices<'py>(
403        &self,
404        py: Python<'py>,
405        instrument_id: InstrumentId,
406    ) -> PyResult<Bound<'py, PyAny>> {
407        let client = self.inner.clone();
408        let symbol = instrument_id.symbol.to_string();
409
410        pyo3_async_runtimes::tokio::future_into_py(py, async move {
411            client
412                .unsubscribe_mark_prices(&symbol)
413                .await
414                .map_err(to_pyruntime_err)
415        })
416    }
417
418    #[pyo3(name = "unsubscribe_instrument_status")]
419    fn py_unsubscribe_instrument_status<'py>(
420        &self,
421        py: Python<'py>,
422        instrument_id: InstrumentId,
423    ) -> PyResult<Bound<'py, PyAny>> {
424        let client = self.inner.clone();
425        let symbol = instrument_id.symbol.to_string();
426
427        pyo3_async_runtimes::tokio::future_into_py(py, async move {
428            client
429                .unsubscribe_instrument_status(&symbol)
430                .await
431                .map_err(to_pyruntime_err)
432        })
433    }
434
435    #[pyo3(name = "disconnect")]
436    fn py_disconnect<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
437        let client = self.inner.clone();
438
439        pyo3_async_runtimes::tokio::future_into_py(py, async move {
440            client.disconnect().await;
441            Ok(())
442        })
443    }
444
445    #[pyo3(name = "close")]
446    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
447        let mut client = self.inner.clone();
448
449        pyo3_async_runtimes::tokio::future_into_py(py, async move {
450            client.close().await;
451            Ok(())
452        })
453    }
454}
455
456/// Python wrapper around [`AxOrdersWebSocketClient`] that handles order event
457/// parsing at the Python boundary.
458#[pyclass(
459    name = "AxOrdersWebSocketClient",
460    module = "nautilus_trader.core.nautilus_pyo3.architect"
461)]
462#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.architect_ax")]
463pub struct PyAxOrdersWebSocketClient {
464    inner: AxOrdersWebSocketClient,
465}
466
467impl Debug for PyAxOrdersWebSocketClient {
468    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469        f.debug_struct(stringify!(PyAxOrdersWebSocketClient))
470            .field("inner", &self.inner)
471            .finish_non_exhaustive()
472    }
473}
474
475#[pymethods]
476#[pyo3_stub_gen::derive::gen_stub_pymethods]
477impl PyAxOrdersWebSocketClient {
478    #[new]
479    #[pyo3(signature = (url, account_id, trader_id, heartbeat=30, proxy_url=None))]
480    fn py_new(
481        url: String,
482        account_id: AccountId,
483        trader_id: TraderId,
484        heartbeat: u64,
485        proxy_url: Option<String>,
486    ) -> Self {
487        Self {
488            inner: AxOrdersWebSocketClient::new(
489                url,
490                account_id,
491                trader_id,
492                heartbeat,
493                TransportBackend::default(),
494                proxy_url,
495            ),
496        }
497    }
498
499    #[getter]
500    #[pyo3(name = "url")]
501    #[must_use]
502    pub fn py_url(&self) -> &str {
503        self.inner.url()
504    }
505
506    #[getter]
507    #[pyo3(name = "account_id")]
508    #[must_use]
509    pub fn py_account_id(&self) -> AccountId {
510        self.inner.account_id()
511    }
512
513    #[pyo3(name = "is_active")]
514    #[must_use]
515    pub fn py_is_active(&self) -> bool {
516        self.inner.is_active()
517    }
518
519    #[pyo3(name = "is_closed")]
520    #[must_use]
521    pub fn py_is_closed(&self) -> bool {
522        self.inner.is_closed()
523    }
524
525    #[pyo3(name = "cache_instrument")]
526    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
527        self.inner
528            .cache_instrument(pyobject_to_instrument_any(py, instrument)?);
529        Ok(())
530    }
531
532    #[pyo3(name = "register_external_order")]
533    fn py_register_external_order(
534        &self,
535        client_order_id: ClientOrderId,
536        venue_order_id: VenueOrderId,
537        instrument_id: InstrumentId,
538        strategy_id: StrategyId,
539    ) -> bool {
540        self.inner.register_external_order(
541            client_order_id,
542            venue_order_id,
543            instrument_id,
544            strategy_id,
545        )
546    }
547
548    #[pyo3(name = "connect")]
549    #[expect(clippy::needless_pass_by_value)]
550    fn py_connect<'py>(
551        &mut self,
552        py: Python<'py>,
553        loop_: Py<PyAny>,
554        callback: Py<PyAny>,
555        bearer_token: String,
556    ) -> PyResult<Bound<'py, PyAny>> {
557        let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
558
559        let clock = get_atomic_clock_realtime();
560        let account_id = self.inner.account_id();
561        let caches = self.inner.caches().clone();
562        let mut client = self.inner.clone();
563
564        pyo3_async_runtimes::tokio::future_into_py(py, async move {
565            client
566                .connect(&bearer_token)
567                .await
568                .map_err(to_pyruntime_err)?;
569
570            let stream = client.stream();
571
572            get_runtime().spawn(async move {
573                let _client = client;
574                tokio::pin!(stream);
575
576                while let Some(msg) = stream.next().await {
577                    match msg {
578                        AxOrdersWsMessage::Event(event) => {
579                            handle_order_event(
580                                *event, &caches, account_id, clock, &call_soon, &callback,
581                            );
582                        }
583                        AxOrdersWsMessage::PlaceOrderResponse(resp) => {
584                            log::debug!(
585                                "Place order response: rid={}, oid={}",
586                                resp.rid,
587                                resp.res.oid
588                            );
589                        }
590                        AxOrdersWsMessage::CancelOrderResponse(resp) => {
591                            log::debug!(
592                                "Cancel order response: rid={}, received={}",
593                                resp.rid,
594                                resp.res.cxl_rx
595                            );
596                        }
597                        AxOrdersWsMessage::OpenOrdersResponse(resp) => {
598                            log::debug!(
599                                "Open orders response: rid={}, count={}",
600                                resp.rid,
601                                resp.res.len()
602                            );
603                        }
604                        AxOrdersWsMessage::Error(err) => {
605                            log::error!(
606                                "AX orders WebSocket error: code={:?}, message={}, rid={:?}",
607                                err.code,
608                                err.message,
609                                err.request_id
610                            );
611                        }
612                        AxOrdersWsMessage::Reconnected => {
613                            log::info!("AX orders WebSocket reconnected");
614                        }
615                        AxOrdersWsMessage::Authenticated => {
616                            log::info!("AX orders WebSocket authenticated");
617                        }
618                    }
619                }
620            });
621
622            Ok(())
623        })
624    }
625
626    #[pyo3(name = "submit_order")]
627    #[pyo3(signature = (
628        trader_id,
629        strategy_id,
630        instrument_id,
631        client_order_id,
632        order_side,
633        order_type,
634        quantity,
635        time_in_force,
636        price=None,
637        trigger_price=None,
638        post_only=false,
639    ))]
640    #[expect(clippy::too_many_arguments)]
641    fn py_submit_order<'py>(
642        &self,
643        py: Python<'py>,
644        trader_id: TraderId,
645        strategy_id: StrategyId,
646        instrument_id: InstrumentId,
647        client_order_id: ClientOrderId,
648        order_side: OrderSide,
649        order_type: OrderType,
650        quantity: Quantity,
651        time_in_force: TimeInForce,
652        price: Option<Price>,
653        trigger_price: Option<Price>,
654        post_only: bool,
655    ) -> PyResult<Bound<'py, PyAny>> {
656        let client = self.inner.clone();
657
658        pyo3_async_runtimes::tokio::future_into_py(py, async move {
659            client
660                .submit_order(
661                    trader_id,
662                    strategy_id,
663                    instrument_id,
664                    client_order_id,
665                    order_side,
666                    order_type,
667                    quantity,
668                    time_in_force,
669                    price,
670                    trigger_price,
671                    post_only,
672                )
673                .await
674                .map_err(to_pyruntime_err)?;
675            Ok(())
676        })
677    }
678
679    #[pyo3(name = "cancel_order")]
680    #[pyo3(signature = (client_order_id, venue_order_id=None))]
681    fn py_cancel_order<'py>(
682        &self,
683        py: Python<'py>,
684        client_order_id: ClientOrderId,
685        venue_order_id: Option<VenueOrderId>,
686    ) -> PyResult<Bound<'py, PyAny>> {
687        let client = self.inner.clone();
688
689        pyo3_async_runtimes::tokio::future_into_py(py, async move {
690            client
691                .cancel_order(client_order_id, venue_order_id)
692                .await
693                .map_err(to_pyruntime_err)?;
694            Ok(())
695        })
696    }
697
698    #[pyo3(name = "get_open_orders")]
699    fn py_get_open_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
700        let client = self.inner.clone();
701
702        pyo3_async_runtimes::tokio::future_into_py(py, async move {
703            client.get_open_orders().await.map_err(to_pyruntime_err)?;
704            Ok(())
705        })
706    }
707
708    #[pyo3(name = "disconnect")]
709    fn py_disconnect<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
710        let client = self.inner.clone();
711
712        pyo3_async_runtimes::tokio::future_into_py(py, async move {
713            client.disconnect().await;
714            Ok(())
715        })
716    }
717
718    #[pyo3(name = "close")]
719    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
720        let mut client = self.inner.clone();
721
722        pyo3_async_runtimes::tokio::future_into_py(py, async move {
723            client.close().await;
724            Ok(())
725        })
726    }
727}
728
729#[expect(clippy::too_many_arguments)]
730fn handle_md_message(
731    message: AxMdMessage,
732    instruments: &Arc<AtomicMap<Ustr, InstrumentAny>>,
733    symbol_data_types: &Arc<AtomicMap<String, SymbolDataTypes>>,
734    book_sequences: &mut AHashMap<Ustr, u64>,
735    candle_cache: &mut AHashMap<(Ustr, AxCandleWidth), AxMdCandle>,
736    instrument_states: &mut AHashMap<Ustr, AxInstrumentState>,
737    ts_init: UnixNanos,
738    call_soon: &Py<PyAny>,
739    callback: &Py<PyAny>,
740) {
741    let instruments_snap = instruments.load();
742    let sdt_snap = symbol_data_types.load();
743
744    match message {
745        AxMdMessage::BookL1(book) => {
746            let l1_subscribed = sdt_snap
747                .get(book.s.as_str())
748                .is_some_and(|e| e.quotes || e.book_level == Some(AxMarketDataLevel::Level1));
749
750            if !l1_subscribed {
751                return;
752            }
753
754            let Some(instrument) = instruments_snap.get(&book.s) else {
755                log::warn!("Instrument cache miss for L1 book symbol={}", book.s);
756                return;
757            };
758
759            match parse_book_l1_quote(&book, instrument, ts_init) {
760                Ok(quote) => {
761                    send_data_to_python(Data::Quote(quote), call_soon, callback);
762                }
763                Err(e) => log::error!("Failed to parse L1 quote: {e}"),
764            }
765        }
766        AxMdMessage::BookL2(book) => {
767            let Some(instrument) = instruments_snap.get(&book.s) else {
768                log::warn!("Instrument cache miss for L2 book symbol={}", book.s);
769                return;
770            };
771
772            let sequence = book_sequences
773                .entry(book.s)
774                .and_modify(|s| *s += 1)
775                .or_insert(1);
776
777            match parse_book_l2_deltas(&book, instrument, *sequence, ts_init) {
778                Ok(deltas) => {
779                    send_data_to_python(
780                        Data::Deltas(OrderBookDeltas_API::new(deltas)),
781                        call_soon,
782                        callback,
783                    );
784                }
785                Err(e) => log::error!("Failed to parse L2 deltas: {e}"),
786            }
787        }
788        AxMdMessage::BookL3(book) => {
789            let Some(instrument) = instruments_snap.get(&book.s) else {
790                log::warn!("Instrument cache miss for L3 book symbol={}", book.s);
791                return;
792            };
793
794            let sequence = book_sequences
795                .entry(book.s)
796                .and_modify(|s| *s += 1)
797                .or_insert(1);
798
799            match parse_book_l3_deltas(&book, instrument, *sequence, ts_init) {
800                Ok(deltas) => {
801                    send_data_to_python(
802                        Data::Deltas(OrderBookDeltas_API::new(deltas)),
803                        call_soon,
804                        callback,
805                    );
806                }
807                Err(e) => log::error!("Failed to parse L3 deltas: {e}"),
808            }
809        }
810        AxMdMessage::Trade(trade) => {
811            let trades_subscribed = sdt_snap.get(trade.s.as_str()).is_some_and(|e| e.trades);
812
813            if !trades_subscribed {
814                return;
815            }
816
817            let Some(instrument) = instruments_snap.get(&trade.s) else {
818                log::warn!("Instrument cache miss for trade symbol={}", trade.s);
819                return;
820            };
821
822            match parse_trade_tick(&trade, instrument, ts_init) {
823                Ok(tick) => {
824                    send_data_to_python(Data::Trade(tick), call_soon, callback);
825                }
826                Err(e) => log::error!("Failed to parse trade: {e}"),
827            }
828        }
829        AxMdMessage::Candle(candle) => {
830            let cache_key = (candle.symbol, candle.width);
831
832            let closed_candle = if let Some(cached) = candle_cache.get(&cache_key) {
833                if cached.ts == candle.ts {
834                    None
835                } else {
836                    Some(cached.clone())
837                }
838            } else {
839                None
840            };
841
842            candle_cache.insert(cache_key, candle);
843
844            if let Some(closed) = closed_candle {
845                let Some(instrument) = instruments_snap.get(&closed.symbol) else {
846                    log::warn!("Instrument cache miss for candle symbol={}", closed.symbol);
847                    return;
848                };
849
850                match parse_candle_bar(&closed, instrument, ts_init) {
851                    Ok(bar) => {
852                        send_data_to_python(Data::Bar(bar), call_soon, callback);
853                    }
854                    Err(e) => log::error!("Failed to parse candle: {e}"),
855                }
856            }
857        }
858        AxMdMessage::Ticker(ticker) => {
859            let Some(instrument) = instruments_snap.get(&ticker.s) else {
860                log::debug!("No instrument cached for ticker symbol '{}'", ticker.s);
861                return;
862            };
863
864            let instrument_id = instrument.id();
865            let price_precision = instrument.price_precision();
866            let ts_event = ax_timestamp_stn_to_unix_nanos(ticker.ts, ticker.tn).unwrap_or(ts_init);
867
868            let mark_prices_subscribed = sdt_snap
869                .get(ticker.s.as_str())
870                .is_some_and(|e| e.mark_prices);
871            if mark_prices_subscribed && let Some(mark_price) = ticker.m {
872                match Price::from_decimal_dp(mark_price, price_precision) {
873                    Ok(price) => {
874                        let update = MarkPriceUpdate::new(instrument_id, price, ts_event, ts_init);
875                        send_data_to_python(Data::MarkPriceUpdate(update), call_soon, callback);
876                    }
877                    Err(e) => {
878                        log::error!("Failed to parse mark price for {}: {e}", ticker.s);
879                    }
880                }
881            }
882
883            if let Some(state) = ticker.i {
884                let status_subscribed = sdt_snap
885                    .get(ticker.s.as_str())
886                    .is_some_and(|e| e.instrument_status);
887                if status_subscribed {
888                    let prev = instrument_states.insert(ticker.s, state);
889                    if prev != Some(state) {
890                        let action = MarketStatusAction::from(state);
891                        let status = InstrumentStatus::new(
892                            instrument_id,
893                            action,
894                            ts_event,
895                            ts_init,
896                            None,
897                            None,
898                            Some(state == AxInstrumentState::Open),
899                            None,
900                            None,
901                        );
902                        call_python_with_event(call_soon, callback, move |py| {
903                            status.into_py_any(py)
904                        });
905                    }
906                }
907            }
908        }
909        AxMdMessage::Heartbeat(_) => {}
910        AxMdMessage::SubscriptionResponse(_) => {}
911        AxMdMessage::Error(err) => {
912            log::error!("AX market data error: {err:?}");
913        }
914    }
915}
916
917fn handle_order_event(
918    event: AxWsOrderEvent,
919    caches: &OrdersCaches,
920    account_id: AccountId,
921    clock: &'static AtomicTime,
922    call_soon: &Py<PyAny>,
923    callback: &Py<PyAny>,
924) {
925    match event {
926        AxWsOrderEvent::Heartbeat => {}
927        AxWsOrderEvent::Acknowledged(msg) => {
928            if let Some(event) =
929                create_order_accepted(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
930            {
931                call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
932            }
933        }
934        AxWsOrderEvent::PartiallyFilled(msg) => {
935            if let Some(event) =
936                create_order_filled(&msg.o, &msg.xs, msg.ts, msg.tn, caches, account_id, clock)
937            {
938                call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
939            }
940        }
941        AxWsOrderEvent::Filled(msg) => {
942            if let Some(event) =
943                create_order_filled(&msg.o, &msg.xs, msg.ts, msg.tn, caches, account_id, clock)
944            {
945                cleanup_terminal_order_tracking(&msg.o, caches);
946                call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
947            }
948        }
949        AxWsOrderEvent::Canceled(msg) => {
950            if let Some(event) =
951                create_order_canceled(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
952            {
953                cleanup_terminal_order_tracking(&msg.o, caches);
954                call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
955            }
956        }
957        AxWsOrderEvent::Rejected(msg) => {
958            let known_reason = msg.r.filter(|r| !matches!(r, AxOrderRejectReason::Unknown));
959            let reason = known_reason
960                .as_ref()
961                .map(AsRef::as_ref)
962                .or(msg.txt.as_deref())
963                .unwrap_or("UNKNOWN");
964
965            if let Some(event) =
966                create_order_rejected(&msg.o, reason, msg.ts, msg.tn, caches, account_id, clock)
967            {
968                cleanup_terminal_order_tracking(&msg.o, caches);
969                call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
970            }
971        }
972        AxWsOrderEvent::Expired(msg) => {
973            if let Some(event) =
974                create_order_expired(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
975            {
976                cleanup_terminal_order_tracking(&msg.o, caches);
977                call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
978            }
979        }
980        AxWsOrderEvent::Replaced(msg) => {
981            if let Some(event) =
982                create_order_accepted(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
983            {
984                call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
985            }
986        }
987        AxWsOrderEvent::DoneForDay(msg) => {
988            if let Some(event) =
989                create_order_expired(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
990            {
991                cleanup_terminal_order_tracking(&msg.o, caches);
992                call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
993            }
994        }
995        AxWsOrderEvent::CancelRejected(msg) => {
996            let venue_order_id = VenueOrderId::new(&msg.oid);
997            if let Some(client_order_id) = caches.venue_to_client_id.get(&venue_order_id)
998                && let Some(metadata) = caches.orders_metadata.get(&client_order_id)
999            {
1000                let event = OrderCancelRejected::new(
1001                    metadata.trader_id,
1002                    metadata.strategy_id,
1003                    metadata.instrument_id,
1004                    metadata.client_order_id,
1005                    Ustr::from(msg.r.as_ref()),
1006                    UUID4::new(),
1007                    clock.get_time_ns(),
1008                    metadata.ts_init,
1009                    false,
1010                    Some(venue_order_id),
1011                    Some(account_id),
1012                );
1013                call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
1014            } else {
1015                log::warn!(
1016                    "Could not find metadata for cancel rejected order {}",
1017                    msg.oid
1018                );
1019            }
1020        }
1021    }
1022}
1023
1024fn drain_status_invalidations(
1025    invalidations: &Arc<Mutex<AHashSet<Ustr>>>,
1026    instrument_states: &mut AHashMap<Ustr, AxInstrumentState>,
1027) {
1028    if let Ok(mut set) = invalidations.lock() {
1029        for symbol in set.drain() {
1030            instrument_states.remove(&symbol);
1031        }
1032    }
1033}
1034
1035fn send_data_to_python(data: Data, call_soon: &Py<PyAny>, callback: &Py<PyAny>) {
1036    Python::attach(|py| {
1037        let py_obj = data_to_pycapsule(py, data);
1038        call_python_threadsafe(py, call_soon, callback, py_obj);
1039    });
1040}
1041
1042fn call_python_with_event<F>(call_soon: &Py<PyAny>, callback: &Py<PyAny>, event_fn: F)
1043where
1044    F: FnOnce(Python<'_>) -> PyResult<Py<PyAny>> + Send + 'static,
1045{
1046    Python::attach(|py| match event_fn(py) {
1047        Ok(py_obj) => {
1048            call_python_threadsafe(py, call_soon, callback, py_obj);
1049        }
1050        Err(e) => {
1051            log::error!("Error converting event to Python: {e}");
1052        }
1053    });
1054}