Skip to main content

nautilus_dydx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the dYdX WebSocket client.
17
18use std::{
19    str::FromStr,
20    sync::atomic::Ordering,
21    time::{Duration, Instant},
22};
23
24use ahash::AHashMap;
25use dashmap::DashMap;
26use nautilus_common::live::get_runtime;
27use nautilus_core::{
28    UUID4,
29    python::{call_python_threadsafe, to_pyvalue_err},
30    time::get_atomic_clock_realtime,
31};
32use nautilus_model::{
33    data::{
34        Bar, BarType, Data, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
35        OrderBookDeltas, OrderBookDeltas_API,
36    },
37    enums::{AccountType, MarketStatusAction, OrderSide, OrderStatus, OrderType},
38    events::{AccountState, OrderAccepted, OrderCanceled},
39    identifiers::{
40        AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
41    },
42    instruments::{Instrument, InstrumentAny},
43    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
44    types::{AccountBalance, Currency, Money},
45};
46use nautilus_network::mode::ConnectionMode;
47use pyo3::{IntoPyObjectExt, prelude::*, types::PyDict};
48use rust_decimal::Decimal;
49use ustr::Ustr;
50
51use crate::{
52    common::{
53        consts::DYDX_VENUE,
54        credential::DydxCredential,
55        enums::{DydxCandleResolution, DydxMarketStatus},
56        parse::{extract_raw_symbol, parse_price},
57    },
58    execution::types::OrderContext,
59    http::{client::DydxHttpClient, parse::parse_account_state},
60    python::encoder::PyDydxClientOrderIdEncoder,
61    websocket::{
62        DydxWsDispatchState, OrderIdentity,
63        client::DydxWebSocketClient,
64        enums::DydxWsOutputMessage,
65        fill_report_to_order_filled, parse as ws_parse,
66        parse::{parse_ws_fill_report, parse_ws_order_report, parse_ws_position_report},
67    },
68};
69
70#[pymethods]
71#[pyo3_stub_gen::derive::gen_stub_pymethods]
72impl DydxWebSocketClient {
73    /// Creates a new public WebSocket client for market data.
74    ///
75    /// This creates a new independent instrument cache. To share a cache with
76    /// the HTTP client, use `Self.new_public_with_cache` instead.
77    #[staticmethod]
78    #[pyo3(name = "new_public")]
79    #[pyo3(signature = (url, heartbeat=None, proxy_url=None))]
80    fn py_new_public(url: String, heartbeat: Option<u64>, proxy_url: Option<String>) -> Self {
81        Self::new_public(url, heartbeat, proxy_url)
82    }
83
84    /// Creates a new private WebSocket client for account updates.
85    ///
86    /// This creates a new independent instrument cache. To share a cache with
87    /// the HTTP client, use `Self.new_private_with_cache` instead.
88    #[staticmethod]
89    #[pyo3(name = "new_private")]
90    #[pyo3(signature = (url, private_key, authenticator_ids, account_id, heartbeat=None, proxy_url=None))]
91    fn py_new_private(
92        url: String,
93        private_key: &str,
94        authenticator_ids: Vec<u64>,
95        account_id: AccountId,
96        heartbeat: Option<u64>,
97        proxy_url: Option<String>,
98    ) -> PyResult<Self> {
99        let credential = DydxCredential::from_private_key(private_key, authenticator_ids)
100            .map_err(to_pyvalue_err)?;
101        Ok(Self::new_private(
102            url, credential, account_id, heartbeat, proxy_url,
103        ))
104    }
105
106    /// Returns `true` when the client is connected.
107    #[pyo3(name = "is_connected")]
108    fn py_is_connected(&self) -> bool {
109        self.is_connected()
110    }
111
112    /// Sets the account ID for account message parsing.
113    #[pyo3(name = "set_account_id")]
114    fn py_set_account_id(&mut self, account_id: AccountId) {
115        self.set_account_id(account_id);
116    }
117
118    /// Sets whether bar timestamps use the close time.
119    #[pyo3(name = "set_bars_timestamp_on_close")]
120    fn py_set_bars_timestamp_on_close(&self, value: bool) {
121        self.set_bars_timestamp_on_close(value);
122    }
123
124    /// Shares the HTTP client's instrument cache with this WebSocket client.
125    ///
126    /// The HTTP client's cache includes CLOB pair ID and market ticker indices
127    /// needed for parsing SubaccountsChannelData into typed execution events.
128    /// Must be called before `connect()`.
129    #[pyo3(name = "share_instrument_cache")]
130    fn py_share_instrument_cache(&mut self, http_client: &DydxHttpClient) {
131        self.set_instrument_cache(http_client.instrument_cache().clone());
132    }
133
134    #[pyo3(name = "register_order_identity")]
135    fn py_register_order_identity(
136        &self,
137        client_order_id: ClientOrderId,
138        instrument_id: InstrumentId,
139        strategy_id: StrategyId,
140        order_side: OrderSide,
141        order_type: OrderType,
142    ) {
143        self.ws_dispatch_state().order_identities.insert(
144            client_order_id,
145            OrderIdentity {
146                instrument_id,
147                strategy_id,
148                order_side,
149                order_type,
150            },
151        );
152    }
153
154    #[pyo3(name = "remove_order_identity")]
155    fn py_remove_order_identity(&self, client_order_id: ClientOrderId) {
156        self.ws_dispatch_state()
157            .order_identities
158            .remove(&client_order_id);
159    }
160
161    /// Returns the account ID if set.
162    #[pyo3(name = "account_id")]
163    fn py_account_id(&self) -> Option<AccountId> {
164        self.account_id()
165    }
166
167    /// Returns a reference to the shared client order ID encoder.
168    #[pyo3(name = "encoder")]
169    fn py_encoder(&self) -> PyDydxClientOrderIdEncoder {
170        PyDydxClientOrderIdEncoder::from_arc(self.encoder().clone())
171    }
172
173    /// Returns the URL of this WebSocket client.
174    #[getter]
175    fn py_url(&self) -> String {
176        self.url().to_string()
177    }
178
179    /// Connects the websocket client in handler mode with automatic reconnection.
180    ///
181    /// Spawns a background handler task that owns the WebSocketClient and processes
182    /// raw messages into venue-specific `DydxWsOutputMessage` values.
183    #[pyo3(name = "connect")]
184    #[pyo3(signature = (loop_, instruments, callback, trader_id=None))]
185    #[expect(clippy::needless_pass_by_value)]
186    fn py_connect<'py>(
187        &mut self,
188        py: Python<'py>,
189        loop_: Py<PyAny>,
190        instruments: Vec<Py<PyAny>>,
191        callback: Py<PyAny>,
192        trader_id: Option<TraderId>,
193    ) -> PyResult<Bound<'py, PyAny>> {
194        let call_soon = loop_.getattr(py, "call_soon_threadsafe")?;
195
196        let mut instruments_any = Vec::new();
197
198        for inst in instruments {
199            let inst_any = pyobject_to_instrument_any(py, inst)?;
200            instruments_any.push(inst_any);
201        }
202
203        self.cache_instruments(instruments_any);
204
205        let mut client = self.clone();
206        let bar_types = self.bar_types().clone();
207        let dispatch_state = self.ws_dispatch_state().clone();
208        let trader_id = trader_id.unwrap_or(TraderId::from("TRADER-000"));
209
210        pyo3_async_runtimes::tokio::future_into_py(py, async move {
211            client.connect().await.map_err(to_pyvalue_err)?;
212
213            if let Some(mut rx) = client.take_receiver() {
214                get_runtime().spawn(async move {
215                    let _client = client; // Keep client alive in spawned task
216                    let clock = get_atomic_clock_realtime();
217                    let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
218                    let order_id_map: DashMap<String, (u32, u32)> = DashMap::new();
219                    let bars_timestamp_on_close = _client.bars_timestamp_on_close();
220                    let mut pending_bars: AHashMap<String, Bar> = AHashMap::new();
221                    let mut seen_tickers: ahash::AHashSet<Ustr> = ahash::AHashSet::new();
222
223                    while let Some(msg) = rx.recv().await {
224                        let ts_init = clock.get_time_ns();
225
226                        match msg {
227                            DydxWsOutputMessage::Trades { id, contents } => {
228                                let Some(instrument) = _client.instrument_cache().get_by_market(&id) else {
229                                    log::warn!("No instrument cached for market {id}");
230                                    continue;
231                                };
232                                let instrument_id = instrument.id();
233
234                                match ws_parse::parse_trade_ticks(instrument_id, &instrument, &contents, ts_init) {
235                                    Ok(items) => {
236                                        Python::attach(|py| {
237                                            for data in items {
238                                                let py_obj = data_to_pycapsule(py, data);
239                                                call_python_threadsafe(py, &call_soon, &callback, py_obj);
240                                            }
241                                        });
242                                    }
243                                    Err(e) => log::error!("Failed to parse trade ticks for {id}: {e}"),
244                                }
245                            }
246                            DydxWsOutputMessage::OrderbookSnapshot { id, contents } => {
247                                let Some(instrument) = _client.instrument_cache().get_by_market(&id) else {
248                                    log::warn!("No instrument cached for market {id}");
249                                    continue;
250                                };
251                                let instrument_id = instrument.id();
252                                let price_precision = instrument.price_precision();
253                                let size_precision = instrument.size_precision();
254
255                                match ws_parse::parse_orderbook_snapshot(
256                                    &instrument_id,
257                                    &contents,
258                                    price_precision,
259                                    size_precision,
260                                    ts_init,
261                                ) {
262                                    Ok(deltas) => {
263                                        Python::attach(|py| {
264                                            let data = Data::Deltas(OrderBookDeltas_API::new(deltas));
265                                            let py_obj = data_to_pycapsule(py, data);
266                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
267                                        });
268                                    }
269                                    Err(e) => log::error!("Failed to parse orderbook snapshot for {id}: {e}"),
270                                }
271                            }
272                            DydxWsOutputMessage::OrderbookUpdate { id, contents } => {
273                                let Some(instrument) = _client.instrument_cache().get_by_market(&id) else {
274                                    log::warn!("No instrument cached for market {id}");
275                                    continue;
276                                };
277                                let instrument_id = instrument.id();
278                                let price_precision = instrument.price_precision();
279                                let size_precision = instrument.size_precision();
280
281                                match ws_parse::parse_orderbook_deltas(
282                                    &instrument_id,
283                                    &contents,
284                                    price_precision,
285                                    size_precision,
286                                    ts_init,
287                                ) {
288                                    Ok(deltas) => {
289                                        Python::attach(|py| {
290                                            let data = Data::Deltas(OrderBookDeltas_API::new(deltas));
291                                            let py_obj = data_to_pycapsule(py, data);
292                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
293                                        });
294                                    }
295                                    Err(e) => log::error!("Failed to parse orderbook deltas for {id}: {e}"),
296                                }
297                            }
298                            DydxWsOutputMessage::OrderbookBatch { id, updates } => {
299                                let Some(instrument) = _client.instrument_cache().get_by_market(&id) else {
300                                    log::warn!("No instrument cached for market {id}");
301                                    continue;
302                                };
303                                let instrument_id = instrument.id();
304                                let price_precision = instrument.price_precision();
305                                let size_precision = instrument.size_precision();
306
307                                let mut all_deltas = Vec::new();
308                                let last_idx = updates.len().saturating_sub(1);
309                                let mut parse_ok = true;
310
311                                for (idx, update) in updates.iter().enumerate() {
312                                    if idx < last_idx {
313                                        match ws_parse::parse_orderbook_deltas_with_flag(
314                                            &instrument_id,
315                                            update,
316                                            price_precision,
317                                            size_precision,
318                                            ts_init,
319                                            false,
320                                        ) {
321                                            Ok(deltas) => all_deltas.extend(deltas),
322                                            Err(e) => {
323                                                log::error!("Failed to parse batch orderbook deltas for {id}: {e}");
324                                                parse_ok = false;
325                                                break;
326                                            }
327                                        }
328                                    } else {
329                                        match ws_parse::parse_orderbook_deltas(
330                                            &instrument_id,
331                                            update,
332                                            price_precision,
333                                            size_precision,
334                                            ts_init,
335                                        ) {
336                                            Ok(last_deltas) => all_deltas.extend(last_deltas.deltas),
337                                            Err(e) => {
338                                                log::error!("Failed to parse batch orderbook deltas for {id}: {e}");
339                                                parse_ok = false;
340                                                break;
341                                            }
342                                        }
343                                    }
344                                }
345
346                                if parse_ok && !all_deltas.is_empty() {
347                                    let combined = OrderBookDeltas::new(instrument_id, all_deltas);
348                                    Python::attach(|py| {
349                                        let data = Data::Deltas(OrderBookDeltas_API::new(combined));
350                                        let py_obj = data_to_pycapsule(py, data);
351                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
352                                    });
353                                }
354                            }
355                            DydxWsOutputMessage::Candles { id, contents } => {
356                                let ticker = id.split('/').next().unwrap_or(&id);
357
358                                let Some(bar_type) = bar_types.get(&id).map(|r| *r) else {
359                                    log::debug!("No bar type registered for candle topic {id}");
360                                    continue;
361                                };
362
363                                let Some(instrument) = _client.instrument_cache().get_by_market(ticker) else {
364                                    log::warn!("No instrument cached for market {ticker}");
365                                    continue;
366                                };
367
368                                match ws_parse::parse_candle_bar(
369                                    bar_type,
370                                    &instrument,
371                                    &contents,
372                                    bars_timestamp_on_close,
373                                    ts_init,
374                                ) {
375                                    Ok(bar) => {
376                                        if let Some(prev_bar) = pending_bars.get(&id) {
377                                            if bar.ts_event == prev_bar.ts_event {
378                                                pending_bars.insert(id, bar);
379                                            } else {
380                                                let emit_bar = *prev_bar;
381                                                pending_bars.insert(id.clone(), bar);
382                                                Python::attach(|py| {
383                                                    let py_obj = data_to_pycapsule(py, Data::Bar(emit_bar));
384                                                    call_python_threadsafe(py, &call_soon, &callback, py_obj);
385                                                });
386                                            }
387                                        } else {
388                                            pending_bars.insert(id, bar);
389                                        }
390                                    }
391                                    Err(e) => log::error!("Failed to parse candle bar for {id}: {e}"),
392                                }
393                            }
394                            DydxWsOutputMessage::Markets(contents) => {
395                                if let Some(ref oracle_prices) = contents.oracle_prices {
396                                    for (ticker, oracle_data) in oracle_prices {
397                                        let Some(instrument) = _client.instrument_cache().get_by_market(ticker) else {
398                                            continue;
399                                        };
400                                        let instrument_id = instrument.id();
401
402                                        let Ok(price) = parse_price(&oracle_data.oracle_price, "oracle_price") else {
403                                            log::warn!("Failed to parse oracle price for {ticker}");
404                                            continue;
405                                        };
406
407                                        let mark_price = MarkPriceUpdate::new(
408                                            instrument_id,
409                                            price,
410                                            ts_init,
411                                            ts_init,
412                                        );
413                                        Python::attach(|py| {
414                                            match mark_price.into_py_any(py) {
415                                                Ok(py_obj) => {
416                                                    call_python_threadsafe(py, &call_soon, &callback, py_obj);
417                                                }
418                                                Err(e) => log::error!("Failed to convert MarkPriceUpdate to Python: {e}"),
419                                            }
420                                        });
421
422                                        let index_price = IndexPriceUpdate::new(
423                                            instrument_id,
424                                            price,
425                                            ts_init,
426                                            ts_init,
427                                        );
428                                        Python::attach(|py| {
429                                            match index_price.into_py_any(py) {
430                                                Ok(py_obj) => {
431                                                    call_python_threadsafe(py, &call_soon, &callback, py_obj);
432                                                }
433                                                Err(e) => log::error!("Failed to convert IndexPriceUpdate to Python: {e}"),
434                                            }
435                                        });
436                                    }
437                                }
438
439                                handle_markets_trading_data(
440                                    contents.trading.as_ref(),
441                                    _client.instrument_cache(),
442                                    &mut seen_tickers,
443                                    &call_soon,
444                                    &callback,
445                                    ts_init,
446                                );
447                                handle_markets_trading_data(
448                                    contents.markets.as_ref(),
449                                    _client.instrument_cache(),
450                                    &mut seen_tickers,
451                                    &call_soon,
452                                    &callback,
453                                    ts_init,
454                                );
455
456                                // Parse oracle prices from initial snapshot markets entries
457                                if let Some(ref markets_map) = contents.markets {
458                                    for (ticker, update) in markets_map {
459                                        if let Some(ref oracle_price_str) = update.oracle_price {
460                                            let Some(instrument) = _client.instrument_cache().get_by_market(ticker) else {
461                                                continue;
462                                            };
463                                            let instrument_id = instrument.id();
464                                            let Ok(price) = parse_price(oracle_price_str, "oracle_price") else {
465                                                log::warn!("Failed to parse oracle price for {ticker}");
466                                                continue;
467                                            };
468
469                                            let mark_price = MarkPriceUpdate::new(
470                                                instrument_id,
471                                                price,
472                                                ts_init,
473                                                ts_init,
474                                            );
475                                            Python::attach(|py| {
476                                                match mark_price.into_py_any(py) {
477                                                    Ok(py_obj) => {
478                                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
479                                                    }
480                                                    Err(e) => log::error!("Failed to convert MarkPriceUpdate to Python: {e}"),
481                                                }
482                                            });
483
484                                            let index_price = IndexPriceUpdate::new(
485                                                instrument_id,
486                                                price,
487                                                ts_init,
488                                                ts_init,
489                                            );
490                                            Python::attach(|py| {
491                                                match index_price.into_py_any(py) {
492                                                    Ok(py_obj) => {
493                                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
494                                                    }
495                                                    Err(e) => log::error!("Failed to convert IndexPriceUpdate to Python: {e}"),
496                                                }
497                                            });
498                                        }
499                                    }
500                                }
501                            }
502                            DydxWsOutputMessage::SubaccountSubscribed(data) => {
503                                let Some(account_id) = _client.account_id() else {
504                                    log::warn!("Cannot parse subaccount subscription: account_id not set");
505                                    continue;
506                                };
507
508                                let instrument_cache = _client.instrument_cache();
509
510                                let inst_map = instrument_cache.to_instrument_id_map();
511                                let oracle_map = instrument_cache.to_oracle_prices_map();
512
513                                if let Some(ref subaccount) = data.contents.subaccount {
514                                    match parse_account_state(
515                                        subaccount,
516                                        account_id,
517                                        &inst_map,
518                                        &oracle_map,
519                                        ts_init,
520                                        ts_init,
521                                    ) {
522                                        Ok(account_state) => {
523                                            Python::attach(|py| {
524                                                match account_state.into_py_any(py) {
525                                                    Ok(py_obj) => {
526                                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
527                                                    }
528                                                    Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
529                                                }
530                                            });
531                                        }
532                                    Err(e) => log::error!("Failed to parse account state: {e}"),
533                                }
534
535                                if let Some(ref positions) = subaccount.open_perpetual_positions {
536                                    for (market, ws_position) in positions {
537                                        match parse_ws_position_report(
538                                            ws_position,
539                                            instrument_cache,
540                                            account_id,
541                                            ts_init,
542                                        ) {
543                                            Ok(report) => {
544                                                Python::attach(|py| {
545                                                    match pyo3::Py::new(py, report) {
546                                                        Ok(py_obj) => {
547                                                            call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
548                                                        }
549                                                        Err(e) => log::error!("Failed to convert PositionStatusReport to Python: {e}"),
550                                                    }
551                                                });
552                                            }
553                                            Err(e) => log::error!("Failed to parse position for {market}: {e}"),
554                                        }
555                                    }
556                                }
557                                } else {
558                                    log::warn!("Subaccount subscription without initial state (new/empty subaccount)");
559
560                                    // Emit zero-balance account state so account gets registered
561                                    let currency = Currency::get_or_create_crypto_with_context("USDC", None);
562                                    let zero = Money::zero(currency);
563                                    let balance = AccountBalance::new_checked(zero, zero, zero)
564                                        .expect("zero balance should always be valid");
565                                    let account_state = AccountState::new(
566                                        account_id,
567                                        AccountType::Margin,
568                                        vec![balance],
569                                        vec![],
570                                        true,
571                                        UUID4::new(),
572                                        ts_init,
573                                        ts_init,
574                                        None,
575                                    );
576                                    Python::attach(|py| {
577                                        match account_state.into_py_any(py) {
578                                            Ok(py_obj) => {
579                                                call_python_threadsafe(py, &call_soon, &callback, py_obj);
580                                            }
581                                            Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
582                                        }
583                                    });
584                                }
585                            }
586                            DydxWsOutputMessage::SubaccountsChannelData(data) => {
587                                let Some(account_id) = _client.account_id() else {
588                                    log::warn!("Cannot parse SubaccountsChannelData: account_id not set");
589                                    continue;
590                                };
591
592                                let instrument_cache = _client.instrument_cache();
593                                let encoder = _client.encoder();
594
595                                let mut terminal_orders: Vec<(u32, u32, String)> = Vec::new();
596                                let mut cum_fill_totals: AHashMap<VenueOrderId, (Decimal, Decimal)> = AHashMap::new();
597
598                                // Phase 1: Parse orders and build order_id_map (needed for fill correlation)
599                                let mut pending_order_reports = Vec::new();
600
601                                if let Some(ref orders) = data.contents.orders {
602                                    for ws_order in orders {
603                                        if let Ok(client_id_u32) = ws_order.client_id.parse::<u32>() {
604                                            let client_meta = ws_order.client_metadata
605                                                .as_ref()
606                                                .and_then(|s| s.parse::<u32>().ok())
607                                                .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
608                                            order_id_map.insert(ws_order.id.clone(), (client_id_u32, client_meta));
609                                        }
610
611                                        match parse_ws_order_report(
612                                            ws_order,
613                                            instrument_cache,
614                                            &order_contexts,
615                                            encoder,
616                                            account_id,
617                                            ts_init,
618                                        ) {
619                                            Ok(report) => {
620                                                if !report.order_status.is_open()
621                                                    && let Ok(cid) = ws_order.client_id.parse::<u32>()
622                                                {
623                                                    let meta = ws_order.client_metadata
624                                                        .as_ref()
625                                                        .and_then(|s| s.parse::<u32>().ok())
626                                                        .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
627                                                    terminal_orders.push((cid, meta, ws_order.id.clone()));
628                                                }
629                                                pending_order_reports.push(report);
630                                            }
631                                            Err(e) => log::error!("Failed to parse WS order: {e}"),
632                                        }
633                                    }
634                                }
635
636                                // Phase 2: Process fills (tracked get OrderFilled, untracked get FillReport)
637                                if let Some(ref fills) = data.contents.fills {
638                                    for ws_fill in fills {
639                                        match parse_ws_fill_report(
640                                            ws_fill,
641                                            instrument_cache,
642                                            &order_id_map,
643                                            &order_contexts,
644                                            encoder,
645                                            account_id,
646                                            ts_init,
647                                        ) {
648                                            Ok(report) => {
649                                                let identity = report.client_order_id.and_then(|cid| {
650                                                    dispatch_state.order_identities.get(&cid).map(|r| (cid, r.clone()))
651                                                });
652
653                                                if let Some((cid, ident)) = identity {
654                                                    ensure_accepted_to_python(
655                                                        cid,
656                                                        account_id,
657                                                        report.venue_order_id,
658                                                        &ident,
659                                                        &dispatch_state,
660                                                        trader_id,
661                                                        ts_init,
662                                                        &call_soon,
663                                                        &callback,
664                                                    );
665                                                    dispatch_state.insert_filled(cid);
666                                                    let quote_currency = instrument_cache
667                                                        .get(&report.instrument_id)
668                                                        .map_or_else(Currency::USD, |i: InstrumentAny| i.quote_currency());
669                                                    let filled = fill_report_to_order_filled(
670                                                        &report, trader_id, &ident, quote_currency,
671                                                    );
672                                                    send_to_python(filled, &call_soon, &callback);
673                                                } else {
674                                                    let entry = cum_fill_totals
675                                                        .entry(report.venue_order_id)
676                                                        .or_default();
677                                                    let qty = report.last_qty.as_decimal();
678                                                    entry.0 += report.last_px.as_decimal() * qty;
679                                                    entry.1 += qty;
680                                                    send_to_python(report, &call_soon, &callback);
681                                                }
682                                            }
683                                            Err(e) => log::error!("Failed to parse WS fill: {e}"),
684                                        }
685                                    }
686                                }
687
688                                // Phase 3: Process order status updates
689                                for report in &mut pending_order_reports {
690                                    if let Some((notional, total_qty)) =
691                                        cum_fill_totals.get(&report.venue_order_id)
692                                        && !total_qty.is_zero()
693                                    {
694                                        report.avg_px = Some(notional / total_qty);
695                                    }
696                                }
697
698                                for report in pending_order_reports {
699                                    let identity = report.client_order_id.and_then(|cid| {
700                                        dispatch_state.order_identities.get(&cid).map(|r| (cid, r.clone()))
701                                    });
702
703                                    if let Some((cid, ident)) = identity {
704                                        match report.order_status {
705                                            OrderStatus::Accepted => {
706                                                if dispatch_state.emitted_accepted.contains(&cid)
707                                                    || dispatch_state.filled_orders.contains(&cid)
708                                                {
709                                                    log::debug!("Skipping duplicate Accepted for {cid}");
710                                                    continue;
711                                                }
712                                                dispatch_state.insert_accepted(cid);
713                                                let accepted = OrderAccepted::new(
714                                                    trader_id,
715                                                    ident.strategy_id,
716                                                    ident.instrument_id,
717                                                    cid,
718                                                    report.venue_order_id,
719                                                    account_id,
720                                                    UUID4::new(),
721                                                    report.ts_last,
722                                                    ts_init,
723                                                    false,
724                                                );
725                                                send_to_python(accepted, &call_soon, &callback);
726                                            }
727                                            OrderStatus::Canceled => {
728                                                ensure_accepted_to_python(
729                                                    cid,
730                                                    account_id,
731                                                    report.venue_order_id,
732                                                    &ident,
733                                                    &dispatch_state,
734                                                    trader_id,
735                                                    ts_init,
736                                                    &call_soon,
737                                                    &callback,
738                                                );
739                                                let canceled = OrderCanceled::new(
740                                                    trader_id,
741                                                    ident.strategy_id,
742                                                    ident.instrument_id,
743                                                    cid,
744                                                    UUID4::new(),
745                                                    report.ts_last,
746                                                    ts_init,
747                                                    false,
748                                                    Some(report.venue_order_id),
749                                                    Some(account_id),
750                                                );
751                                                send_to_python(canceled, &call_soon, &callback);
752                                                dispatch_state.cleanup_terminal(&cid);
753                                            }
754                                            OrderStatus::Filled => {
755                                                dispatch_state.cleanup_terminal(&cid);
756                                            }
757                                            _ => {
758                                                send_to_python(report, &call_soon, &callback);
759                                            }
760                                        }
761                                    } else {
762                                        send_to_python(report, &call_soon, &callback);
763                                    }
764                                }
765
766                                // Deferred cleanup after fills are correlated
767                                for (client_id, client_metadata, order_id) in terminal_orders {
768                                    order_contexts.remove(&client_id);
769                                    encoder.remove(client_id, client_metadata);
770                                    order_id_map.remove(&order_id);
771                                }
772                            }
773                            DydxWsOutputMessage::BlockHeight { height, time } => {
774                                Python::attach(|py| {
775                                    let dict = PyDict::new(py);
776                                    let _ = dict.set_item("type", "block_height");
777                                    let _ = dict.set_item("height", height);
778                                    let _ = dict.set_item("time", time.to_rfc3339());
779                                    if let Ok(py_obj) = dict.into_py_any(py) {
780                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
781                                    }
782                                });
783                            }
784                            DydxWsOutputMessage::Error(err) => {
785                                log::error!("dYdX WebSocket error: {err}");
786                            }
787                            DydxWsOutputMessage::Reconnected => {
788                                log::info!("dYdX WebSocket reconnected");
789                                pending_bars.clear();
790                            }
791                        }
792                    }
793                });
794            }
795
796            Ok(())
797        })
798    }
799
800    /// Disconnects the websocket client gracefully.
801    ///
802    /// Sends a disconnect command to the handler, sets the stop signal, then
803    /// awaits the handler task with a timeout before aborting.
804    ///
805    /// # Errors
806    ///
807    /// Returns an error if the underlying client cannot be accessed.
808    #[pyo3(name = "disconnect")]
809    fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
810        let mut client = self.clone();
811        pyo3_async_runtimes::tokio::future_into_py(py, async move {
812            client.disconnect().await.map_err(to_pyvalue_err)?;
813            Ok(())
814        })
815    }
816
817    #[pyo3(name = "wait_until_active")]
818    fn py_wait_until_active<'py>(
819        &self,
820        py: Python<'py>,
821        timeout_secs: f64,
822    ) -> PyResult<Bound<'py, PyAny>> {
823        let connection_mode = self.connection_mode_atomic();
824
825        pyo3_async_runtimes::tokio::future_into_py(py, async move {
826            let timeout = Duration::from_secs_f64(timeout_secs);
827            let start = Instant::now();
828
829            loop {
830                let mode = connection_mode.load();
831                let mode_u8 = mode.load(Ordering::Relaxed);
832                let is_connected = matches!(
833                    mode_u8,
834                    x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
835                );
836
837                if is_connected {
838                    break;
839                }
840
841                if start.elapsed() > timeout {
842                    return Err(to_pyvalue_err(std::io::Error::new(
843                        std::io::ErrorKind::TimedOut,
844                        format!("Client did not become active within {timeout_secs}s"),
845                    )));
846                }
847                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
848            }
849
850            Ok(())
851        })
852    }
853
854    /// Caches a single instrument.
855    ///
856    /// Any existing instrument with the same ID will be replaced.
857    #[pyo3(name = "cache_instrument")]
858    fn py_cache_instrument(&self, instrument: Py<PyAny>, py: Python<'_>) -> PyResult<()> {
859        let inst_any = pyobject_to_instrument_any(py, instrument)?;
860        self.cache_instrument(inst_any);
861        Ok(())
862    }
863
864    /// Caches multiple instruments.
865    ///
866    /// Any existing instruments with the same IDs will be replaced.
867    #[pyo3(name = "cache_instruments")]
868    fn py_cache_instruments(&self, instruments: Vec<Py<PyAny>>, py: Python<'_>) -> PyResult<()> {
869        let mut instruments_any = Vec::new();
870
871        for inst in instruments {
872            let inst_any = pyobject_to_instrument_any(py, inst)?;
873            instruments_any.push(inst_any);
874        }
875        self.cache_instruments(instruments_any);
876        Ok(())
877    }
878
879    #[pyo3(name = "is_closed")]
880    fn py_is_closed(&self) -> bool {
881        !self.is_connected()
882    }
883
884    /// Subscribes to public trade updates for a specific instrument.
885    ///
886    /// # References
887    ///
888    /// <https://docs.dydx.trade/developers/indexer/websockets#trades-channel>
889    #[pyo3(name = "subscribe_trades")]
890    fn py_subscribe_trades<'py>(
891        &self,
892        py: Python<'py>,
893        instrument_id: InstrumentId,
894    ) -> PyResult<Bound<'py, PyAny>> {
895        let client = self.clone();
896        pyo3_async_runtimes::tokio::future_into_py(py, async move {
897            client
898                .subscribe_trades(instrument_id)
899                .await
900                .map_err(to_pyvalue_err)?;
901            Ok(())
902        })
903    }
904
905    /// Unsubscribes from public trade updates for a specific instrument.
906    #[pyo3(name = "unsubscribe_trades")]
907    fn py_unsubscribe_trades<'py>(
908        &self,
909        py: Python<'py>,
910        instrument_id: InstrumentId,
911    ) -> PyResult<Bound<'py, PyAny>> {
912        let client = self.clone();
913        pyo3_async_runtimes::tokio::future_into_py(py, async move {
914            client
915                .unsubscribe_trades(instrument_id)
916                .await
917                .map_err(to_pyvalue_err)?;
918            Ok(())
919        })
920    }
921
922    /// Subscribes to orderbook updates for a specific instrument.
923    ///
924    /// # References
925    ///
926    /// <https://docs.dydx.trade/developers/indexer/websockets#orderbook-channel>
927    #[pyo3(name = "subscribe_orderbook")]
928    fn py_subscribe_orderbook<'py>(
929        &self,
930        py: Python<'py>,
931        instrument_id: InstrumentId,
932    ) -> PyResult<Bound<'py, PyAny>> {
933        let client = self.clone();
934        pyo3_async_runtimes::tokio::future_into_py(py, async move {
935            client
936                .subscribe_orderbook(instrument_id)
937                .await
938                .map_err(to_pyvalue_err)?;
939            Ok(())
940        })
941    }
942
943    /// Unsubscribes from orderbook updates for a specific instrument.
944    #[pyo3(name = "unsubscribe_orderbook")]
945    fn py_unsubscribe_orderbook<'py>(
946        &self,
947        py: Python<'py>,
948        instrument_id: InstrumentId,
949    ) -> PyResult<Bound<'py, PyAny>> {
950        let client = self.clone();
951        pyo3_async_runtimes::tokio::future_into_py(py, async move {
952            client
953                .unsubscribe_orderbook(instrument_id)
954                .await
955                .map_err(to_pyvalue_err)?;
956            Ok(())
957        })
958    }
959
960    #[pyo3(name = "subscribe_bars")]
961    fn py_subscribe_bars<'py>(
962        &self,
963        py: Python<'py>,
964        bar_type: BarType,
965    ) -> PyResult<Bound<'py, PyAny>> {
966        let spec = bar_type.spec();
967        let resolution = DydxCandleResolution::from_bar_spec(&spec).map_err(to_pyvalue_err)?;
968        let resolution = resolution.to_string();
969
970        let client = self.clone();
971        let instrument_id = bar_type.instrument_id();
972        let bar_types = self.bar_types().clone();
973
974        // Build topic for bar type registration (e.g., "ETH-USD/1MIN")
975        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
976        let topic = format!("{ticker}/{resolution}");
977
978        pyo3_async_runtimes::tokio::future_into_py(py, async move {
979            bar_types.insert(topic, bar_type);
980
981            client
982                .subscribe_candles(instrument_id, &resolution)
983                .await
984                .map_err(to_pyvalue_err)?;
985            Ok(())
986        })
987    }
988
989    #[pyo3(name = "unsubscribe_bars")]
990    fn py_unsubscribe_bars<'py>(
991        &self,
992        py: Python<'py>,
993        bar_type: BarType,
994    ) -> PyResult<Bound<'py, PyAny>> {
995        let spec = bar_type.spec();
996        let resolution = DydxCandleResolution::from_bar_spec(&spec).map_err(to_pyvalue_err)?;
997        let resolution = resolution.to_string();
998
999        let client = self.clone();
1000        let instrument_id = bar_type.instrument_id();
1001        let bar_types = self.bar_types().clone();
1002
1003        // Build topic for unregistration
1004        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1005        let topic = format!("{ticker}/{resolution}");
1006
1007        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1008            client
1009                .unsubscribe_candles(instrument_id, &resolution)
1010                .await
1011                .map_err(to_pyvalue_err)?;
1012
1013            bar_types.remove(&topic);
1014
1015            Ok(())
1016        })
1017    }
1018
1019    /// Subscribes to market updates for all instruments.
1020    ///
1021    /// # Errors
1022    ///
1023    /// Returns an error if the subscription request fails.
1024    ///
1025    /// # References
1026    ///
1027    /// <https://docs.dydx.trade/developers/indexer/websockets#markets-channel>
1028    #[pyo3(name = "subscribe_markets")]
1029    fn py_subscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1030        let client = self.clone();
1031        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1032            client.subscribe_markets().await.map_err(to_pyvalue_err)?;
1033            Ok(())
1034        })
1035    }
1036
1037    /// Unsubscribes from market updates.
1038    ///
1039    /// # Errors
1040    ///
1041    /// Returns an error if the unsubscription request fails.
1042    #[pyo3(name = "unsubscribe_markets")]
1043    fn py_unsubscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1044        let client = self.clone();
1045        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1046            client.unsubscribe_markets().await.map_err(to_pyvalue_err)?;
1047            Ok(())
1048        })
1049    }
1050
1051    /// Subscribes to subaccount updates (orders, fills, positions, balances).
1052    ///
1053    /// This requires authentication and will only work for private WebSocket clients
1054    /// created with `Self.new_private`.
1055    ///
1056    /// # References
1057    ///
1058    /// <https://docs.dydx.trade/developers/indexer/websockets#subaccounts-channel>
1059    #[pyo3(name = "subscribe_subaccount")]
1060    fn py_subscribe_subaccount<'py>(
1061        &self,
1062        py: Python<'py>,
1063        address: String,
1064        subaccount_number: u32,
1065    ) -> PyResult<Bound<'py, PyAny>> {
1066        let client = self.clone();
1067        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1068            client
1069                .subscribe_subaccount(&address, subaccount_number)
1070                .await
1071                .map_err(to_pyvalue_err)?;
1072            Ok(())
1073        })
1074    }
1075
1076    /// Unsubscribes from subaccount updates.
1077    #[pyo3(name = "unsubscribe_subaccount")]
1078    fn py_unsubscribe_subaccount<'py>(
1079        &self,
1080        py: Python<'py>,
1081        address: String,
1082        subaccount_number: u32,
1083    ) -> PyResult<Bound<'py, PyAny>> {
1084        let client = self.clone();
1085        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1086            client
1087                .unsubscribe_subaccount(&address, subaccount_number)
1088                .await
1089                .map_err(to_pyvalue_err)?;
1090            Ok(())
1091        })
1092    }
1093
1094    /// Subscribes to block height updates.
1095    ///
1096    /// # Errors
1097    ///
1098    /// Returns an error if the subscription request fails.
1099    ///
1100    /// # References
1101    ///
1102    /// <https://docs.dydx.trade/developers/indexer/websockets#block-height-channel>
1103    #[pyo3(name = "subscribe_block_height")]
1104    fn py_subscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1105        let client = self.clone();
1106        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1107            client
1108                .subscribe_block_height()
1109                .await
1110                .map_err(to_pyvalue_err)?;
1111            Ok(())
1112        })
1113    }
1114
1115    /// Unsubscribes from block height updates.
1116    ///
1117    /// # Errors
1118    ///
1119    /// Returns an error if the unsubscription request fails.
1120    #[pyo3(name = "unsubscribe_block_height")]
1121    fn py_unsubscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
1122        let client = self.clone();
1123        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1124            client
1125                .unsubscribe_block_height()
1126                .await
1127                .map_err(to_pyvalue_err)?;
1128            Ok(())
1129        })
1130    }
1131}
1132
1133fn instrument_id_from_ticker(ticker: &str) -> InstrumentId {
1134    let symbol = format!("{ticker}-PERP");
1135    InstrumentId::new(Symbol::new(&symbol), *DYDX_VENUE)
1136}
1137
1138fn handle_markets_trading_data(
1139    trading: Option<
1140        &std::collections::HashMap<String, crate::websocket::messages::DydxMarketTradingUpdate>,
1141    >,
1142    instrument_cache: &std::sync::Arc<crate::common::instrument_cache::InstrumentCache>,
1143    seen_tickers: &mut ahash::AHashSet<Ustr>,
1144    call_soon: &Py<PyAny>,
1145    callback: &Py<PyAny>,
1146    ts_init: nautilus_core::UnixNanos,
1147) {
1148    let Some(trading_map) = trading else {
1149        return;
1150    };
1151
1152    for (ticker, update) in trading_map {
1153        let instrument_id = instrument_id_from_ticker(ticker);
1154
1155        if let Some(status) = &update.status {
1156            let action = MarketStatusAction::from(*status);
1157            let is_trading = matches!(status, DydxMarketStatus::Active);
1158
1159            let instrument_status = InstrumentStatus::new(
1160                instrument_id,
1161                action,
1162                ts_init,
1163                ts_init,
1164                None,
1165                None,
1166                Some(is_trading),
1167                None,
1168                None,
1169            );
1170
1171            if instrument_cache.get_by_market(ticker).is_some() {
1172                Python::attach(|py| match instrument_status.into_py_any(py) {
1173                    Ok(py_obj) => {
1174                        call_python_threadsafe(py, call_soon, callback, py_obj);
1175                    }
1176                    Err(e) => log::error!("Failed to convert InstrumentStatus to Python: {e}"),
1177                });
1178            }
1179        }
1180
1181        let ticker_ustr = Ustr::from(ticker.as_str());
1182        if !seen_tickers.contains(&ticker_ustr) {
1183            let is_active = update
1184                .status
1185                .as_ref()
1186                .is_none_or(|s| matches!(s, crate::common::enums::DydxMarketStatus::Active));
1187            if instrument_cache.get_by_market(ticker).is_some() {
1188                seen_tickers.insert(ticker_ustr);
1189            } else if is_active {
1190                seen_tickers.insert(ticker_ustr);
1191                log::info!("New instrument discovered via WebSocket: {ticker}");
1192                Python::attach(|py| {
1193                    let dict = PyDict::new(py);
1194                    let _ = dict.set_item("type", "new_instrument_discovered");
1195                    let _ = dict.set_item("ticker", ticker);
1196                    if let Ok(py_obj) = dict.into_py_any(py) {
1197                        call_python_threadsafe(py, call_soon, callback, py_obj);
1198                    }
1199                });
1200            }
1201        }
1202
1203        if let Some(ref rate_str) = update.next_funding_rate {
1204            if let Ok(rate) = Decimal::from_str(rate_str) {
1205                let funding_rate = FundingRateUpdate {
1206                    instrument_id,
1207                    rate,
1208                    interval: Some(60),
1209                    next_funding_ns: None,
1210                    ts_event: ts_init,
1211                    ts_init,
1212                };
1213                Python::attach(|py| match funding_rate.into_py_any(py) {
1214                    Ok(py_obj) => {
1215                        call_python_threadsafe(py, call_soon, callback, py_obj);
1216                    }
1217                    Err(e) => log::error!("Failed to convert FundingRateUpdate to Python: {e}"),
1218                });
1219            } else {
1220                log::warn!("Failed to parse next_funding_rate for {ticker}: {rate_str}");
1221            }
1222        }
1223    }
1224}
1225
1226#[expect(clippy::too_many_arguments)]
1227fn ensure_accepted_to_python(
1228    client_order_id: ClientOrderId,
1229    account_id: AccountId,
1230    venue_order_id: VenueOrderId,
1231    identity: &OrderIdentity,
1232    state: &DydxWsDispatchState,
1233    trader_id: TraderId,
1234    ts_init: nautilus_core::UnixNanos,
1235    call_soon: &Py<PyAny>,
1236    callback: &Py<PyAny>,
1237) {
1238    if state.emitted_accepted.contains(&client_order_id) {
1239        return;
1240    }
1241    state.insert_accepted(client_order_id);
1242    let accepted = OrderAccepted::new(
1243        trader_id,
1244        identity.strategy_id,
1245        identity.instrument_id,
1246        client_order_id,
1247        venue_order_id,
1248        account_id,
1249        UUID4::new(),
1250        ts_init,
1251        ts_init,
1252        false,
1253    );
1254    send_to_python(accepted, call_soon, callback);
1255}
1256
1257fn send_to_python<T: for<'py> IntoPyObjectExt<'py>>(
1258    value: T,
1259    call_soon: &Py<PyAny>,
1260    callback: &Py<PyAny>,
1261) {
1262    Python::attach(|py| match value.into_py_any(py) {
1263        Ok(py_obj) => {
1264            call_python_threadsafe(py, call_soon, callback, py_obj);
1265        }
1266        Err(e) => log::error!("Failed to convert to Python: {e}"),
1267    });
1268}