1use std::{
19 str::FromStr,
20 sync::atomic::Ordering,
21 time::{Duration, Instant},
22};
23
24use ahash::AHashMap;
25use dashmap::DashMap;
26use nautilus_common::live::get_runtime;
27use nautilus_core::{
28 UUID4,
29 python::{call_python_threadsafe, to_pyvalue_err},
30 time::get_atomic_clock_realtime,
31};
32use nautilus_model::{
33 data::{
34 Bar, BarType, Data, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
35 OrderBookDeltas, OrderBookDeltas_API,
36 },
37 enums::{AccountType, MarketStatusAction, OrderSide, OrderStatus, OrderType},
38 events::{AccountState, OrderAccepted, OrderCanceled},
39 identifiers::{
40 AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
41 },
42 instruments::{Instrument, InstrumentAny},
43 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
44 types::{AccountBalance, Currency, Money},
45};
46use nautilus_network::mode::ConnectionMode;
47use pyo3::{IntoPyObjectExt, prelude::*, types::PyDict};
48use rust_decimal::Decimal;
49use ustr::Ustr;
50
51use crate::{
52 common::{
53 consts::DYDX_VENUE,
54 credential::DydxCredential,
55 enums::{DydxCandleResolution, DydxMarketStatus},
56 parse::{extract_raw_symbol, parse_price},
57 },
58 execution::types::OrderContext,
59 http::{client::DydxHttpClient, parse::parse_account_state},
60 python::encoder::PyDydxClientOrderIdEncoder,
61 websocket::{
62 DydxWsDispatchState, OrderIdentity,
63 client::DydxWebSocketClient,
64 enums::DydxWsOutputMessage,
65 fill_report_to_order_filled, parse as ws_parse,
66 parse::{parse_ws_fill_report, parse_ws_order_report, parse_ws_position_report},
67 },
68};
69
70#[pymethods]
71#[pyo3_stub_gen::derive::gen_stub_pymethods]
72impl DydxWebSocketClient {
73 #[staticmethod]
78 #[pyo3(name = "new_public")]
79 #[pyo3(signature = (url, heartbeat=None, proxy_url=None))]
80 fn py_new_public(url: String, heartbeat: Option<u64>, proxy_url: Option<String>) -> Self {
81 Self::new_public(url, heartbeat, proxy_url)
82 }
83
84 #[staticmethod]
89 #[pyo3(name = "new_private")]
90 #[pyo3(signature = (url, private_key, authenticator_ids, account_id, heartbeat=None, proxy_url=None))]
91 fn py_new_private(
92 url: String,
93 private_key: &str,
94 authenticator_ids: Vec<u64>,
95 account_id: AccountId,
96 heartbeat: Option<u64>,
97 proxy_url: Option<String>,
98 ) -> PyResult<Self> {
99 let credential = DydxCredential::from_private_key(private_key, authenticator_ids)
100 .map_err(to_pyvalue_err)?;
101 Ok(Self::new_private(
102 url, credential, account_id, heartbeat, proxy_url,
103 ))
104 }
105
106 #[pyo3(name = "is_connected")]
108 fn py_is_connected(&self) -> bool {
109 self.is_connected()
110 }
111
112 #[pyo3(name = "set_account_id")]
114 fn py_set_account_id(&mut self, account_id: AccountId) {
115 self.set_account_id(account_id);
116 }
117
118 #[pyo3(name = "set_bars_timestamp_on_close")]
120 fn py_set_bars_timestamp_on_close(&self, value: bool) {
121 self.set_bars_timestamp_on_close(value);
122 }
123
124 #[pyo3(name = "share_instrument_cache")]
130 fn py_share_instrument_cache(&mut self, http_client: &DydxHttpClient) {
131 self.set_instrument_cache(http_client.instrument_cache().clone());
132 }
133
134 #[pyo3(name = "register_order_identity")]
135 fn py_register_order_identity(
136 &self,
137 client_order_id: ClientOrderId,
138 instrument_id: InstrumentId,
139 strategy_id: StrategyId,
140 order_side: OrderSide,
141 order_type: OrderType,
142 ) {
143 self.ws_dispatch_state().order_identities.insert(
144 client_order_id,
145 OrderIdentity {
146 instrument_id,
147 strategy_id,
148 order_side,
149 order_type,
150 },
151 );
152 }
153
154 #[pyo3(name = "remove_order_identity")]
155 fn py_remove_order_identity(&self, client_order_id: ClientOrderId) {
156 self.ws_dispatch_state()
157 .order_identities
158 .remove(&client_order_id);
159 }
160
161 #[pyo3(name = "account_id")]
163 fn py_account_id(&self) -> Option<AccountId> {
164 self.account_id()
165 }
166
167 #[pyo3(name = "encoder")]
169 fn py_encoder(&self) -> PyDydxClientOrderIdEncoder {
170 PyDydxClientOrderIdEncoder::from_arc(self.encoder().clone())
171 }
172
173 #[getter]
175 fn py_url(&self) -> String {
176 self.url().to_string()
177 }
178
179 #[pyo3(name = "connect")]
184 #[pyo3(signature = (loop_, instruments, callback, trader_id=None))]
185 #[expect(clippy::needless_pass_by_value)]
186 fn py_connect<'py>(
187 &mut self,
188 py: Python<'py>,
189 loop_: Py<PyAny>,
190 instruments: Vec<Py<PyAny>>,
191 callback: Py<PyAny>,
192 trader_id: Option<TraderId>,
193 ) -> PyResult<Bound<'py, PyAny>> {
194 let call_soon = loop_.getattr(py, "call_soon_threadsafe")?;
195
196 let mut instruments_any = Vec::new();
197
198 for inst in instruments {
199 let inst_any = pyobject_to_instrument_any(py, inst)?;
200 instruments_any.push(inst_any);
201 }
202
203 self.cache_instruments(instruments_any);
204
205 let mut client = self.clone();
206 let bar_types = self.bar_types().clone();
207 let dispatch_state = self.ws_dispatch_state().clone();
208 let trader_id = trader_id.unwrap_or(TraderId::from("TRADER-000"));
209
210 pyo3_async_runtimes::tokio::future_into_py(py, async move {
211 client.connect().await.map_err(to_pyvalue_err)?;
212
213 if let Some(mut rx) = client.take_receiver() {
214 get_runtime().spawn(async move {
215 let _client = client; let clock = get_atomic_clock_realtime();
217 let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
218 let order_id_map: DashMap<String, (u32, u32)> = DashMap::new();
219 let bars_timestamp_on_close = _client.bars_timestamp_on_close();
220 let mut pending_bars: AHashMap<String, Bar> = AHashMap::new();
221 let mut seen_tickers: ahash::AHashSet<Ustr> = ahash::AHashSet::new();
222
223 while let Some(msg) = rx.recv().await {
224 let ts_init = clock.get_time_ns();
225
226 match msg {
227 DydxWsOutputMessage::Trades { id, contents } => {
228 let Some(instrument) = _client.instrument_cache().get_by_market(&id) else {
229 log::warn!("No instrument cached for market {id}");
230 continue;
231 };
232 let instrument_id = instrument.id();
233
234 match ws_parse::parse_trade_ticks(instrument_id, &instrument, &contents, ts_init) {
235 Ok(items) => {
236 Python::attach(|py| {
237 for data in items {
238 let py_obj = data_to_pycapsule(py, data);
239 call_python_threadsafe(py, &call_soon, &callback, py_obj);
240 }
241 });
242 }
243 Err(e) => log::error!("Failed to parse trade ticks for {id}: {e}"),
244 }
245 }
246 DydxWsOutputMessage::OrderbookSnapshot { id, contents } => {
247 let Some(instrument) = _client.instrument_cache().get_by_market(&id) else {
248 log::warn!("No instrument cached for market {id}");
249 continue;
250 };
251 let instrument_id = instrument.id();
252 let price_precision = instrument.price_precision();
253 let size_precision = instrument.size_precision();
254
255 match ws_parse::parse_orderbook_snapshot(
256 &instrument_id,
257 &contents,
258 price_precision,
259 size_precision,
260 ts_init,
261 ) {
262 Ok(deltas) => {
263 Python::attach(|py| {
264 let data = Data::Deltas(OrderBookDeltas_API::new(deltas));
265 let py_obj = data_to_pycapsule(py, data);
266 call_python_threadsafe(py, &call_soon, &callback, py_obj);
267 });
268 }
269 Err(e) => log::error!("Failed to parse orderbook snapshot for {id}: {e}"),
270 }
271 }
272 DydxWsOutputMessage::OrderbookUpdate { id, contents } => {
273 let Some(instrument) = _client.instrument_cache().get_by_market(&id) else {
274 log::warn!("No instrument cached for market {id}");
275 continue;
276 };
277 let instrument_id = instrument.id();
278 let price_precision = instrument.price_precision();
279 let size_precision = instrument.size_precision();
280
281 match ws_parse::parse_orderbook_deltas(
282 &instrument_id,
283 &contents,
284 price_precision,
285 size_precision,
286 ts_init,
287 ) {
288 Ok(deltas) => {
289 Python::attach(|py| {
290 let data = Data::Deltas(OrderBookDeltas_API::new(deltas));
291 let py_obj = data_to_pycapsule(py, data);
292 call_python_threadsafe(py, &call_soon, &callback, py_obj);
293 });
294 }
295 Err(e) => log::error!("Failed to parse orderbook deltas for {id}: {e}"),
296 }
297 }
298 DydxWsOutputMessage::OrderbookBatch { id, updates } => {
299 let Some(instrument) = _client.instrument_cache().get_by_market(&id) else {
300 log::warn!("No instrument cached for market {id}");
301 continue;
302 };
303 let instrument_id = instrument.id();
304 let price_precision = instrument.price_precision();
305 let size_precision = instrument.size_precision();
306
307 let mut all_deltas = Vec::new();
308 let last_idx = updates.len().saturating_sub(1);
309 let mut parse_ok = true;
310
311 for (idx, update) in updates.iter().enumerate() {
312 if idx < last_idx {
313 match ws_parse::parse_orderbook_deltas_with_flag(
314 &instrument_id,
315 update,
316 price_precision,
317 size_precision,
318 ts_init,
319 false,
320 ) {
321 Ok(deltas) => all_deltas.extend(deltas),
322 Err(e) => {
323 log::error!("Failed to parse batch orderbook deltas for {id}: {e}");
324 parse_ok = false;
325 break;
326 }
327 }
328 } else {
329 match ws_parse::parse_orderbook_deltas(
330 &instrument_id,
331 update,
332 price_precision,
333 size_precision,
334 ts_init,
335 ) {
336 Ok(last_deltas) => all_deltas.extend(last_deltas.deltas),
337 Err(e) => {
338 log::error!("Failed to parse batch orderbook deltas for {id}: {e}");
339 parse_ok = false;
340 break;
341 }
342 }
343 }
344 }
345
346 if parse_ok && !all_deltas.is_empty() {
347 let combined = OrderBookDeltas::new(instrument_id, all_deltas);
348 Python::attach(|py| {
349 let data = Data::Deltas(OrderBookDeltas_API::new(combined));
350 let py_obj = data_to_pycapsule(py, data);
351 call_python_threadsafe(py, &call_soon, &callback, py_obj);
352 });
353 }
354 }
355 DydxWsOutputMessage::Candles { id, contents } => {
356 let ticker = id.split('/').next().unwrap_or(&id);
357
358 let Some(bar_type) = bar_types.get(&id).map(|r| *r) else {
359 log::debug!("No bar type registered for candle topic {id}");
360 continue;
361 };
362
363 let Some(instrument) = _client.instrument_cache().get_by_market(ticker) else {
364 log::warn!("No instrument cached for market {ticker}");
365 continue;
366 };
367
368 match ws_parse::parse_candle_bar(
369 bar_type,
370 &instrument,
371 &contents,
372 bars_timestamp_on_close,
373 ts_init,
374 ) {
375 Ok(bar) => {
376 if let Some(prev_bar) = pending_bars.get(&id) {
377 if bar.ts_event == prev_bar.ts_event {
378 pending_bars.insert(id, bar);
379 } else {
380 let emit_bar = *prev_bar;
381 pending_bars.insert(id.clone(), bar);
382 Python::attach(|py| {
383 let py_obj = data_to_pycapsule(py, Data::Bar(emit_bar));
384 call_python_threadsafe(py, &call_soon, &callback, py_obj);
385 });
386 }
387 } else {
388 pending_bars.insert(id, bar);
389 }
390 }
391 Err(e) => log::error!("Failed to parse candle bar for {id}: {e}"),
392 }
393 }
394 DydxWsOutputMessage::Markets(contents) => {
395 if let Some(ref oracle_prices) = contents.oracle_prices {
396 for (ticker, oracle_data) in oracle_prices {
397 let Some(instrument) = _client.instrument_cache().get_by_market(ticker) else {
398 continue;
399 };
400 let instrument_id = instrument.id();
401
402 let Ok(price) = parse_price(&oracle_data.oracle_price, "oracle_price") else {
403 log::warn!("Failed to parse oracle price for {ticker}");
404 continue;
405 };
406
407 let mark_price = MarkPriceUpdate::new(
408 instrument_id,
409 price,
410 ts_init,
411 ts_init,
412 );
413 Python::attach(|py| {
414 match mark_price.into_py_any(py) {
415 Ok(py_obj) => {
416 call_python_threadsafe(py, &call_soon, &callback, py_obj);
417 }
418 Err(e) => log::error!("Failed to convert MarkPriceUpdate to Python: {e}"),
419 }
420 });
421
422 let index_price = IndexPriceUpdate::new(
423 instrument_id,
424 price,
425 ts_init,
426 ts_init,
427 );
428 Python::attach(|py| {
429 match index_price.into_py_any(py) {
430 Ok(py_obj) => {
431 call_python_threadsafe(py, &call_soon, &callback, py_obj);
432 }
433 Err(e) => log::error!("Failed to convert IndexPriceUpdate to Python: {e}"),
434 }
435 });
436 }
437 }
438
439 handle_markets_trading_data(
440 contents.trading.as_ref(),
441 _client.instrument_cache(),
442 &mut seen_tickers,
443 &call_soon,
444 &callback,
445 ts_init,
446 );
447 handle_markets_trading_data(
448 contents.markets.as_ref(),
449 _client.instrument_cache(),
450 &mut seen_tickers,
451 &call_soon,
452 &callback,
453 ts_init,
454 );
455
456 if let Some(ref markets_map) = contents.markets {
458 for (ticker, update) in markets_map {
459 if let Some(ref oracle_price_str) = update.oracle_price {
460 let Some(instrument) = _client.instrument_cache().get_by_market(ticker) else {
461 continue;
462 };
463 let instrument_id = instrument.id();
464 let Ok(price) = parse_price(oracle_price_str, "oracle_price") else {
465 log::warn!("Failed to parse oracle price for {ticker}");
466 continue;
467 };
468
469 let mark_price = MarkPriceUpdate::new(
470 instrument_id,
471 price,
472 ts_init,
473 ts_init,
474 );
475 Python::attach(|py| {
476 match mark_price.into_py_any(py) {
477 Ok(py_obj) => {
478 call_python_threadsafe(py, &call_soon, &callback, py_obj);
479 }
480 Err(e) => log::error!("Failed to convert MarkPriceUpdate to Python: {e}"),
481 }
482 });
483
484 let index_price = IndexPriceUpdate::new(
485 instrument_id,
486 price,
487 ts_init,
488 ts_init,
489 );
490 Python::attach(|py| {
491 match index_price.into_py_any(py) {
492 Ok(py_obj) => {
493 call_python_threadsafe(py, &call_soon, &callback, py_obj);
494 }
495 Err(e) => log::error!("Failed to convert IndexPriceUpdate to Python: {e}"),
496 }
497 });
498 }
499 }
500 }
501 }
502 DydxWsOutputMessage::SubaccountSubscribed(data) => {
503 let Some(account_id) = _client.account_id() else {
504 log::warn!("Cannot parse subaccount subscription: account_id not set");
505 continue;
506 };
507
508 let instrument_cache = _client.instrument_cache();
509
510 let inst_map = instrument_cache.to_instrument_id_map();
511 let oracle_map = instrument_cache.to_oracle_prices_map();
512
513 if let Some(ref subaccount) = data.contents.subaccount {
514 match parse_account_state(
515 subaccount,
516 account_id,
517 &inst_map,
518 &oracle_map,
519 ts_init,
520 ts_init,
521 ) {
522 Ok(account_state) => {
523 Python::attach(|py| {
524 match account_state.into_py_any(py) {
525 Ok(py_obj) => {
526 call_python_threadsafe(py, &call_soon, &callback, py_obj);
527 }
528 Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
529 }
530 });
531 }
532 Err(e) => log::error!("Failed to parse account state: {e}"),
533 }
534
535 if let Some(ref positions) = subaccount.open_perpetual_positions {
536 for (market, ws_position) in positions {
537 match parse_ws_position_report(
538 ws_position,
539 instrument_cache,
540 account_id,
541 ts_init,
542 ) {
543 Ok(report) => {
544 Python::attach(|py| {
545 match pyo3::Py::new(py, report) {
546 Ok(py_obj) => {
547 call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
548 }
549 Err(e) => log::error!("Failed to convert PositionStatusReport to Python: {e}"),
550 }
551 });
552 }
553 Err(e) => log::error!("Failed to parse position for {market}: {e}"),
554 }
555 }
556 }
557 } else {
558 log::warn!("Subaccount subscription without initial state (new/empty subaccount)");
559
560 let currency = Currency::get_or_create_crypto_with_context("USDC", None);
562 let zero = Money::zero(currency);
563 let balance = AccountBalance::new_checked(zero, zero, zero)
564 .expect("zero balance should always be valid");
565 let account_state = AccountState::new(
566 account_id,
567 AccountType::Margin,
568 vec![balance],
569 vec![],
570 true,
571 UUID4::new(),
572 ts_init,
573 ts_init,
574 None,
575 );
576 Python::attach(|py| {
577 match account_state.into_py_any(py) {
578 Ok(py_obj) => {
579 call_python_threadsafe(py, &call_soon, &callback, py_obj);
580 }
581 Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
582 }
583 });
584 }
585 }
586 DydxWsOutputMessage::SubaccountsChannelData(data) => {
587 let Some(account_id) = _client.account_id() else {
588 log::warn!("Cannot parse SubaccountsChannelData: account_id not set");
589 continue;
590 };
591
592 let instrument_cache = _client.instrument_cache();
593 let encoder = _client.encoder();
594
595 let mut terminal_orders: Vec<(u32, u32, String)> = Vec::new();
596 let mut cum_fill_totals: AHashMap<VenueOrderId, (Decimal, Decimal)> = AHashMap::new();
597
598 let mut pending_order_reports = Vec::new();
600
601 if let Some(ref orders) = data.contents.orders {
602 for ws_order in orders {
603 if let Ok(client_id_u32) = ws_order.client_id.parse::<u32>() {
604 let client_meta = ws_order.client_metadata
605 .as_ref()
606 .and_then(|s| s.parse::<u32>().ok())
607 .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
608 order_id_map.insert(ws_order.id.clone(), (client_id_u32, client_meta));
609 }
610
611 match parse_ws_order_report(
612 ws_order,
613 instrument_cache,
614 &order_contexts,
615 encoder,
616 account_id,
617 ts_init,
618 ) {
619 Ok(report) => {
620 if !report.order_status.is_open()
621 && let Ok(cid) = ws_order.client_id.parse::<u32>()
622 {
623 let meta = ws_order.client_metadata
624 .as_ref()
625 .and_then(|s| s.parse::<u32>().ok())
626 .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
627 terminal_orders.push((cid, meta, ws_order.id.clone()));
628 }
629 pending_order_reports.push(report);
630 }
631 Err(e) => log::error!("Failed to parse WS order: {e}"),
632 }
633 }
634 }
635
636 if let Some(ref fills) = data.contents.fills {
638 for ws_fill in fills {
639 match parse_ws_fill_report(
640 ws_fill,
641 instrument_cache,
642 &order_id_map,
643 &order_contexts,
644 encoder,
645 account_id,
646 ts_init,
647 ) {
648 Ok(report) => {
649 let identity = report.client_order_id.and_then(|cid| {
650 dispatch_state.order_identities.get(&cid).map(|r| (cid, r.clone()))
651 });
652
653 if let Some((cid, ident)) = identity {
654 ensure_accepted_to_python(
655 cid,
656 account_id,
657 report.venue_order_id,
658 &ident,
659 &dispatch_state,
660 trader_id,
661 ts_init,
662 &call_soon,
663 &callback,
664 );
665 dispatch_state.insert_filled(cid);
666 let quote_currency = instrument_cache
667 .get(&report.instrument_id)
668 .map_or_else(Currency::USD, |i: InstrumentAny| i.quote_currency());
669 let filled = fill_report_to_order_filled(
670 &report, trader_id, &ident, quote_currency,
671 );
672 send_to_python(filled, &call_soon, &callback);
673 } else {
674 let entry = cum_fill_totals
675 .entry(report.venue_order_id)
676 .or_default();
677 let qty = report.last_qty.as_decimal();
678 entry.0 += report.last_px.as_decimal() * qty;
679 entry.1 += qty;
680 send_to_python(report, &call_soon, &callback);
681 }
682 }
683 Err(e) => log::error!("Failed to parse WS fill: {e}"),
684 }
685 }
686 }
687
688 for report in &mut pending_order_reports {
690 if let Some((notional, total_qty)) =
691 cum_fill_totals.get(&report.venue_order_id)
692 && !total_qty.is_zero()
693 {
694 report.avg_px = Some(notional / total_qty);
695 }
696 }
697
698 for report in pending_order_reports {
699 let identity = report.client_order_id.and_then(|cid| {
700 dispatch_state.order_identities.get(&cid).map(|r| (cid, r.clone()))
701 });
702
703 if let Some((cid, ident)) = identity {
704 match report.order_status {
705 OrderStatus::Accepted => {
706 if dispatch_state.emitted_accepted.contains(&cid)
707 || dispatch_state.filled_orders.contains(&cid)
708 {
709 log::debug!("Skipping duplicate Accepted for {cid}");
710 continue;
711 }
712 dispatch_state.insert_accepted(cid);
713 let accepted = OrderAccepted::new(
714 trader_id,
715 ident.strategy_id,
716 ident.instrument_id,
717 cid,
718 report.venue_order_id,
719 account_id,
720 UUID4::new(),
721 report.ts_last,
722 ts_init,
723 false,
724 );
725 send_to_python(accepted, &call_soon, &callback);
726 }
727 OrderStatus::Canceled => {
728 ensure_accepted_to_python(
729 cid,
730 account_id,
731 report.venue_order_id,
732 &ident,
733 &dispatch_state,
734 trader_id,
735 ts_init,
736 &call_soon,
737 &callback,
738 );
739 let canceled = OrderCanceled::new(
740 trader_id,
741 ident.strategy_id,
742 ident.instrument_id,
743 cid,
744 UUID4::new(),
745 report.ts_last,
746 ts_init,
747 false,
748 Some(report.venue_order_id),
749 Some(account_id),
750 );
751 send_to_python(canceled, &call_soon, &callback);
752 dispatch_state.cleanup_terminal(&cid);
753 }
754 OrderStatus::Filled => {
755 dispatch_state.cleanup_terminal(&cid);
756 }
757 _ => {
758 send_to_python(report, &call_soon, &callback);
759 }
760 }
761 } else {
762 send_to_python(report, &call_soon, &callback);
763 }
764 }
765
766 for (client_id, client_metadata, order_id) in terminal_orders {
768 order_contexts.remove(&client_id);
769 encoder.remove(client_id, client_metadata);
770 order_id_map.remove(&order_id);
771 }
772 }
773 DydxWsOutputMessage::BlockHeight { height, time } => {
774 Python::attach(|py| {
775 let dict = PyDict::new(py);
776 let _ = dict.set_item("type", "block_height");
777 let _ = dict.set_item("height", height);
778 let _ = dict.set_item("time", time.to_rfc3339());
779 if let Ok(py_obj) = dict.into_py_any(py) {
780 call_python_threadsafe(py, &call_soon, &callback, py_obj);
781 }
782 });
783 }
784 DydxWsOutputMessage::Error(err) => {
785 log::error!("dYdX WebSocket error: {err}");
786 }
787 DydxWsOutputMessage::Reconnected => {
788 log::info!("dYdX WebSocket reconnected");
789 pending_bars.clear();
790 }
791 }
792 }
793 });
794 }
795
796 Ok(())
797 })
798 }
799
800 #[pyo3(name = "disconnect")]
809 fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
810 let mut client = self.clone();
811 pyo3_async_runtimes::tokio::future_into_py(py, async move {
812 client.disconnect().await.map_err(to_pyvalue_err)?;
813 Ok(())
814 })
815 }
816
817 #[pyo3(name = "wait_until_active")]
818 fn py_wait_until_active<'py>(
819 &self,
820 py: Python<'py>,
821 timeout_secs: f64,
822 ) -> PyResult<Bound<'py, PyAny>> {
823 let connection_mode = self.connection_mode_atomic();
824
825 pyo3_async_runtimes::tokio::future_into_py(py, async move {
826 let timeout = Duration::from_secs_f64(timeout_secs);
827 let start = Instant::now();
828
829 loop {
830 let mode = connection_mode.load();
831 let mode_u8 = mode.load(Ordering::Relaxed);
832 let is_connected = matches!(
833 mode_u8,
834 x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
835 );
836
837 if is_connected {
838 break;
839 }
840
841 if start.elapsed() > timeout {
842 return Err(to_pyvalue_err(std::io::Error::new(
843 std::io::ErrorKind::TimedOut,
844 format!("Client did not become active within {timeout_secs}s"),
845 )));
846 }
847 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
848 }
849
850 Ok(())
851 })
852 }
853
854 #[pyo3(name = "cache_instrument")]
858 fn py_cache_instrument(&self, instrument: Py<PyAny>, py: Python<'_>) -> PyResult<()> {
859 let inst_any = pyobject_to_instrument_any(py, instrument)?;
860 self.cache_instrument(inst_any);
861 Ok(())
862 }
863
864 #[pyo3(name = "cache_instruments")]
868 fn py_cache_instruments(&self, instruments: Vec<Py<PyAny>>, py: Python<'_>) -> PyResult<()> {
869 let mut instruments_any = Vec::new();
870
871 for inst in instruments {
872 let inst_any = pyobject_to_instrument_any(py, inst)?;
873 instruments_any.push(inst_any);
874 }
875 self.cache_instruments(instruments_any);
876 Ok(())
877 }
878
879 #[pyo3(name = "is_closed")]
880 fn py_is_closed(&self) -> bool {
881 !self.is_connected()
882 }
883
884 #[pyo3(name = "subscribe_trades")]
890 fn py_subscribe_trades<'py>(
891 &self,
892 py: Python<'py>,
893 instrument_id: InstrumentId,
894 ) -> PyResult<Bound<'py, PyAny>> {
895 let client = self.clone();
896 pyo3_async_runtimes::tokio::future_into_py(py, async move {
897 client
898 .subscribe_trades(instrument_id)
899 .await
900 .map_err(to_pyvalue_err)?;
901 Ok(())
902 })
903 }
904
905 #[pyo3(name = "unsubscribe_trades")]
907 fn py_unsubscribe_trades<'py>(
908 &self,
909 py: Python<'py>,
910 instrument_id: InstrumentId,
911 ) -> PyResult<Bound<'py, PyAny>> {
912 let client = self.clone();
913 pyo3_async_runtimes::tokio::future_into_py(py, async move {
914 client
915 .unsubscribe_trades(instrument_id)
916 .await
917 .map_err(to_pyvalue_err)?;
918 Ok(())
919 })
920 }
921
922 #[pyo3(name = "subscribe_orderbook")]
928 fn py_subscribe_orderbook<'py>(
929 &self,
930 py: Python<'py>,
931 instrument_id: InstrumentId,
932 ) -> PyResult<Bound<'py, PyAny>> {
933 let client = self.clone();
934 pyo3_async_runtimes::tokio::future_into_py(py, async move {
935 client
936 .subscribe_orderbook(instrument_id)
937 .await
938 .map_err(to_pyvalue_err)?;
939 Ok(())
940 })
941 }
942
943 #[pyo3(name = "unsubscribe_orderbook")]
945 fn py_unsubscribe_orderbook<'py>(
946 &self,
947 py: Python<'py>,
948 instrument_id: InstrumentId,
949 ) -> PyResult<Bound<'py, PyAny>> {
950 let client = self.clone();
951 pyo3_async_runtimes::tokio::future_into_py(py, async move {
952 client
953 .unsubscribe_orderbook(instrument_id)
954 .await
955 .map_err(to_pyvalue_err)?;
956 Ok(())
957 })
958 }
959
960 #[pyo3(name = "subscribe_bars")]
961 fn py_subscribe_bars<'py>(
962 &self,
963 py: Python<'py>,
964 bar_type: BarType,
965 ) -> PyResult<Bound<'py, PyAny>> {
966 let spec = bar_type.spec();
967 let resolution = DydxCandleResolution::from_bar_spec(&spec).map_err(to_pyvalue_err)?;
968 let resolution = resolution.to_string();
969
970 let client = self.clone();
971 let instrument_id = bar_type.instrument_id();
972 let bar_types = self.bar_types().clone();
973
974 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
976 let topic = format!("{ticker}/{resolution}");
977
978 pyo3_async_runtimes::tokio::future_into_py(py, async move {
979 bar_types.insert(topic, bar_type);
980
981 client
982 .subscribe_candles(instrument_id, &resolution)
983 .await
984 .map_err(to_pyvalue_err)?;
985 Ok(())
986 })
987 }
988
989 #[pyo3(name = "unsubscribe_bars")]
990 fn py_unsubscribe_bars<'py>(
991 &self,
992 py: Python<'py>,
993 bar_type: BarType,
994 ) -> PyResult<Bound<'py, PyAny>> {
995 let spec = bar_type.spec();
996 let resolution = DydxCandleResolution::from_bar_spec(&spec).map_err(to_pyvalue_err)?;
997 let resolution = resolution.to_string();
998
999 let client = self.clone();
1000 let instrument_id = bar_type.instrument_id();
1001 let bar_types = self.bar_types().clone();
1002
1003 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1005 let topic = format!("{ticker}/{resolution}");
1006
1007 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1008 client
1009 .unsubscribe_candles(instrument_id, &resolution)
1010 .await
1011 .map_err(to_pyvalue_err)?;
1012
1013 bar_types.remove(&topic);
1014
1015 Ok(())
1016 })
1017 }
1018
1019 #[pyo3(name = "subscribe_markets")]
1029 fn py_subscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1030 let client = self.clone();
1031 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1032 client.subscribe_markets().await.map_err(to_pyvalue_err)?;
1033 Ok(())
1034 })
1035 }
1036
1037 #[pyo3(name = "unsubscribe_markets")]
1043 fn py_unsubscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1044 let client = self.clone();
1045 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1046 client.unsubscribe_markets().await.map_err(to_pyvalue_err)?;
1047 Ok(())
1048 })
1049 }
1050
1051 #[pyo3(name = "subscribe_subaccount")]
1060 fn py_subscribe_subaccount<'py>(
1061 &self,
1062 py: Python<'py>,
1063 address: String,
1064 subaccount_number: u32,
1065 ) -> PyResult<Bound<'py, PyAny>> {
1066 let client = self.clone();
1067 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1068 client
1069 .subscribe_subaccount(&address, subaccount_number)
1070 .await
1071 .map_err(to_pyvalue_err)?;
1072 Ok(())
1073 })
1074 }
1075
1076 #[pyo3(name = "unsubscribe_subaccount")]
1078 fn py_unsubscribe_subaccount<'py>(
1079 &self,
1080 py: Python<'py>,
1081 address: String,
1082 subaccount_number: u32,
1083 ) -> PyResult<Bound<'py, PyAny>> {
1084 let client = self.clone();
1085 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1086 client
1087 .unsubscribe_subaccount(&address, subaccount_number)
1088 .await
1089 .map_err(to_pyvalue_err)?;
1090 Ok(())
1091 })
1092 }
1093
1094 #[pyo3(name = "subscribe_block_height")]
1104 fn py_subscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1105 let client = self.clone();
1106 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1107 client
1108 .subscribe_block_height()
1109 .await
1110 .map_err(to_pyvalue_err)?;
1111 Ok(())
1112 })
1113 }
1114
1115 #[pyo3(name = "unsubscribe_block_height")]
1121 fn py_unsubscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1122 let client = self.clone();
1123 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1124 client
1125 .unsubscribe_block_height()
1126 .await
1127 .map_err(to_pyvalue_err)?;
1128 Ok(())
1129 })
1130 }
1131}
1132
1133fn instrument_id_from_ticker(ticker: &str) -> InstrumentId {
1134 let symbol = format!("{ticker}-PERP");
1135 InstrumentId::new(Symbol::new(&symbol), *DYDX_VENUE)
1136}
1137
1138fn handle_markets_trading_data(
1139 trading: Option<
1140 &std::collections::HashMap<String, crate::websocket::messages::DydxMarketTradingUpdate>,
1141 >,
1142 instrument_cache: &std::sync::Arc<crate::common::instrument_cache::InstrumentCache>,
1143 seen_tickers: &mut ahash::AHashSet<Ustr>,
1144 call_soon: &Py<PyAny>,
1145 callback: &Py<PyAny>,
1146 ts_init: nautilus_core::UnixNanos,
1147) {
1148 let Some(trading_map) = trading else {
1149 return;
1150 };
1151
1152 for (ticker, update) in trading_map {
1153 let instrument_id = instrument_id_from_ticker(ticker);
1154
1155 if let Some(status) = &update.status {
1156 let action = MarketStatusAction::from(*status);
1157 let is_trading = matches!(status, DydxMarketStatus::Active);
1158
1159 let instrument_status = InstrumentStatus::new(
1160 instrument_id,
1161 action,
1162 ts_init,
1163 ts_init,
1164 None,
1165 None,
1166 Some(is_trading),
1167 None,
1168 None,
1169 );
1170
1171 if instrument_cache.get_by_market(ticker).is_some() {
1172 Python::attach(|py| match instrument_status.into_py_any(py) {
1173 Ok(py_obj) => {
1174 call_python_threadsafe(py, call_soon, callback, py_obj);
1175 }
1176 Err(e) => log::error!("Failed to convert InstrumentStatus to Python: {e}"),
1177 });
1178 }
1179 }
1180
1181 let ticker_ustr = Ustr::from(ticker.as_str());
1182 if !seen_tickers.contains(&ticker_ustr) {
1183 let is_active = update
1184 .status
1185 .as_ref()
1186 .is_none_or(|s| matches!(s, crate::common::enums::DydxMarketStatus::Active));
1187 if instrument_cache.get_by_market(ticker).is_some() {
1188 seen_tickers.insert(ticker_ustr);
1189 } else if is_active {
1190 seen_tickers.insert(ticker_ustr);
1191 log::info!("New instrument discovered via WebSocket: {ticker}");
1192 Python::attach(|py| {
1193 let dict = PyDict::new(py);
1194 let _ = dict.set_item("type", "new_instrument_discovered");
1195 let _ = dict.set_item("ticker", ticker);
1196 if let Ok(py_obj) = dict.into_py_any(py) {
1197 call_python_threadsafe(py, call_soon, callback, py_obj);
1198 }
1199 });
1200 }
1201 }
1202
1203 if let Some(ref rate_str) = update.next_funding_rate {
1204 if let Ok(rate) = Decimal::from_str(rate_str) {
1205 let funding_rate = FundingRateUpdate {
1206 instrument_id,
1207 rate,
1208 interval: Some(60),
1209 next_funding_ns: None,
1210 ts_event: ts_init,
1211 ts_init,
1212 };
1213 Python::attach(|py| match funding_rate.into_py_any(py) {
1214 Ok(py_obj) => {
1215 call_python_threadsafe(py, call_soon, callback, py_obj);
1216 }
1217 Err(e) => log::error!("Failed to convert FundingRateUpdate to Python: {e}"),
1218 });
1219 } else {
1220 log::warn!("Failed to parse next_funding_rate for {ticker}: {rate_str}");
1221 }
1222 }
1223 }
1224}
1225
1226#[expect(clippy::too_many_arguments)]
1227fn ensure_accepted_to_python(
1228 client_order_id: ClientOrderId,
1229 account_id: AccountId,
1230 venue_order_id: VenueOrderId,
1231 identity: &OrderIdentity,
1232 state: &DydxWsDispatchState,
1233 trader_id: TraderId,
1234 ts_init: nautilus_core::UnixNanos,
1235 call_soon: &Py<PyAny>,
1236 callback: &Py<PyAny>,
1237) {
1238 if state.emitted_accepted.contains(&client_order_id) {
1239 return;
1240 }
1241 state.insert_accepted(client_order_id);
1242 let accepted = OrderAccepted::new(
1243 trader_id,
1244 identity.strategy_id,
1245 identity.instrument_id,
1246 client_order_id,
1247 venue_order_id,
1248 account_id,
1249 UUID4::new(),
1250 ts_init,
1251 ts_init,
1252 false,
1253 );
1254 send_to_python(accepted, call_soon, callback);
1255}
1256
1257fn send_to_python<T: for<'py> IntoPyObjectExt<'py>>(
1258 value: T,
1259 call_soon: &Py<PyAny>,
1260 callback: &Py<PyAny>,
1261) {
1262 Python::attach(|py| match value.into_py_any(py) {
1263 Ok(py_obj) => {
1264 call_python_threadsafe(py, call_soon, callback, py_obj);
1265 }
1266 Err(e) => log::error!("Failed to convert to Python: {e}"),
1267 });
1268}