Skip to main content

nautilus_bybit/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Bybit WebSocket client.
17
18use std::sync::Arc;
19
20use ahash::AHashMap;
21use dashmap::DashMap;
22use futures_util::StreamExt;
23use nautilus_common::live::get_runtime;
24use nautilus_core::{
25    AtomicMap, AtomicSet, UUID4, UnixNanos,
26    python::{call_python_threadsafe, to_pyruntime_err, to_pyvalue_err},
27    time::{AtomicTime, get_atomic_clock_realtime},
28};
29use nautilus_model::{
30    data::{BarType, Data, OrderBookDeltas_API, QuoteTick},
31    enums::{
32        AggregationSource, BarAggregation, OrderSide, OrderType, PriceType, TimeInForce,
33        TriggerType,
34    },
35    events::{OrderCancelRejected, OrderModifyRejected, OrderRejected},
36    identifiers::{
37        AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
38    },
39    instruments::{Instrument, InstrumentAny},
40    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
41    types::{Price, Quantity},
42};
43use nautilus_network::websocket::TransportBackend;
44use pyo3::{IntoPyObjectExt, prelude::*};
45use ustr::Ustr;
46
47use crate::{
48    common::{
49        consts::BYBIT_VENUE,
50        enums::{BybitEnvironment, BybitPositionIdx, BybitProductType},
51        parse::make_bybit_symbol,
52    },
53    python::params::{BybitWsAmendOrderParams, BybitWsCancelOrderParams, BybitWsPlaceOrderParams},
54    websocket::{
55        client::{BATCH_PROCESSING_LIMIT, BybitWebSocketClient, PendingPyRequest},
56        dispatch::PendingOperation,
57        messages::{BybitWebSocketError, BybitWsMessage},
58        parse::{
59            parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_orderbook_quote,
60            parse_ticker_linear_funding, parse_ticker_linear_index_price,
61            parse_ticker_linear_mark_price, parse_ticker_linear_quote, parse_ticker_option_greeks,
62            parse_ticker_option_index_price, parse_ticker_option_mark_price,
63            parse_ticker_option_quote, parse_ws_account_state, parse_ws_fill_report,
64            parse_ws_kline_bar, parse_ws_order_status_report, parse_ws_position_status_report,
65            parse_ws_trade_tick,
66        },
67    },
68};
69
70fn validate_bar_type(bar_type: &BarType) -> anyhow::Result<()> {
71    let spec = bar_type.spec();
72
73    if spec.price_type != PriceType::Last {
74        anyhow::bail!(
75            "Invalid bar type: Bybit bars only support LAST price type, received {}",
76            spec.price_type
77        );
78    }
79
80    if bar_type.aggregation_source() != AggregationSource::External {
81        anyhow::bail!(
82            "Invalid bar type: Bybit bars only support EXTERNAL aggregation source, received {}",
83            bar_type.aggregation_source()
84        );
85    }
86
87    let step = spec.step.get();
88    if spec.aggregation == BarAggregation::Minute && step >= 60 {
89        let hours = step / 60;
90        anyhow::bail!("Invalid bar type: {step}-MINUTE not supported, use {hours}-HOUR instead");
91    }
92
93    Ok(())
94}
95
96#[pymethods]
97#[pyo3_stub_gen::derive::gen_stub_pymethods]
98impl BybitWebSocketError {
99    fn __repr__(&self) -> String {
100        format!(
101            "BybitWebSocketError(code={}, message='{}', conn_id={:?}, topic={:?})",
102            self.code, self.message, self.conn_id, self.topic
103        )
104    }
105
106    #[getter]
107    pub fn code(&self) -> i64 {
108        self.code
109    }
110
111    #[getter]
112    pub fn message(&self) -> &str {
113        &self.message
114    }
115
116    #[getter]
117    pub fn conn_id(&self) -> Option<&str> {
118        self.conn_id.as_deref()
119    }
120
121    #[getter]
122    pub fn topic(&self) -> Option<&str> {
123        self.topic.as_deref()
124    }
125
126    #[getter]
127    pub fn req_id(&self) -> Option<&str> {
128        self.req_id.as_deref()
129    }
130}
131
132#[pymethods]
133#[pyo3_stub_gen::derive::gen_stub_pymethods]
134impl BybitWebSocketClient {
135    /// Creates a new Bybit public WebSocket client.
136    #[staticmethod]
137    #[pyo3(name = "new_public")]
138    #[pyo3(signature = (product_type, environment, url=None, heartbeat=20, proxy_url=None))]
139    fn py_new_public(
140        product_type: BybitProductType,
141        environment: BybitEnvironment,
142        url: Option<String>,
143        heartbeat: u64,
144        proxy_url: Option<String>,
145    ) -> Self {
146        Self::new_public_with(
147            product_type,
148            environment,
149            url,
150            heartbeat,
151            TransportBackend::default(),
152            proxy_url,
153        )
154    }
155
156    /// Creates a new Bybit private WebSocket client.
157    ///
158    /// If `api_key` or `api_secret` are not provided, they will be loaded from
159    /// environment variables based on the environment:
160    /// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
161    /// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
162    /// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
163    #[staticmethod]
164    #[pyo3(name = "new_private")]
165    #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=20, proxy_url=None))]
166    fn py_new_private(
167        environment: BybitEnvironment,
168        api_key: Option<String>,
169        api_secret: Option<String>,
170        url: Option<String>,
171        heartbeat: u64,
172        proxy_url: Option<String>,
173    ) -> Self {
174        Self::new_private(
175            environment,
176            api_key,
177            api_secret,
178            url,
179            heartbeat,
180            TransportBackend::default(),
181            proxy_url,
182        )
183    }
184
185    /// Creates a new Bybit trade WebSocket client for order operations.
186    ///
187    /// If `api_key` or `api_secret` are not provided, they will be loaded from
188    /// environment variables based on the environment:
189    /// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
190    /// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
191    /// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
192    #[staticmethod]
193    #[pyo3(name = "new_trade")]
194    #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=20, proxy_url=None))]
195    fn py_new_trade(
196        environment: BybitEnvironment,
197        api_key: Option<String>,
198        api_secret: Option<String>,
199        url: Option<String>,
200        heartbeat: u64,
201        proxy_url: Option<String>,
202    ) -> Self {
203        Self::new_trade(
204            environment,
205            api_key,
206            api_secret,
207            url,
208            heartbeat,
209            TransportBackend::default(),
210            proxy_url,
211        )
212    }
213
214    #[getter]
215    #[pyo3(name = "api_key_masked")]
216    #[must_use]
217    pub fn py_api_key_masked(&self) -> Option<String> {
218        self.credential().map(|c| c.api_key_masked())
219    }
220
221    /// Returns a value indicating whether the client is active.
222    #[pyo3(name = "is_active")]
223    fn py_is_active(&self) -> bool {
224        self.is_active()
225    }
226
227    /// Returns a value indicating whether the client is closed.
228    #[pyo3(name = "is_closed")]
229    fn py_is_closed(&self) -> bool {
230        self.is_closed()
231    }
232
233    /// Returns the number of currently registered subscriptions.
234    #[pyo3(name = "subscription_count")]
235    fn py_subscription_count(&self) -> usize {
236        self.subscription_count()
237    }
238
239    /// Adds an instrument to the shared instruments cache.
240    #[pyo3(name = "cache_instrument")]
241    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
242        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
243        Ok(())
244    }
245
246    /// Sets the account ID for account message parsing.
247    #[pyo3(name = "set_account_id")]
248    fn py_set_account_id(&mut self, account_id: AccountId) {
249        self.set_account_id(account_id);
250    }
251
252    /// Sets the account market maker level.
253    #[pyo3(name = "set_mm_level")]
254    fn py_set_mm_level(&self, mm_level: u8) {
255        self.set_mm_level(mm_level);
256    }
257
258    /// Sets whether bar timestamps use the close time.
259    #[pyo3(name = "set_bars_timestamp_on_close")]
260    fn py_set_bars_timestamp_on_close(&self, value: bool) {
261        self.set_bars_timestamp_on_close(value);
262    }
263
264    /// Adds an instrument ID to the option greeks subscription set.
265    #[pyo3(name = "add_option_greeks_sub")]
266    fn py_add_option_greeks_sub(&self, instrument_id: InstrumentId) {
267        self.add_option_greeks_sub(instrument_id);
268    }
269
270    /// Removes an instrument ID from the option greeks subscription set.
271    #[pyo3(name = "remove_option_greeks_sub")]
272    fn py_remove_option_greeks_sub(&self, instrument_id: InstrumentId) {
273        self.remove_option_greeks_sub(&instrument_id);
274    }
275
276    /// Disconnects the WebSocket client and stops the background task.
277    #[pyo3(name = "connect")]
278    #[expect(clippy::needless_pass_by_value)] // PyO3 extracted parameter
279    fn py_connect<'py>(
280        &mut self,
281        py: Python<'py>,
282        loop_: Py<PyAny>,
283        callback: Py<PyAny>,
284    ) -> PyResult<Bound<'py, PyAny>> {
285        let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
286        let mut client = self.clone();
287
288        pyo3_async_runtimes::tokio::future_into_py(py, async move {
289            client.connect().await.map_err(to_pyruntime_err)?;
290
291            let stream = client.stream();
292            let clock = get_atomic_clock_realtime();
293            let product_type = client.product_type();
294            let account_id = client.account_id();
295            let bar_types_cache = client.bar_types_cache().clone();
296            let trade_subs = client.trade_subs().clone();
297            let option_greeks_subs = client.option_greeks_subs().clone();
298            let bars_timestamp_on_close = client.bars_timestamp_on_close();
299            let instruments = Arc::clone(client.instruments_cache_ref());
300            let pending_py_requests = Arc::clone(client.pending_py_requests());
301
302            get_runtime().spawn(async move {
303                let mut quote_cache = AHashMap::new();
304                let mut funding_cache: AHashMap<Ustr, (Option<String>, Option<String>)> =
305                    AHashMap::new();
306                let _client = client;
307                let _resolve = |raw_symbol: &Ustr| -> Option<InstrumentAny> {
308                    let key =
309                        product_type.map_or(*raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
310                    instruments.get_cloned(&key)
311                };
312
313                tokio::pin!(stream);
314
315                while let Some(msg) = stream.next().await {
316                    match msg {
317                        BybitWsMessage::Orderbook(ref msg) => {
318                            handle_orderbook(
319                                msg,
320                                product_type,
321                                &instruments,
322                                &mut quote_cache,
323                                clock,
324                                &call_soon,
325                                &callback,
326                            );
327                        }
328                        BybitWsMessage::Trade(ref msg) => {
329                            handle_trade(
330                                msg,
331                                product_type,
332                                &instruments,
333                                &trade_subs,
334                                clock,
335                                &call_soon,
336                                &callback,
337                            );
338                        }
339                        BybitWsMessage::Kline(ref msg) => {
340                            handle_kline(
341                                msg,
342                                product_type,
343                                &instruments,
344                                &bar_types_cache,
345                                bars_timestamp_on_close,
346                                clock,
347                                &call_soon,
348                                &callback,
349                            );
350                        }
351                        BybitWsMessage::TickerLinear(ref msg) => {
352                            handle_ticker_linear(
353                                msg,
354                                product_type,
355                                &instruments,
356                                &mut quote_cache,
357                                &mut funding_cache,
358                                clock,
359                                &call_soon,
360                                &callback,
361                            );
362                        }
363                        BybitWsMessage::TickerOption(ref msg) => {
364                            handle_ticker_option(
365                                msg,
366                                product_type,
367                                &instruments,
368                                &mut quote_cache,
369                                &option_greeks_subs,
370                                clock,
371                                &call_soon,
372                                &callback,
373                            );
374                        }
375                        BybitWsMessage::AccountOrder(ref msg) => {
376                            handle_account_order(
377                                msg,
378                                &instruments,
379                                account_id,
380                                clock,
381                                &call_soon,
382                                &callback,
383                            );
384                        }
385                        BybitWsMessage::AccountExecution(ref msg) => {
386                            handle_account_execution(
387                                msg,
388                                &instruments,
389                                account_id,
390                                clock,
391                                &call_soon,
392                                &callback,
393                            );
394                        }
395                        BybitWsMessage::AccountWallet(ref msg) => {
396                            handle_account_wallet(msg, account_id, clock, &call_soon, &callback);
397                        }
398                        BybitWsMessage::AccountPosition(ref msg) => {
399                            handle_account_position(
400                                msg,
401                                &instruments,
402                                account_id,
403                                clock,
404                                &call_soon,
405                                &callback,
406                            );
407                        }
408                        BybitWsMessage::OrderResponse(ref resp) => {
409                            handle_order_response(
410                                resp,
411                                &pending_py_requests,
412                                account_id,
413                                clock,
414                                &call_soon,
415                                &callback,
416                            );
417                        }
418                        BybitWsMessage::Error(err) => {
419                            send_to_python(err, &call_soon, &callback);
420                        }
421                        BybitWsMessage::Reconnected => {
422                            quote_cache.clear();
423                            funding_cache.clear();
424                            log::info!("WebSocket reconnected");
425                        }
426                        BybitWsMessage::Auth(_) => {
427                            log::info!("WebSocket authenticated");
428                        }
429                    }
430                }
431            });
432
433            Ok(())
434        })
435    }
436
437    #[pyo3(name = "close")]
438    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
439        let mut client = self.clone();
440
441        pyo3_async_runtimes::tokio::future_into_py(py, async move {
442            if let Err(e) = client.close().await {
443                log::error!("Error on close: {e}");
444            }
445            Ok(())
446        })
447    }
448
449    /// Subscribe to the provided topic strings.
450    #[pyo3(name = "subscribe")]
451    fn py_subscribe<'py>(
452        &self,
453        py: Python<'py>,
454        topics: Vec<String>,
455    ) -> PyResult<Bound<'py, PyAny>> {
456        let client = self.clone();
457
458        pyo3_async_runtimes::tokio::future_into_py(py, async move {
459            client.subscribe(topics).await.map_err(to_pyruntime_err)?;
460            Ok(())
461        })
462    }
463
464    /// Unsubscribe from the provided topics.
465    #[pyo3(name = "unsubscribe")]
466    fn py_unsubscribe<'py>(
467        &self,
468        py: Python<'py>,
469        topics: Vec<String>,
470    ) -> PyResult<Bound<'py, PyAny>> {
471        let client = self.clone();
472
473        pyo3_async_runtimes::tokio::future_into_py(py, async move {
474            client.unsubscribe(topics).await.map_err(to_pyruntime_err)?;
475            Ok(())
476        })
477    }
478
479    /// Subscribes to orderbook updates for a specific instrument.
480    ///
481    /// # References
482    ///
483    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/orderbook>
484    #[pyo3(name = "subscribe_orderbook")]
485    fn py_subscribe_orderbook<'py>(
486        &self,
487        py: Python<'py>,
488        instrument_id: InstrumentId,
489        depth: u32,
490    ) -> PyResult<Bound<'py, PyAny>> {
491        let client = self.clone();
492
493        pyo3_async_runtimes::tokio::future_into_py(py, async move {
494            client
495                .subscribe_orderbook(instrument_id, depth)
496                .await
497                .map_err(to_pyruntime_err)?;
498            Ok(())
499        })
500    }
501
502    /// Unsubscribes from orderbook updates for a specific instrument.
503    #[pyo3(name = "unsubscribe_orderbook")]
504    fn py_unsubscribe_orderbook<'py>(
505        &self,
506        py: Python<'py>,
507        instrument_id: InstrumentId,
508        depth: u32,
509    ) -> PyResult<Bound<'py, PyAny>> {
510        let client = self.clone();
511
512        pyo3_async_runtimes::tokio::future_into_py(py, async move {
513            client
514                .unsubscribe_orderbook(instrument_id, depth)
515                .await
516                .map_err(to_pyruntime_err)?;
517            Ok(())
518        })
519    }
520
521    /// Subscribes to public trade updates for a specific instrument.
522    ///
523    /// # References
524    ///
525    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/trade>
526    #[pyo3(name = "subscribe_trades")]
527    fn py_subscribe_trades<'py>(
528        &self,
529        py: Python<'py>,
530        instrument_id: InstrumentId,
531    ) -> PyResult<Bound<'py, PyAny>> {
532        let client = self.clone();
533
534        pyo3_async_runtimes::tokio::future_into_py(py, async move {
535            client
536                .subscribe_trades(instrument_id)
537                .await
538                .map_err(to_pyruntime_err)?;
539            Ok(())
540        })
541    }
542
543    /// Unsubscribes from public trade updates for a specific instrument.
544    #[pyo3(name = "unsubscribe_trades")]
545    fn py_unsubscribe_trades<'py>(
546        &self,
547        py: Python<'py>,
548        instrument_id: InstrumentId,
549    ) -> PyResult<Bound<'py, PyAny>> {
550        let client = self.clone();
551
552        pyo3_async_runtimes::tokio::future_into_py(py, async move {
553            client
554                .unsubscribe_trades(instrument_id)
555                .await
556                .map_err(to_pyruntime_err)?;
557            Ok(())
558        })
559    }
560
561    /// Subscribes to ticker updates for a specific instrument.
562    ///
563    /// # References
564    ///
565    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/ticker>
566    #[pyo3(name = "subscribe_ticker")]
567    fn py_subscribe_ticker<'py>(
568        &self,
569        py: Python<'py>,
570        instrument_id: InstrumentId,
571    ) -> PyResult<Bound<'py, PyAny>> {
572        let client = self.clone();
573
574        pyo3_async_runtimes::tokio::future_into_py(py, async move {
575            client
576                .subscribe_ticker(instrument_id)
577                .await
578                .map_err(to_pyruntime_err)?;
579            Ok(())
580        })
581    }
582
583    #[pyo3(name = "subscribe_option_greeks")]
584    fn py_subscribe_option_greeks<'py>(
585        &self,
586        py: Python<'py>,
587        instrument_id: InstrumentId,
588    ) -> PyResult<Bound<'py, PyAny>> {
589        self.add_option_greeks_sub(instrument_id);
590        let client = self.clone();
591
592        pyo3_async_runtimes::tokio::future_into_py(py, async move {
593            client
594                .subscribe_ticker(instrument_id)
595                .await
596                .map_err(to_pyruntime_err)?;
597            Ok(())
598        })
599    }
600
601    #[pyo3(name = "unsubscribe_option_greeks")]
602    fn py_unsubscribe_option_greeks<'py>(
603        &self,
604        py: Python<'py>,
605        instrument_id: InstrumentId,
606    ) -> PyResult<Bound<'py, PyAny>> {
607        self.remove_option_greeks_sub(&instrument_id);
608        let client = self.clone();
609
610        pyo3_async_runtimes::tokio::future_into_py(py, async move {
611            client
612                .unsubscribe_ticker(instrument_id)
613                .await
614                .map_err(to_pyruntime_err)?;
615            Ok(())
616        })
617    }
618
619    /// Unsubscribes from ticker updates for a specific instrument.
620    #[pyo3(name = "unsubscribe_ticker")]
621    fn py_unsubscribe_ticker<'py>(
622        &self,
623        py: Python<'py>,
624        instrument_id: InstrumentId,
625    ) -> PyResult<Bound<'py, PyAny>> {
626        let client = self.clone();
627
628        pyo3_async_runtimes::tokio::future_into_py(py, async move {
629            client
630                .unsubscribe_ticker(instrument_id)
631                .await
632                .map_err(to_pyruntime_err)?;
633            Ok(())
634        })
635    }
636
637    /// Subscribes to kline/candlestick updates for a specific instrument.
638    ///
639    /// # References
640    ///
641    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/kline>
642    #[pyo3(name = "subscribe_bars")]
643    fn py_subscribe_bars<'py>(
644        &self,
645        py: Python<'py>,
646        bar_type: BarType,
647    ) -> PyResult<Bound<'py, PyAny>> {
648        validate_bar_type(&bar_type).map_err(to_pyvalue_err)?;
649
650        let client = self.clone();
651        pyo3_async_runtimes::tokio::future_into_py(py, async move {
652            client
653                .subscribe_bars(bar_type)
654                .await
655                .map_err(to_pyruntime_err)?;
656            Ok(())
657        })
658    }
659
660    /// Unsubscribes from kline/candlestick updates for a specific instrument.
661    #[pyo3(name = "unsubscribe_bars")]
662    fn py_unsubscribe_bars<'py>(
663        &self,
664        py: Python<'py>,
665        bar_type: BarType,
666    ) -> PyResult<Bound<'py, PyAny>> {
667        validate_bar_type(&bar_type).map_err(to_pyvalue_err)?;
668
669        let client = self.clone();
670        pyo3_async_runtimes::tokio::future_into_py(py, async move {
671            client
672                .unsubscribe_bars(bar_type)
673                .await
674                .map_err(to_pyruntime_err)?;
675            Ok(())
676        })
677    }
678
679    /// Subscribes to order updates.
680    ///
681    /// # Errors
682    ///
683    /// Returns an error if the subscription request fails or if not authenticated.
684    ///
685    /// # References
686    ///
687    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/order>
688    #[pyo3(name = "subscribe_orders")]
689    fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
690        let client = self.clone();
691
692        pyo3_async_runtimes::tokio::future_into_py(py, async move {
693            client.subscribe_orders().await.map_err(to_pyruntime_err)?;
694            Ok(())
695        })
696    }
697
698    /// Unsubscribes from order updates.
699    #[pyo3(name = "unsubscribe_orders")]
700    fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
701        let client = self.clone();
702
703        pyo3_async_runtimes::tokio::future_into_py(py, async move {
704            client
705                .unsubscribe_orders()
706                .await
707                .map_err(to_pyruntime_err)?;
708            Ok(())
709        })
710    }
711
712    /// Subscribes to execution/fill updates.
713    ///
714    /// # Errors
715    ///
716    /// Returns an error if the subscription request fails or if not authenticated.
717    ///
718    /// # References
719    ///
720    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/execution>
721    #[pyo3(name = "subscribe_executions")]
722    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
723        let client = self.clone();
724
725        pyo3_async_runtimes::tokio::future_into_py(py, async move {
726            client
727                .subscribe_executions()
728                .await
729                .map_err(to_pyruntime_err)?;
730            Ok(())
731        })
732    }
733
734    /// Unsubscribes from execution/fill updates.
735    #[pyo3(name = "unsubscribe_executions")]
736    fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
737        let client = self.clone();
738
739        pyo3_async_runtimes::tokio::future_into_py(py, async move {
740            client
741                .unsubscribe_executions()
742                .await
743                .map_err(to_pyruntime_err)?;
744            Ok(())
745        })
746    }
747
748    /// Subscribes to position updates.
749    ///
750    /// # Errors
751    ///
752    /// Returns an error if the subscription request fails or if not authenticated.
753    ///
754    /// # References
755    ///
756    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/position>
757    #[pyo3(name = "subscribe_positions")]
758    fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
759        let client = self.clone();
760
761        pyo3_async_runtimes::tokio::future_into_py(py, async move {
762            client
763                .subscribe_positions()
764                .await
765                .map_err(to_pyruntime_err)?;
766            Ok(())
767        })
768    }
769
770    /// Unsubscribes from position updates.
771    #[pyo3(name = "unsubscribe_positions")]
772    fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
773        let client = self.clone();
774
775        pyo3_async_runtimes::tokio::future_into_py(py, async move {
776            client
777                .unsubscribe_positions()
778                .await
779                .map_err(to_pyruntime_err)?;
780            Ok(())
781        })
782    }
783
784    /// Subscribes to wallet/balance updates.
785    ///
786    /// # Errors
787    ///
788    /// Returns an error if the subscription request fails or if not authenticated.
789    ///
790    /// # References
791    ///
792    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/wallet>
793    #[pyo3(name = "subscribe_wallet")]
794    fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
795        let client = self.clone();
796
797        pyo3_async_runtimes::tokio::future_into_py(py, async move {
798            client.subscribe_wallet().await.map_err(to_pyruntime_err)?;
799            Ok(())
800        })
801    }
802
803    /// Unsubscribes from wallet/balance updates.
804    #[pyo3(name = "unsubscribe_wallet")]
805    fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
806        let client = self.clone();
807
808        pyo3_async_runtimes::tokio::future_into_py(py, async move {
809            client
810                .unsubscribe_wallet()
811                .await
812                .map_err(to_pyruntime_err)?;
813            Ok(())
814        })
815    }
816
817    /// Waits until the WebSocket client becomes active or times out.
818    #[pyo3(name = "wait_until_active")]
819    fn py_wait_until_active<'py>(
820        &self,
821        py: Python<'py>,
822        timeout_secs: f64,
823    ) -> PyResult<Bound<'py, PyAny>> {
824        let client = self.clone();
825
826        pyo3_async_runtimes::tokio::future_into_py(py, async move {
827            client
828                .wait_until_active(timeout_secs)
829                .await
830                .map_err(to_pyruntime_err)?;
831            Ok(())
832        })
833    }
834
835    /// Submits an order using Nautilus domain objects.
836    #[pyo3(name = "submit_order")]
837    #[pyo3(signature = (
838        product_type,
839        trader_id,
840        strategy_id,
841        instrument_id,
842        client_order_id,
843        order_side,
844        order_type,
845        quantity,
846        is_quote_quantity=false,
847        time_in_force=None,
848        price=None,
849        trigger_price=None,
850        trigger_type=None,
851        post_only=None,
852        reduce_only=None,
853        is_leverage=false,
854        position_idx=None,
855    ))]
856    #[expect(clippy::too_many_arguments)]
857    fn py_submit_order<'py>(
858        &self,
859        py: Python<'py>,
860        product_type: BybitProductType,
861        trader_id: TraderId,
862        strategy_id: StrategyId,
863        instrument_id: InstrumentId,
864        client_order_id: ClientOrderId,
865        order_side: OrderSide,
866        order_type: OrderType,
867        quantity: Quantity,
868        is_quote_quantity: bool,
869        time_in_force: Option<TimeInForce>,
870        price: Option<Price>,
871        trigger_price: Option<Price>,
872        trigger_type: Option<TriggerType>,
873        post_only: Option<bool>,
874        reduce_only: Option<bool>,
875        is_leverage: bool,
876        position_idx: Option<BybitPositionIdx>,
877    ) -> PyResult<Bound<'py, PyAny>> {
878        let client = self.clone();
879        let pending_py_requests = Arc::clone(self.pending_py_requests());
880
881        pyo3_async_runtimes::tokio::future_into_py(py, async move {
882            let req_id = client
883                .submit_order(
884                    product_type,
885                    instrument_id,
886                    client_order_id,
887                    order_side,
888                    order_type,
889                    quantity,
890                    is_quote_quantity,
891                    time_in_force,
892                    price,
893                    trigger_price,
894                    trigger_type,
895                    post_only,
896                    reduce_only,
897                    is_leverage,
898                    position_idx,
899                )
900                .await
901                .map_err(to_pyruntime_err)?;
902            pending_py_requests.insert(
903                req_id,
904                vec![PendingPyRequest {
905                    client_order_id,
906                    operation: PendingOperation::Place,
907                    trader_id,
908                    strategy_id,
909                    instrument_id,
910                    venue_order_id: None,
911                }],
912            );
913            Ok(())
914        })
915    }
916
917    /// Modifies an existing order using Nautilus domain objects.
918    #[pyo3(name = "modify_order")]
919    #[pyo3(signature = (
920        product_type,
921        trader_id,
922        strategy_id,
923        instrument_id,
924        client_order_id,
925        venue_order_id=None,
926        quantity=None,
927        price=None,
928    ))]
929    #[expect(clippy::too_many_arguments)]
930    fn py_modify_order<'py>(
931        &self,
932        py: Python<'py>,
933        product_type: BybitProductType,
934        trader_id: TraderId,
935        strategy_id: StrategyId,
936        instrument_id: InstrumentId,
937        client_order_id: ClientOrderId,
938        venue_order_id: Option<VenueOrderId>,
939        quantity: Option<Quantity>,
940        price: Option<Price>,
941    ) -> PyResult<Bound<'py, PyAny>> {
942        let client = self.clone();
943        let pending_py_requests = Arc::clone(self.pending_py_requests());
944
945        pyo3_async_runtimes::tokio::future_into_py(py, async move {
946            let req_id = client
947                .modify_order(
948                    product_type,
949                    instrument_id,
950                    client_order_id,
951                    venue_order_id,
952                    quantity,
953                    price,
954                )
955                .await
956                .map_err(to_pyruntime_err)?;
957            pending_py_requests.insert(
958                req_id,
959                vec![PendingPyRequest {
960                    client_order_id,
961                    operation: PendingOperation::Amend,
962                    trader_id,
963                    strategy_id,
964                    instrument_id,
965                    venue_order_id,
966                }],
967            );
968            Ok(())
969        })
970    }
971
972    /// Cancels an order via WebSocket, returning the request ID for correlation.
973    #[pyo3(name = "cancel_order")]
974    #[pyo3(signature = (
975        product_type,
976        trader_id,
977        strategy_id,
978        instrument_id,
979        client_order_id,
980        venue_order_id=None,
981    ))]
982    #[expect(clippy::too_many_arguments)]
983    fn py_cancel_order<'py>(
984        &self,
985        py: Python<'py>,
986        product_type: BybitProductType,
987        trader_id: TraderId,
988        strategy_id: StrategyId,
989        instrument_id: InstrumentId,
990        client_order_id: ClientOrderId,
991        venue_order_id: Option<VenueOrderId>,
992    ) -> PyResult<Bound<'py, PyAny>> {
993        let client = self.clone();
994        let pending_py_requests = Arc::clone(self.pending_py_requests());
995
996        pyo3_async_runtimes::tokio::future_into_py(py, async move {
997            let req_id = client
998                .cancel_order_by_id(product_type, instrument_id, client_order_id, venue_order_id)
999                .await
1000                .map_err(to_pyruntime_err)?;
1001            pending_py_requests.insert(
1002                req_id,
1003                vec![PendingPyRequest {
1004                    client_order_id,
1005                    operation: PendingOperation::Cancel,
1006                    trader_id,
1007                    strategy_id,
1008                    instrument_id,
1009                    venue_order_id,
1010                }],
1011            );
1012            Ok(())
1013        })
1014    }
1015
1016    /// Builds order params for placing an order.
1017    #[pyo3(name = "build_place_order_params")]
1018    #[pyo3(signature = (
1019        product_type,
1020        instrument_id,
1021        client_order_id,
1022        order_side,
1023        order_type,
1024        quantity,
1025        is_quote_quantity=false,
1026        time_in_force=None,
1027        price=None,
1028        trigger_price=None,
1029        trigger_type=None,
1030        post_only=None,
1031        reduce_only=None,
1032        is_leverage=false,
1033        take_profit=None,
1034        stop_loss=None,
1035        position_idx=None,
1036    ))]
1037    #[expect(clippy::too_many_arguments)]
1038    fn py_build_place_order_params(
1039        &self,
1040        product_type: BybitProductType,
1041        instrument_id: InstrumentId,
1042        client_order_id: ClientOrderId,
1043        order_side: OrderSide,
1044        order_type: OrderType,
1045        quantity: Quantity,
1046        is_quote_quantity: bool,
1047        time_in_force: Option<TimeInForce>,
1048        price: Option<Price>,
1049        trigger_price: Option<Price>,
1050        trigger_type: Option<TriggerType>,
1051        post_only: Option<bool>,
1052        reduce_only: Option<bool>,
1053        is_leverage: bool,
1054        take_profit: Option<Price>,
1055        stop_loss: Option<Price>,
1056        position_idx: Option<BybitPositionIdx>,
1057    ) -> PyResult<BybitWsPlaceOrderParams> {
1058        let params = self
1059            .build_place_order_params(
1060                product_type,
1061                instrument_id,
1062                client_order_id,
1063                order_side,
1064                order_type,
1065                quantity,
1066                is_quote_quantity,
1067                time_in_force,
1068                price,
1069                trigger_price,
1070                trigger_type,
1071                post_only,
1072                reduce_only,
1073                is_leverage,
1074                take_profit,
1075                stop_loss,
1076                position_idx,
1077            )
1078            .map_err(to_pyruntime_err)?;
1079        Ok(params.into())
1080    }
1081
1082    /// Batch cancels multiple orders via WebSocket, returning the request ID for correlation.
1083    #[pyo3(name = "batch_cancel_orders")]
1084    fn py_batch_cancel_orders<'py>(
1085        &self,
1086        py: Python<'py>,
1087        trader_id: TraderId,
1088        strategy_id: StrategyId,
1089        orders: Vec<BybitWsCancelOrderParams>,
1090    ) -> PyResult<Bound<'py, PyAny>> {
1091        let client = self.clone();
1092        let pending_py_requests = Arc::clone(self.pending_py_requests());
1093
1094        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1095            let order_params: Vec<crate::websocket::messages::BybitWsCancelOrderParams> = orders
1096                .into_iter()
1097                .map(|p| p.try_into())
1098                .collect::<Result<Vec<_>, _>>()
1099                .map_err(to_pyruntime_err)?;
1100
1101            let per_order = build_pending_entries(
1102                &order_params,
1103                PendingOperation::Cancel,
1104                trader_id,
1105                strategy_id,
1106            );
1107
1108            let req_ids = client
1109                .batch_cancel_orders(order_params)
1110                .await
1111                .map_err(to_pyruntime_err)?;
1112
1113            register_batch_pending(req_ids, &per_order, &pending_py_requests);
1114            Ok(())
1115        })
1116    }
1117
1118    /// Builds order params for amending an order.
1119    #[pyo3(name = "build_amend_order_params")]
1120    fn py_build_amend_order_params(
1121        &self,
1122        product_type: BybitProductType,
1123        instrument_id: InstrumentId,
1124        venue_order_id: Option<VenueOrderId>,
1125        client_order_id: Option<ClientOrderId>,
1126        quantity: Option<Quantity>,
1127        price: Option<Price>,
1128    ) -> PyResult<crate::python::params::BybitWsAmendOrderParams> {
1129        let params = self
1130            .build_amend_order_params(
1131                product_type,
1132                instrument_id,
1133                venue_order_id,
1134                client_order_id,
1135                quantity,
1136                price,
1137            )
1138            .map_err(to_pyruntime_err)?;
1139        Ok(params.into())
1140    }
1141
1142    /// Builds order params for canceling an order via WebSocket.
1143    #[pyo3(name = "build_cancel_order_params")]
1144    fn py_build_cancel_order_params(
1145        &self,
1146        product_type: BybitProductType,
1147        instrument_id: InstrumentId,
1148        venue_order_id: Option<VenueOrderId>,
1149        client_order_id: Option<ClientOrderId>,
1150    ) -> PyResult<crate::python::params::BybitWsCancelOrderParams> {
1151        let params = self
1152            .build_cancel_order_params(product_type, instrument_id, venue_order_id, client_order_id)
1153            .map_err(to_pyruntime_err)?;
1154        Ok(params.into())
1155    }
1156
1157    #[pyo3(name = "batch_modify_orders")]
1158    fn py_batch_modify_orders<'py>(
1159        &self,
1160        py: Python<'py>,
1161        trader_id: TraderId,
1162        strategy_id: StrategyId,
1163        orders: Vec<BybitWsAmendOrderParams>,
1164    ) -> PyResult<Bound<'py, PyAny>> {
1165        let client = self.clone();
1166        let pending_py_requests = Arc::clone(self.pending_py_requests());
1167
1168        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1169            let order_params: Vec<crate::websocket::messages::BybitWsAmendOrderParams> = orders
1170                .into_iter()
1171                .map(|p| p.try_into())
1172                .collect::<Result<Vec<_>, _>>()
1173                .map_err(to_pyruntime_err)?;
1174
1175            let per_order = build_pending_entries(
1176                &order_params,
1177                PendingOperation::Amend,
1178                trader_id,
1179                strategy_id,
1180            );
1181
1182            let req_ids = client
1183                .batch_amend_orders(order_params)
1184                .await
1185                .map_err(to_pyruntime_err)?;
1186
1187            register_batch_pending(req_ids, &per_order, &pending_py_requests);
1188            Ok(())
1189        })
1190    }
1191
1192    /// Batch creates multiple orders via WebSocket, returning the request ID for correlation.
1193    #[pyo3(name = "batch_place_orders")]
1194    fn py_batch_place_orders<'py>(
1195        &self,
1196        py: Python<'py>,
1197        trader_id: TraderId,
1198        strategy_id: StrategyId,
1199        orders: Vec<BybitWsPlaceOrderParams>,
1200    ) -> PyResult<Bound<'py, PyAny>> {
1201        let client = self.clone();
1202        let pending_py_requests = Arc::clone(self.pending_py_requests());
1203
1204        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1205            let order_params: Vec<crate::websocket::messages::BybitWsPlaceOrderParams> = orders
1206                .into_iter()
1207                .map(|p| p.try_into())
1208                .collect::<Result<Vec<_>, _>>()
1209                .map_err(to_pyruntime_err)?;
1210
1211            let per_order = build_pending_entries(
1212                &order_params,
1213                PendingOperation::Place,
1214                trader_id,
1215                strategy_id,
1216            );
1217
1218            let req_ids = client
1219                .batch_place_orders(order_params)
1220                .await
1221                .map_err(to_pyruntime_err)?;
1222
1223            register_batch_pending(req_ids, &per_order, &pending_py_requests);
1224            Ok(())
1225        })
1226    }
1227}
1228
1229trait BatchOrderParams {
1230    fn order_link_id(&self) -> Option<&str>;
1231    fn symbol(&self) -> Ustr;
1232    fn category(&self) -> BybitProductType;
1233    fn venue_order_id(&self) -> Option<VenueOrderId>;
1234}
1235
1236impl BatchOrderParams for crate::websocket::messages::BybitWsCancelOrderParams {
1237    fn order_link_id(&self) -> Option<&str> {
1238        self.order_link_id.as_deref()
1239    }
1240    fn symbol(&self) -> Ustr {
1241        self.symbol
1242    }
1243    fn category(&self) -> BybitProductType {
1244        self.category
1245    }
1246    fn venue_order_id(&self) -> Option<VenueOrderId> {
1247        self.order_id.as_ref().map(VenueOrderId::new)
1248    }
1249}
1250
1251impl BatchOrderParams for crate::websocket::messages::BybitWsAmendOrderParams {
1252    fn order_link_id(&self) -> Option<&str> {
1253        self.order_link_id.as_deref()
1254    }
1255    fn symbol(&self) -> Ustr {
1256        self.symbol
1257    }
1258    fn category(&self) -> BybitProductType {
1259        self.category
1260    }
1261    fn venue_order_id(&self) -> Option<VenueOrderId> {
1262        self.order_id.as_ref().map(VenueOrderId::new)
1263    }
1264}
1265
1266impl BatchOrderParams for crate::websocket::messages::BybitWsPlaceOrderParams {
1267    fn order_link_id(&self) -> Option<&str> {
1268        self.order_link_id.as_deref()
1269    }
1270    fn symbol(&self) -> Ustr {
1271        self.symbol
1272    }
1273    fn category(&self) -> BybitProductType {
1274        self.category
1275    }
1276    fn venue_order_id(&self) -> Option<VenueOrderId> {
1277        None
1278    }
1279}
1280
1281fn build_pending_entries<P: BatchOrderParams>(
1282    params: &[P],
1283    operation: PendingOperation,
1284    trader_id: TraderId,
1285    strategy_id: StrategyId,
1286) -> Vec<PendingPyRequest> {
1287    params
1288        .iter()
1289        .map(|p| PendingPyRequest {
1290            client_order_id: p
1291                .order_link_id()
1292                .filter(|s| !s.is_empty())
1293                .map_or(ClientOrderId::from("UNKNOWN"), ClientOrderId::new),
1294            operation,
1295            trader_id,
1296            strategy_id,
1297            instrument_id: InstrumentId::new(
1298                Symbol::new(make_bybit_symbol(p.symbol().as_str(), p.category()).as_str()),
1299                *BYBIT_VENUE,
1300            ),
1301            venue_order_id: p.venue_order_id(),
1302        })
1303        .collect()
1304}
1305
1306fn register_batch_pending(
1307    req_ids: Vec<String>,
1308    per_order: &[PendingPyRequest],
1309    pending_py_requests: &DashMap<String, Vec<PendingPyRequest>>,
1310) {
1311    for (req_id, chunk) in req_ids
1312        .into_iter()
1313        .zip(per_order.chunks(BATCH_PROCESSING_LIMIT))
1314    {
1315        pending_py_requests.insert(req_id, chunk.to_vec());
1316    }
1317}
1318
1319fn resolve_instrument(
1320    raw_symbol: &Ustr,
1321    product_type: Option<BybitProductType>,
1322    instruments: &AtomicMap<Ustr, InstrumentAny>,
1323) -> Option<InstrumentAny> {
1324    let key = product_type.map_or(*raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
1325    instruments.get_cloned(&key)
1326}
1327
1328fn send_data_to_python(data: Data, call_soon: &Py<PyAny>, callback: &Py<PyAny>) {
1329    Python::attach(|py| {
1330        let py_obj = data_to_pycapsule(py, data);
1331        call_python_threadsafe(py, call_soon, callback, py_obj);
1332    });
1333}
1334
1335fn send_to_python<T: for<'py> IntoPyObjectExt<'py>>(
1336    value: T,
1337    call_soon: &Py<PyAny>,
1338    callback: &Py<PyAny>,
1339) {
1340    Python::attach(|py| {
1341        if let Ok(py_obj) = value.into_py_any(py) {
1342            call_python_threadsafe(py, call_soon, callback, py_obj);
1343        }
1344    });
1345}
1346
1347fn handle_orderbook(
1348    msg: &crate::websocket::messages::BybitWsOrderbookDepthMsg,
1349    product_type: Option<BybitProductType>,
1350    instruments: &AtomicMap<Ustr, InstrumentAny>,
1351    quote_cache: &mut AHashMap<InstrumentId, QuoteTick>,
1352    clock: &AtomicTime,
1353    call_soon: &Py<PyAny>,
1354    callback: &Py<PyAny>,
1355) {
1356    let Some(instrument) = resolve_instrument(&msg.data.s, product_type, instruments) else {
1357        return;
1358    };
1359    let ts_init = clock.get_time_ns();
1360
1361    match parse_orderbook_deltas(msg, &instrument, ts_init) {
1362        Ok(deltas) => {
1363            send_data_to_python(
1364                Data::Deltas(OrderBookDeltas_API::new(deltas)),
1365                call_soon,
1366                callback,
1367            );
1368        }
1369        Err(e) => log::error!("Failed to parse orderbook deltas: {e}"),
1370    }
1371
1372    let instrument_id = instrument.id();
1373    let last_quote = quote_cache.get(&instrument_id);
1374
1375    match parse_orderbook_quote(msg, &instrument, last_quote, ts_init) {
1376        Ok(quote) => {
1377            quote_cache.insert(instrument_id, quote);
1378            send_data_to_python(Data::Quote(quote), call_soon, callback);
1379        }
1380        Err(e) => log::error!("Failed to parse orderbook quote: {e}"),
1381    }
1382}
1383
1384fn handle_trade(
1385    msg: &crate::websocket::messages::BybitWsTradeMsg,
1386    product_type: Option<BybitProductType>,
1387    instruments: &AtomicMap<Ustr, InstrumentAny>,
1388    trade_subs: &AtomicSet<InstrumentId>,
1389    clock: &AtomicTime,
1390    call_soon: &Py<PyAny>,
1391    callback: &Py<PyAny>,
1392) {
1393    let ts_init = clock.get_time_ns();
1394
1395    for trade in &msg.data {
1396        let Some(instrument) = resolve_instrument(&trade.s, product_type, instruments) else {
1397            continue;
1398        };
1399
1400        if product_type == Some(BybitProductType::Option)
1401            && !trade_subs.is_empty()
1402            && !trade_subs.contains(&instrument.id())
1403        {
1404            continue;
1405        }
1406
1407        match parse_ws_trade_tick(trade, &instrument, ts_init) {
1408            Ok(tick) => send_data_to_python(Data::Trade(tick), call_soon, callback),
1409            Err(e) => log::error!("Failed to parse trade tick: {e}"),
1410        }
1411    }
1412}
1413
1414#[expect(clippy::too_many_arguments)]
1415fn handle_kline(
1416    msg: &crate::websocket::messages::BybitWsKlineMsg,
1417    product_type: Option<BybitProductType>,
1418    instruments: &AtomicMap<Ustr, InstrumentAny>,
1419    bar_types_cache: &AtomicMap<String, BarType>,
1420    bars_timestamp_on_close: bool,
1421    clock: &AtomicTime,
1422    call_soon: &Py<PyAny>,
1423    callback: &Py<PyAny>,
1424) {
1425    let Ok((_, raw_symbol)) = parse_kline_topic(msg.topic.as_str()) else {
1426        return;
1427    };
1428    let ustr_symbol = Ustr::from(raw_symbol);
1429    let Some(instrument) = resolve_instrument(&ustr_symbol, product_type, instruments) else {
1430        return;
1431    };
1432    let Some(bar_type) = bar_types_cache.load().get(msg.topic.as_str()).copied() else {
1433        return;
1434    };
1435
1436    let ts_init = clock.get_time_ns();
1437
1438    for kline in &msg.data {
1439        if !kline.confirm {
1440            continue;
1441        }
1442
1443        match parse_ws_kline_bar(
1444            kline,
1445            &instrument,
1446            bar_type,
1447            bars_timestamp_on_close,
1448            ts_init,
1449        ) {
1450            Ok(bar) => send_data_to_python(Data::Bar(bar), call_soon, callback),
1451            Err(e) => log::error!("Failed to parse kline bar: {e}"),
1452        }
1453    }
1454}
1455
1456#[expect(clippy::too_many_arguments)]
1457fn handle_ticker_linear(
1458    msg: &crate::websocket::messages::BybitWsTickerLinearMsg,
1459    product_type: Option<BybitProductType>,
1460    instruments: &AtomicMap<Ustr, InstrumentAny>,
1461    quote_cache: &mut AHashMap<InstrumentId, QuoteTick>,
1462    funding_cache: &mut AHashMap<Ustr, (Option<String>, Option<String>)>,
1463    clock: &AtomicTime,
1464    call_soon: &Py<PyAny>,
1465    callback: &Py<PyAny>,
1466) {
1467    let Some(instrument) = resolve_instrument(&msg.data.symbol, product_type, instruments) else {
1468        return;
1469    };
1470    let instrument_id = instrument.id();
1471    let ts_init = clock.get_time_ns();
1472
1473    if msg.data.bid1_price.is_some() {
1474        match parse_ticker_linear_quote(msg, &instrument, ts_init) {
1475            Ok(quote) => {
1476                let last = quote_cache.get(&instrument_id);
1477
1478                if last.is_none_or(|q| *q != quote) {
1479                    quote_cache.insert(instrument_id, quote);
1480                    send_data_to_python(Data::Quote(quote), call_soon, callback);
1481                }
1482            }
1483            Err(e) => log::debug!("Skipping partial ticker update: {e}"),
1484        }
1485    }
1486
1487    let ts_event = match parse_millis_i64(msg.ts, "ticker.ts") {
1488        Ok(ts) => ts,
1489        Err(e) => {
1490            log::error!("Failed to parse ticker timestamp: {e}");
1491            return;
1492        }
1493    };
1494
1495    let cache_entry = funding_cache.entry(msg.data.symbol).or_insert((None, None));
1496    let mut changed = false;
1497
1498    if let Some(rate) = &msg.data.funding_rate
1499        && cache_entry.0.as_ref() != Some(rate)
1500    {
1501        cache_entry.0 = Some(rate.clone());
1502        changed = true;
1503    }
1504
1505    if let Some(next_time) = &msg.data.next_funding_time
1506        && cache_entry.1.as_ref() != Some(next_time)
1507    {
1508        cache_entry.1 = Some(next_time.clone());
1509        changed = true;
1510    }
1511
1512    if changed {
1513        match parse_ticker_linear_funding(&msg.data, instrument_id, ts_event, ts_init) {
1514            Ok(update) => send_to_python(update, call_soon, callback),
1515            Err(e) => log::debug!("Skipping funding rate update: {e}"),
1516        }
1517    }
1518
1519    if msg.data.mark_price.is_some() {
1520        match parse_ticker_linear_mark_price(&msg.data, &instrument, ts_event, ts_init) {
1521            Ok(update) => send_to_python(update, call_soon, callback),
1522            Err(e) => log::debug!("Skipping mark price update: {e}"),
1523        }
1524    }
1525
1526    if msg.data.index_price.is_some() {
1527        match parse_ticker_linear_index_price(&msg.data, &instrument, ts_event, ts_init) {
1528            Ok(update) => send_to_python(update, call_soon, callback),
1529            Err(e) => log::debug!("Skipping index price update: {e}"),
1530        }
1531    }
1532}
1533
1534#[expect(clippy::too_many_arguments)]
1535fn handle_ticker_option(
1536    msg: &crate::websocket::messages::BybitWsTickerOptionMsg,
1537    product_type: Option<BybitProductType>,
1538    instruments: &AtomicMap<Ustr, InstrumentAny>,
1539    quote_cache: &mut AHashMap<InstrumentId, QuoteTick>,
1540    option_greeks_subs: &AtomicSet<InstrumentId>,
1541    clock: &AtomicTime,
1542    call_soon: &Py<PyAny>,
1543    callback: &Py<PyAny>,
1544) {
1545    let Some(instrument) = resolve_instrument(&msg.data.symbol, product_type, instruments) else {
1546        return;
1547    };
1548    let instrument_id = instrument.id();
1549    let ts_init = clock.get_time_ns();
1550
1551    match parse_ticker_option_quote(msg, &instrument, ts_init) {
1552        Ok(quote) => {
1553            let last = quote_cache.get(&instrument_id);
1554
1555            if last.is_none_or(|q| *q != quote) {
1556                quote_cache.insert(instrument_id, quote);
1557                send_data_to_python(Data::Quote(quote), call_soon, callback);
1558            }
1559        }
1560        Err(e) => log::error!("Failed to parse ticker option quote: {e}"),
1561    }
1562
1563    match parse_ticker_option_mark_price(msg, &instrument, ts_init) {
1564        Ok(update) => send_to_python(update, call_soon, callback),
1565        Err(e) => log::error!("Failed to parse ticker option mark price: {e}"),
1566    }
1567
1568    match parse_ticker_option_index_price(msg, &instrument, ts_init) {
1569        Ok(update) => send_to_python(update, call_soon, callback),
1570        Err(e) => log::error!("Failed to parse ticker option index price: {e}"),
1571    }
1572
1573    if option_greeks_subs.contains(&instrument_id) {
1574        match parse_ticker_option_greeks(msg, &instrument, ts_init) {
1575            Ok(greeks) => send_to_python(greeks, call_soon, callback),
1576            Err(e) => log::error!("Failed to parse option greeks: {e}"),
1577        }
1578    }
1579}
1580
1581fn handle_account_order(
1582    msg: &crate::websocket::messages::BybitWsAccountOrderMsg,
1583    instruments: &AtomicMap<Ustr, InstrumentAny>,
1584    account_id: Option<AccountId>,
1585    clock: &AtomicTime,
1586    call_soon: &Py<PyAny>,
1587    callback: &Py<PyAny>,
1588) {
1589    let ts_init = clock.get_time_ns();
1590
1591    for order in &msg.data {
1592        let symbol = make_bybit_symbol(order.symbol, order.category);
1593        let Some(instrument) = instruments.get_cloned(&symbol) else {
1594            log::warn!("No instrument for order update: {symbol}");
1595            continue;
1596        };
1597        let Some(account_id) = account_id else {
1598            continue;
1599        };
1600
1601        match parse_ws_order_status_report(order, &instrument, account_id, ts_init) {
1602            Ok(report) => send_to_python(report, call_soon, callback),
1603            Err(e) => log::error!("Failed to parse order status report: {e}"),
1604        }
1605    }
1606}
1607
1608fn handle_account_execution(
1609    msg: &crate::websocket::messages::BybitWsAccountExecutionMsg,
1610    instruments: &AtomicMap<Ustr, InstrumentAny>,
1611    account_id: Option<AccountId>,
1612    clock: &AtomicTime,
1613    call_soon: &Py<PyAny>,
1614    callback: &Py<PyAny>,
1615) {
1616    let ts_init = clock.get_time_ns();
1617
1618    for exec in &msg.data {
1619        let symbol = make_bybit_symbol(exec.symbol, exec.category);
1620        let Some(instrument) = instruments.get_cloned(&symbol) else {
1621            log::warn!("No instrument for execution update: {symbol}");
1622            continue;
1623        };
1624        let Some(account_id) = account_id else {
1625            continue;
1626        };
1627
1628        match parse_ws_fill_report(exec, account_id, &instrument, ts_init) {
1629            Ok(report) => send_to_python(report, call_soon, callback),
1630            Err(e) => log::error!("Failed to parse fill report: {e}"),
1631        }
1632    }
1633}
1634
1635fn handle_account_wallet(
1636    msg: &crate::websocket::messages::BybitWsAccountWalletMsg,
1637    account_id: Option<AccountId>,
1638    clock: &AtomicTime,
1639    call_soon: &Py<PyAny>,
1640    callback: &Py<PyAny>,
1641) {
1642    let ts_init = clock.get_time_ns();
1643    let ts_event = parse_millis_i64(msg.creation_time, "wallet.creation_time").unwrap_or(ts_init);
1644    let Some(account_id) = account_id else {
1645        return;
1646    };
1647
1648    for wallet in &msg.data {
1649        match parse_ws_account_state(wallet, account_id, ts_event, ts_init) {
1650            Ok(state) => send_to_python(state, call_soon, callback),
1651            Err(e) => log::error!("Failed to parse account state: {e}"),
1652        }
1653    }
1654}
1655
1656fn handle_account_position(
1657    msg: &crate::websocket::messages::BybitWsAccountPositionMsg,
1658    instruments: &AtomicMap<Ustr, InstrumentAny>,
1659    account_id: Option<AccountId>,
1660    clock: &AtomicTime,
1661    call_soon: &Py<PyAny>,
1662    callback: &Py<PyAny>,
1663) {
1664    let ts_init = clock.get_time_ns();
1665
1666    for position in &msg.data {
1667        let symbol = make_bybit_symbol(position.symbol, position.category);
1668        let Some(instrument) = instruments.get_cloned(&symbol) else {
1669            log::warn!("No instrument for position update: {symbol}");
1670            continue;
1671        };
1672        let Some(account_id) = account_id else {
1673            continue;
1674        };
1675
1676        match parse_ws_position_status_report(position, account_id, &instrument, ts_init) {
1677            Ok(report) => send_to_python(report, call_soon, callback),
1678            Err(e) => log::error!("Failed to parse position status report: {e}"),
1679        }
1680    }
1681}
1682
1683fn handle_order_response(
1684    resp: &crate::websocket::messages::BybitWsOrderResponse,
1685    pending_py_requests: &DashMap<String, Vec<PendingPyRequest>>,
1686    account_id: Option<AccountId>,
1687    clock: &AtomicTime,
1688    call_soon: &Py<PyAny>,
1689    callback: &Py<PyAny>,
1690) {
1691    if resp.ret_code == 0 {
1692        let entries = resp
1693            .req_id
1694            .as_ref()
1695            .and_then(|rid| pending_py_requests.remove(rid))
1696            .map(|(_, v)| v);
1697
1698        // Check for per-order failures in batch retExtInfo
1699        if let Some(entries) = entries {
1700            let batch_errors = resp.extract_batch_errors();
1701            let data_array = resp.data.as_array();
1702            let ts_init = clock.get_time_ns();
1703
1704            for (idx, error) in batch_errors.iter().enumerate() {
1705                if error.code == 0 {
1706                    continue;
1707                }
1708
1709                let pending = data_array
1710                    .and_then(|arr| arr.get(idx))
1711                    .and_then(|item| item.get("orderLinkId"))
1712                    .and_then(|v| v.as_str())
1713                    .filter(|s| !s.is_empty())
1714                    .and_then(|oli| {
1715                        let cid = ClientOrderId::new(oli);
1716                        entries.iter().find(|e| e.client_order_id == cid)
1717                    })
1718                    .or_else(|| entries.get(idx));
1719
1720                if let Some(pending) = pending {
1721                    let reason = Ustr::from(&error.msg);
1722                    emit_rejection(pending, reason, account_id, ts_init, call_soon, callback);
1723                } else {
1724                    log::warn!(
1725                        "Batch error at index {idx} without correlation: code={}, msg={}",
1726                        error.code,
1727                        error.msg,
1728                    );
1729                }
1730            }
1731        }
1732        return;
1733    }
1734
1735    // Try to find the pending entries by req_id, then by orderLinkId
1736    let entries = resp
1737        .req_id
1738        .as_ref()
1739        .and_then(|rid| pending_py_requests.remove(rid))
1740        .map(|(_, v)| v)
1741        .or_else(|| {
1742            // Bybit sometimes omits req_id, search by orderLinkId instead
1743            let order_link_id = resp
1744                .data
1745                .get("orderLinkId")
1746                .and_then(|v| v.as_str())
1747                .filter(|s| !s.is_empty())?;
1748            let cid = ClientOrderId::new(order_link_id);
1749            let key = pending_py_requests
1750                .iter()
1751                .find(|entry| entry.value().iter().any(|e| e.client_order_id == cid))
1752                .map(|entry| entry.key().clone())?;
1753            pending_py_requests.remove(&key).map(|(_, v)| v)
1754        });
1755
1756    let Some(entries) = entries else {
1757        log::warn!(
1758            "Unmatched order response: ret_code={}, ret_msg={}",
1759            resp.ret_code,
1760            resp.ret_msg,
1761        );
1762        return;
1763    };
1764
1765    let ts_init = clock.get_time_ns();
1766    let reason = Ustr::from(&resp.ret_msg);
1767
1768    for pending in &entries {
1769        emit_rejection(pending, reason, account_id, ts_init, call_soon, callback);
1770    }
1771}
1772
1773fn emit_rejection(
1774    pending: &PendingPyRequest,
1775    reason: Ustr,
1776    account_id: Option<AccountId>,
1777    ts_init: UnixNanos,
1778    call_soon: &Py<PyAny>,
1779    callback: &Py<PyAny>,
1780) {
1781    match pending.operation {
1782        PendingOperation::Place => {
1783            let event = OrderRejected::new(
1784                pending.trader_id,
1785                pending.strategy_id,
1786                pending.instrument_id,
1787                pending.client_order_id,
1788                account_id.unwrap_or(AccountId::from("BYBIT-000")),
1789                reason,
1790                UUID4::new(),
1791                ts_init,
1792                ts_init,
1793                false,
1794                false,
1795            );
1796            send_to_python(event, call_soon, callback);
1797        }
1798        PendingOperation::Cancel => {
1799            let event = OrderCancelRejected::new(
1800                pending.trader_id,
1801                pending.strategy_id,
1802                pending.instrument_id,
1803                pending.client_order_id,
1804                reason,
1805                UUID4::new(),
1806                ts_init,
1807                ts_init,
1808                false,
1809                pending.venue_order_id,
1810                account_id,
1811            );
1812            send_to_python(event, call_soon, callback);
1813        }
1814        PendingOperation::Amend => {
1815            let event = OrderModifyRejected::new(
1816                pending.trader_id,
1817                pending.strategy_id,
1818                pending.instrument_id,
1819                pending.client_order_id,
1820                reason,
1821                UUID4::new(),
1822                ts_init,
1823                ts_init,
1824                false,
1825                pending.venue_order_id,
1826                account_id,
1827            );
1828            send_to_python(event, call_soon, callback);
1829        }
1830    }
1831}