1use std::{
19 future::Future,
20 sync::{Arc, Mutex},
21 time::{Duration, Instant},
22};
23
24use ahash::AHashMap;
25use anyhow::Context;
26use async_trait::async_trait;
27use futures_util::{StreamExt, pin_mut};
28use nautilus_common::{
29 clients::ExecutionClient,
30 live::{get_runtime, runner::get_exec_event_sender},
31 messages::execution::{
32 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
33 GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
34 GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
35 GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
36 SubmitOrderList,
37 },
38};
39use nautilus_core::{
40 MUTEX_POISONED, UnixNanos,
41 env::get_or_env_var,
42 time::{AtomicTime, get_atomic_clock_realtime},
43};
44use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
45use nautilus_model::{
46 accounts::AccountAny,
47 enums::{OmsType, OrderSide, OrderType, TimeInForce},
48 identifiers::{AccountId, ClientId, InstrumentId, Venue},
49 instruments::{Instrument, InstrumentAny},
50 orders::{Order, OrderAny},
51 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
52 types::{AccountBalance, MarginBalance, Price},
53};
54use tokio::task::JoinHandle;
55use ustr::Ustr;
56
57use crate::{
58 common::{
59 consts::BYBIT_VENUE,
60 credential::credential_env_vars,
61 enums::{
62 BybitAccountType, BybitEnvironment, BybitOrderSide, BybitOrderType, BybitPositionIdx,
63 BybitPositionMode, BybitProductType, BybitTimeInForce, BybitTpSlMode,
64 resolve_trigger_type,
65 },
66 parse::{
67 BybitTpSlParams, extract_raw_symbol, get_price_str, nanos_to_millis,
68 parse_bybit_tp_sl_params, spot_leverage, spot_market_unit, trigger_direction,
69 },
70 symbol::BybitSymbol,
71 },
72 config::BybitExecClientConfig,
73 http::client::BybitHttpClient,
74 websocket::{
75 client::BybitWebSocketClient,
76 dispatch::{OrderIdentity, PendingOperation, WsDispatchState, dispatch_ws_message},
77 messages::{BybitWsAmendOrderParams, BybitWsCancelOrderParams, BybitWsPlaceOrderParams},
78 },
79};
80
81#[must_use]
87pub fn resolve_position_idx(
88 position_mode: Option<BybitPositionMode>,
89 order_side: BybitOrderSide,
90 is_reduce_only: bool,
91 manual_override: Option<BybitPositionIdx>,
92) -> Option<BybitPositionIdx> {
93 if manual_override.is_some() {
94 return manual_override;
95 }
96 let mode = position_mode?;
97 match mode {
98 BybitPositionMode::BothSides => Some(match (order_side, is_reduce_only) {
99 (BybitOrderSide::Buy, false) | (BybitOrderSide::Sell, true) => {
100 BybitPositionIdx::BuyHedge
101 }
102 (BybitOrderSide::Sell, false) | (BybitOrderSide::Buy, true) => {
103 BybitPositionIdx::SellHedge
104 }
105 (BybitOrderSide::Unknown, _) => BybitPositionIdx::OneWay,
106 }),
107 BybitPositionMode::MergedSingle => Some(BybitPositionIdx::OneWay),
108 }
109}
110
111fn parse_derivative_symbol(symbol_str: &str) -> Option<BybitSymbol> {
112 let symbol = match BybitSymbol::new(symbol_str) {
113 Ok(s) => s,
114 Err(e) => {
115 log::warn!("Failed to parse symbol {symbol_str}: {e}");
116 return None;
117 }
118 };
119 matches!(
120 symbol.product_type(),
121 BybitProductType::Linear | BybitProductType::Inverse
122 )
123 .then_some(symbol)
124}
125
126fn is_unchanged_error<E: std::fmt::Display>(err: &E, code: &str) -> bool {
127 let msg = err.to_string().to_lowercase();
128 if msg.contains("not been modified") {
129 return true;
130 }
131 !code.is_empty() && msg.contains(code)
132}
133
134fn is_low_margin_error<E: std::fmt::Display>(err: &E) -> bool {
135 err.to_string()
136 .contains("needs to be equal to or greater than")
137}
138
139#[derive(Debug)]
141pub struct BybitExecutionClient {
142 core: ExecutionClientCore,
143 clock: &'static AtomicTime,
144 config: BybitExecClientConfig,
145 emitter: ExecutionEventEmitter,
146 http_client: BybitHttpClient,
147 ws_private: BybitWebSocketClient,
148 ws_trade: BybitWebSocketClient,
149 ws_private_stream_handle: Option<JoinHandle<()>>,
150 ws_trade_stream_handle: Option<JoinHandle<()>>,
151 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
152 instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
153 dispatch_state: Arc<WsDispatchState>,
154}
155
156impl BybitExecutionClient {
157 pub fn new(core: ExecutionClientCore, config: BybitExecClientConfig) -> anyhow::Result<Self> {
163 let (key_var, secret_var) = credential_env_vars(config.environment);
164 let api_key = get_or_env_var(config.api_key.clone(), key_var)?;
165 let api_secret = get_or_env_var(config.api_secret.clone(), secret_var)?;
166
167 let http_client = BybitHttpClient::with_credentials(
168 api_key.clone(),
169 api_secret.clone(),
170 Some(config.http_base_url()),
171 config.http_timeout_secs,
172 config.max_retries,
173 config.retry_delay_initial_ms,
174 config.retry_delay_max_ms,
175 config.recv_window_ms,
176 config.proxy_url.clone(),
177 )?;
178
179 let ws_private = BybitWebSocketClient::new_private(
180 config.environment,
181 Some(api_key.clone()),
182 Some(api_secret.clone()),
183 Some(config.ws_private_url()),
184 config.heartbeat_interval_secs,
185 config.transport_backend,
186 config.proxy_url.clone(),
187 );
188
189 let ws_trade = BybitWebSocketClient::new_trade(
190 config.environment,
191 Some(api_key),
192 Some(api_secret),
193 Some(config.ws_trade_url()),
194 config.heartbeat_interval_secs,
195 config.transport_backend,
196 config.proxy_url.clone(),
197 );
198
199 let clock = get_atomic_clock_realtime();
200 let emitter = ExecutionEventEmitter::new(
201 clock,
202 core.trader_id,
203 core.account_id,
204 core.account_type,
205 None,
206 );
207
208 Ok(Self {
209 core,
210 clock,
211 config,
212 emitter,
213 http_client,
214 ws_private,
215 ws_trade,
216 ws_private_stream_handle: None,
217 ws_trade_stream_handle: None,
218 pending_tasks: Mutex::new(Vec::new()),
219 instruments_cache: Arc::new(AHashMap::new()),
220 dispatch_state: Arc::new(WsDispatchState::default()),
221 })
222 }
223
224 fn product_types(&self) -> Vec<BybitProductType> {
225 if self.config.product_types.is_empty() {
226 vec![BybitProductType::Linear]
227 } else {
228 self.config.product_types.clone()
229 }
230 }
231
232 fn update_account_state(&self) {
233 let http_client = self.http_client.clone();
234 let account_id = self.core.account_id;
235 let emitter = self.emitter.clone();
236
237 self.spawn_task("query_account", async move {
238 let account_state = http_client
239 .request_account_state(BybitAccountType::Unified, account_id)
240 .await
241 .context("failed to request Bybit account state")?;
242 emitter.send_account_state(account_state);
243 Ok(())
244 });
245 }
246
247 fn spawn_task<F>(&self, description: &'static str, fut: F)
248 where
249 F: Future<Output = anyhow::Result<()>> + Send + 'static,
250 {
251 let runtime = get_runtime();
252 let handle = runtime.spawn(async move {
253 if let Err(e) = fut.await {
254 log::warn!("{description} failed: {e:?}");
255 }
256 });
257
258 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
259 tasks.retain(|handle| !handle.is_finished());
260 tasks.push(handle);
261 }
262
263 fn abort_pending_tasks(&self) {
264 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
265 for handle in tasks.drain(..) {
266 handle.abort();
267 }
268 }
269
270 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
272 let account_id = self.core.account_id;
273
274 if self.core.cache().account(&account_id).is_some() {
275 log::info!("Account {account_id} registered");
276 return Ok(());
277 }
278
279 let start = Instant::now();
280 let timeout = Duration::from_secs_f64(timeout_secs);
281 let interval = Duration::from_millis(10);
282
283 loop {
284 tokio::time::sleep(interval).await;
285
286 if self.core.cache().account(&account_id).is_some() {
287 log::info!("Account {account_id} registered");
288 return Ok(());
289 }
290
291 if start.elapsed() >= timeout {
292 anyhow::bail!(
293 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
294 );
295 }
296 }
297 }
298
299 fn get_product_type_for_instrument(&self, instrument_id: InstrumentId) -> BybitProductType {
300 BybitProductType::from_suffix(instrument_id.symbol.as_str()).unwrap_or_else(|| {
301 log::warn!("No product-type suffix on {instrument_id}, defaulting to Linear");
302 BybitProductType::Linear
303 })
304 }
305
306 fn resolve_position_idx(
307 &self,
308 instrument_id: InstrumentId,
309 order_side: BybitOrderSide,
310 is_reduce_only: bool,
311 manual_override: Option<BybitPositionIdx>,
312 ) -> Option<BybitPositionIdx> {
313 let product_type = self.get_product_type_for_instrument(instrument_id);
314 if !matches!(
315 product_type,
316 BybitProductType::Linear | BybitProductType::Inverse
317 ) {
318 return None;
319 }
320 let mode = self
321 .config
322 .position_mode
323 .as_ref()
324 .and_then(|map| map.get(instrument_id.symbol.as_str()).copied());
325 resolve_position_idx(mode, order_side, is_reduce_only, manual_override)
326 }
327
328 async fn apply_account_configuration(&self) -> anyhow::Result<()> {
329 self.apply_leverages_setting().await;
330 self.apply_position_modes_setting().await;
331 self.apply_margin_mode_setting().await
332 }
333
334 async fn apply_leverages_setting(&self) {
335 let Some(leverages) = &self.config.futures_leverages else {
336 return;
337 };
338
339 for (symbol_str, leverage) in leverages {
340 self.apply_leverage_entry(symbol_str, *leverage).await;
341 }
342 }
343
344 async fn apply_leverage_entry(&self, symbol_str: &str, leverage: u32) {
345 let Some(symbol) = parse_derivative_symbol(symbol_str) else {
346 return;
347 };
348 let lev = leverage.to_string();
349 let result = self
350 .http_client
351 .set_leverage(symbol.product_type(), symbol.raw_symbol(), &lev, &lev)
352 .await;
353
354 match result {
355 Ok(_) => log::info!("Set leverage for {symbol_str} to {leverage}"),
356 Err(e) if is_unchanged_error(&e, "110043") => {
357 log::info!("Leverage already set for {symbol_str} to {leverage}");
358 }
359 Err(e) => log::error!("Failed to set leverage for {symbol_str}: {e}"),
360 }
361 }
362
363 async fn apply_position_modes_setting(&self) {
364 let Some(modes) = &self.config.position_mode else {
365 return;
366 };
367
368 for (symbol_str, mode) in modes {
369 self.apply_position_mode_entry(symbol_str, *mode).await;
370 }
371 }
372
373 async fn apply_position_mode_entry(&self, symbol_str: &str, mode: BybitPositionMode) {
374 let Some(symbol) = parse_derivative_symbol(symbol_str) else {
375 return;
376 };
377 let result = self
378 .http_client
379 .switch_mode(
380 symbol.product_type(),
381 mode,
382 Some(symbol.raw_symbol().to_string()),
383 None,
384 )
385 .await;
386
387 match result {
388 Ok(_) => log::info!("Set symbol `{symbol_str}` position mode to `{mode:?}`"),
389 Err(e) if is_unchanged_error(&e, "110025") => {
390 log::info!("Symbol `{symbol_str}` position mode already set to `{mode:?}`");
391 }
392 Err(e) => log::error!("Failed to set position mode for {symbol_str}: {e}"),
393 }
394 }
395
396 async fn apply_margin_mode_setting(&self) -> anyhow::Result<()> {
397 let Some(margin_mode) = self.config.margin_mode else {
398 return Ok(());
399 };
400
401 let result = self.http_client.set_margin_mode(margin_mode).await;
402
403 match result {
404 Ok(_) => {
405 log::info!("Set account margin mode to {margin_mode:?}");
406 Ok(())
407 }
408 Err(e) if is_unchanged_error(&e, "") => {
409 log::info!("Margin mode already set to {margin_mode:?}");
410 Ok(())
411 }
412 Err(e) if is_low_margin_error(&e) => {
413 log::warn!("Cannot set margin mode: {e}");
414 Ok(())
415 }
416 Err(e) => Err(anyhow::Error::from(e).context("failed to set margin mode")),
417 }
418 }
419
420 fn map_order_type(order_type: OrderType) -> anyhow::Result<(BybitOrderType, bool)> {
421 match order_type {
422 OrderType::Market => Ok((BybitOrderType::Market, false)),
423 OrderType::Limit => Ok((BybitOrderType::Limit, false)),
424 OrderType::StopMarket | OrderType::MarketIfTouched => {
425 Ok((BybitOrderType::Market, true))
426 }
427 OrderType::StopLimit | OrderType::LimitIfTouched => Ok((BybitOrderType::Limit, true)),
428 _ => anyhow::bail!("unsupported order type for Bybit: {order_type}"),
429 }
430 }
431
432 fn map_time_in_force(tif: TimeInForce, is_post_only: bool) -> BybitTimeInForce {
433 if is_post_only {
434 return BybitTimeInForce::PostOnly;
435 }
436
437 match tif {
438 TimeInForce::Gtc => BybitTimeInForce::Gtc,
439 TimeInForce::Ioc => BybitTimeInForce::Ioc,
440 TimeInForce::Fok => BybitTimeInForce::Fok,
441 _ => BybitTimeInForce::Gtc,
442 }
443 }
444
445 fn build_ws_place_params(
446 order: &OrderAny,
447 product_type: BybitProductType,
448 raw_symbol: &str,
449 tp_sl: &BybitTpSlParams,
450 position_idx: Option<BybitPositionIdx>,
451 ) -> anyhow::Result<BybitWsPlaceOrderParams> {
452 let bybit_side = BybitOrderSide::try_from(order.order_side())?;
453 let (bybit_order_type, is_conditional) = Self::map_order_type(order.order_type())?;
454 let has_tp_sl = tp_sl.has_tp_sl();
455 let trigger_dir = trigger_direction(order.order_type(), order.order_side(), is_conditional);
456
457 Ok(BybitWsPlaceOrderParams {
458 category: product_type,
459 symbol: Ustr::from(raw_symbol),
460 side: bybit_side,
461 order_type: bybit_order_type,
462 qty: order.quantity().to_string(),
463 is_leverage: spot_leverage(product_type, tp_sl.is_leverage),
464 market_unit: spot_market_unit(
465 product_type,
466 bybit_order_type,
467 order.is_quote_quantity(),
468 ),
469 price: order.price().map(|p: Price| p.to_string()),
470 time_in_force: if bybit_order_type == BybitOrderType::Market {
471 None
472 } else {
473 Some(Self::map_time_in_force(
474 order.time_in_force(),
475 order.is_post_only(),
476 ))
477 },
478 order_link_id: Some(order.client_order_id().to_string()),
479 reduce_only: if order.is_reduce_only() {
480 Some(true)
481 } else {
482 None
483 },
484 close_on_trigger: tp_sl.close_on_trigger,
485 trigger_price: order.trigger_price().map(|p: Price| p.to_string()),
486 trigger_by: if is_conditional {
487 Some(resolve_trigger_type(order.trigger_type()))
488 } else {
489 None
490 },
491 trigger_direction: trigger_dir.map(|d| d as i32),
492 tpsl_mode: if has_tp_sl {
493 Some(BybitTpSlMode::Full)
494 } else {
495 None
496 },
497 take_profit: tp_sl.take_profit.map(|p| p.to_string()),
498 stop_loss: tp_sl.stop_loss.map(|p| p.to_string()),
499 tp_trigger_by: tp_sl.tp_trigger_by.or(tp_sl
500 .take_profit
501 .map(|_| resolve_trigger_type(order.trigger_type()))),
502 sl_trigger_by: tp_sl.sl_trigger_by.or(tp_sl
503 .stop_loss
504 .map(|_| resolve_trigger_type(order.trigger_type()))),
505 sl_trigger_price: tp_sl.sl_trigger_price.clone(),
506 tp_trigger_price: tp_sl.tp_trigger_price.clone(),
507 sl_order_type: tp_sl.sl_order_type,
508 tp_order_type: tp_sl.tp_order_type,
509 sl_limit_price: tp_sl.sl_limit_price.clone(),
510 tp_limit_price: tp_sl.tp_limit_price.clone(),
511 order_iv: tp_sl.order_iv.clone(),
512 mmp: tp_sl.mmp,
513 position_idx,
514 })
515 }
516}
517
518#[async_trait(?Send)]
519impl ExecutionClient for BybitExecutionClient {
520 fn is_connected(&self) -> bool {
521 self.core.is_connected()
522 }
523
524 fn client_id(&self) -> ClientId {
525 self.core.client_id
526 }
527
528 fn account_id(&self) -> AccountId {
529 self.core.account_id
530 }
531
532 fn venue(&self) -> Venue {
533 *BYBIT_VENUE
534 }
535
536 fn oms_type(&self) -> OmsType {
537 self.core.oms_type
538 }
539
540 fn get_account(&self) -> Option<AccountAny> {
541 self.core.cache().account(&self.core.account_id).cloned()
542 }
543
544 async fn connect(&mut self) -> anyhow::Result<()> {
545 if self.core.is_connected() {
546 return Ok(());
547 }
548
549 self.http_client.reset_cancellation_token();
551
552 let product_types = self.product_types();
553
554 if !self.core.instruments_initialized() {
555 let mut all_instruments = Vec::new();
556
557 for product_type in &product_types {
558 let instruments = self
559 .http_client
560 .request_instruments(*product_type, None, None)
561 .await
562 .with_context(|| {
563 format!("failed to request Bybit instruments for {product_type:?}")
564 })?;
565
566 if instruments.is_empty() {
567 log::warn!("No instruments returned for {product_type:?}");
568 continue;
569 }
570
571 log::info!("Loaded {} {product_type:?} instruments", instruments.len());
572
573 self.http_client.cache_instruments(&instruments);
574 all_instruments.extend(instruments);
575 }
576
577 if !all_instruments.is_empty() {
578 let mut instruments_map = AHashMap::new();
579 for instrument in &all_instruments {
580 instruments_map.insert(instrument.id().symbol.inner(), instrument.clone());
581 }
582 self.instruments_cache = Arc::new(instruments_map);
583 }
584 self.core.set_instruments_initialized();
585 }
586
587 self.ws_private.set_account_id(self.core.account_id);
588 self.ws_trade.set_account_id(self.core.account_id);
589
590 self.ws_private.connect().await?;
591 self.ws_private.wait_until_active(10.0).await?;
592 log::info!("Connected to private WebSocket");
593
594 if self.ws_private_stream_handle.is_none() {
595 let stream = self.ws_private.stream();
596 let emitter = self.emitter.clone();
597 let account_id = self.core.account_id;
598 let instruments = Arc::clone(&self.instruments_cache);
599 let state = Arc::clone(&self.dispatch_state);
600 let clock = self.clock;
601
602 let handle = get_runtime().spawn(async move {
603 pin_mut!(stream);
604 while let Some(message) = stream.next().await {
605 dispatch_ws_message(
606 &message,
607 &emitter,
608 &state,
609 account_id,
610 &instruments,
611 clock,
612 );
613 }
614 });
615 self.ws_private_stream_handle = Some(handle);
616 }
617
618 if self.config.environment == BybitEnvironment::Demo {
620 log::warn!("Demo mode: Trade WebSocket not available, orders use HTTP REST API");
621 } else {
622 self.ws_trade.connect().await?;
623 self.ws_trade.wait_until_active(10.0).await?;
624 log::info!("Connected to trade WebSocket");
625
626 if self.ws_trade_stream_handle.is_none() {
627 let stream = self.ws_trade.stream();
628 let emitter = self.emitter.clone();
629 let account_id = self.core.account_id;
630 let instruments = Arc::clone(&self.instruments_cache);
631 let state = Arc::clone(&self.dispatch_state);
632 let clock = self.clock;
633
634 let handle = get_runtime().spawn(async move {
635 pin_mut!(stream);
636 while let Some(message) = stream.next().await {
637 dispatch_ws_message(
638 &message,
639 &emitter,
640 &state,
641 account_id,
642 &instruments,
643 clock,
644 );
645 }
646 });
647 self.ws_trade_stream_handle = Some(handle);
648 }
649 }
650
651 self.ws_private.subscribe_orders().await?;
652 self.ws_private.subscribe_executions().await?;
653 self.ws_private.subscribe_positions().await?;
654 self.ws_private.subscribe_wallet().await?;
655
656 self.apply_account_configuration().await?;
657
658 let account_state = self
659 .http_client
660 .request_account_state(BybitAccountType::Unified, self.core.account_id)
661 .await
662 .context("failed to request Bybit account state")?;
663
664 if !account_state.balances.is_empty() {
665 log::info!(
666 "Received account state with {} balance(s)",
667 account_state.balances.len()
668 );
669 }
670 self.emitter.send_account_state(account_state);
671
672 self.await_account_registered(30.0).await?;
673
674 self.core.set_connected();
675 log::info!("Connected: client_id={}", self.core.client_id);
676 Ok(())
677 }
678
679 async fn disconnect(&mut self) -> anyhow::Result<()> {
680 if self.core.is_disconnected() {
681 return Ok(());
682 }
683
684 self.abort_pending_tasks();
685 self.http_client.cancel_all_requests();
686
687 if let Err(e) = self.ws_private.close().await {
688 log::warn!("Error closing private websocket: {e:?}");
689 }
690
691 if let Err(e) = self.ws_trade.close().await {
692 log::warn!("Error closing trade websocket: {e:?}");
693 }
694
695 if let Some(handle) = self.ws_private_stream_handle.take() {
696 handle.abort();
697 }
698
699 if let Some(handle) = self.ws_trade_stream_handle.take() {
700 handle.abort();
701 }
702
703 self.core.set_disconnected();
704 log::info!("Disconnected: client_id={}", self.core.client_id);
705 Ok(())
706 }
707
708 fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
709 self.update_account_state();
710 Ok(())
711 }
712
713 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
714 let instrument_id = cmd.instrument_id;
715 let product_type = self.get_product_type_for_instrument(instrument_id);
716 let client_order_id = cmd.client_order_id;
717 let venue_order_id = cmd.venue_order_id;
718 let account_id = self.core.account_id;
719 let http_client = self.http_client.clone();
720 let emitter = self.emitter.clone();
721
722 self.spawn_task("query_order", async move {
723 match http_client
724 .query_order(
725 account_id,
726 product_type,
727 instrument_id,
728 Some(client_order_id),
729 venue_order_id,
730 )
731 .await
732 {
733 Ok(Some(report)) => {
734 emitter.send_order_status_report(report);
735 }
736 Ok(None) => {
737 log::warn!("Order not found: client_order_id={client_order_id}, venue_order_id={venue_order_id:?}");
738 }
739 Err(e) => {
740 log::error!("Failed to query order: {e}");
741 }
742 }
743 Ok(())
744 });
745
746 Ok(())
747 }
748
749 fn generate_account_state(
750 &self,
751 balances: Vec<AccountBalance>,
752 margins: Vec<MarginBalance>,
753 reported: bool,
754 ts_event: UnixNanos,
755 ) -> anyhow::Result<()> {
756 self.emitter
757 .emit_account_state(balances, margins, reported, ts_event);
758 Ok(())
759 }
760
761 fn start(&mut self) -> anyhow::Result<()> {
762 if self.core.is_started() {
763 return Ok(());
764 }
765
766 let sender = get_exec_event_sender();
767 self.emitter.set_sender(sender);
768 self.core.set_started();
769
770 let http_client = self.http_client.clone();
771 let product_types = self.config.product_types.clone();
772
773 get_runtime().spawn(async move {
774 let mut all_instruments = Vec::new();
775
776 for product_type in product_types {
777 match http_client
778 .request_instruments(product_type, None, None)
779 .await
780 {
781 Ok(instruments) => {
782 if instruments.is_empty() {
783 log::warn!("No instruments returned for {product_type:?}");
784 continue;
785 }
786 http_client.cache_instruments(&instruments);
787 all_instruments.extend(instruments);
788 }
789 Err(e) => {
790 log::error!("Failed to request instruments for {product_type:?}: {e}");
791 }
792 }
793 }
794
795 if all_instruments.is_empty() {
796 log::warn!(
797 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
798 );
799 } else {
800 log::info!("Instruments initialized: count={}", all_instruments.len());
801 }
802 });
803
804 log::info!(
805 "Started: client_id={}, account_id={}, account_type={:?}, product_types={:?}, environment={:?}, proxy_url={:?}",
806 self.core.client_id,
807 self.core.account_id,
808 self.core.account_type,
809 self.config.product_types,
810 self.config.environment,
811 self.config.proxy_url,
812 );
813 Ok(())
814 }
815
816 fn stop(&mut self) -> anyhow::Result<()> {
817 if self.core.is_stopped() {
818 return Ok(());
819 }
820
821 self.core.set_stopped();
822 self.core.set_disconnected();
823
824 if let Some(handle) = self.ws_private_stream_handle.take() {
825 handle.abort();
826 }
827
828 if let Some(handle) = self.ws_trade_stream_handle.take() {
829 handle.abort();
830 }
831 self.abort_pending_tasks();
832 log::info!("Stopped: client_id={}", self.core.client_id);
833 Ok(())
834 }
835
836 async fn generate_order_status_report(
837 &self,
838 cmd: &GenerateOrderStatusReport,
839 ) -> anyhow::Result<Option<OrderStatusReport>> {
840 let Some(instrument_id) = cmd.instrument_id else {
841 log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
842 return Ok(None);
843 };
844
845 let product_type = self.get_product_type_for_instrument(instrument_id);
846
847 let mut reports = self
848 .http_client
849 .request_order_status_reports(
850 self.core.account_id,
851 product_type,
852 Some(instrument_id),
853 false,
854 None,
855 None,
856 None,
857 )
858 .await?;
859
860 if let Some(client_order_id) = cmd.client_order_id {
861 reports.retain(|report| report.client_order_id == Some(client_order_id));
862 }
863
864 if let Some(venue_order_id) = cmd.venue_order_id {
865 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
866 }
867
868 Ok(reports.into_iter().next())
869 }
870
871 async fn generate_order_status_reports(
872 &self,
873 cmd: &GenerateOrderStatusReports,
874 ) -> anyhow::Result<Vec<OrderStatusReport>> {
875 let mut reports = Vec::new();
876
877 if let Some(instrument_id) = cmd.instrument_id {
878 let product_type = self.get_product_type_for_instrument(instrument_id);
879 let mut fetched = self
880 .http_client
881 .request_order_status_reports(
882 self.core.account_id,
883 product_type,
884 Some(instrument_id),
885 cmd.open_only,
886 None,
887 None,
888 None,
889 )
890 .await?;
891 reports.append(&mut fetched);
892 } else {
893 for product_type in self.product_types() {
894 let mut fetched = self
895 .http_client
896 .request_order_status_reports(
897 self.core.account_id,
898 product_type,
899 None,
900 cmd.open_only,
901 None,
902 None,
903 None,
904 )
905 .await?;
906 reports.append(&mut fetched);
907 }
908 }
909
910 if let Some(start) = cmd.start {
911 reports.retain(|r| r.ts_last >= start);
912 }
913
914 if let Some(end) = cmd.end {
915 reports.retain(|r| r.ts_last <= end);
916 }
917
918 Ok(reports)
919 }
920
921 async fn generate_fill_reports(
922 &self,
923 cmd: GenerateFillReports,
924 ) -> anyhow::Result<Vec<FillReport>> {
925 let start_ms = nanos_to_millis(cmd.start);
926 let end_ms = nanos_to_millis(cmd.end);
927 let mut reports = Vec::new();
928
929 if let Some(instrument_id) = cmd.instrument_id {
930 let product_type = self.get_product_type_for_instrument(instrument_id);
931 let mut fetched = self
932 .http_client
933 .request_fill_reports(
934 self.core.account_id,
935 product_type,
936 Some(instrument_id),
937 start_ms,
938 end_ms,
939 None,
940 )
941 .await?;
942 reports.append(&mut fetched);
943 } else {
944 for product_type in self.product_types() {
945 let mut fetched = self
946 .http_client
947 .request_fill_reports(
948 self.core.account_id,
949 product_type,
950 None,
951 start_ms,
952 end_ms,
953 None,
954 )
955 .await?;
956 reports.append(&mut fetched);
957 }
958 }
959
960 if let Some(venue_order_id) = cmd.venue_order_id {
961 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
962 }
963
964 Ok(reports)
965 }
966
967 async fn generate_position_status_reports(
968 &self,
969 cmd: &GeneratePositionStatusReports,
970 ) -> anyhow::Result<Vec<PositionStatusReport>> {
971 let mut reports = Vec::new();
972
973 if let Some(instrument_id) = cmd.instrument_id {
974 let product_type = self.get_product_type_for_instrument(instrument_id);
975
976 if product_type != BybitProductType::Spot {
978 let mut fetched = self
979 .http_client
980 .request_position_status_reports(
981 self.core.account_id,
982 product_type,
983 Some(instrument_id),
984 )
985 .await?;
986 reports.append(&mut fetched);
987 }
988 } else {
989 for product_type in self.product_types() {
990 if product_type == BybitProductType::Spot {
992 continue;
993 }
994 let mut fetched = self
995 .http_client
996 .request_position_status_reports(self.core.account_id, product_type, None)
997 .await?;
998 reports.append(&mut fetched);
999 }
1000 }
1001
1002 Ok(reports)
1003 }
1004
1005 async fn generate_mass_status(
1006 &self,
1007 lookback_mins: Option<u64>,
1008 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1009 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
1010
1011 let ts_now = self.clock.get_time_ns();
1012
1013 let start = lookback_mins.map(|mins| {
1014 let lookback_ns = mins * 60 * 1_000_000_000;
1015 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1016 });
1017
1018 let order_cmd = GenerateOrderStatusReportsBuilder::default()
1019 .ts_init(ts_now)
1020 .open_only(false)
1021 .start(start)
1022 .build()
1023 .map_err(|e| anyhow::anyhow!("{e}"))?;
1024
1025 let fill_cmd = GenerateFillReportsBuilder::default()
1026 .ts_init(ts_now)
1027 .start(start)
1028 .build()
1029 .map_err(|e| anyhow::anyhow!("{e}"))?;
1030
1031 let position_cmd = GeneratePositionStatusReportsBuilder::default()
1032 .ts_init(ts_now)
1033 .start(start)
1034 .build()
1035 .map_err(|e| anyhow::anyhow!("{e}"))?;
1036
1037 let (order_reports, fill_reports, position_reports) = tokio::try_join!(
1038 self.generate_order_status_reports(&order_cmd),
1039 self.generate_fill_reports(fill_cmd),
1040 self.generate_position_status_reports(&position_cmd),
1041 )?;
1042
1043 log::info!("Received {} OrderStatusReports", order_reports.len());
1044 log::info!("Received {} FillReports", fill_reports.len());
1045 log::info!("Received {} PositionReports", position_reports.len());
1046
1047 let mut mass_status = ExecutionMassStatus::new(
1048 self.core.client_id,
1049 self.core.account_id,
1050 *BYBIT_VENUE,
1051 ts_now,
1052 None,
1053 );
1054
1055 mass_status.add_order_reports(order_reports);
1056 mass_status.add_fill_reports(fill_reports);
1057 mass_status.add_position_reports(position_reports);
1058
1059 Ok(Some(mass_status))
1060 }
1061
1062 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
1063 let order = {
1064 let cache = self.core.cache();
1065 let order = cache
1066 .order(&cmd.client_order_id)
1067 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
1068
1069 if order.is_closed() {
1070 log::warn!("Cannot submit closed order {}", order.client_order_id());
1071 return Ok(());
1072 }
1073
1074 order.clone()
1075 };
1076
1077 if let Err(e) = BybitOrderSide::try_from(order.order_side()) {
1079 self.emitter.emit_order_denied(&order, &e.to_string());
1080 return Ok(());
1081 }
1082
1083 if let Err(e) = Self::map_order_type(order.order_type()) {
1084 self.emitter.emit_order_denied(&order, &e.to_string());
1085 return Ok(());
1086 }
1087
1088 let tp_sl = match parse_bybit_tp_sl_params(cmd.params.as_ref()) {
1089 Ok(p) => p,
1090 Err(e) => {
1091 self.emitter.emit_order_denied(&order, &e.to_string());
1092 return Ok(());
1093 }
1094 };
1095
1096 if self.config.environment == BybitEnvironment::Demo
1097 && (tp_sl.has_tp_sl() || tp_sl.order_iv.is_some() || tp_sl.mmp.is_some())
1098 {
1099 self.emitter.emit_order_denied(
1100 &order,
1101 "Native TP/SL and option params are not supported in demo mode",
1102 );
1103 return Ok(());
1104 }
1105
1106 log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
1107 self.emitter.emit_order_submitted(&order);
1108
1109 let instrument_id = order.instrument_id();
1110 let product_type = self.get_product_type_for_instrument(instrument_id);
1111 let client_order_id = order.client_order_id();
1112 let strategy_id = order.strategy_id();
1113 let emitter = self.emitter.clone();
1114 let clock = self.clock;
1115
1116 self.dispatch_state.order_identities.insert(
1118 client_order_id,
1119 OrderIdentity {
1120 instrument_id,
1121 strategy_id,
1122 order_side: order.order_side(),
1123 order_type: order.order_type(),
1124 },
1125 );
1126
1127 let bybit_side =
1128 BybitOrderSide::try_from(order.order_side()).expect("order side validated above");
1129 let position_idx = self.resolve_position_idx(
1130 instrument_id,
1131 bybit_side,
1132 order.is_reduce_only(),
1133 tp_sl.position_idx,
1134 );
1135
1136 if self.config.environment == BybitEnvironment::Demo {
1137 let http_client = self.http_client.clone();
1138 let account_id = self.core.account_id;
1139 let order_side = order.order_side();
1140 let order_type = order.order_type();
1141 let quantity = order.quantity();
1142 let time_in_force = order.time_in_force();
1143 let price = order.price();
1144 let trigger_price = order.trigger_price();
1145 let post_only = order.is_post_only();
1146 let reduce_only = order.is_reduce_only();
1147 let is_quote_quantity = order.is_quote_quantity();
1148 let is_leverage = tp_sl.is_leverage;
1149
1150 self.spawn_task("submit_order_http", async move {
1151 let result = http_client
1152 .submit_order(
1153 account_id,
1154 product_type,
1155 instrument_id,
1156 client_order_id,
1157 order_side,
1158 order_type,
1159 quantity,
1160 Some(time_in_force),
1161 price,
1162 trigger_price,
1163 Some(post_only),
1164 reduce_only,
1165 is_quote_quantity,
1166 is_leverage,
1167 position_idx,
1168 )
1169 .await;
1170
1171 if let Err(e) = result {
1172 let ts_event = clock.get_time_ns();
1173 emitter.emit_order_rejected_event(
1174 strategy_id,
1175 instrument_id,
1176 client_order_id,
1177 &format!("submit-order-error: {e}"),
1178 ts_event,
1179 false,
1180 );
1181 anyhow::bail!("submit order failed: {e}");
1182 }
1183
1184 Ok(())
1185 });
1186
1187 return Ok(());
1188 }
1189
1190 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1191 let params =
1192 Self::build_ws_place_params(&order, product_type, raw_symbol, &tp_sl, position_idx)?;
1193
1194 let ws_trade = self.ws_trade.clone();
1195 let dispatch_state = Arc::clone(&self.dispatch_state);
1196
1197 self.spawn_task("submit_order", async move {
1198 match ws_trade.place_order(params).await {
1199 Ok(req_id) => {
1200 dispatch_state.pending_requests.insert(
1201 req_id,
1202 (vec![client_order_id], vec![None], PendingOperation::Place),
1203 );
1204 }
1205 Err(e) => {
1206 dispatch_state.order_identities.remove(&client_order_id);
1207 let ts_event = clock.get_time_ns();
1208 emitter.emit_order_rejected_event(
1209 strategy_id,
1210 instrument_id,
1211 client_order_id,
1212 &format!("submit-order-error: {e}"),
1213 ts_event,
1214 false,
1215 );
1216 anyhow::bail!("submit order failed: {e}");
1217 }
1218 }
1219
1220 Ok(())
1221 });
1222
1223 Ok(())
1224 }
1225
1226 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
1227 if cmd.order_list.client_order_ids.is_empty() {
1228 return Ok(());
1229 }
1230
1231 let tp_sl = match parse_bybit_tp_sl_params(cmd.params.as_ref()) {
1232 Ok(p) => p,
1233 Err(e) => {
1234 let cache = self.core.cache();
1235
1236 for cid in &cmd.order_list.client_order_ids {
1237 if let Some(order) = cache.order(cid) {
1238 self.emitter.emit_order_denied(order, &e.to_string());
1239 }
1240 }
1241 return Ok(());
1242 }
1243 };
1244
1245 if self.config.environment == BybitEnvironment::Demo
1246 && (tp_sl.has_tp_sl() || tp_sl.order_iv.is_some() || tp_sl.mmp.is_some())
1247 {
1248 let cache = self.core.cache();
1249
1250 for cid in &cmd.order_list.client_order_ids {
1251 if let Some(order) = cache.order(cid) {
1252 self.emitter.emit_order_denied(
1253 order,
1254 "Native TP/SL and option params are not supported in demo mode",
1255 );
1256 }
1257 }
1258 return Ok(());
1259 }
1260
1261 let instrument_id = cmd.instrument_id;
1262 let product_type = self.get_product_type_for_instrument(instrument_id);
1263 let strategy_id = cmd.strategy_id;
1264
1265 let mut valid_orders = Vec::with_capacity(cmd.order_list.client_order_ids.len());
1266 {
1267 let cache = self.core.cache();
1268 let mut deny_reason: Option<String> = None;
1269
1270 for cid in &cmd.order_list.client_order_ids {
1271 let Some(order) = cache.order(cid) else {
1272 deny_reason = Some(format!("Order not found in cache: {cid}"));
1273 break;
1274 };
1275
1276 if order.is_closed() {
1277 deny_reason = Some(format!("Cannot submit closed order {cid}"));
1278 break;
1279 }
1280
1281 if let Err(e) = BybitOrderSide::try_from(order.order_side()) {
1282 deny_reason = Some(e.to_string());
1283 break;
1284 }
1285
1286 if let Err(e) = Self::map_order_type(order.order_type()) {
1287 deny_reason = Some(e.to_string());
1288 break;
1289 }
1290
1291 valid_orders.push(order.clone());
1292 }
1293
1294 if let Some(reason) = deny_reason {
1296 for cid in &cmd.order_list.client_order_ids {
1297 if let Some(order) = cache.order(cid) {
1298 self.emitter.emit_order_denied(order, &reason);
1299 }
1300 }
1301 return Ok(());
1302 }
1303 }
1304
1305 if valid_orders.is_empty() {
1306 return Ok(());
1307 }
1308
1309 for order in &valid_orders {
1310 self.emitter.emit_order_submitted(order);
1311 self.dispatch_state.order_identities.insert(
1312 order.client_order_id(),
1313 OrderIdentity {
1314 instrument_id,
1315 strategy_id,
1316 order_side: order.order_side(),
1317 order_type: order.order_type(),
1318 },
1319 );
1320 }
1321
1322 let emitter = self.emitter.clone();
1323 let clock = self.clock;
1324
1325 if self.config.environment == BybitEnvironment::Demo {
1327 let http_client = self.http_client.clone();
1328 let account_id = self.core.account_id;
1329 let is_leverage = tp_sl.is_leverage;
1330
1331 let order_data: Vec<_> = valid_orders
1332 .iter()
1333 .map(|o| {
1334 let bybit_side = BybitOrderSide::try_from(o.order_side())
1335 .expect("order side validated above");
1336 let position_idx = self.resolve_position_idx(
1337 instrument_id,
1338 bybit_side,
1339 o.is_reduce_only(),
1340 tp_sl.position_idx,
1341 );
1342 (
1343 o.client_order_id(),
1344 o.order_side(),
1345 o.order_type(),
1346 o.quantity(),
1347 o.time_in_force(),
1348 o.price(),
1349 o.trigger_price(),
1350 o.is_post_only(),
1351 o.is_reduce_only(),
1352 o.is_quote_quantity(),
1353 position_idx,
1354 )
1355 })
1356 .collect();
1357
1358 self.spawn_task("submit_order_list_http", async move {
1359 for (
1360 cid,
1361 side,
1362 otype,
1363 qty,
1364 tif,
1365 price,
1366 trigger,
1367 post_only,
1368 reduce,
1369 quote_qty,
1370 position_idx,
1371 ) in order_data
1372 {
1373 if let Err(e) = http_client
1374 .submit_order(
1375 account_id,
1376 product_type,
1377 instrument_id,
1378 cid,
1379 side,
1380 otype,
1381 qty,
1382 Some(tif),
1383 price,
1384 trigger,
1385 Some(post_only),
1386 reduce,
1387 quote_qty,
1388 is_leverage,
1389 position_idx,
1390 )
1391 .await
1392 {
1393 let ts_event = clock.get_time_ns();
1394 emitter.emit_order_rejected_event(
1395 strategy_id,
1396 instrument_id,
1397 cid,
1398 &format!("submit-order-error: {e}"),
1399 ts_event,
1400 false,
1401 );
1402 }
1403 }
1404 Ok(())
1405 });
1406
1407 return Ok(());
1408 }
1409
1410 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1412
1413 let mut order_params = Vec::with_capacity(valid_orders.len());
1414 let mut client_order_ids = Vec::with_capacity(valid_orders.len());
1415
1416 for order in &valid_orders {
1417 let bybit_side =
1418 BybitOrderSide::try_from(order.order_side()).expect("order side validated above");
1419 let position_idx = self.resolve_position_idx(
1420 instrument_id,
1421 bybit_side,
1422 order.is_reduce_only(),
1423 tp_sl.position_idx,
1424 );
1425 let params =
1426 Self::build_ws_place_params(order, product_type, raw_symbol, &tp_sl, position_idx)
1427 .expect("validated above");
1428 order_params.push(params);
1429 client_order_ids.push(order.client_order_id());
1430 }
1431
1432 let ws_trade = self.ws_trade.clone();
1433 let dispatch_state = Arc::clone(&self.dispatch_state);
1434
1435 self.spawn_task("submit_order_list", async move {
1436 match ws_trade.batch_place_orders(order_params).await {
1437 Ok(req_ids) => {
1438 for (req_id, chunk_cids) in req_ids
1439 .into_iter()
1440 .zip(client_order_ids.chunks(20).map(|c| c.to_vec()))
1441 {
1442 let chunk_voids = vec![None; chunk_cids.len()];
1443 dispatch_state
1444 .pending_requests
1445 .insert(req_id, (chunk_cids, chunk_voids, PendingOperation::Place));
1446 }
1447 }
1448 Err(e) => {
1449 for cid in &client_order_ids {
1450 dispatch_state.order_identities.remove(cid);
1451 }
1452
1453 let ts_event = clock.get_time_ns();
1454
1455 for cid in &client_order_ids {
1456 emitter.emit_order_rejected_event(
1457 strategy_id,
1458 instrument_id,
1459 *cid,
1460 &format!("submit-order-list-error: {e}"),
1461 ts_event,
1462 false,
1463 );
1464 }
1465 anyhow::bail!("submit order list failed: {e}");
1466 }
1467 }
1468 Ok(())
1469 });
1470
1471 Ok(())
1472 }
1473
1474 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1475 let instrument_id = cmd.instrument_id;
1476 let product_type = self.get_product_type_for_instrument(instrument_id);
1477 let client_order_id = cmd.client_order_id;
1478 let strategy_id = cmd.strategy_id;
1479 let venue_order_id = cmd.venue_order_id;
1480 let emitter = self.emitter.clone();
1481 let clock = self.clock;
1482
1483 let has_order_iv = cmd
1484 .params
1485 .as_ref()
1486 .and_then(|p| p.get("order_iv"))
1487 .is_some();
1488
1489 if self.config.environment == BybitEnvironment::Demo && has_order_iv {
1490 let ts_event = self.clock.get_time_ns();
1491 self.emitter.emit_order_modify_rejected_event(
1492 strategy_id,
1493 instrument_id,
1494 client_order_id,
1495 venue_order_id,
1496 "Option params (order_iv) are not supported in demo mode",
1497 ts_event,
1498 );
1499 return Ok(());
1500 }
1501
1502 if self.config.environment == BybitEnvironment::Demo {
1503 let http_client = self.http_client.clone();
1504 let account_id = self.core.account_id;
1505 let quantity = cmd.quantity;
1506 let price = cmd.price;
1507
1508 self.spawn_task("modify_order_http", async move {
1509 let result = http_client
1510 .modify_order(
1511 account_id,
1512 product_type,
1513 instrument_id,
1514 Some(client_order_id),
1515 venue_order_id,
1516 quantity,
1517 price,
1518 )
1519 .await;
1520
1521 if let Err(e) = result {
1522 let ts_event = clock.get_time_ns();
1523 emitter.emit_order_modify_rejected_event(
1524 strategy_id,
1525 instrument_id,
1526 client_order_id,
1527 venue_order_id,
1528 &format!("modify-order-error: {e}"),
1529 ts_event,
1530 );
1531 anyhow::bail!("modify order failed: {e}");
1532 }
1533
1534 Ok(())
1535 });
1536
1537 return Ok(());
1538 }
1539
1540 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1541
1542 let order_iv = if let Some(value) = cmd.params.as_ref().and_then(|p| p.get("order_iv")) {
1543 match get_price_str(cmd.params.as_ref().unwrap(), "order_iv") {
1544 Some(s) => Some(s),
1545 None => {
1546 let ts_event = self.clock.get_time_ns();
1547 self.emitter.emit_order_modify_rejected_event(
1548 strategy_id,
1549 instrument_id,
1550 client_order_id,
1551 venue_order_id,
1552 &format!("invalid type for 'order_iv': {value}, expected string or number"),
1553 ts_event,
1554 );
1555 return Ok(());
1556 }
1557 }
1558 } else {
1559 None
1560 };
1561
1562 let params = BybitWsAmendOrderParams {
1563 category: product_type,
1564 symbol: Ustr::from(raw_symbol),
1565 order_id: cmd.venue_order_id.map(|v| v.to_string()),
1566 order_link_id: Some(cmd.client_order_id.to_string()),
1567 qty: cmd.quantity.map(|q| q.to_string()),
1568 price: cmd.price.map(|p| p.to_string()),
1569 trigger_price: None,
1570 take_profit: None,
1571 stop_loss: None,
1572 tp_trigger_by: None,
1573 sl_trigger_by: None,
1574 order_iv,
1575 };
1576
1577 let ws_trade = self.ws_trade.clone();
1578 let dispatch_state = Arc::clone(&self.dispatch_state);
1579
1580 self.spawn_task("modify_order", async move {
1581 match ws_trade.amend_order(params).await {
1582 Ok(req_id) => {
1583 dispatch_state.pending_requests.insert(
1584 req_id,
1585 (
1586 vec![client_order_id],
1587 vec![venue_order_id],
1588 PendingOperation::Amend,
1589 ),
1590 );
1591 }
1592 Err(e) => {
1593 let ts_event = clock.get_time_ns();
1594 emitter.emit_order_modify_rejected_event(
1595 strategy_id,
1596 instrument_id,
1597 client_order_id,
1598 venue_order_id,
1599 &format!("modify-order-error: {e}"),
1600 ts_event,
1601 );
1602 anyhow::bail!("modify order failed: {e}");
1603 }
1604 }
1605
1606 Ok(())
1607 });
1608
1609 Ok(())
1610 }
1611
1612 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1613 let instrument_id = cmd.instrument_id;
1614 let product_type = self.get_product_type_for_instrument(instrument_id);
1615 let client_order_id = cmd.client_order_id;
1616 let strategy_id = cmd.strategy_id;
1617 let venue_order_id = cmd.venue_order_id;
1618 let emitter = self.emitter.clone();
1619 let clock = self.clock;
1620
1621 if self.config.environment == BybitEnvironment::Demo {
1622 let http_client = self.http_client.clone();
1623 let account_id = self.core.account_id;
1624
1625 self.spawn_task("cancel_order_http", async move {
1626 let result = http_client
1627 .cancel_order(
1628 account_id,
1629 product_type,
1630 instrument_id,
1631 Some(client_order_id),
1632 venue_order_id,
1633 )
1634 .await;
1635
1636 if let Err(e) = result {
1637 let ts_event = clock.get_time_ns();
1638 emitter.emit_order_cancel_rejected_event(
1639 strategy_id,
1640 instrument_id,
1641 client_order_id,
1642 venue_order_id,
1643 &format!("cancel-order-error: {e}"),
1644 ts_event,
1645 );
1646 anyhow::bail!("cancel order failed: {e}");
1647 }
1648
1649 Ok(())
1650 });
1651
1652 return Ok(());
1653 }
1654
1655 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1656
1657 let params = BybitWsCancelOrderParams {
1658 category: product_type,
1659 symbol: Ustr::from(raw_symbol),
1660 order_id: cmd.venue_order_id.map(|v| v.to_string()),
1661 order_link_id: Some(cmd.client_order_id.to_string()),
1662 };
1663
1664 let ws_trade = self.ws_trade.clone();
1665 let dispatch_state = Arc::clone(&self.dispatch_state);
1666
1667 self.spawn_task("cancel_order", async move {
1668 match ws_trade.cancel_order(params).await {
1669 Ok(req_id) => {
1670 dispatch_state.pending_requests.insert(
1671 req_id,
1672 (
1673 vec![client_order_id],
1674 vec![venue_order_id],
1675 PendingOperation::Cancel,
1676 ),
1677 );
1678 }
1679 Err(e) => {
1680 let ts_event = clock.get_time_ns();
1681 emitter.emit_order_cancel_rejected_event(
1682 strategy_id,
1683 instrument_id,
1684 client_order_id,
1685 venue_order_id,
1686 &format!("cancel-order-error: {e}"),
1687 ts_event,
1688 );
1689 anyhow::bail!("cancel order failed: {e}");
1690 }
1691 }
1692
1693 Ok(())
1694 });
1695
1696 Ok(())
1697 }
1698
1699 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1700 if cmd.order_side != OrderSide::NoOrderSide {
1701 log::warn!(
1702 "Bybit does not support order_side filtering for cancel all orders; \
1703 ignoring order_side={:?} and canceling all orders",
1704 cmd.order_side,
1705 );
1706 }
1707
1708 let instrument_id = cmd.instrument_id;
1709 let product_type = self.get_product_type_for_instrument(instrument_id);
1710 let account_id = self.core.account_id;
1711 let http_client = self.http_client.clone();
1712
1713 self.spawn_task("cancel_all_orders", async move {
1714 match http_client
1715 .cancel_all_orders(account_id, product_type, instrument_id)
1716 .await
1717 {
1718 Ok(reports) => {
1719 for report in reports {
1720 log::debug!("Cancelled order: {report:?}");
1721 }
1722 }
1723 Err(e) => {
1724 log::error!("Failed to cancel all orders for {instrument_id}: {e}");
1725 }
1726 }
1727 Ok(())
1728 });
1729
1730 Ok(())
1731 }
1732
1733 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1734 if cmd.cancels.is_empty() {
1735 return Ok(());
1736 }
1737
1738 let instrument_id = cmd.instrument_id;
1739 let product_type = self.get_product_type_for_instrument(instrument_id);
1740
1741 if self.config.environment == BybitEnvironment::Demo {
1743 let http_client = self.http_client.clone();
1744 let account_id = self.core.account_id;
1745 let strategy_id = cmd.strategy_id;
1746 let emitter = self.emitter.clone();
1747 let clock = self.clock;
1748 let cancels: Vec<_> = cmd
1749 .cancels
1750 .iter()
1751 .map(|c| (c.client_order_id, c.venue_order_id))
1752 .collect();
1753
1754 self.spawn_task("batch_cancel_orders_http", async move {
1755 for (client_order_id, venue_order_id) in cancels {
1756 if let Err(e) = http_client
1757 .cancel_order(
1758 account_id,
1759 product_type,
1760 instrument_id,
1761 Some(client_order_id),
1762 venue_order_id,
1763 )
1764 .await
1765 {
1766 let ts_event = clock.get_time_ns();
1767 emitter.emit_order_cancel_rejected_event(
1768 strategy_id,
1769 instrument_id,
1770 client_order_id,
1771 venue_order_id,
1772 &format!("cancel-order-error: {e}"),
1773 ts_event,
1774 );
1775 }
1776 }
1777 Ok(())
1778 });
1779
1780 return Ok(());
1781 }
1782
1783 let raw_symbol = Ustr::from(extract_raw_symbol(instrument_id.symbol.as_str()));
1784
1785 let mut cancel_params = Vec::with_capacity(cmd.cancels.len());
1786 let client_order_ids: Vec<_> = cmd.cancels.iter().map(|c| c.client_order_id).collect();
1787 let venue_order_ids: Vec<_> = cmd.cancels.iter().map(|c| c.venue_order_id).collect();
1788 for cancel in &cmd.cancels {
1789 cancel_params.push(BybitWsCancelOrderParams {
1790 category: product_type,
1791 symbol: raw_symbol,
1792 order_id: cancel.venue_order_id.map(|v| v.to_string()),
1793 order_link_id: Some(cancel.client_order_id.to_string()),
1794 });
1795 }
1796
1797 let ws_trade = self.ws_trade.clone();
1798 let dispatch_state = Arc::clone(&self.dispatch_state);
1799
1800 self.spawn_task("batch_cancel_orders", async move {
1801 match ws_trade.batch_cancel_orders(cancel_params).await {
1802 Ok(req_ids) => {
1803 for (req_id, (chunk_cids, chunk_voids)) in req_ids.into_iter().zip(
1804 client_order_ids
1805 .chunks(20)
1806 .map(|c| c.to_vec())
1807 .zip(venue_order_ids.chunks(20).map(|c| c.to_vec())),
1808 ) {
1809 dispatch_state
1810 .pending_requests
1811 .insert(req_id, (chunk_cids, chunk_voids, PendingOperation::Cancel));
1812 }
1813 }
1814 Err(e) => {
1815 anyhow::bail!("batch cancel orders failed: {e}");
1816 }
1817 }
1818 Ok(())
1819 });
1820
1821 Ok(())
1822 }
1823}
1824
1825#[cfg(test)]
1826mod tests {
1827 use rstest::rstest;
1828
1829 use super::*;
1830 use crate::common::enums::BybitMarketUnit;
1831
1832 #[rstest]
1833 #[case::spot_market_base(
1834 BybitProductType::Spot,
1835 BybitOrderType::Market,
1836 false,
1837 Some(BybitMarketUnit::BaseCoin)
1838 )]
1839 #[case::spot_market_quote(
1840 BybitProductType::Spot,
1841 BybitOrderType::Market,
1842 true,
1843 Some(BybitMarketUnit::QuoteCoin)
1844 )]
1845 #[case::spot_limit(BybitProductType::Spot, BybitOrderType::Limit, true, None)]
1846 #[case::linear_market(BybitProductType::Linear, BybitOrderType::Market, true, None)]
1847 fn test_ws_params_market_unit(
1848 #[case] product_type: BybitProductType,
1849 #[case] order_type: BybitOrderType,
1850 #[case] is_quote_quantity: bool,
1851 #[case] expected: Option<BybitMarketUnit>,
1852 ) {
1853 let params = BybitWsPlaceOrderParams {
1854 category: product_type,
1855 symbol: ustr::Ustr::from("BTCUSDT"),
1856 side: BybitOrderSide::Buy,
1857 order_type,
1858 qty: "1.0".to_string(),
1859 is_leverage: None,
1860 market_unit: spot_market_unit(product_type, order_type, is_quote_quantity),
1861 price: None,
1862 time_in_force: None,
1863 order_link_id: None,
1864 reduce_only: None,
1865 close_on_trigger: None,
1866 trigger_price: None,
1867 trigger_by: None,
1868 trigger_direction: None,
1869 tpsl_mode: None,
1870 take_profit: None,
1871 stop_loss: None,
1872 tp_trigger_by: None,
1873 sl_trigger_by: None,
1874 sl_trigger_price: None,
1875 tp_trigger_price: None,
1876 sl_order_type: None,
1877 tp_order_type: None,
1878 sl_limit_price: None,
1879 tp_limit_price: None,
1880 order_iv: None,
1881 mmp: None,
1882 position_idx: None,
1883 };
1884
1885 assert_eq!(params.market_unit, expected);
1886 }
1887
1888 #[rstest]
1889 #[case::market(OrderType::Market, BybitOrderType::Market, false)]
1890 #[case::limit(OrderType::Limit, BybitOrderType::Limit, false)]
1891 #[case::stop_market(OrderType::StopMarket, BybitOrderType::Market, true)]
1892 #[case::stop_limit(OrderType::StopLimit, BybitOrderType::Limit, true)]
1893 #[case::market_if_touched(OrderType::MarketIfTouched, BybitOrderType::Market, true)]
1894 #[case::limit_if_touched(OrderType::LimitIfTouched, BybitOrderType::Limit, true)]
1895 fn test_map_order_type(
1896 #[case] input: OrderType,
1897 #[case] expected_type: BybitOrderType,
1898 #[case] expected_conditional: bool,
1899 ) {
1900 let (bybit_type, is_conditional) = BybitExecutionClient::map_order_type(input).unwrap();
1901 assert_eq!(bybit_type, expected_type);
1902 assert_eq!(is_conditional, expected_conditional);
1903 }
1904
1905 #[rstest]
1906 fn test_map_order_type_rejects_trailing_stop() {
1907 assert!(BybitExecutionClient::map_order_type(OrderType::TrailingStopMarket).is_err());
1908 }
1909
1910 #[rstest]
1911 #[case::buy_open(BybitOrderSide::Buy, false, BybitPositionIdx::BuyHedge)]
1912 #[case::sell_open(BybitOrderSide::Sell, false, BybitPositionIdx::SellHedge)]
1913 #[case::sell_close_long(BybitOrderSide::Sell, true, BybitPositionIdx::BuyHedge)]
1914 #[case::buy_close_short(BybitOrderSide::Buy, true, BybitPositionIdx::SellHedge)]
1915 fn test_resolve_position_idx_hedge_mode(
1916 #[case] side: BybitOrderSide,
1917 #[case] is_reduce_only: bool,
1918 #[case] expected: BybitPositionIdx,
1919 ) {
1920 let idx = resolve_position_idx(
1921 Some(BybitPositionMode::BothSides),
1922 side,
1923 is_reduce_only,
1924 None,
1925 );
1926 assert_eq!(idx, Some(expected));
1927 }
1928
1929 #[rstest]
1930 fn test_resolve_position_idx_one_way_mode() {
1931 let idx = resolve_position_idx(
1932 Some(BybitPositionMode::MergedSingle),
1933 BybitOrderSide::Buy,
1934 false,
1935 None,
1936 );
1937 assert_eq!(idx, Some(BybitPositionIdx::OneWay));
1938 }
1939
1940 #[rstest]
1941 fn test_resolve_position_idx_manual_override_wins() {
1942 let idx = resolve_position_idx(
1943 Some(BybitPositionMode::BothSides),
1944 BybitOrderSide::Buy,
1945 false,
1946 Some(BybitPositionIdx::SellHedge),
1947 );
1948 assert_eq!(idx, Some(BybitPositionIdx::SellHedge));
1949 }
1950
1951 #[rstest]
1952 fn test_resolve_position_idx_returns_none_when_unconfigured() {
1953 let idx = resolve_position_idx(None, BybitOrderSide::Buy, false, None);
1954 assert!(idx.is_none());
1955 }
1956
1957 #[rstest]
1958 #[case::linear("BTCUSDT-LINEAR", true)]
1959 #[case::inverse("BTCUSD-INVERSE", true)]
1960 #[case::spot("BTCUSDT-SPOT", false)]
1961 #[case::option("BTC-30JUN25-100000-C-OPTION", false)]
1962 fn test_parse_derivative_symbol_filters_product_type(
1963 #[case] symbol_str: &str,
1964 #[case] keeps: bool,
1965 ) {
1966 let result = parse_derivative_symbol(symbol_str);
1967 assert_eq!(result.is_some(), keeps);
1968 }
1969
1970 #[rstest]
1971 fn test_parse_derivative_symbol_rejects_malformed() {
1972 assert!(parse_derivative_symbol("not-a-real-symbol").is_none());
1973 }
1974
1975 #[rstest]
1976 #[case::matches_msg("Position mode has not been modified", "110025", true)]
1977 #[case::matches_code("retCode 110025: noop", "110025", true)]
1978 #[case::matches_msg_only("Already not been modified", "", true)]
1979 #[case::wrong_code("retCode 99999: other", "110025", false)]
1980 #[case::empty_no_modified_msg("retCode 99999", "", false)]
1981 fn test_is_unchanged_error(#[case] msg: &str, #[case] code: &str, #[case] expected: bool) {
1982 let err = anyhow::anyhow!("{msg}");
1983 assert_eq!(is_unchanged_error(&err, code), expected);
1984 }
1985
1986 #[rstest]
1987 #[case::matches("Margin needs to be equal to or greater than 0.5", true)]
1988 #[case::no_match("Some other error", false)]
1989 fn test_is_low_margin_error(#[case] msg: &str, #[case] expected: bool) {
1990 let err = anyhow::anyhow!("{msg}");
1991 assert_eq!(is_low_margin_error(&err), expected);
1992 }
1993}