1use std::{
24 fmt::Debug,
25 sync::{Arc, Mutex},
26};
27
28use ahash::{AHashMap, AHashSet};
29use futures_util::StreamExt;
30use nautilus_common::live::get_runtime;
31use nautilus_core::{
32 AtomicMap, UUID4, UnixNanos,
33 python::{call_python_threadsafe, to_pyruntime_err},
34 time::{AtomicTime, get_atomic_clock_realtime},
35};
36use nautilus_model::{
37 data::{BarType, Data, InstrumentStatus, MarkPriceUpdate, OrderBookDeltas_API},
38 enums::{MarketStatusAction, OrderSide, OrderType, TimeInForce},
39 events::OrderCancelRejected,
40 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
41 instruments::{Instrument, InstrumentAny},
42 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
43 types::{Price, Quantity},
44};
45use nautilus_network::websocket::TransportBackend;
46use pyo3::{IntoPyObjectExt, prelude::*};
47use ustr::Ustr;
48
49use crate::{
50 common::{
51 enums::{AxCandleWidth, AxInstrumentState, AxMarketDataLevel},
52 parse::ax_timestamp_stn_to_unix_nanos,
53 },
54 execution::{
55 cleanup_terminal_order_tracking, create_order_accepted, create_order_canceled,
56 create_order_expired, create_order_filled, create_order_rejected,
57 },
58 http::models::AxOrderRejectReason,
59 websocket::{
60 data::{
61 AxMdWebSocketClient,
62 client::SymbolDataTypes,
63 parse::{
64 parse_book_l1_quote, parse_book_l2_deltas, parse_book_l3_deltas, parse_candle_bar,
65 parse_trade_tick,
66 },
67 },
68 messages::{AxDataWsMessage, AxMdCandle, AxMdMessage, AxOrdersWsMessage, AxWsOrderEvent},
69 orders::{AxOrdersWebSocketClient, OrdersCaches},
70 },
71};
72
73#[pyclass(
76 name = "AxMdWebSocketClient",
77 module = "nautilus_trader.core.nautilus_pyo3.architect"
78)]
79#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.architect_ax")]
80pub struct PyAxMdWebSocketClient {
81 inner: AxMdWebSocketClient,
82 instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
83}
84
85impl Debug for PyAxMdWebSocketClient {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 f.debug_struct(stringify!(PyAxMdWebSocketClient))
88 .field("inner", &self.inner)
89 .finish_non_exhaustive()
90 }
91}
92
93#[pymethods]
94#[pyo3_stub_gen::derive::gen_stub_pymethods]
95impl PyAxMdWebSocketClient {
96 #[new]
97 #[pyo3(signature = (url, auth_token, heartbeat=30, proxy_url=None))]
98 fn py_new(url: String, auth_token: String, heartbeat: u64, proxy_url: Option<String>) -> Self {
99 Self {
100 inner: AxMdWebSocketClient::new(
101 url,
102 auth_token,
103 heartbeat,
104 TransportBackend::default(),
105 proxy_url,
106 ),
107 instruments_cache: Arc::new(AtomicMap::new()),
108 }
109 }
110
111 #[staticmethod]
112 #[pyo3(name = "without_auth")]
113 #[pyo3(signature = (url, heartbeat=30, proxy_url=None))]
114 fn py_without_auth(url: String, heartbeat: u64, proxy_url: Option<String>) -> Self {
115 Self {
116 inner: AxMdWebSocketClient::without_auth(
117 url,
118 heartbeat,
119 TransportBackend::default(),
120 proxy_url,
121 ),
122 instruments_cache: Arc::new(AtomicMap::new()),
123 }
124 }
125
126 #[getter]
127 #[pyo3(name = "url")]
128 #[must_use]
129 pub fn py_url(&self) -> &str {
130 self.inner.url()
131 }
132
133 #[pyo3(name = "is_active")]
134 #[must_use]
135 pub fn py_is_active(&self) -> bool {
136 self.inner.is_active()
137 }
138
139 #[pyo3(name = "is_closed")]
140 #[must_use]
141 pub fn py_is_closed(&self) -> bool {
142 self.inner.is_closed()
143 }
144
145 #[pyo3(name = "subscription_count")]
146 #[must_use]
147 pub fn py_subscription_count(&self) -> usize {
148 self.inner.subscription_count()
149 }
150
151 #[pyo3(name = "set_auth_token")]
152 fn py_set_auth_token(&mut self, token: String) {
153 self.inner.set_auth_token(token);
154 }
155
156 #[pyo3(name = "cache_instrument")]
157 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
158 let inst = pyobject_to_instrument_any(py, instrument)?;
159 let symbol = inst.symbol().inner();
160 self.instruments_cache.insert(symbol, inst);
161 Ok(())
162 }
163
164 #[pyo3(name = "connect")]
165 #[expect(clippy::needless_pass_by_value)]
166 fn py_connect<'py>(
167 &mut self,
168 py: Python<'py>,
169 loop_: Py<PyAny>,
170 callback: Py<PyAny>,
171 ) -> PyResult<Bound<'py, PyAny>> {
172 let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
173
174 let clock = get_atomic_clock_realtime();
175 let instruments = Arc::clone(&self.instruments_cache);
176 let symbol_data_types = self.inner.symbol_data_types();
177 let status_invalidations = self.inner.status_invalidations();
178 let mut client = self.inner.clone();
179
180 pyo3_async_runtimes::tokio::future_into_py(py, async move {
181 client.connect().await.map_err(to_pyruntime_err)?;
182
183 let stream = client.stream();
184
185 get_runtime().spawn(async move {
186 let _client = client;
187 tokio::pin!(stream);
188
189 let mut book_sequences: AHashMap<Ustr, u64> = AHashMap::new();
190 let mut candle_cache: AHashMap<(Ustr, AxCandleWidth), AxMdCandle> = AHashMap::new();
191 let mut instrument_states: AHashMap<Ustr, AxInstrumentState> = AHashMap::new();
192
193 while let Some(msg) = stream.next().await {
194 let ts_init = clock.get_time_ns();
195
196 drain_status_invalidations(&status_invalidations, &mut instrument_states);
197
198 match msg {
199 AxDataWsMessage::MdMessage(md_msg) => {
200 handle_md_message(
201 md_msg,
202 &instruments,
203 &symbol_data_types,
204 &mut book_sequences,
205 &mut candle_cache,
206 &mut instrument_states,
207 ts_init,
208 &call_soon,
209 &callback,
210 );
211 }
212 AxDataWsMessage::Reconnected => {
213 candle_cache.clear();
214 instrument_states.clear();
215 log::info!("AX WebSocket reconnected");
216 }
217 AxDataWsMessage::CandleUnsubscribed { symbol, width } => {
218 candle_cache.remove(&(symbol, width));
219 }
220 }
221 }
222 });
223
224 Ok(())
225 })
226 }
227
228 #[pyo3(name = "subscribe_book_deltas")]
229 fn py_subscribe_book_deltas<'py>(
230 &self,
231 py: Python<'py>,
232 instrument_id: InstrumentId,
233 level: AxMarketDataLevel,
234 ) -> PyResult<Bound<'py, PyAny>> {
235 let client = self.inner.clone();
236 let symbol = instrument_id.symbol.to_string();
237
238 pyo3_async_runtimes::tokio::future_into_py(py, async move {
239 client
240 .subscribe_book_deltas(&symbol, level)
241 .await
242 .map_err(to_pyruntime_err)
243 })
244 }
245
246 #[pyo3(name = "subscribe_quotes")]
247 fn py_subscribe_quotes<'py>(
248 &self,
249 py: Python<'py>,
250 instrument_id: InstrumentId,
251 ) -> PyResult<Bound<'py, PyAny>> {
252 let client = self.inner.clone();
253 let symbol = instrument_id.symbol.to_string();
254
255 pyo3_async_runtimes::tokio::future_into_py(py, async move {
256 client
257 .subscribe_quotes(&symbol)
258 .await
259 .map_err(to_pyruntime_err)
260 })
261 }
262
263 #[pyo3(name = "subscribe_trades")]
264 fn py_subscribe_trades<'py>(
265 &self,
266 py: Python<'py>,
267 instrument_id: InstrumentId,
268 ) -> PyResult<Bound<'py, PyAny>> {
269 let client = self.inner.clone();
270 let symbol = instrument_id.symbol.to_string();
271
272 pyo3_async_runtimes::tokio::future_into_py(py, async move {
273 client
274 .subscribe_trades(&symbol)
275 .await
276 .map_err(to_pyruntime_err)
277 })
278 }
279
280 #[pyo3(name = "subscribe_mark_prices")]
281 fn py_subscribe_mark_prices<'py>(
282 &self,
283 py: Python<'py>,
284 instrument_id: InstrumentId,
285 ) -> PyResult<Bound<'py, PyAny>> {
286 let client = self.inner.clone();
287 let symbol = instrument_id.symbol.to_string();
288
289 pyo3_async_runtimes::tokio::future_into_py(py, async move {
290 client
291 .subscribe_mark_prices(&symbol)
292 .await
293 .map_err(to_pyruntime_err)
294 })
295 }
296
297 #[pyo3(name = "subscribe_instrument_status")]
298 fn py_subscribe_instrument_status<'py>(
299 &self,
300 py: Python<'py>,
301 instrument_id: InstrumentId,
302 ) -> PyResult<Bound<'py, PyAny>> {
303 let client = self.inner.clone();
304 let symbol = instrument_id.symbol.to_string();
305
306 pyo3_async_runtimes::tokio::future_into_py(py, async move {
307 client
308 .subscribe_instrument_status(&symbol)
309 .await
310 .map_err(to_pyruntime_err)
311 })
312 }
313
314 #[pyo3(name = "unsubscribe_book_deltas")]
315 fn py_unsubscribe_book_deltas<'py>(
316 &self,
317 py: Python<'py>,
318 instrument_id: InstrumentId,
319 ) -> PyResult<Bound<'py, PyAny>> {
320 let client = self.inner.clone();
321 let symbol = instrument_id.symbol.to_string();
322
323 pyo3_async_runtimes::tokio::future_into_py(py, async move {
324 client
325 .unsubscribe_book_deltas(&symbol)
326 .await
327 .map_err(to_pyruntime_err)
328 })
329 }
330
331 #[pyo3(name = "subscribe_bars")]
332 fn py_subscribe_bars<'py>(
333 &self,
334 py: Python<'py>,
335 bar_type: BarType,
336 ) -> PyResult<Bound<'py, PyAny>> {
337 let client = self.inner.clone();
338 let symbol = bar_type.instrument_id().symbol.to_string();
339 let width = AxCandleWidth::try_from(&bar_type.spec()).map_err(to_pyruntime_err)?;
340
341 pyo3_async_runtimes::tokio::future_into_py(py, async move {
342 client
343 .subscribe_candles(&symbol, width)
344 .await
345 .map_err(to_pyruntime_err)
346 })
347 }
348
349 #[pyo3(name = "unsubscribe_quotes")]
350 fn py_unsubscribe_quotes<'py>(
351 &self,
352 py: Python<'py>,
353 instrument_id: InstrumentId,
354 ) -> PyResult<Bound<'py, PyAny>> {
355 let client = self.inner.clone();
356 let symbol = instrument_id.symbol.to_string();
357
358 pyo3_async_runtimes::tokio::future_into_py(py, async move {
359 client
360 .unsubscribe_quotes(&symbol)
361 .await
362 .map_err(to_pyruntime_err)
363 })
364 }
365
366 #[pyo3(name = "unsubscribe_trades")]
367 fn py_unsubscribe_trades<'py>(
368 &self,
369 py: Python<'py>,
370 instrument_id: InstrumentId,
371 ) -> PyResult<Bound<'py, PyAny>> {
372 let client = self.inner.clone();
373 let symbol = instrument_id.symbol.to_string();
374
375 pyo3_async_runtimes::tokio::future_into_py(py, async move {
376 client
377 .unsubscribe_trades(&symbol)
378 .await
379 .map_err(to_pyruntime_err)
380 })
381 }
382
383 #[pyo3(name = "unsubscribe_bars")]
384 fn py_unsubscribe_bars<'py>(
385 &self,
386 py: Python<'py>,
387 bar_type: BarType,
388 ) -> PyResult<Bound<'py, PyAny>> {
389 let client = self.inner.clone();
390 let symbol = bar_type.instrument_id().symbol.to_string();
391 let width = AxCandleWidth::try_from(&bar_type.spec()).map_err(to_pyruntime_err)?;
392
393 pyo3_async_runtimes::tokio::future_into_py(py, async move {
394 client
395 .unsubscribe_candles(&symbol, width)
396 .await
397 .map_err(to_pyruntime_err)
398 })
399 }
400
401 #[pyo3(name = "unsubscribe_mark_prices")]
402 fn py_unsubscribe_mark_prices<'py>(
403 &self,
404 py: Python<'py>,
405 instrument_id: InstrumentId,
406 ) -> PyResult<Bound<'py, PyAny>> {
407 let client = self.inner.clone();
408 let symbol = instrument_id.symbol.to_string();
409
410 pyo3_async_runtimes::tokio::future_into_py(py, async move {
411 client
412 .unsubscribe_mark_prices(&symbol)
413 .await
414 .map_err(to_pyruntime_err)
415 })
416 }
417
418 #[pyo3(name = "unsubscribe_instrument_status")]
419 fn py_unsubscribe_instrument_status<'py>(
420 &self,
421 py: Python<'py>,
422 instrument_id: InstrumentId,
423 ) -> PyResult<Bound<'py, PyAny>> {
424 let client = self.inner.clone();
425 let symbol = instrument_id.symbol.to_string();
426
427 pyo3_async_runtimes::tokio::future_into_py(py, async move {
428 client
429 .unsubscribe_instrument_status(&symbol)
430 .await
431 .map_err(to_pyruntime_err)
432 })
433 }
434
435 #[pyo3(name = "disconnect")]
436 fn py_disconnect<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
437 let client = self.inner.clone();
438
439 pyo3_async_runtimes::tokio::future_into_py(py, async move {
440 client.disconnect().await;
441 Ok(())
442 })
443 }
444
445 #[pyo3(name = "close")]
446 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
447 let mut client = self.inner.clone();
448
449 pyo3_async_runtimes::tokio::future_into_py(py, async move {
450 client.close().await;
451 Ok(())
452 })
453 }
454}
455
456#[pyclass(
459 name = "AxOrdersWebSocketClient",
460 module = "nautilus_trader.core.nautilus_pyo3.architect"
461)]
462#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.architect_ax")]
463pub struct PyAxOrdersWebSocketClient {
464 inner: AxOrdersWebSocketClient,
465}
466
467impl Debug for PyAxOrdersWebSocketClient {
468 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469 f.debug_struct(stringify!(PyAxOrdersWebSocketClient))
470 .field("inner", &self.inner)
471 .finish_non_exhaustive()
472 }
473}
474
475#[pymethods]
476#[pyo3_stub_gen::derive::gen_stub_pymethods]
477impl PyAxOrdersWebSocketClient {
478 #[new]
479 #[pyo3(signature = (url, account_id, trader_id, heartbeat=30, proxy_url=None))]
480 fn py_new(
481 url: String,
482 account_id: AccountId,
483 trader_id: TraderId,
484 heartbeat: u64,
485 proxy_url: Option<String>,
486 ) -> Self {
487 Self {
488 inner: AxOrdersWebSocketClient::new(
489 url,
490 account_id,
491 trader_id,
492 heartbeat,
493 TransportBackend::default(),
494 proxy_url,
495 ),
496 }
497 }
498
499 #[getter]
500 #[pyo3(name = "url")]
501 #[must_use]
502 pub fn py_url(&self) -> &str {
503 self.inner.url()
504 }
505
506 #[getter]
507 #[pyo3(name = "account_id")]
508 #[must_use]
509 pub fn py_account_id(&self) -> AccountId {
510 self.inner.account_id()
511 }
512
513 #[pyo3(name = "is_active")]
514 #[must_use]
515 pub fn py_is_active(&self) -> bool {
516 self.inner.is_active()
517 }
518
519 #[pyo3(name = "is_closed")]
520 #[must_use]
521 pub fn py_is_closed(&self) -> bool {
522 self.inner.is_closed()
523 }
524
525 #[pyo3(name = "cache_instrument")]
526 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
527 self.inner
528 .cache_instrument(pyobject_to_instrument_any(py, instrument)?);
529 Ok(())
530 }
531
532 #[pyo3(name = "register_external_order")]
533 fn py_register_external_order(
534 &self,
535 client_order_id: ClientOrderId,
536 venue_order_id: VenueOrderId,
537 instrument_id: InstrumentId,
538 strategy_id: StrategyId,
539 ) -> bool {
540 self.inner.register_external_order(
541 client_order_id,
542 venue_order_id,
543 instrument_id,
544 strategy_id,
545 )
546 }
547
548 #[pyo3(name = "connect")]
549 #[expect(clippy::needless_pass_by_value)]
550 fn py_connect<'py>(
551 &mut self,
552 py: Python<'py>,
553 loop_: Py<PyAny>,
554 callback: Py<PyAny>,
555 bearer_token: String,
556 ) -> PyResult<Bound<'py, PyAny>> {
557 let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
558
559 let clock = get_atomic_clock_realtime();
560 let account_id = self.inner.account_id();
561 let caches = self.inner.caches().clone();
562 let mut client = self.inner.clone();
563
564 pyo3_async_runtimes::tokio::future_into_py(py, async move {
565 client
566 .connect(&bearer_token)
567 .await
568 .map_err(to_pyruntime_err)?;
569
570 let stream = client.stream();
571
572 get_runtime().spawn(async move {
573 let _client = client;
574 tokio::pin!(stream);
575
576 while let Some(msg) = stream.next().await {
577 match msg {
578 AxOrdersWsMessage::Event(event) => {
579 handle_order_event(
580 *event, &caches, account_id, clock, &call_soon, &callback,
581 );
582 }
583 AxOrdersWsMessage::PlaceOrderResponse(resp) => {
584 log::debug!(
585 "Place order response: rid={}, oid={}",
586 resp.rid,
587 resp.res.oid
588 );
589 }
590 AxOrdersWsMessage::CancelOrderResponse(resp) => {
591 log::debug!(
592 "Cancel order response: rid={}, received={}",
593 resp.rid,
594 resp.res.cxl_rx
595 );
596 }
597 AxOrdersWsMessage::OpenOrdersResponse(resp) => {
598 log::debug!(
599 "Open orders response: rid={}, count={}",
600 resp.rid,
601 resp.res.len()
602 );
603 }
604 AxOrdersWsMessage::Error(err) => {
605 log::error!(
606 "AX orders WebSocket error: code={:?}, message={}, rid={:?}",
607 err.code,
608 err.message,
609 err.request_id
610 );
611 }
612 AxOrdersWsMessage::Reconnected => {
613 log::info!("AX orders WebSocket reconnected");
614 }
615 AxOrdersWsMessage::Authenticated => {
616 log::info!("AX orders WebSocket authenticated");
617 }
618 }
619 }
620 });
621
622 Ok(())
623 })
624 }
625
626 #[pyo3(name = "submit_order")]
627 #[pyo3(signature = (
628 trader_id,
629 strategy_id,
630 instrument_id,
631 client_order_id,
632 order_side,
633 order_type,
634 quantity,
635 time_in_force,
636 price=None,
637 trigger_price=None,
638 post_only=false,
639 ))]
640 #[expect(clippy::too_many_arguments)]
641 fn py_submit_order<'py>(
642 &self,
643 py: Python<'py>,
644 trader_id: TraderId,
645 strategy_id: StrategyId,
646 instrument_id: InstrumentId,
647 client_order_id: ClientOrderId,
648 order_side: OrderSide,
649 order_type: OrderType,
650 quantity: Quantity,
651 time_in_force: TimeInForce,
652 price: Option<Price>,
653 trigger_price: Option<Price>,
654 post_only: bool,
655 ) -> PyResult<Bound<'py, PyAny>> {
656 let client = self.inner.clone();
657
658 pyo3_async_runtimes::tokio::future_into_py(py, async move {
659 client
660 .submit_order(
661 trader_id,
662 strategy_id,
663 instrument_id,
664 client_order_id,
665 order_side,
666 order_type,
667 quantity,
668 time_in_force,
669 price,
670 trigger_price,
671 post_only,
672 )
673 .await
674 .map_err(to_pyruntime_err)?;
675 Ok(())
676 })
677 }
678
679 #[pyo3(name = "cancel_order")]
680 #[pyo3(signature = (client_order_id, venue_order_id=None))]
681 fn py_cancel_order<'py>(
682 &self,
683 py: Python<'py>,
684 client_order_id: ClientOrderId,
685 venue_order_id: Option<VenueOrderId>,
686 ) -> PyResult<Bound<'py, PyAny>> {
687 let client = self.inner.clone();
688
689 pyo3_async_runtimes::tokio::future_into_py(py, async move {
690 client
691 .cancel_order(client_order_id, venue_order_id)
692 .await
693 .map_err(to_pyruntime_err)?;
694 Ok(())
695 })
696 }
697
698 #[pyo3(name = "get_open_orders")]
699 fn py_get_open_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
700 let client = self.inner.clone();
701
702 pyo3_async_runtimes::tokio::future_into_py(py, async move {
703 client.get_open_orders().await.map_err(to_pyruntime_err)?;
704 Ok(())
705 })
706 }
707
708 #[pyo3(name = "disconnect")]
709 fn py_disconnect<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
710 let client = self.inner.clone();
711
712 pyo3_async_runtimes::tokio::future_into_py(py, async move {
713 client.disconnect().await;
714 Ok(())
715 })
716 }
717
718 #[pyo3(name = "close")]
719 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
720 let mut client = self.inner.clone();
721
722 pyo3_async_runtimes::tokio::future_into_py(py, async move {
723 client.close().await;
724 Ok(())
725 })
726 }
727}
728
729#[expect(clippy::too_many_arguments)]
730fn handle_md_message(
731 message: AxMdMessage,
732 instruments: &Arc<AtomicMap<Ustr, InstrumentAny>>,
733 symbol_data_types: &Arc<AtomicMap<String, SymbolDataTypes>>,
734 book_sequences: &mut AHashMap<Ustr, u64>,
735 candle_cache: &mut AHashMap<(Ustr, AxCandleWidth), AxMdCandle>,
736 instrument_states: &mut AHashMap<Ustr, AxInstrumentState>,
737 ts_init: UnixNanos,
738 call_soon: &Py<PyAny>,
739 callback: &Py<PyAny>,
740) {
741 let instruments_snap = instruments.load();
742 let sdt_snap = symbol_data_types.load();
743
744 match message {
745 AxMdMessage::BookL1(book) => {
746 let l1_subscribed = sdt_snap
747 .get(book.s.as_str())
748 .is_some_and(|e| e.quotes || e.book_level == Some(AxMarketDataLevel::Level1));
749
750 if !l1_subscribed {
751 return;
752 }
753
754 let Some(instrument) = instruments_snap.get(&book.s) else {
755 log::warn!("Instrument cache miss for L1 book symbol={}", book.s);
756 return;
757 };
758
759 match parse_book_l1_quote(&book, instrument, ts_init) {
760 Ok(quote) => {
761 send_data_to_python(Data::Quote(quote), call_soon, callback);
762 }
763 Err(e) => log::error!("Failed to parse L1 quote: {e}"),
764 }
765 }
766 AxMdMessage::BookL2(book) => {
767 let Some(instrument) = instruments_snap.get(&book.s) else {
768 log::warn!("Instrument cache miss for L2 book symbol={}", book.s);
769 return;
770 };
771
772 let sequence = book_sequences
773 .entry(book.s)
774 .and_modify(|s| *s += 1)
775 .or_insert(1);
776
777 match parse_book_l2_deltas(&book, instrument, *sequence, ts_init) {
778 Ok(deltas) => {
779 send_data_to_python(
780 Data::Deltas(OrderBookDeltas_API::new(deltas)),
781 call_soon,
782 callback,
783 );
784 }
785 Err(e) => log::error!("Failed to parse L2 deltas: {e}"),
786 }
787 }
788 AxMdMessage::BookL3(book) => {
789 let Some(instrument) = instruments_snap.get(&book.s) else {
790 log::warn!("Instrument cache miss for L3 book symbol={}", book.s);
791 return;
792 };
793
794 let sequence = book_sequences
795 .entry(book.s)
796 .and_modify(|s| *s += 1)
797 .or_insert(1);
798
799 match parse_book_l3_deltas(&book, instrument, *sequence, ts_init) {
800 Ok(deltas) => {
801 send_data_to_python(
802 Data::Deltas(OrderBookDeltas_API::new(deltas)),
803 call_soon,
804 callback,
805 );
806 }
807 Err(e) => log::error!("Failed to parse L3 deltas: {e}"),
808 }
809 }
810 AxMdMessage::Trade(trade) => {
811 let trades_subscribed = sdt_snap.get(trade.s.as_str()).is_some_and(|e| e.trades);
812
813 if !trades_subscribed {
814 return;
815 }
816
817 let Some(instrument) = instruments_snap.get(&trade.s) else {
818 log::warn!("Instrument cache miss for trade symbol={}", trade.s);
819 return;
820 };
821
822 match parse_trade_tick(&trade, instrument, ts_init) {
823 Ok(tick) => {
824 send_data_to_python(Data::Trade(tick), call_soon, callback);
825 }
826 Err(e) => log::error!("Failed to parse trade: {e}"),
827 }
828 }
829 AxMdMessage::Candle(candle) => {
830 let cache_key = (candle.symbol, candle.width);
831
832 let closed_candle = if let Some(cached) = candle_cache.get(&cache_key) {
833 if cached.ts == candle.ts {
834 None
835 } else {
836 Some(cached.clone())
837 }
838 } else {
839 None
840 };
841
842 candle_cache.insert(cache_key, candle);
843
844 if let Some(closed) = closed_candle {
845 let Some(instrument) = instruments_snap.get(&closed.symbol) else {
846 log::warn!("Instrument cache miss for candle symbol={}", closed.symbol);
847 return;
848 };
849
850 match parse_candle_bar(&closed, instrument, ts_init) {
851 Ok(bar) => {
852 send_data_to_python(Data::Bar(bar), call_soon, callback);
853 }
854 Err(e) => log::error!("Failed to parse candle: {e}"),
855 }
856 }
857 }
858 AxMdMessage::Ticker(ticker) => {
859 let Some(instrument) = instruments_snap.get(&ticker.s) else {
860 log::debug!("No instrument cached for ticker symbol '{}'", ticker.s);
861 return;
862 };
863
864 let instrument_id = instrument.id();
865 let price_precision = instrument.price_precision();
866 let ts_event = ax_timestamp_stn_to_unix_nanos(ticker.ts, ticker.tn).unwrap_or(ts_init);
867
868 let mark_prices_subscribed = sdt_snap
869 .get(ticker.s.as_str())
870 .is_some_and(|e| e.mark_prices);
871 if mark_prices_subscribed && let Some(mark_price) = ticker.m {
872 match Price::from_decimal_dp(mark_price, price_precision) {
873 Ok(price) => {
874 let update = MarkPriceUpdate::new(instrument_id, price, ts_event, ts_init);
875 send_data_to_python(Data::MarkPriceUpdate(update), call_soon, callback);
876 }
877 Err(e) => {
878 log::error!("Failed to parse mark price for {}: {e}", ticker.s);
879 }
880 }
881 }
882
883 if let Some(state) = ticker.i {
884 let status_subscribed = sdt_snap
885 .get(ticker.s.as_str())
886 .is_some_and(|e| e.instrument_status);
887 if status_subscribed {
888 let prev = instrument_states.insert(ticker.s, state);
889 if prev != Some(state) {
890 let action = MarketStatusAction::from(state);
891 let status = InstrumentStatus::new(
892 instrument_id,
893 action,
894 ts_event,
895 ts_init,
896 None,
897 None,
898 Some(state == AxInstrumentState::Open),
899 None,
900 None,
901 );
902 call_python_with_event(call_soon, callback, move |py| {
903 status.into_py_any(py)
904 });
905 }
906 }
907 }
908 }
909 AxMdMessage::Heartbeat(_) => {}
910 AxMdMessage::SubscriptionResponse(_) => {}
911 AxMdMessage::Error(err) => {
912 log::error!("AX market data error: {err:?}");
913 }
914 }
915}
916
917fn handle_order_event(
918 event: AxWsOrderEvent,
919 caches: &OrdersCaches,
920 account_id: AccountId,
921 clock: &'static AtomicTime,
922 call_soon: &Py<PyAny>,
923 callback: &Py<PyAny>,
924) {
925 match event {
926 AxWsOrderEvent::Heartbeat => {}
927 AxWsOrderEvent::Acknowledged(msg) => {
928 if let Some(event) =
929 create_order_accepted(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
930 {
931 call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
932 }
933 }
934 AxWsOrderEvent::PartiallyFilled(msg) => {
935 if let Some(event) =
936 create_order_filled(&msg.o, &msg.xs, msg.ts, msg.tn, caches, account_id, clock)
937 {
938 call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
939 }
940 }
941 AxWsOrderEvent::Filled(msg) => {
942 if let Some(event) =
943 create_order_filled(&msg.o, &msg.xs, msg.ts, msg.tn, caches, account_id, clock)
944 {
945 cleanup_terminal_order_tracking(&msg.o, caches);
946 call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
947 }
948 }
949 AxWsOrderEvent::Canceled(msg) => {
950 if let Some(event) =
951 create_order_canceled(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
952 {
953 cleanup_terminal_order_tracking(&msg.o, caches);
954 call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
955 }
956 }
957 AxWsOrderEvent::Rejected(msg) => {
958 let known_reason = msg.r.filter(|r| !matches!(r, AxOrderRejectReason::Unknown));
959 let reason = known_reason
960 .as_ref()
961 .map(AsRef::as_ref)
962 .or(msg.txt.as_deref())
963 .unwrap_or("UNKNOWN");
964
965 if let Some(event) =
966 create_order_rejected(&msg.o, reason, msg.ts, msg.tn, caches, account_id, clock)
967 {
968 cleanup_terminal_order_tracking(&msg.o, caches);
969 call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
970 }
971 }
972 AxWsOrderEvent::Expired(msg) => {
973 if let Some(event) =
974 create_order_expired(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
975 {
976 cleanup_terminal_order_tracking(&msg.o, caches);
977 call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
978 }
979 }
980 AxWsOrderEvent::Replaced(msg) => {
981 if let Some(event) =
982 create_order_accepted(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
983 {
984 call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
985 }
986 }
987 AxWsOrderEvent::DoneForDay(msg) => {
988 if let Some(event) =
989 create_order_expired(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
990 {
991 cleanup_terminal_order_tracking(&msg.o, caches);
992 call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
993 }
994 }
995 AxWsOrderEvent::CancelRejected(msg) => {
996 let venue_order_id = VenueOrderId::new(&msg.oid);
997 if let Some(client_order_id) = caches.venue_to_client_id.get(&venue_order_id)
998 && let Some(metadata) = caches.orders_metadata.get(&client_order_id)
999 {
1000 let event = OrderCancelRejected::new(
1001 metadata.trader_id,
1002 metadata.strategy_id,
1003 metadata.instrument_id,
1004 metadata.client_order_id,
1005 Ustr::from(msg.r.as_ref()),
1006 UUID4::new(),
1007 clock.get_time_ns(),
1008 metadata.ts_init,
1009 false,
1010 Some(venue_order_id),
1011 Some(account_id),
1012 );
1013 call_python_with_event(call_soon, callback, move |py| event.into_py_any(py));
1014 } else {
1015 log::warn!(
1016 "Could not find metadata for cancel rejected order {}",
1017 msg.oid
1018 );
1019 }
1020 }
1021 }
1022}
1023
1024fn drain_status_invalidations(
1025 invalidations: &Arc<Mutex<AHashSet<Ustr>>>,
1026 instrument_states: &mut AHashMap<Ustr, AxInstrumentState>,
1027) {
1028 if let Ok(mut set) = invalidations.lock() {
1029 for symbol in set.drain() {
1030 instrument_states.remove(&symbol);
1031 }
1032 }
1033}
1034
1035fn send_data_to_python(data: Data, call_soon: &Py<PyAny>, callback: &Py<PyAny>) {
1036 Python::attach(|py| {
1037 let py_obj = data_to_pycapsule(py, data);
1038 call_python_threadsafe(py, call_soon, callback, py_obj);
1039 });
1040}
1041
1042fn call_python_with_event<F>(call_soon: &Py<PyAny>, callback: &Py<PyAny>, event_fn: F)
1043where
1044 F: FnOnce(Python<'_>) -> PyResult<Py<PyAny>> + Send + 'static,
1045{
1046 Python::attach(|py| match event_fn(py) {
1047 Ok(py_obj) => {
1048 call_python_threadsafe(py, call_soon, callback, py_obj);
1049 }
1050 Err(e) => {
1051 log::error!("Error converting event to Python: {e}");
1052 }
1053 });
1054}