1use std::sync::Arc;
19
20use ahash::AHashMap;
21use dashmap::DashMap;
22use futures_util::StreamExt;
23use nautilus_common::live::get_runtime;
24use nautilus_core::{
25 AtomicMap, AtomicSet, UUID4, UnixNanos,
26 python::{call_python_threadsafe, to_pyruntime_err, to_pyvalue_err},
27 time::{AtomicTime, get_atomic_clock_realtime},
28};
29use nautilus_model::{
30 data::{BarType, Data, OrderBookDeltas_API, QuoteTick},
31 enums::{
32 AggregationSource, BarAggregation, OrderSide, OrderType, PriceType, TimeInForce,
33 TriggerType,
34 },
35 events::{OrderCancelRejected, OrderModifyRejected, OrderRejected},
36 identifiers::{
37 AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
38 },
39 instruments::{Instrument, InstrumentAny},
40 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
41 types::{Price, Quantity},
42};
43use nautilus_network::websocket::TransportBackend;
44use pyo3::{IntoPyObjectExt, prelude::*};
45use ustr::Ustr;
46
47use crate::{
48 common::{
49 consts::BYBIT_VENUE,
50 enums::{BybitEnvironment, BybitPositionIdx, BybitProductType},
51 parse::make_bybit_symbol,
52 },
53 python::params::{BybitWsAmendOrderParams, BybitWsCancelOrderParams, BybitWsPlaceOrderParams},
54 websocket::{
55 client::{BATCH_PROCESSING_LIMIT, BybitWebSocketClient, PendingPyRequest},
56 dispatch::PendingOperation,
57 messages::{BybitWebSocketError, BybitWsMessage},
58 parse::{
59 parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_orderbook_quote,
60 parse_ticker_linear_funding, parse_ticker_linear_index_price,
61 parse_ticker_linear_mark_price, parse_ticker_linear_quote, parse_ticker_option_greeks,
62 parse_ticker_option_index_price, parse_ticker_option_mark_price,
63 parse_ticker_option_quote, parse_ws_account_state, parse_ws_fill_report,
64 parse_ws_kline_bar, parse_ws_order_status_report, parse_ws_position_status_report,
65 parse_ws_trade_tick,
66 },
67 },
68};
69
70fn validate_bar_type(bar_type: &BarType) -> anyhow::Result<()> {
71 let spec = bar_type.spec();
72
73 if spec.price_type != PriceType::Last {
74 anyhow::bail!(
75 "Invalid bar type: Bybit bars only support LAST price type, received {}",
76 spec.price_type
77 );
78 }
79
80 if bar_type.aggregation_source() != AggregationSource::External {
81 anyhow::bail!(
82 "Invalid bar type: Bybit bars only support EXTERNAL aggregation source, received {}",
83 bar_type.aggregation_source()
84 );
85 }
86
87 let step = spec.step.get();
88 if spec.aggregation == BarAggregation::Minute && step >= 60 {
89 let hours = step / 60;
90 anyhow::bail!("Invalid bar type: {step}-MINUTE not supported, use {hours}-HOUR instead");
91 }
92
93 Ok(())
94}
95
96#[pymethods]
97#[pyo3_stub_gen::derive::gen_stub_pymethods]
98impl BybitWebSocketError {
99 fn __repr__(&self) -> String {
100 format!(
101 "BybitWebSocketError(code={}, message='{}', conn_id={:?}, topic={:?})",
102 self.code, self.message, self.conn_id, self.topic
103 )
104 }
105
106 #[getter]
107 pub fn code(&self) -> i64 {
108 self.code
109 }
110
111 #[getter]
112 pub fn message(&self) -> &str {
113 &self.message
114 }
115
116 #[getter]
117 pub fn conn_id(&self) -> Option<&str> {
118 self.conn_id.as_deref()
119 }
120
121 #[getter]
122 pub fn topic(&self) -> Option<&str> {
123 self.topic.as_deref()
124 }
125
126 #[getter]
127 pub fn req_id(&self) -> Option<&str> {
128 self.req_id.as_deref()
129 }
130}
131
132#[pymethods]
133#[pyo3_stub_gen::derive::gen_stub_pymethods]
134impl BybitWebSocketClient {
135 #[staticmethod]
137 #[pyo3(name = "new_public")]
138 #[pyo3(signature = (product_type, environment, url=None, heartbeat=20, proxy_url=None))]
139 fn py_new_public(
140 product_type: BybitProductType,
141 environment: BybitEnvironment,
142 url: Option<String>,
143 heartbeat: u64,
144 proxy_url: Option<String>,
145 ) -> Self {
146 Self::new_public_with(
147 product_type,
148 environment,
149 url,
150 heartbeat,
151 TransportBackend::default(),
152 proxy_url,
153 )
154 }
155
156 #[staticmethod]
164 #[pyo3(name = "new_private")]
165 #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=20, proxy_url=None))]
166 fn py_new_private(
167 environment: BybitEnvironment,
168 api_key: Option<String>,
169 api_secret: Option<String>,
170 url: Option<String>,
171 heartbeat: u64,
172 proxy_url: Option<String>,
173 ) -> Self {
174 Self::new_private(
175 environment,
176 api_key,
177 api_secret,
178 url,
179 heartbeat,
180 TransportBackend::default(),
181 proxy_url,
182 )
183 }
184
185 #[staticmethod]
193 #[pyo3(name = "new_trade")]
194 #[pyo3(signature = (environment, api_key=None, api_secret=None, url=None, heartbeat=20, proxy_url=None))]
195 fn py_new_trade(
196 environment: BybitEnvironment,
197 api_key: Option<String>,
198 api_secret: Option<String>,
199 url: Option<String>,
200 heartbeat: u64,
201 proxy_url: Option<String>,
202 ) -> Self {
203 Self::new_trade(
204 environment,
205 api_key,
206 api_secret,
207 url,
208 heartbeat,
209 TransportBackend::default(),
210 proxy_url,
211 )
212 }
213
214 #[getter]
215 #[pyo3(name = "api_key_masked")]
216 #[must_use]
217 pub fn py_api_key_masked(&self) -> Option<String> {
218 self.credential().map(|c| c.api_key_masked())
219 }
220
221 #[pyo3(name = "is_active")]
223 fn py_is_active(&self) -> bool {
224 self.is_active()
225 }
226
227 #[pyo3(name = "is_closed")]
229 fn py_is_closed(&self) -> bool {
230 self.is_closed()
231 }
232
233 #[pyo3(name = "subscription_count")]
235 fn py_subscription_count(&self) -> usize {
236 self.subscription_count()
237 }
238
239 #[pyo3(name = "cache_instrument")]
241 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
242 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
243 Ok(())
244 }
245
246 #[pyo3(name = "set_account_id")]
248 fn py_set_account_id(&mut self, account_id: AccountId) {
249 self.set_account_id(account_id);
250 }
251
252 #[pyo3(name = "set_mm_level")]
254 fn py_set_mm_level(&self, mm_level: u8) {
255 self.set_mm_level(mm_level);
256 }
257
258 #[pyo3(name = "set_bars_timestamp_on_close")]
260 fn py_set_bars_timestamp_on_close(&self, value: bool) {
261 self.set_bars_timestamp_on_close(value);
262 }
263
264 #[pyo3(name = "add_option_greeks_sub")]
266 fn py_add_option_greeks_sub(&self, instrument_id: InstrumentId) {
267 self.add_option_greeks_sub(instrument_id);
268 }
269
270 #[pyo3(name = "remove_option_greeks_sub")]
272 fn py_remove_option_greeks_sub(&self, instrument_id: InstrumentId) {
273 self.remove_option_greeks_sub(&instrument_id);
274 }
275
276 #[pyo3(name = "connect")]
278 #[expect(clippy::needless_pass_by_value)] fn py_connect<'py>(
280 &mut self,
281 py: Python<'py>,
282 loop_: Py<PyAny>,
283 callback: Py<PyAny>,
284 ) -> PyResult<Bound<'py, PyAny>> {
285 let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
286 let mut client = self.clone();
287
288 pyo3_async_runtimes::tokio::future_into_py(py, async move {
289 client.connect().await.map_err(to_pyruntime_err)?;
290
291 let stream = client.stream();
292 let clock = get_atomic_clock_realtime();
293 let product_type = client.product_type();
294 let account_id = client.account_id();
295 let bar_types_cache = client.bar_types_cache().clone();
296 let trade_subs = client.trade_subs().clone();
297 let option_greeks_subs = client.option_greeks_subs().clone();
298 let bars_timestamp_on_close = client.bars_timestamp_on_close();
299 let instruments = Arc::clone(client.instruments_cache_ref());
300 let pending_py_requests = Arc::clone(client.pending_py_requests());
301
302 get_runtime().spawn(async move {
303 let mut quote_cache = AHashMap::new();
304 let mut funding_cache: AHashMap<Ustr, (Option<String>, Option<String>)> =
305 AHashMap::new();
306 let _client = client;
307 let _resolve = |raw_symbol: &Ustr| -> Option<InstrumentAny> {
308 let key =
309 product_type.map_or(*raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
310 instruments.get_cloned(&key)
311 };
312
313 tokio::pin!(stream);
314
315 while let Some(msg) = stream.next().await {
316 match msg {
317 BybitWsMessage::Orderbook(ref msg) => {
318 handle_orderbook(
319 msg,
320 product_type,
321 &instruments,
322 &mut quote_cache,
323 clock,
324 &call_soon,
325 &callback,
326 );
327 }
328 BybitWsMessage::Trade(ref msg) => {
329 handle_trade(
330 msg,
331 product_type,
332 &instruments,
333 &trade_subs,
334 clock,
335 &call_soon,
336 &callback,
337 );
338 }
339 BybitWsMessage::Kline(ref msg) => {
340 handle_kline(
341 msg,
342 product_type,
343 &instruments,
344 &bar_types_cache,
345 bars_timestamp_on_close,
346 clock,
347 &call_soon,
348 &callback,
349 );
350 }
351 BybitWsMessage::TickerLinear(ref msg) => {
352 handle_ticker_linear(
353 msg,
354 product_type,
355 &instruments,
356 &mut quote_cache,
357 &mut funding_cache,
358 clock,
359 &call_soon,
360 &callback,
361 );
362 }
363 BybitWsMessage::TickerOption(ref msg) => {
364 handle_ticker_option(
365 msg,
366 product_type,
367 &instruments,
368 &mut quote_cache,
369 &option_greeks_subs,
370 clock,
371 &call_soon,
372 &callback,
373 );
374 }
375 BybitWsMessage::AccountOrder(ref msg) => {
376 handle_account_order(
377 msg,
378 &instruments,
379 account_id,
380 clock,
381 &call_soon,
382 &callback,
383 );
384 }
385 BybitWsMessage::AccountExecution(ref msg) => {
386 handle_account_execution(
387 msg,
388 &instruments,
389 account_id,
390 clock,
391 &call_soon,
392 &callback,
393 );
394 }
395 BybitWsMessage::AccountWallet(ref msg) => {
396 handle_account_wallet(msg, account_id, clock, &call_soon, &callback);
397 }
398 BybitWsMessage::AccountPosition(ref msg) => {
399 handle_account_position(
400 msg,
401 &instruments,
402 account_id,
403 clock,
404 &call_soon,
405 &callback,
406 );
407 }
408 BybitWsMessage::OrderResponse(ref resp) => {
409 handle_order_response(
410 resp,
411 &pending_py_requests,
412 account_id,
413 clock,
414 &call_soon,
415 &callback,
416 );
417 }
418 BybitWsMessage::Error(err) => {
419 send_to_python(err, &call_soon, &callback);
420 }
421 BybitWsMessage::Reconnected => {
422 quote_cache.clear();
423 funding_cache.clear();
424 log::info!("WebSocket reconnected");
425 }
426 BybitWsMessage::Auth(_) => {
427 log::info!("WebSocket authenticated");
428 }
429 }
430 }
431 });
432
433 Ok(())
434 })
435 }
436
437 #[pyo3(name = "close")]
438 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
439 let mut client = self.clone();
440
441 pyo3_async_runtimes::tokio::future_into_py(py, async move {
442 if let Err(e) = client.close().await {
443 log::error!("Error on close: {e}");
444 }
445 Ok(())
446 })
447 }
448
449 #[pyo3(name = "subscribe")]
451 fn py_subscribe<'py>(
452 &self,
453 py: Python<'py>,
454 topics: Vec<String>,
455 ) -> PyResult<Bound<'py, PyAny>> {
456 let client = self.clone();
457
458 pyo3_async_runtimes::tokio::future_into_py(py, async move {
459 client.subscribe(topics).await.map_err(to_pyruntime_err)?;
460 Ok(())
461 })
462 }
463
464 #[pyo3(name = "unsubscribe")]
466 fn py_unsubscribe<'py>(
467 &self,
468 py: Python<'py>,
469 topics: Vec<String>,
470 ) -> PyResult<Bound<'py, PyAny>> {
471 let client = self.clone();
472
473 pyo3_async_runtimes::tokio::future_into_py(py, async move {
474 client.unsubscribe(topics).await.map_err(to_pyruntime_err)?;
475 Ok(())
476 })
477 }
478
479 #[pyo3(name = "subscribe_orderbook")]
485 fn py_subscribe_orderbook<'py>(
486 &self,
487 py: Python<'py>,
488 instrument_id: InstrumentId,
489 depth: u32,
490 ) -> PyResult<Bound<'py, PyAny>> {
491 let client = self.clone();
492
493 pyo3_async_runtimes::tokio::future_into_py(py, async move {
494 client
495 .subscribe_orderbook(instrument_id, depth)
496 .await
497 .map_err(to_pyruntime_err)?;
498 Ok(())
499 })
500 }
501
502 #[pyo3(name = "unsubscribe_orderbook")]
504 fn py_unsubscribe_orderbook<'py>(
505 &self,
506 py: Python<'py>,
507 instrument_id: InstrumentId,
508 depth: u32,
509 ) -> PyResult<Bound<'py, PyAny>> {
510 let client = self.clone();
511
512 pyo3_async_runtimes::tokio::future_into_py(py, async move {
513 client
514 .unsubscribe_orderbook(instrument_id, depth)
515 .await
516 .map_err(to_pyruntime_err)?;
517 Ok(())
518 })
519 }
520
521 #[pyo3(name = "subscribe_trades")]
527 fn py_subscribe_trades<'py>(
528 &self,
529 py: Python<'py>,
530 instrument_id: InstrumentId,
531 ) -> PyResult<Bound<'py, PyAny>> {
532 let client = self.clone();
533
534 pyo3_async_runtimes::tokio::future_into_py(py, async move {
535 client
536 .subscribe_trades(instrument_id)
537 .await
538 .map_err(to_pyruntime_err)?;
539 Ok(())
540 })
541 }
542
543 #[pyo3(name = "unsubscribe_trades")]
545 fn py_unsubscribe_trades<'py>(
546 &self,
547 py: Python<'py>,
548 instrument_id: InstrumentId,
549 ) -> PyResult<Bound<'py, PyAny>> {
550 let client = self.clone();
551
552 pyo3_async_runtimes::tokio::future_into_py(py, async move {
553 client
554 .unsubscribe_trades(instrument_id)
555 .await
556 .map_err(to_pyruntime_err)?;
557 Ok(())
558 })
559 }
560
561 #[pyo3(name = "subscribe_ticker")]
567 fn py_subscribe_ticker<'py>(
568 &self,
569 py: Python<'py>,
570 instrument_id: InstrumentId,
571 ) -> PyResult<Bound<'py, PyAny>> {
572 let client = self.clone();
573
574 pyo3_async_runtimes::tokio::future_into_py(py, async move {
575 client
576 .subscribe_ticker(instrument_id)
577 .await
578 .map_err(to_pyruntime_err)?;
579 Ok(())
580 })
581 }
582
583 #[pyo3(name = "subscribe_option_greeks")]
584 fn py_subscribe_option_greeks<'py>(
585 &self,
586 py: Python<'py>,
587 instrument_id: InstrumentId,
588 ) -> PyResult<Bound<'py, PyAny>> {
589 self.add_option_greeks_sub(instrument_id);
590 let client = self.clone();
591
592 pyo3_async_runtimes::tokio::future_into_py(py, async move {
593 client
594 .subscribe_ticker(instrument_id)
595 .await
596 .map_err(to_pyruntime_err)?;
597 Ok(())
598 })
599 }
600
601 #[pyo3(name = "unsubscribe_option_greeks")]
602 fn py_unsubscribe_option_greeks<'py>(
603 &self,
604 py: Python<'py>,
605 instrument_id: InstrumentId,
606 ) -> PyResult<Bound<'py, PyAny>> {
607 self.remove_option_greeks_sub(&instrument_id);
608 let client = self.clone();
609
610 pyo3_async_runtimes::tokio::future_into_py(py, async move {
611 client
612 .unsubscribe_ticker(instrument_id)
613 .await
614 .map_err(to_pyruntime_err)?;
615 Ok(())
616 })
617 }
618
619 #[pyo3(name = "unsubscribe_ticker")]
621 fn py_unsubscribe_ticker<'py>(
622 &self,
623 py: Python<'py>,
624 instrument_id: InstrumentId,
625 ) -> PyResult<Bound<'py, PyAny>> {
626 let client = self.clone();
627
628 pyo3_async_runtimes::tokio::future_into_py(py, async move {
629 client
630 .unsubscribe_ticker(instrument_id)
631 .await
632 .map_err(to_pyruntime_err)?;
633 Ok(())
634 })
635 }
636
637 #[pyo3(name = "subscribe_bars")]
643 fn py_subscribe_bars<'py>(
644 &self,
645 py: Python<'py>,
646 bar_type: BarType,
647 ) -> PyResult<Bound<'py, PyAny>> {
648 validate_bar_type(&bar_type).map_err(to_pyvalue_err)?;
649
650 let client = self.clone();
651 pyo3_async_runtimes::tokio::future_into_py(py, async move {
652 client
653 .subscribe_bars(bar_type)
654 .await
655 .map_err(to_pyruntime_err)?;
656 Ok(())
657 })
658 }
659
660 #[pyo3(name = "unsubscribe_bars")]
662 fn py_unsubscribe_bars<'py>(
663 &self,
664 py: Python<'py>,
665 bar_type: BarType,
666 ) -> PyResult<Bound<'py, PyAny>> {
667 validate_bar_type(&bar_type).map_err(to_pyvalue_err)?;
668
669 let client = self.clone();
670 pyo3_async_runtimes::tokio::future_into_py(py, async move {
671 client
672 .unsubscribe_bars(bar_type)
673 .await
674 .map_err(to_pyruntime_err)?;
675 Ok(())
676 })
677 }
678
679 #[pyo3(name = "subscribe_orders")]
689 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
690 let client = self.clone();
691
692 pyo3_async_runtimes::tokio::future_into_py(py, async move {
693 client.subscribe_orders().await.map_err(to_pyruntime_err)?;
694 Ok(())
695 })
696 }
697
698 #[pyo3(name = "unsubscribe_orders")]
700 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
701 let client = self.clone();
702
703 pyo3_async_runtimes::tokio::future_into_py(py, async move {
704 client
705 .unsubscribe_orders()
706 .await
707 .map_err(to_pyruntime_err)?;
708 Ok(())
709 })
710 }
711
712 #[pyo3(name = "subscribe_executions")]
722 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
723 let client = self.clone();
724
725 pyo3_async_runtimes::tokio::future_into_py(py, async move {
726 client
727 .subscribe_executions()
728 .await
729 .map_err(to_pyruntime_err)?;
730 Ok(())
731 })
732 }
733
734 #[pyo3(name = "unsubscribe_executions")]
736 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
737 let client = self.clone();
738
739 pyo3_async_runtimes::tokio::future_into_py(py, async move {
740 client
741 .unsubscribe_executions()
742 .await
743 .map_err(to_pyruntime_err)?;
744 Ok(())
745 })
746 }
747
748 #[pyo3(name = "subscribe_positions")]
758 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
759 let client = self.clone();
760
761 pyo3_async_runtimes::tokio::future_into_py(py, async move {
762 client
763 .subscribe_positions()
764 .await
765 .map_err(to_pyruntime_err)?;
766 Ok(())
767 })
768 }
769
770 #[pyo3(name = "unsubscribe_positions")]
772 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
773 let client = self.clone();
774
775 pyo3_async_runtimes::tokio::future_into_py(py, async move {
776 client
777 .unsubscribe_positions()
778 .await
779 .map_err(to_pyruntime_err)?;
780 Ok(())
781 })
782 }
783
784 #[pyo3(name = "subscribe_wallet")]
794 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
795 let client = self.clone();
796
797 pyo3_async_runtimes::tokio::future_into_py(py, async move {
798 client.subscribe_wallet().await.map_err(to_pyruntime_err)?;
799 Ok(())
800 })
801 }
802
803 #[pyo3(name = "unsubscribe_wallet")]
805 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
806 let client = self.clone();
807
808 pyo3_async_runtimes::tokio::future_into_py(py, async move {
809 client
810 .unsubscribe_wallet()
811 .await
812 .map_err(to_pyruntime_err)?;
813 Ok(())
814 })
815 }
816
817 #[pyo3(name = "wait_until_active")]
819 fn py_wait_until_active<'py>(
820 &self,
821 py: Python<'py>,
822 timeout_secs: f64,
823 ) -> PyResult<Bound<'py, PyAny>> {
824 let client = self.clone();
825
826 pyo3_async_runtimes::tokio::future_into_py(py, async move {
827 client
828 .wait_until_active(timeout_secs)
829 .await
830 .map_err(to_pyruntime_err)?;
831 Ok(())
832 })
833 }
834
835 #[pyo3(name = "submit_order")]
837 #[pyo3(signature = (
838 product_type,
839 trader_id,
840 strategy_id,
841 instrument_id,
842 client_order_id,
843 order_side,
844 order_type,
845 quantity,
846 is_quote_quantity=false,
847 time_in_force=None,
848 price=None,
849 trigger_price=None,
850 trigger_type=None,
851 post_only=None,
852 reduce_only=None,
853 is_leverage=false,
854 position_idx=None,
855 ))]
856 #[expect(clippy::too_many_arguments)]
857 fn py_submit_order<'py>(
858 &self,
859 py: Python<'py>,
860 product_type: BybitProductType,
861 trader_id: TraderId,
862 strategy_id: StrategyId,
863 instrument_id: InstrumentId,
864 client_order_id: ClientOrderId,
865 order_side: OrderSide,
866 order_type: OrderType,
867 quantity: Quantity,
868 is_quote_quantity: bool,
869 time_in_force: Option<TimeInForce>,
870 price: Option<Price>,
871 trigger_price: Option<Price>,
872 trigger_type: Option<TriggerType>,
873 post_only: Option<bool>,
874 reduce_only: Option<bool>,
875 is_leverage: bool,
876 position_idx: Option<BybitPositionIdx>,
877 ) -> PyResult<Bound<'py, PyAny>> {
878 let client = self.clone();
879 let pending_py_requests = Arc::clone(self.pending_py_requests());
880
881 pyo3_async_runtimes::tokio::future_into_py(py, async move {
882 let req_id = client
883 .submit_order(
884 product_type,
885 instrument_id,
886 client_order_id,
887 order_side,
888 order_type,
889 quantity,
890 is_quote_quantity,
891 time_in_force,
892 price,
893 trigger_price,
894 trigger_type,
895 post_only,
896 reduce_only,
897 is_leverage,
898 position_idx,
899 )
900 .await
901 .map_err(to_pyruntime_err)?;
902 pending_py_requests.insert(
903 req_id,
904 vec![PendingPyRequest {
905 client_order_id,
906 operation: PendingOperation::Place,
907 trader_id,
908 strategy_id,
909 instrument_id,
910 venue_order_id: None,
911 }],
912 );
913 Ok(())
914 })
915 }
916
917 #[pyo3(name = "modify_order")]
919 #[pyo3(signature = (
920 product_type,
921 trader_id,
922 strategy_id,
923 instrument_id,
924 client_order_id,
925 venue_order_id=None,
926 quantity=None,
927 price=None,
928 ))]
929 #[expect(clippy::too_many_arguments)]
930 fn py_modify_order<'py>(
931 &self,
932 py: Python<'py>,
933 product_type: BybitProductType,
934 trader_id: TraderId,
935 strategy_id: StrategyId,
936 instrument_id: InstrumentId,
937 client_order_id: ClientOrderId,
938 venue_order_id: Option<VenueOrderId>,
939 quantity: Option<Quantity>,
940 price: Option<Price>,
941 ) -> PyResult<Bound<'py, PyAny>> {
942 let client = self.clone();
943 let pending_py_requests = Arc::clone(self.pending_py_requests());
944
945 pyo3_async_runtimes::tokio::future_into_py(py, async move {
946 let req_id = client
947 .modify_order(
948 product_type,
949 instrument_id,
950 client_order_id,
951 venue_order_id,
952 quantity,
953 price,
954 )
955 .await
956 .map_err(to_pyruntime_err)?;
957 pending_py_requests.insert(
958 req_id,
959 vec![PendingPyRequest {
960 client_order_id,
961 operation: PendingOperation::Amend,
962 trader_id,
963 strategy_id,
964 instrument_id,
965 venue_order_id,
966 }],
967 );
968 Ok(())
969 })
970 }
971
972 #[pyo3(name = "cancel_order")]
974 #[pyo3(signature = (
975 product_type,
976 trader_id,
977 strategy_id,
978 instrument_id,
979 client_order_id,
980 venue_order_id=None,
981 ))]
982 #[expect(clippy::too_many_arguments)]
983 fn py_cancel_order<'py>(
984 &self,
985 py: Python<'py>,
986 product_type: BybitProductType,
987 trader_id: TraderId,
988 strategy_id: StrategyId,
989 instrument_id: InstrumentId,
990 client_order_id: ClientOrderId,
991 venue_order_id: Option<VenueOrderId>,
992 ) -> PyResult<Bound<'py, PyAny>> {
993 let client = self.clone();
994 let pending_py_requests = Arc::clone(self.pending_py_requests());
995
996 pyo3_async_runtimes::tokio::future_into_py(py, async move {
997 let req_id = client
998 .cancel_order_by_id(product_type, instrument_id, client_order_id, venue_order_id)
999 .await
1000 .map_err(to_pyruntime_err)?;
1001 pending_py_requests.insert(
1002 req_id,
1003 vec![PendingPyRequest {
1004 client_order_id,
1005 operation: PendingOperation::Cancel,
1006 trader_id,
1007 strategy_id,
1008 instrument_id,
1009 venue_order_id,
1010 }],
1011 );
1012 Ok(())
1013 })
1014 }
1015
1016 #[pyo3(name = "build_place_order_params")]
1018 #[pyo3(signature = (
1019 product_type,
1020 instrument_id,
1021 client_order_id,
1022 order_side,
1023 order_type,
1024 quantity,
1025 is_quote_quantity=false,
1026 time_in_force=None,
1027 price=None,
1028 trigger_price=None,
1029 trigger_type=None,
1030 post_only=None,
1031 reduce_only=None,
1032 is_leverage=false,
1033 take_profit=None,
1034 stop_loss=None,
1035 position_idx=None,
1036 ))]
1037 #[expect(clippy::too_many_arguments)]
1038 fn py_build_place_order_params(
1039 &self,
1040 product_type: BybitProductType,
1041 instrument_id: InstrumentId,
1042 client_order_id: ClientOrderId,
1043 order_side: OrderSide,
1044 order_type: OrderType,
1045 quantity: Quantity,
1046 is_quote_quantity: bool,
1047 time_in_force: Option<TimeInForce>,
1048 price: Option<Price>,
1049 trigger_price: Option<Price>,
1050 trigger_type: Option<TriggerType>,
1051 post_only: Option<bool>,
1052 reduce_only: Option<bool>,
1053 is_leverage: bool,
1054 take_profit: Option<Price>,
1055 stop_loss: Option<Price>,
1056 position_idx: Option<BybitPositionIdx>,
1057 ) -> PyResult<BybitWsPlaceOrderParams> {
1058 let params = self
1059 .build_place_order_params(
1060 product_type,
1061 instrument_id,
1062 client_order_id,
1063 order_side,
1064 order_type,
1065 quantity,
1066 is_quote_quantity,
1067 time_in_force,
1068 price,
1069 trigger_price,
1070 trigger_type,
1071 post_only,
1072 reduce_only,
1073 is_leverage,
1074 take_profit,
1075 stop_loss,
1076 position_idx,
1077 )
1078 .map_err(to_pyruntime_err)?;
1079 Ok(params.into())
1080 }
1081
1082 #[pyo3(name = "batch_cancel_orders")]
1084 fn py_batch_cancel_orders<'py>(
1085 &self,
1086 py: Python<'py>,
1087 trader_id: TraderId,
1088 strategy_id: StrategyId,
1089 orders: Vec<BybitWsCancelOrderParams>,
1090 ) -> PyResult<Bound<'py, PyAny>> {
1091 let client = self.clone();
1092 let pending_py_requests = Arc::clone(self.pending_py_requests());
1093
1094 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1095 let order_params: Vec<crate::websocket::messages::BybitWsCancelOrderParams> = orders
1096 .into_iter()
1097 .map(|p| p.try_into())
1098 .collect::<Result<Vec<_>, _>>()
1099 .map_err(to_pyruntime_err)?;
1100
1101 let per_order = build_pending_entries(
1102 &order_params,
1103 PendingOperation::Cancel,
1104 trader_id,
1105 strategy_id,
1106 );
1107
1108 let req_ids = client
1109 .batch_cancel_orders(order_params)
1110 .await
1111 .map_err(to_pyruntime_err)?;
1112
1113 register_batch_pending(req_ids, &per_order, &pending_py_requests);
1114 Ok(())
1115 })
1116 }
1117
1118 #[pyo3(name = "build_amend_order_params")]
1120 fn py_build_amend_order_params(
1121 &self,
1122 product_type: BybitProductType,
1123 instrument_id: InstrumentId,
1124 venue_order_id: Option<VenueOrderId>,
1125 client_order_id: Option<ClientOrderId>,
1126 quantity: Option<Quantity>,
1127 price: Option<Price>,
1128 ) -> PyResult<crate::python::params::BybitWsAmendOrderParams> {
1129 let params = self
1130 .build_amend_order_params(
1131 product_type,
1132 instrument_id,
1133 venue_order_id,
1134 client_order_id,
1135 quantity,
1136 price,
1137 )
1138 .map_err(to_pyruntime_err)?;
1139 Ok(params.into())
1140 }
1141
1142 #[pyo3(name = "build_cancel_order_params")]
1144 fn py_build_cancel_order_params(
1145 &self,
1146 product_type: BybitProductType,
1147 instrument_id: InstrumentId,
1148 venue_order_id: Option<VenueOrderId>,
1149 client_order_id: Option<ClientOrderId>,
1150 ) -> PyResult<crate::python::params::BybitWsCancelOrderParams> {
1151 let params = self
1152 .build_cancel_order_params(product_type, instrument_id, venue_order_id, client_order_id)
1153 .map_err(to_pyruntime_err)?;
1154 Ok(params.into())
1155 }
1156
1157 #[pyo3(name = "batch_modify_orders")]
1158 fn py_batch_modify_orders<'py>(
1159 &self,
1160 py: Python<'py>,
1161 trader_id: TraderId,
1162 strategy_id: StrategyId,
1163 orders: Vec<BybitWsAmendOrderParams>,
1164 ) -> PyResult<Bound<'py, PyAny>> {
1165 let client = self.clone();
1166 let pending_py_requests = Arc::clone(self.pending_py_requests());
1167
1168 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1169 let order_params: Vec<crate::websocket::messages::BybitWsAmendOrderParams> = orders
1170 .into_iter()
1171 .map(|p| p.try_into())
1172 .collect::<Result<Vec<_>, _>>()
1173 .map_err(to_pyruntime_err)?;
1174
1175 let per_order = build_pending_entries(
1176 &order_params,
1177 PendingOperation::Amend,
1178 trader_id,
1179 strategy_id,
1180 );
1181
1182 let req_ids = client
1183 .batch_amend_orders(order_params)
1184 .await
1185 .map_err(to_pyruntime_err)?;
1186
1187 register_batch_pending(req_ids, &per_order, &pending_py_requests);
1188 Ok(())
1189 })
1190 }
1191
1192 #[pyo3(name = "batch_place_orders")]
1194 fn py_batch_place_orders<'py>(
1195 &self,
1196 py: Python<'py>,
1197 trader_id: TraderId,
1198 strategy_id: StrategyId,
1199 orders: Vec<BybitWsPlaceOrderParams>,
1200 ) -> PyResult<Bound<'py, PyAny>> {
1201 let client = self.clone();
1202 let pending_py_requests = Arc::clone(self.pending_py_requests());
1203
1204 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1205 let order_params: Vec<crate::websocket::messages::BybitWsPlaceOrderParams> = orders
1206 .into_iter()
1207 .map(|p| p.try_into())
1208 .collect::<Result<Vec<_>, _>>()
1209 .map_err(to_pyruntime_err)?;
1210
1211 let per_order = build_pending_entries(
1212 &order_params,
1213 PendingOperation::Place,
1214 trader_id,
1215 strategy_id,
1216 );
1217
1218 let req_ids = client
1219 .batch_place_orders(order_params)
1220 .await
1221 .map_err(to_pyruntime_err)?;
1222
1223 register_batch_pending(req_ids, &per_order, &pending_py_requests);
1224 Ok(())
1225 })
1226 }
1227}
1228
1229trait BatchOrderParams {
1230 fn order_link_id(&self) -> Option<&str>;
1231 fn symbol(&self) -> Ustr;
1232 fn category(&self) -> BybitProductType;
1233 fn venue_order_id(&self) -> Option<VenueOrderId>;
1234}
1235
1236impl BatchOrderParams for crate::websocket::messages::BybitWsCancelOrderParams {
1237 fn order_link_id(&self) -> Option<&str> {
1238 self.order_link_id.as_deref()
1239 }
1240 fn symbol(&self) -> Ustr {
1241 self.symbol
1242 }
1243 fn category(&self) -> BybitProductType {
1244 self.category
1245 }
1246 fn venue_order_id(&self) -> Option<VenueOrderId> {
1247 self.order_id.as_ref().map(VenueOrderId::new)
1248 }
1249}
1250
1251impl BatchOrderParams for crate::websocket::messages::BybitWsAmendOrderParams {
1252 fn order_link_id(&self) -> Option<&str> {
1253 self.order_link_id.as_deref()
1254 }
1255 fn symbol(&self) -> Ustr {
1256 self.symbol
1257 }
1258 fn category(&self) -> BybitProductType {
1259 self.category
1260 }
1261 fn venue_order_id(&self) -> Option<VenueOrderId> {
1262 self.order_id.as_ref().map(VenueOrderId::new)
1263 }
1264}
1265
1266impl BatchOrderParams for crate::websocket::messages::BybitWsPlaceOrderParams {
1267 fn order_link_id(&self) -> Option<&str> {
1268 self.order_link_id.as_deref()
1269 }
1270 fn symbol(&self) -> Ustr {
1271 self.symbol
1272 }
1273 fn category(&self) -> BybitProductType {
1274 self.category
1275 }
1276 fn venue_order_id(&self) -> Option<VenueOrderId> {
1277 None
1278 }
1279}
1280
1281fn build_pending_entries<P: BatchOrderParams>(
1282 params: &[P],
1283 operation: PendingOperation,
1284 trader_id: TraderId,
1285 strategy_id: StrategyId,
1286) -> Vec<PendingPyRequest> {
1287 params
1288 .iter()
1289 .map(|p| PendingPyRequest {
1290 client_order_id: p
1291 .order_link_id()
1292 .filter(|s| !s.is_empty())
1293 .map_or(ClientOrderId::from("UNKNOWN"), ClientOrderId::new),
1294 operation,
1295 trader_id,
1296 strategy_id,
1297 instrument_id: InstrumentId::new(
1298 Symbol::new(make_bybit_symbol(p.symbol().as_str(), p.category()).as_str()),
1299 *BYBIT_VENUE,
1300 ),
1301 venue_order_id: p.venue_order_id(),
1302 })
1303 .collect()
1304}
1305
1306fn register_batch_pending(
1307 req_ids: Vec<String>,
1308 per_order: &[PendingPyRequest],
1309 pending_py_requests: &DashMap<String, Vec<PendingPyRequest>>,
1310) {
1311 for (req_id, chunk) in req_ids
1312 .into_iter()
1313 .zip(per_order.chunks(BATCH_PROCESSING_LIMIT))
1314 {
1315 pending_py_requests.insert(req_id, chunk.to_vec());
1316 }
1317}
1318
1319fn resolve_instrument(
1320 raw_symbol: &Ustr,
1321 product_type: Option<BybitProductType>,
1322 instruments: &AtomicMap<Ustr, InstrumentAny>,
1323) -> Option<InstrumentAny> {
1324 let key = product_type.map_or(*raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
1325 instruments.get_cloned(&key)
1326}
1327
1328fn send_data_to_python(data: Data, call_soon: &Py<PyAny>, callback: &Py<PyAny>) {
1329 Python::attach(|py| {
1330 let py_obj = data_to_pycapsule(py, data);
1331 call_python_threadsafe(py, call_soon, callback, py_obj);
1332 });
1333}
1334
1335fn send_to_python<T: for<'py> IntoPyObjectExt<'py>>(
1336 value: T,
1337 call_soon: &Py<PyAny>,
1338 callback: &Py<PyAny>,
1339) {
1340 Python::attach(|py| {
1341 if let Ok(py_obj) = value.into_py_any(py) {
1342 call_python_threadsafe(py, call_soon, callback, py_obj);
1343 }
1344 });
1345}
1346
1347fn handle_orderbook(
1348 msg: &crate::websocket::messages::BybitWsOrderbookDepthMsg,
1349 product_type: Option<BybitProductType>,
1350 instruments: &AtomicMap<Ustr, InstrumentAny>,
1351 quote_cache: &mut AHashMap<InstrumentId, QuoteTick>,
1352 clock: &AtomicTime,
1353 call_soon: &Py<PyAny>,
1354 callback: &Py<PyAny>,
1355) {
1356 let Some(instrument) = resolve_instrument(&msg.data.s, product_type, instruments) else {
1357 return;
1358 };
1359 let ts_init = clock.get_time_ns();
1360
1361 match parse_orderbook_deltas(msg, &instrument, ts_init) {
1362 Ok(deltas) => {
1363 send_data_to_python(
1364 Data::Deltas(OrderBookDeltas_API::new(deltas)),
1365 call_soon,
1366 callback,
1367 );
1368 }
1369 Err(e) => log::error!("Failed to parse orderbook deltas: {e}"),
1370 }
1371
1372 let instrument_id = instrument.id();
1373 let last_quote = quote_cache.get(&instrument_id);
1374
1375 match parse_orderbook_quote(msg, &instrument, last_quote, ts_init) {
1376 Ok(quote) => {
1377 quote_cache.insert(instrument_id, quote);
1378 send_data_to_python(Data::Quote(quote), call_soon, callback);
1379 }
1380 Err(e) => log::error!("Failed to parse orderbook quote: {e}"),
1381 }
1382}
1383
1384fn handle_trade(
1385 msg: &crate::websocket::messages::BybitWsTradeMsg,
1386 product_type: Option<BybitProductType>,
1387 instruments: &AtomicMap<Ustr, InstrumentAny>,
1388 trade_subs: &AtomicSet<InstrumentId>,
1389 clock: &AtomicTime,
1390 call_soon: &Py<PyAny>,
1391 callback: &Py<PyAny>,
1392) {
1393 let ts_init = clock.get_time_ns();
1394
1395 for trade in &msg.data {
1396 let Some(instrument) = resolve_instrument(&trade.s, product_type, instruments) else {
1397 continue;
1398 };
1399
1400 if product_type == Some(BybitProductType::Option)
1401 && !trade_subs.is_empty()
1402 && !trade_subs.contains(&instrument.id())
1403 {
1404 continue;
1405 }
1406
1407 match parse_ws_trade_tick(trade, &instrument, ts_init) {
1408 Ok(tick) => send_data_to_python(Data::Trade(tick), call_soon, callback),
1409 Err(e) => log::error!("Failed to parse trade tick: {e}"),
1410 }
1411 }
1412}
1413
1414#[expect(clippy::too_many_arguments)]
1415fn handle_kline(
1416 msg: &crate::websocket::messages::BybitWsKlineMsg,
1417 product_type: Option<BybitProductType>,
1418 instruments: &AtomicMap<Ustr, InstrumentAny>,
1419 bar_types_cache: &AtomicMap<String, BarType>,
1420 bars_timestamp_on_close: bool,
1421 clock: &AtomicTime,
1422 call_soon: &Py<PyAny>,
1423 callback: &Py<PyAny>,
1424) {
1425 let Ok((_, raw_symbol)) = parse_kline_topic(msg.topic.as_str()) else {
1426 return;
1427 };
1428 let ustr_symbol = Ustr::from(raw_symbol);
1429 let Some(instrument) = resolve_instrument(&ustr_symbol, product_type, instruments) else {
1430 return;
1431 };
1432 let Some(bar_type) = bar_types_cache.load().get(msg.topic.as_str()).copied() else {
1433 return;
1434 };
1435
1436 let ts_init = clock.get_time_ns();
1437
1438 for kline in &msg.data {
1439 if !kline.confirm {
1440 continue;
1441 }
1442
1443 match parse_ws_kline_bar(
1444 kline,
1445 &instrument,
1446 bar_type,
1447 bars_timestamp_on_close,
1448 ts_init,
1449 ) {
1450 Ok(bar) => send_data_to_python(Data::Bar(bar), call_soon, callback),
1451 Err(e) => log::error!("Failed to parse kline bar: {e}"),
1452 }
1453 }
1454}
1455
1456#[expect(clippy::too_many_arguments)]
1457fn handle_ticker_linear(
1458 msg: &crate::websocket::messages::BybitWsTickerLinearMsg,
1459 product_type: Option<BybitProductType>,
1460 instruments: &AtomicMap<Ustr, InstrumentAny>,
1461 quote_cache: &mut AHashMap<InstrumentId, QuoteTick>,
1462 funding_cache: &mut AHashMap<Ustr, (Option<String>, Option<String>)>,
1463 clock: &AtomicTime,
1464 call_soon: &Py<PyAny>,
1465 callback: &Py<PyAny>,
1466) {
1467 let Some(instrument) = resolve_instrument(&msg.data.symbol, product_type, instruments) else {
1468 return;
1469 };
1470 let instrument_id = instrument.id();
1471 let ts_init = clock.get_time_ns();
1472
1473 if msg.data.bid1_price.is_some() {
1474 match parse_ticker_linear_quote(msg, &instrument, ts_init) {
1475 Ok(quote) => {
1476 let last = quote_cache.get(&instrument_id);
1477
1478 if last.is_none_or(|q| *q != quote) {
1479 quote_cache.insert(instrument_id, quote);
1480 send_data_to_python(Data::Quote(quote), call_soon, callback);
1481 }
1482 }
1483 Err(e) => log::debug!("Skipping partial ticker update: {e}"),
1484 }
1485 }
1486
1487 let ts_event = match parse_millis_i64(msg.ts, "ticker.ts") {
1488 Ok(ts) => ts,
1489 Err(e) => {
1490 log::error!("Failed to parse ticker timestamp: {e}");
1491 return;
1492 }
1493 };
1494
1495 let cache_entry = funding_cache.entry(msg.data.symbol).or_insert((None, None));
1496 let mut changed = false;
1497
1498 if let Some(rate) = &msg.data.funding_rate
1499 && cache_entry.0.as_ref() != Some(rate)
1500 {
1501 cache_entry.0 = Some(rate.clone());
1502 changed = true;
1503 }
1504
1505 if let Some(next_time) = &msg.data.next_funding_time
1506 && cache_entry.1.as_ref() != Some(next_time)
1507 {
1508 cache_entry.1 = Some(next_time.clone());
1509 changed = true;
1510 }
1511
1512 if changed {
1513 match parse_ticker_linear_funding(&msg.data, instrument_id, ts_event, ts_init) {
1514 Ok(update) => send_to_python(update, call_soon, callback),
1515 Err(e) => log::debug!("Skipping funding rate update: {e}"),
1516 }
1517 }
1518
1519 if msg.data.mark_price.is_some() {
1520 match parse_ticker_linear_mark_price(&msg.data, &instrument, ts_event, ts_init) {
1521 Ok(update) => send_to_python(update, call_soon, callback),
1522 Err(e) => log::debug!("Skipping mark price update: {e}"),
1523 }
1524 }
1525
1526 if msg.data.index_price.is_some() {
1527 match parse_ticker_linear_index_price(&msg.data, &instrument, ts_event, ts_init) {
1528 Ok(update) => send_to_python(update, call_soon, callback),
1529 Err(e) => log::debug!("Skipping index price update: {e}"),
1530 }
1531 }
1532}
1533
1534#[expect(clippy::too_many_arguments)]
1535fn handle_ticker_option(
1536 msg: &crate::websocket::messages::BybitWsTickerOptionMsg,
1537 product_type: Option<BybitProductType>,
1538 instruments: &AtomicMap<Ustr, InstrumentAny>,
1539 quote_cache: &mut AHashMap<InstrumentId, QuoteTick>,
1540 option_greeks_subs: &AtomicSet<InstrumentId>,
1541 clock: &AtomicTime,
1542 call_soon: &Py<PyAny>,
1543 callback: &Py<PyAny>,
1544) {
1545 let Some(instrument) = resolve_instrument(&msg.data.symbol, product_type, instruments) else {
1546 return;
1547 };
1548 let instrument_id = instrument.id();
1549 let ts_init = clock.get_time_ns();
1550
1551 match parse_ticker_option_quote(msg, &instrument, ts_init) {
1552 Ok(quote) => {
1553 let last = quote_cache.get(&instrument_id);
1554
1555 if last.is_none_or(|q| *q != quote) {
1556 quote_cache.insert(instrument_id, quote);
1557 send_data_to_python(Data::Quote(quote), call_soon, callback);
1558 }
1559 }
1560 Err(e) => log::error!("Failed to parse ticker option quote: {e}"),
1561 }
1562
1563 match parse_ticker_option_mark_price(msg, &instrument, ts_init) {
1564 Ok(update) => send_to_python(update, call_soon, callback),
1565 Err(e) => log::error!("Failed to parse ticker option mark price: {e}"),
1566 }
1567
1568 match parse_ticker_option_index_price(msg, &instrument, ts_init) {
1569 Ok(update) => send_to_python(update, call_soon, callback),
1570 Err(e) => log::error!("Failed to parse ticker option index price: {e}"),
1571 }
1572
1573 if option_greeks_subs.contains(&instrument_id) {
1574 match parse_ticker_option_greeks(msg, &instrument, ts_init) {
1575 Ok(greeks) => send_to_python(greeks, call_soon, callback),
1576 Err(e) => log::error!("Failed to parse option greeks: {e}"),
1577 }
1578 }
1579}
1580
1581fn handle_account_order(
1582 msg: &crate::websocket::messages::BybitWsAccountOrderMsg,
1583 instruments: &AtomicMap<Ustr, InstrumentAny>,
1584 account_id: Option<AccountId>,
1585 clock: &AtomicTime,
1586 call_soon: &Py<PyAny>,
1587 callback: &Py<PyAny>,
1588) {
1589 let ts_init = clock.get_time_ns();
1590
1591 for order in &msg.data {
1592 let symbol = make_bybit_symbol(order.symbol, order.category);
1593 let Some(instrument) = instruments.get_cloned(&symbol) else {
1594 log::warn!("No instrument for order update: {symbol}");
1595 continue;
1596 };
1597 let Some(account_id) = account_id else {
1598 continue;
1599 };
1600
1601 match parse_ws_order_status_report(order, &instrument, account_id, ts_init) {
1602 Ok(report) => send_to_python(report, call_soon, callback),
1603 Err(e) => log::error!("Failed to parse order status report: {e}"),
1604 }
1605 }
1606}
1607
1608fn handle_account_execution(
1609 msg: &crate::websocket::messages::BybitWsAccountExecutionMsg,
1610 instruments: &AtomicMap<Ustr, InstrumentAny>,
1611 account_id: Option<AccountId>,
1612 clock: &AtomicTime,
1613 call_soon: &Py<PyAny>,
1614 callback: &Py<PyAny>,
1615) {
1616 let ts_init = clock.get_time_ns();
1617
1618 for exec in &msg.data {
1619 let symbol = make_bybit_symbol(exec.symbol, exec.category);
1620 let Some(instrument) = instruments.get_cloned(&symbol) else {
1621 log::warn!("No instrument for execution update: {symbol}");
1622 continue;
1623 };
1624 let Some(account_id) = account_id else {
1625 continue;
1626 };
1627
1628 match parse_ws_fill_report(exec, account_id, &instrument, ts_init) {
1629 Ok(report) => send_to_python(report, call_soon, callback),
1630 Err(e) => log::error!("Failed to parse fill report: {e}"),
1631 }
1632 }
1633}
1634
1635fn handle_account_wallet(
1636 msg: &crate::websocket::messages::BybitWsAccountWalletMsg,
1637 account_id: Option<AccountId>,
1638 clock: &AtomicTime,
1639 call_soon: &Py<PyAny>,
1640 callback: &Py<PyAny>,
1641) {
1642 let ts_init = clock.get_time_ns();
1643 let ts_event = parse_millis_i64(msg.creation_time, "wallet.creation_time").unwrap_or(ts_init);
1644 let Some(account_id) = account_id else {
1645 return;
1646 };
1647
1648 for wallet in &msg.data {
1649 match parse_ws_account_state(wallet, account_id, ts_event, ts_init) {
1650 Ok(state) => send_to_python(state, call_soon, callback),
1651 Err(e) => log::error!("Failed to parse account state: {e}"),
1652 }
1653 }
1654}
1655
1656fn handle_account_position(
1657 msg: &crate::websocket::messages::BybitWsAccountPositionMsg,
1658 instruments: &AtomicMap<Ustr, InstrumentAny>,
1659 account_id: Option<AccountId>,
1660 clock: &AtomicTime,
1661 call_soon: &Py<PyAny>,
1662 callback: &Py<PyAny>,
1663) {
1664 let ts_init = clock.get_time_ns();
1665
1666 for position in &msg.data {
1667 let symbol = make_bybit_symbol(position.symbol, position.category);
1668 let Some(instrument) = instruments.get_cloned(&symbol) else {
1669 log::warn!("No instrument for position update: {symbol}");
1670 continue;
1671 };
1672 let Some(account_id) = account_id else {
1673 continue;
1674 };
1675
1676 match parse_ws_position_status_report(position, account_id, &instrument, ts_init) {
1677 Ok(report) => send_to_python(report, call_soon, callback),
1678 Err(e) => log::error!("Failed to parse position status report: {e}"),
1679 }
1680 }
1681}
1682
1683fn handle_order_response(
1684 resp: &crate::websocket::messages::BybitWsOrderResponse,
1685 pending_py_requests: &DashMap<String, Vec<PendingPyRequest>>,
1686 account_id: Option<AccountId>,
1687 clock: &AtomicTime,
1688 call_soon: &Py<PyAny>,
1689 callback: &Py<PyAny>,
1690) {
1691 if resp.ret_code == 0 {
1692 let entries = resp
1693 .req_id
1694 .as_ref()
1695 .and_then(|rid| pending_py_requests.remove(rid))
1696 .map(|(_, v)| v);
1697
1698 if let Some(entries) = entries {
1700 let batch_errors = resp.extract_batch_errors();
1701 let data_array = resp.data.as_array();
1702 let ts_init = clock.get_time_ns();
1703
1704 for (idx, error) in batch_errors.iter().enumerate() {
1705 if error.code == 0 {
1706 continue;
1707 }
1708
1709 let pending = data_array
1710 .and_then(|arr| arr.get(idx))
1711 .and_then(|item| item.get("orderLinkId"))
1712 .and_then(|v| v.as_str())
1713 .filter(|s| !s.is_empty())
1714 .and_then(|oli| {
1715 let cid = ClientOrderId::new(oli);
1716 entries.iter().find(|e| e.client_order_id == cid)
1717 })
1718 .or_else(|| entries.get(idx));
1719
1720 if let Some(pending) = pending {
1721 let reason = Ustr::from(&error.msg);
1722 emit_rejection(pending, reason, account_id, ts_init, call_soon, callback);
1723 } else {
1724 log::warn!(
1725 "Batch error at index {idx} without correlation: code={}, msg={}",
1726 error.code,
1727 error.msg,
1728 );
1729 }
1730 }
1731 }
1732 return;
1733 }
1734
1735 let entries = resp
1737 .req_id
1738 .as_ref()
1739 .and_then(|rid| pending_py_requests.remove(rid))
1740 .map(|(_, v)| v)
1741 .or_else(|| {
1742 let order_link_id = resp
1744 .data
1745 .get("orderLinkId")
1746 .and_then(|v| v.as_str())
1747 .filter(|s| !s.is_empty())?;
1748 let cid = ClientOrderId::new(order_link_id);
1749 let key = pending_py_requests
1750 .iter()
1751 .find(|entry| entry.value().iter().any(|e| e.client_order_id == cid))
1752 .map(|entry| entry.key().clone())?;
1753 pending_py_requests.remove(&key).map(|(_, v)| v)
1754 });
1755
1756 let Some(entries) = entries else {
1757 log::warn!(
1758 "Unmatched order response: ret_code={}, ret_msg={}",
1759 resp.ret_code,
1760 resp.ret_msg,
1761 );
1762 return;
1763 };
1764
1765 let ts_init = clock.get_time_ns();
1766 let reason = Ustr::from(&resp.ret_msg);
1767
1768 for pending in &entries {
1769 emit_rejection(pending, reason, account_id, ts_init, call_soon, callback);
1770 }
1771}
1772
1773fn emit_rejection(
1774 pending: &PendingPyRequest,
1775 reason: Ustr,
1776 account_id: Option<AccountId>,
1777 ts_init: UnixNanos,
1778 call_soon: &Py<PyAny>,
1779 callback: &Py<PyAny>,
1780) {
1781 match pending.operation {
1782 PendingOperation::Place => {
1783 let event = OrderRejected::new(
1784 pending.trader_id,
1785 pending.strategy_id,
1786 pending.instrument_id,
1787 pending.client_order_id,
1788 account_id.unwrap_or(AccountId::from("BYBIT-000")),
1789 reason,
1790 UUID4::new(),
1791 ts_init,
1792 ts_init,
1793 false,
1794 false,
1795 );
1796 send_to_python(event, call_soon, callback);
1797 }
1798 PendingOperation::Cancel => {
1799 let event = OrderCancelRejected::new(
1800 pending.trader_id,
1801 pending.strategy_id,
1802 pending.instrument_id,
1803 pending.client_order_id,
1804 reason,
1805 UUID4::new(),
1806 ts_init,
1807 ts_init,
1808 false,
1809 pending.venue_order_id,
1810 account_id,
1811 );
1812 send_to_python(event, call_soon, callback);
1813 }
1814 PendingOperation::Amend => {
1815 let event = OrderModifyRejected::new(
1816 pending.trader_id,
1817 pending.strategy_id,
1818 pending.instrument_id,
1819 pending.client_order_id,
1820 reason,
1821 UUID4::new(),
1822 ts_init,
1823 ts_init,
1824 false,
1825 pending.venue_order_id,
1826 account_id,
1827 );
1828 send_to_python(event, call_soon, callback);
1829 }
1830 }
1831}