Skip to main content

nautilus_bybit/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Bybit adapter.
17
18use std::{
19    future::Future,
20    sync::{Arc, Mutex},
21    time::{Duration, Instant},
22};
23
24use ahash::AHashMap;
25use anyhow::Context;
26use async_trait::async_trait;
27use futures_util::{StreamExt, pin_mut};
28use nautilus_common::{
29    clients::ExecutionClient,
30    live::{get_runtime, runner::get_exec_event_sender},
31    messages::execution::{
32        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
33        GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
34        GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
35        GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
36        SubmitOrderList,
37    },
38};
39use nautilus_core::{
40    MUTEX_POISONED, UnixNanos,
41    env::get_or_env_var,
42    time::{AtomicTime, get_atomic_clock_realtime},
43};
44use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
45use nautilus_model::{
46    accounts::AccountAny,
47    enums::{OmsType, OrderSide, OrderType, TimeInForce},
48    identifiers::{AccountId, ClientId, InstrumentId, Venue},
49    instruments::{Instrument, InstrumentAny},
50    orders::{Order, OrderAny},
51    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
52    types::{AccountBalance, MarginBalance, Price},
53};
54use tokio::task::JoinHandle;
55use ustr::Ustr;
56
57use crate::{
58    common::{
59        consts::BYBIT_VENUE,
60        credential::credential_env_vars,
61        enums::{
62            BybitAccountType, BybitEnvironment, BybitOrderSide, BybitOrderType, BybitPositionIdx,
63            BybitPositionMode, BybitProductType, BybitTimeInForce, BybitTpSlMode,
64            resolve_trigger_type,
65        },
66        parse::{
67            BybitTpSlParams, extract_raw_symbol, get_price_str, nanos_to_millis,
68            parse_bybit_tp_sl_params, spot_leverage, spot_market_unit, trigger_direction,
69        },
70        symbol::BybitSymbol,
71    },
72    config::BybitExecClientConfig,
73    http::client::BybitHttpClient,
74    websocket::{
75        client::BybitWebSocketClient,
76        dispatch::{OrderIdentity, PendingOperation, WsDispatchState, dispatch_ws_message},
77        messages::{BybitWsAmendOrderParams, BybitWsCancelOrderParams, BybitWsPlaceOrderParams},
78    },
79};
80
81/// Resolves the `positionIdx` to send with an order under a given position mode.
82///
83/// In hedge mode `positionIdx` identifies the position being affected (1 = long,
84/// 2 = short), not the trade direction. A reduce-only sell closes a long position
85/// and a reduce-only buy closes a short position. A manual override always wins.
86#[must_use]
87pub fn resolve_position_idx(
88    position_mode: Option<BybitPositionMode>,
89    order_side: BybitOrderSide,
90    is_reduce_only: bool,
91    manual_override: Option<BybitPositionIdx>,
92) -> Option<BybitPositionIdx> {
93    if manual_override.is_some() {
94        return manual_override;
95    }
96    let mode = position_mode?;
97    match mode {
98        BybitPositionMode::BothSides => Some(match (order_side, is_reduce_only) {
99            (BybitOrderSide::Buy, false) | (BybitOrderSide::Sell, true) => {
100                BybitPositionIdx::BuyHedge
101            }
102            (BybitOrderSide::Sell, false) | (BybitOrderSide::Buy, true) => {
103                BybitPositionIdx::SellHedge
104            }
105            (BybitOrderSide::Unknown, _) => BybitPositionIdx::OneWay,
106        }),
107        BybitPositionMode::MergedSingle => Some(BybitPositionIdx::OneWay),
108    }
109}
110
111fn parse_derivative_symbol(symbol_str: &str) -> Option<BybitSymbol> {
112    let symbol = match BybitSymbol::new(symbol_str) {
113        Ok(s) => s,
114        Err(e) => {
115            log::warn!("Failed to parse symbol {symbol_str}: {e}");
116            return None;
117        }
118    };
119    matches!(
120        symbol.product_type(),
121        BybitProductType::Linear | BybitProductType::Inverse
122    )
123    .then_some(symbol)
124}
125
126fn is_unchanged_error<E: std::fmt::Display>(err: &E, code: &str) -> bool {
127    let msg = err.to_string().to_lowercase();
128    if msg.contains("not been modified") {
129        return true;
130    }
131    !code.is_empty() && msg.contains(code)
132}
133
134fn is_low_margin_error<E: std::fmt::Display>(err: &E) -> bool {
135    err.to_string()
136        .contains("needs to be equal to or greater than")
137}
138
139/// Live execution client for Bybit.
140#[derive(Debug)]
141pub struct BybitExecutionClient {
142    core: ExecutionClientCore,
143    clock: &'static AtomicTime,
144    config: BybitExecClientConfig,
145    emitter: ExecutionEventEmitter,
146    http_client: BybitHttpClient,
147    ws_private: BybitWebSocketClient,
148    ws_trade: BybitWebSocketClient,
149    ws_private_stream_handle: Option<JoinHandle<()>>,
150    ws_trade_stream_handle: Option<JoinHandle<()>>,
151    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
152    instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
153    dispatch_state: Arc<WsDispatchState>,
154}
155
156impl BybitExecutionClient {
157    /// Creates a new [`BybitExecutionClient`].
158    ///
159    /// # Errors
160    ///
161    /// Returns an error if the client fails to initialize.
162    pub fn new(core: ExecutionClientCore, config: BybitExecClientConfig) -> anyhow::Result<Self> {
163        let (key_var, secret_var) = credential_env_vars(config.environment);
164        let api_key = get_or_env_var(config.api_key.clone(), key_var)?;
165        let api_secret = get_or_env_var(config.api_secret.clone(), secret_var)?;
166
167        let http_client = BybitHttpClient::with_credentials(
168            api_key.clone(),
169            api_secret.clone(),
170            Some(config.http_base_url()),
171            config.http_timeout_secs,
172            config.max_retries,
173            config.retry_delay_initial_ms,
174            config.retry_delay_max_ms,
175            config.recv_window_ms,
176            config.proxy_url.clone(),
177        )?;
178
179        let ws_private = BybitWebSocketClient::new_private(
180            config.environment,
181            Some(api_key.clone()),
182            Some(api_secret.clone()),
183            Some(config.ws_private_url()),
184            config.heartbeat_interval_secs,
185            config.transport_backend,
186            config.proxy_url.clone(),
187        );
188
189        let ws_trade = BybitWebSocketClient::new_trade(
190            config.environment,
191            Some(api_key),
192            Some(api_secret),
193            Some(config.ws_trade_url()),
194            config.heartbeat_interval_secs,
195            config.transport_backend,
196            config.proxy_url.clone(),
197        );
198
199        let clock = get_atomic_clock_realtime();
200        let emitter = ExecutionEventEmitter::new(
201            clock,
202            core.trader_id,
203            core.account_id,
204            core.account_type,
205            None,
206        );
207
208        Ok(Self {
209            core,
210            clock,
211            config,
212            emitter,
213            http_client,
214            ws_private,
215            ws_trade,
216            ws_private_stream_handle: None,
217            ws_trade_stream_handle: None,
218            pending_tasks: Mutex::new(Vec::new()),
219            instruments_cache: Arc::new(AHashMap::new()),
220            dispatch_state: Arc::new(WsDispatchState::default()),
221        })
222    }
223
224    fn product_types(&self) -> Vec<BybitProductType> {
225        if self.config.product_types.is_empty() {
226            vec![BybitProductType::Linear]
227        } else {
228            self.config.product_types.clone()
229        }
230    }
231
232    fn update_account_state(&self) {
233        let http_client = self.http_client.clone();
234        let account_id = self.core.account_id;
235        let emitter = self.emitter.clone();
236
237        self.spawn_task("query_account", async move {
238            let account_state = http_client
239                .request_account_state(BybitAccountType::Unified, account_id)
240                .await
241                .context("failed to request Bybit account state")?;
242            emitter.send_account_state(account_state);
243            Ok(())
244        });
245    }
246
247    fn spawn_task<F>(&self, description: &'static str, fut: F)
248    where
249        F: Future<Output = anyhow::Result<()>> + Send + 'static,
250    {
251        let runtime = get_runtime();
252        let handle = runtime.spawn(async move {
253            if let Err(e) = fut.await {
254                log::warn!("{description} failed: {e:?}");
255            }
256        });
257
258        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
259        tasks.retain(|handle| !handle.is_finished());
260        tasks.push(handle);
261    }
262
263    fn abort_pending_tasks(&self) {
264        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
265        for handle in tasks.drain(..) {
266            handle.abort();
267        }
268    }
269
270    /// Polls the cache until the account is registered or timeout is reached.
271    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
272        let account_id = self.core.account_id;
273
274        if self.core.cache().account(&account_id).is_some() {
275            log::info!("Account {account_id} registered");
276            return Ok(());
277        }
278
279        let start = Instant::now();
280        let timeout = Duration::from_secs_f64(timeout_secs);
281        let interval = Duration::from_millis(10);
282
283        loop {
284            tokio::time::sleep(interval).await;
285
286            if self.core.cache().account(&account_id).is_some() {
287                log::info!("Account {account_id} registered");
288                return Ok(());
289            }
290
291            if start.elapsed() >= timeout {
292                anyhow::bail!(
293                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
294                );
295            }
296        }
297    }
298
299    fn get_product_type_for_instrument(&self, instrument_id: InstrumentId) -> BybitProductType {
300        BybitProductType::from_suffix(instrument_id.symbol.as_str()).unwrap_or_else(|| {
301            log::warn!("No product-type suffix on {instrument_id}, defaulting to Linear");
302            BybitProductType::Linear
303        })
304    }
305
306    fn resolve_position_idx(
307        &self,
308        instrument_id: InstrumentId,
309        order_side: BybitOrderSide,
310        is_reduce_only: bool,
311        manual_override: Option<BybitPositionIdx>,
312    ) -> Option<BybitPositionIdx> {
313        let product_type = self.get_product_type_for_instrument(instrument_id);
314        if !matches!(
315            product_type,
316            BybitProductType::Linear | BybitProductType::Inverse
317        ) {
318            return None;
319        }
320        let mode = self
321            .config
322            .position_mode
323            .as_ref()
324            .and_then(|map| map.get(instrument_id.symbol.as_str()).copied());
325        resolve_position_idx(mode, order_side, is_reduce_only, manual_override)
326    }
327
328    async fn apply_account_configuration(&self) -> anyhow::Result<()> {
329        self.apply_leverages_setting().await;
330        self.apply_position_modes_setting().await;
331        self.apply_margin_mode_setting().await
332    }
333
334    async fn apply_leverages_setting(&self) {
335        let Some(leverages) = &self.config.futures_leverages else {
336            return;
337        };
338
339        for (symbol_str, leverage) in leverages {
340            self.apply_leverage_entry(symbol_str, *leverage).await;
341        }
342    }
343
344    async fn apply_leverage_entry(&self, symbol_str: &str, leverage: u32) {
345        let Some(symbol) = parse_derivative_symbol(symbol_str) else {
346            return;
347        };
348        let lev = leverage.to_string();
349        let result = self
350            .http_client
351            .set_leverage(symbol.product_type(), symbol.raw_symbol(), &lev, &lev)
352            .await;
353
354        match result {
355            Ok(_) => log::info!("Set leverage for {symbol_str} to {leverage}"),
356            Err(e) if is_unchanged_error(&e, "110043") => {
357                log::info!("Leverage already set for {symbol_str} to {leverage}");
358            }
359            Err(e) => log::error!("Failed to set leverage for {symbol_str}: {e}"),
360        }
361    }
362
363    async fn apply_position_modes_setting(&self) {
364        let Some(modes) = &self.config.position_mode else {
365            return;
366        };
367
368        for (symbol_str, mode) in modes {
369            self.apply_position_mode_entry(symbol_str, *mode).await;
370        }
371    }
372
373    async fn apply_position_mode_entry(&self, symbol_str: &str, mode: BybitPositionMode) {
374        let Some(symbol) = parse_derivative_symbol(symbol_str) else {
375            return;
376        };
377        let result = self
378            .http_client
379            .switch_mode(
380                symbol.product_type(),
381                mode,
382                Some(symbol.raw_symbol().to_string()),
383                None,
384            )
385            .await;
386
387        match result {
388            Ok(_) => log::info!("Set symbol `{symbol_str}` position mode to `{mode:?}`"),
389            Err(e) if is_unchanged_error(&e, "110025") => {
390                log::info!("Symbol `{symbol_str}` position mode already set to `{mode:?}`");
391            }
392            Err(e) => log::error!("Failed to set position mode for {symbol_str}: {e}"),
393        }
394    }
395
396    async fn apply_margin_mode_setting(&self) -> anyhow::Result<()> {
397        let Some(margin_mode) = self.config.margin_mode else {
398            return Ok(());
399        };
400
401        let result = self.http_client.set_margin_mode(margin_mode).await;
402
403        match result {
404            Ok(_) => {
405                log::info!("Set account margin mode to {margin_mode:?}");
406                Ok(())
407            }
408            Err(e) if is_unchanged_error(&e, "") => {
409                log::info!("Margin mode already set to {margin_mode:?}");
410                Ok(())
411            }
412            Err(e) if is_low_margin_error(&e) => {
413                log::warn!("Cannot set margin mode: {e}");
414                Ok(())
415            }
416            Err(e) => Err(anyhow::Error::from(e).context("failed to set margin mode")),
417        }
418    }
419
420    fn map_order_type(order_type: OrderType) -> anyhow::Result<(BybitOrderType, bool)> {
421        match order_type {
422            OrderType::Market => Ok((BybitOrderType::Market, false)),
423            OrderType::Limit => Ok((BybitOrderType::Limit, false)),
424            OrderType::StopMarket | OrderType::MarketIfTouched => {
425                Ok((BybitOrderType::Market, true))
426            }
427            OrderType::StopLimit | OrderType::LimitIfTouched => Ok((BybitOrderType::Limit, true)),
428            _ => anyhow::bail!("unsupported order type for Bybit: {order_type}"),
429        }
430    }
431
432    fn map_time_in_force(tif: TimeInForce, is_post_only: bool) -> BybitTimeInForce {
433        if is_post_only {
434            return BybitTimeInForce::PostOnly;
435        }
436
437        match tif {
438            TimeInForce::Gtc => BybitTimeInForce::Gtc,
439            TimeInForce::Ioc => BybitTimeInForce::Ioc,
440            TimeInForce::Fok => BybitTimeInForce::Fok,
441            _ => BybitTimeInForce::Gtc,
442        }
443    }
444
445    fn build_ws_place_params(
446        order: &OrderAny,
447        product_type: BybitProductType,
448        raw_symbol: &str,
449        tp_sl: &BybitTpSlParams,
450        position_idx: Option<BybitPositionIdx>,
451    ) -> anyhow::Result<BybitWsPlaceOrderParams> {
452        let bybit_side = BybitOrderSide::try_from(order.order_side())?;
453        let (bybit_order_type, is_conditional) = Self::map_order_type(order.order_type())?;
454        let has_tp_sl = tp_sl.has_tp_sl();
455        let trigger_dir = trigger_direction(order.order_type(), order.order_side(), is_conditional);
456
457        Ok(BybitWsPlaceOrderParams {
458            category: product_type,
459            symbol: Ustr::from(raw_symbol),
460            side: bybit_side,
461            order_type: bybit_order_type,
462            qty: order.quantity().to_string(),
463            is_leverage: spot_leverage(product_type, tp_sl.is_leverage),
464            market_unit: spot_market_unit(
465                product_type,
466                bybit_order_type,
467                order.is_quote_quantity(),
468            ),
469            price: order.price().map(|p: Price| p.to_string()),
470            time_in_force: if bybit_order_type == BybitOrderType::Market {
471                None
472            } else {
473                Some(Self::map_time_in_force(
474                    order.time_in_force(),
475                    order.is_post_only(),
476                ))
477            },
478            order_link_id: Some(order.client_order_id().to_string()),
479            reduce_only: if order.is_reduce_only() {
480                Some(true)
481            } else {
482                None
483            },
484            close_on_trigger: tp_sl.close_on_trigger,
485            trigger_price: order.trigger_price().map(|p: Price| p.to_string()),
486            trigger_by: if is_conditional {
487                Some(resolve_trigger_type(order.trigger_type()))
488            } else {
489                None
490            },
491            trigger_direction: trigger_dir.map(|d| d as i32),
492            tpsl_mode: if has_tp_sl {
493                Some(BybitTpSlMode::Full)
494            } else {
495                None
496            },
497            take_profit: tp_sl.take_profit.map(|p| p.to_string()),
498            stop_loss: tp_sl.stop_loss.map(|p| p.to_string()),
499            tp_trigger_by: tp_sl.tp_trigger_by.or(tp_sl
500                .take_profit
501                .map(|_| resolve_trigger_type(order.trigger_type()))),
502            sl_trigger_by: tp_sl.sl_trigger_by.or(tp_sl
503                .stop_loss
504                .map(|_| resolve_trigger_type(order.trigger_type()))),
505            sl_trigger_price: tp_sl.sl_trigger_price.clone(),
506            tp_trigger_price: tp_sl.tp_trigger_price.clone(),
507            sl_order_type: tp_sl.sl_order_type,
508            tp_order_type: tp_sl.tp_order_type,
509            sl_limit_price: tp_sl.sl_limit_price.clone(),
510            tp_limit_price: tp_sl.tp_limit_price.clone(),
511            order_iv: tp_sl.order_iv.clone(),
512            mmp: tp_sl.mmp,
513            position_idx,
514        })
515    }
516}
517
518#[async_trait(?Send)]
519impl ExecutionClient for BybitExecutionClient {
520    fn is_connected(&self) -> bool {
521        self.core.is_connected()
522    }
523
524    fn client_id(&self) -> ClientId {
525        self.core.client_id
526    }
527
528    fn account_id(&self) -> AccountId {
529        self.core.account_id
530    }
531
532    fn venue(&self) -> Venue {
533        *BYBIT_VENUE
534    }
535
536    fn oms_type(&self) -> OmsType {
537        self.core.oms_type
538    }
539
540    fn get_account(&self) -> Option<AccountAny> {
541        self.core.cache().account(&self.core.account_id).cloned()
542    }
543
544    async fn connect(&mut self) -> anyhow::Result<()> {
545        if self.core.is_connected() {
546            return Ok(());
547        }
548
549        // Reset after a prior disconnect so REST calls are not short-circuited
550        self.http_client.reset_cancellation_token();
551
552        let product_types = self.product_types();
553
554        if !self.core.instruments_initialized() {
555            let mut all_instruments = Vec::new();
556
557            for product_type in &product_types {
558                let instruments = self
559                    .http_client
560                    .request_instruments(*product_type, None, None)
561                    .await
562                    .with_context(|| {
563                        format!("failed to request Bybit instruments for {product_type:?}")
564                    })?;
565
566                if instruments.is_empty() {
567                    log::warn!("No instruments returned for {product_type:?}");
568                    continue;
569                }
570
571                log::info!("Loaded {} {product_type:?} instruments", instruments.len());
572
573                self.http_client.cache_instruments(&instruments);
574                all_instruments.extend(instruments);
575            }
576
577            if !all_instruments.is_empty() {
578                let mut instruments_map = AHashMap::new();
579                for instrument in &all_instruments {
580                    instruments_map.insert(instrument.id().symbol.inner(), instrument.clone());
581                }
582                self.instruments_cache = Arc::new(instruments_map);
583            }
584            self.core.set_instruments_initialized();
585        }
586
587        self.ws_private.set_account_id(self.core.account_id);
588        self.ws_trade.set_account_id(self.core.account_id);
589
590        self.ws_private.connect().await?;
591        self.ws_private.wait_until_active(10.0).await?;
592        log::info!("Connected to private WebSocket");
593
594        if self.ws_private_stream_handle.is_none() {
595            let stream = self.ws_private.stream();
596            let emitter = self.emitter.clone();
597            let account_id = self.core.account_id;
598            let instruments = Arc::clone(&self.instruments_cache);
599            let state = Arc::clone(&self.dispatch_state);
600            let clock = self.clock;
601
602            let handle = get_runtime().spawn(async move {
603                pin_mut!(stream);
604                while let Some(message) = stream.next().await {
605                    dispatch_ws_message(
606                        &message,
607                        &emitter,
608                        &state,
609                        account_id,
610                        &instruments,
611                        clock,
612                    );
613                }
614            });
615            self.ws_private_stream_handle = Some(handle);
616        }
617
618        // Demo environment does not support Trade WebSocket API
619        if self.config.environment == BybitEnvironment::Demo {
620            log::warn!("Demo mode: Trade WebSocket not available, orders use HTTP REST API");
621        } else {
622            self.ws_trade.connect().await?;
623            self.ws_trade.wait_until_active(10.0).await?;
624            log::info!("Connected to trade WebSocket");
625
626            if self.ws_trade_stream_handle.is_none() {
627                let stream = self.ws_trade.stream();
628                let emitter = self.emitter.clone();
629                let account_id = self.core.account_id;
630                let instruments = Arc::clone(&self.instruments_cache);
631                let state = Arc::clone(&self.dispatch_state);
632                let clock = self.clock;
633
634                let handle = get_runtime().spawn(async move {
635                    pin_mut!(stream);
636                    while let Some(message) = stream.next().await {
637                        dispatch_ws_message(
638                            &message,
639                            &emitter,
640                            &state,
641                            account_id,
642                            &instruments,
643                            clock,
644                        );
645                    }
646                });
647                self.ws_trade_stream_handle = Some(handle);
648            }
649        }
650
651        self.ws_private.subscribe_orders().await?;
652        self.ws_private.subscribe_executions().await?;
653        self.ws_private.subscribe_positions().await?;
654        self.ws_private.subscribe_wallet().await?;
655
656        self.apply_account_configuration().await?;
657
658        let account_state = self
659            .http_client
660            .request_account_state(BybitAccountType::Unified, self.core.account_id)
661            .await
662            .context("failed to request Bybit account state")?;
663
664        if !account_state.balances.is_empty() {
665            log::info!(
666                "Received account state with {} balance(s)",
667                account_state.balances.len()
668            );
669        }
670        self.emitter.send_account_state(account_state);
671
672        self.await_account_registered(30.0).await?;
673
674        self.core.set_connected();
675        log::info!("Connected: client_id={}", self.core.client_id);
676        Ok(())
677    }
678
679    async fn disconnect(&mut self) -> anyhow::Result<()> {
680        if self.core.is_disconnected() {
681            return Ok(());
682        }
683
684        self.abort_pending_tasks();
685        self.http_client.cancel_all_requests();
686
687        if let Err(e) = self.ws_private.close().await {
688            log::warn!("Error closing private websocket: {e:?}");
689        }
690
691        if let Err(e) = self.ws_trade.close().await {
692            log::warn!("Error closing trade websocket: {e:?}");
693        }
694
695        if let Some(handle) = self.ws_private_stream_handle.take() {
696            handle.abort();
697        }
698
699        if let Some(handle) = self.ws_trade_stream_handle.take() {
700            handle.abort();
701        }
702
703        self.core.set_disconnected();
704        log::info!("Disconnected: client_id={}", self.core.client_id);
705        Ok(())
706    }
707
708    fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
709        self.update_account_state();
710        Ok(())
711    }
712
713    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
714        let instrument_id = cmd.instrument_id;
715        let product_type = self.get_product_type_for_instrument(instrument_id);
716        let client_order_id = cmd.client_order_id;
717        let venue_order_id = cmd.venue_order_id;
718        let account_id = self.core.account_id;
719        let http_client = self.http_client.clone();
720        let emitter = self.emitter.clone();
721
722        self.spawn_task("query_order", async move {
723            match http_client
724                .query_order(
725                    account_id,
726                    product_type,
727                    instrument_id,
728                    Some(client_order_id),
729                    venue_order_id,
730                )
731                .await
732            {
733                Ok(Some(report)) => {
734                    emitter.send_order_status_report(report);
735                }
736                Ok(None) => {
737                    log::warn!("Order not found: client_order_id={client_order_id}, venue_order_id={venue_order_id:?}");
738                }
739                Err(e) => {
740                    log::error!("Failed to query order: {e}");
741                }
742            }
743            Ok(())
744        });
745
746        Ok(())
747    }
748
749    fn generate_account_state(
750        &self,
751        balances: Vec<AccountBalance>,
752        margins: Vec<MarginBalance>,
753        reported: bool,
754        ts_event: UnixNanos,
755    ) -> anyhow::Result<()> {
756        self.emitter
757            .emit_account_state(balances, margins, reported, ts_event);
758        Ok(())
759    }
760
761    fn start(&mut self) -> anyhow::Result<()> {
762        if self.core.is_started() {
763            return Ok(());
764        }
765
766        let sender = get_exec_event_sender();
767        self.emitter.set_sender(sender);
768        self.core.set_started();
769
770        let http_client = self.http_client.clone();
771        let product_types = self.config.product_types.clone();
772
773        get_runtime().spawn(async move {
774            let mut all_instruments = Vec::new();
775
776            for product_type in product_types {
777                match http_client
778                    .request_instruments(product_type, None, None)
779                    .await
780                {
781                    Ok(instruments) => {
782                        if instruments.is_empty() {
783                            log::warn!("No instruments returned for {product_type:?}");
784                            continue;
785                        }
786                        http_client.cache_instruments(&instruments);
787                        all_instruments.extend(instruments);
788                    }
789                    Err(e) => {
790                        log::error!("Failed to request instruments for {product_type:?}: {e}");
791                    }
792                }
793            }
794
795            if all_instruments.is_empty() {
796                log::warn!(
797                    "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
798                );
799            } else {
800                log::info!("Instruments initialized: count={}", all_instruments.len());
801            }
802        });
803
804        log::info!(
805            "Started: client_id={}, account_id={}, account_type={:?}, product_types={:?}, environment={:?}, proxy_url={:?}",
806            self.core.client_id,
807            self.core.account_id,
808            self.core.account_type,
809            self.config.product_types,
810            self.config.environment,
811            self.config.proxy_url,
812        );
813        Ok(())
814    }
815
816    fn stop(&mut self) -> anyhow::Result<()> {
817        if self.core.is_stopped() {
818            return Ok(());
819        }
820
821        self.core.set_stopped();
822        self.core.set_disconnected();
823
824        if let Some(handle) = self.ws_private_stream_handle.take() {
825            handle.abort();
826        }
827
828        if let Some(handle) = self.ws_trade_stream_handle.take() {
829            handle.abort();
830        }
831        self.abort_pending_tasks();
832        log::info!("Stopped: client_id={}", self.core.client_id);
833        Ok(())
834    }
835
836    async fn generate_order_status_report(
837        &self,
838        cmd: &GenerateOrderStatusReport,
839    ) -> anyhow::Result<Option<OrderStatusReport>> {
840        let Some(instrument_id) = cmd.instrument_id else {
841            log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
842            return Ok(None);
843        };
844
845        let product_type = self.get_product_type_for_instrument(instrument_id);
846
847        let mut reports = self
848            .http_client
849            .request_order_status_reports(
850                self.core.account_id,
851                product_type,
852                Some(instrument_id),
853                false,
854                None,
855                None,
856                None,
857            )
858            .await?;
859
860        if let Some(client_order_id) = cmd.client_order_id {
861            reports.retain(|report| report.client_order_id == Some(client_order_id));
862        }
863
864        if let Some(venue_order_id) = cmd.venue_order_id {
865            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
866        }
867
868        Ok(reports.into_iter().next())
869    }
870
871    async fn generate_order_status_reports(
872        &self,
873        cmd: &GenerateOrderStatusReports,
874    ) -> anyhow::Result<Vec<OrderStatusReport>> {
875        let mut reports = Vec::new();
876
877        if let Some(instrument_id) = cmd.instrument_id {
878            let product_type = self.get_product_type_for_instrument(instrument_id);
879            let mut fetched = self
880                .http_client
881                .request_order_status_reports(
882                    self.core.account_id,
883                    product_type,
884                    Some(instrument_id),
885                    cmd.open_only,
886                    None,
887                    None,
888                    None,
889                )
890                .await?;
891            reports.append(&mut fetched);
892        } else {
893            for product_type in self.product_types() {
894                let mut fetched = self
895                    .http_client
896                    .request_order_status_reports(
897                        self.core.account_id,
898                        product_type,
899                        None,
900                        cmd.open_only,
901                        None,
902                        None,
903                        None,
904                    )
905                    .await?;
906                reports.append(&mut fetched);
907            }
908        }
909
910        if let Some(start) = cmd.start {
911            reports.retain(|r| r.ts_last >= start);
912        }
913
914        if let Some(end) = cmd.end {
915            reports.retain(|r| r.ts_last <= end);
916        }
917
918        Ok(reports)
919    }
920
921    async fn generate_fill_reports(
922        &self,
923        cmd: GenerateFillReports,
924    ) -> anyhow::Result<Vec<FillReport>> {
925        let start_ms = nanos_to_millis(cmd.start);
926        let end_ms = nanos_to_millis(cmd.end);
927        let mut reports = Vec::new();
928
929        if let Some(instrument_id) = cmd.instrument_id {
930            let product_type = self.get_product_type_for_instrument(instrument_id);
931            let mut fetched = self
932                .http_client
933                .request_fill_reports(
934                    self.core.account_id,
935                    product_type,
936                    Some(instrument_id),
937                    start_ms,
938                    end_ms,
939                    None,
940                )
941                .await?;
942            reports.append(&mut fetched);
943        } else {
944            for product_type in self.product_types() {
945                let mut fetched = self
946                    .http_client
947                    .request_fill_reports(
948                        self.core.account_id,
949                        product_type,
950                        None,
951                        start_ms,
952                        end_ms,
953                        None,
954                    )
955                    .await?;
956                reports.append(&mut fetched);
957            }
958        }
959
960        if let Some(venue_order_id) = cmd.venue_order_id {
961            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
962        }
963
964        Ok(reports)
965    }
966
967    async fn generate_position_status_reports(
968        &self,
969        cmd: &GeneratePositionStatusReports,
970    ) -> anyhow::Result<Vec<PositionStatusReport>> {
971        let mut reports = Vec::new();
972
973        if let Some(instrument_id) = cmd.instrument_id {
974            let product_type = self.get_product_type_for_instrument(instrument_id);
975
976            // Skip Spot - positions API only supports derivatives
977            if product_type != BybitProductType::Spot {
978                let mut fetched = self
979                    .http_client
980                    .request_position_status_reports(
981                        self.core.account_id,
982                        product_type,
983                        Some(instrument_id),
984                    )
985                    .await?;
986                reports.append(&mut fetched);
987            }
988        } else {
989            for product_type in self.product_types() {
990                // Skip Spot - positions API only supports derivatives
991                if product_type == BybitProductType::Spot {
992                    continue;
993                }
994                let mut fetched = self
995                    .http_client
996                    .request_position_status_reports(self.core.account_id, product_type, None)
997                    .await?;
998                reports.append(&mut fetched);
999            }
1000        }
1001
1002        Ok(reports)
1003    }
1004
1005    async fn generate_mass_status(
1006        &self,
1007        lookback_mins: Option<u64>,
1008    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1009        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
1010
1011        let ts_now = self.clock.get_time_ns();
1012
1013        let start = lookback_mins.map(|mins| {
1014            let lookback_ns = mins * 60 * 1_000_000_000;
1015            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1016        });
1017
1018        let order_cmd = GenerateOrderStatusReportsBuilder::default()
1019            .ts_init(ts_now)
1020            .open_only(false)
1021            .start(start)
1022            .build()
1023            .map_err(|e| anyhow::anyhow!("{e}"))?;
1024
1025        let fill_cmd = GenerateFillReportsBuilder::default()
1026            .ts_init(ts_now)
1027            .start(start)
1028            .build()
1029            .map_err(|e| anyhow::anyhow!("{e}"))?;
1030
1031        let position_cmd = GeneratePositionStatusReportsBuilder::default()
1032            .ts_init(ts_now)
1033            .start(start)
1034            .build()
1035            .map_err(|e| anyhow::anyhow!("{e}"))?;
1036
1037        let (order_reports, fill_reports, position_reports) = tokio::try_join!(
1038            self.generate_order_status_reports(&order_cmd),
1039            self.generate_fill_reports(fill_cmd),
1040            self.generate_position_status_reports(&position_cmd),
1041        )?;
1042
1043        log::info!("Received {} OrderStatusReports", order_reports.len());
1044        log::info!("Received {} FillReports", fill_reports.len());
1045        log::info!("Received {} PositionReports", position_reports.len());
1046
1047        let mut mass_status = ExecutionMassStatus::new(
1048            self.core.client_id,
1049            self.core.account_id,
1050            *BYBIT_VENUE,
1051            ts_now,
1052            None,
1053        );
1054
1055        mass_status.add_order_reports(order_reports);
1056        mass_status.add_fill_reports(fill_reports);
1057        mass_status.add_position_reports(position_reports);
1058
1059        Ok(Some(mass_status))
1060    }
1061
1062    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
1063        let order = {
1064            let cache = self.core.cache();
1065            let order = cache
1066                .order(&cmd.client_order_id)
1067                .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
1068
1069            if order.is_closed() {
1070                log::warn!("Cannot submit closed order {}", order.client_order_id());
1071                return Ok(());
1072            }
1073
1074            order.clone()
1075        };
1076
1077        // Validate order params before emitting submitted event
1078        if let Err(e) = BybitOrderSide::try_from(order.order_side()) {
1079            self.emitter.emit_order_denied(&order, &e.to_string());
1080            return Ok(());
1081        }
1082
1083        if let Err(e) = Self::map_order_type(order.order_type()) {
1084            self.emitter.emit_order_denied(&order, &e.to_string());
1085            return Ok(());
1086        }
1087
1088        let tp_sl = match parse_bybit_tp_sl_params(cmd.params.as_ref()) {
1089            Ok(p) => p,
1090            Err(e) => {
1091                self.emitter.emit_order_denied(&order, &e.to_string());
1092                return Ok(());
1093            }
1094        };
1095
1096        if self.config.environment == BybitEnvironment::Demo
1097            && (tp_sl.has_tp_sl() || tp_sl.order_iv.is_some() || tp_sl.mmp.is_some())
1098        {
1099            self.emitter.emit_order_denied(
1100                &order,
1101                "Native TP/SL and option params are not supported in demo mode",
1102            );
1103            return Ok(());
1104        }
1105
1106        log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
1107        self.emitter.emit_order_submitted(&order);
1108
1109        let instrument_id = order.instrument_id();
1110        let product_type = self.get_product_type_for_instrument(instrument_id);
1111        let client_order_id = order.client_order_id();
1112        let strategy_id = order.strategy_id();
1113        let emitter = self.emitter.clone();
1114        let clock = self.clock;
1115
1116        // Store identity for WS dispatch to produce proper order events
1117        self.dispatch_state.order_identities.insert(
1118            client_order_id,
1119            OrderIdentity {
1120                instrument_id,
1121                strategy_id,
1122                order_side: order.order_side(),
1123                order_type: order.order_type(),
1124            },
1125        );
1126
1127        let bybit_side =
1128            BybitOrderSide::try_from(order.order_side()).expect("order side validated above");
1129        let position_idx = self.resolve_position_idx(
1130            instrument_id,
1131            bybit_side,
1132            order.is_reduce_only(),
1133            tp_sl.position_idx,
1134        );
1135
1136        if self.config.environment == BybitEnvironment::Demo {
1137            let http_client = self.http_client.clone();
1138            let account_id = self.core.account_id;
1139            let order_side = order.order_side();
1140            let order_type = order.order_type();
1141            let quantity = order.quantity();
1142            let time_in_force = order.time_in_force();
1143            let price = order.price();
1144            let trigger_price = order.trigger_price();
1145            let post_only = order.is_post_only();
1146            let reduce_only = order.is_reduce_only();
1147            let is_quote_quantity = order.is_quote_quantity();
1148            let is_leverage = tp_sl.is_leverage;
1149
1150            self.spawn_task("submit_order_http", async move {
1151                let result = http_client
1152                    .submit_order(
1153                        account_id,
1154                        product_type,
1155                        instrument_id,
1156                        client_order_id,
1157                        order_side,
1158                        order_type,
1159                        quantity,
1160                        Some(time_in_force),
1161                        price,
1162                        trigger_price,
1163                        Some(post_only),
1164                        reduce_only,
1165                        is_quote_quantity,
1166                        is_leverage,
1167                        position_idx,
1168                    )
1169                    .await;
1170
1171                if let Err(e) = result {
1172                    let ts_event = clock.get_time_ns();
1173                    emitter.emit_order_rejected_event(
1174                        strategy_id,
1175                        instrument_id,
1176                        client_order_id,
1177                        &format!("submit-order-error: {e}"),
1178                        ts_event,
1179                        false,
1180                    );
1181                    anyhow::bail!("submit order failed: {e}");
1182                }
1183
1184                Ok(())
1185            });
1186
1187            return Ok(());
1188        }
1189
1190        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1191        let params =
1192            Self::build_ws_place_params(&order, product_type, raw_symbol, &tp_sl, position_idx)?;
1193
1194        let ws_trade = self.ws_trade.clone();
1195        let dispatch_state = Arc::clone(&self.dispatch_state);
1196
1197        self.spawn_task("submit_order", async move {
1198            match ws_trade.place_order(params).await {
1199                Ok(req_id) => {
1200                    dispatch_state.pending_requests.insert(
1201                        req_id,
1202                        (vec![client_order_id], vec![None], PendingOperation::Place),
1203                    );
1204                }
1205                Err(e) => {
1206                    dispatch_state.order_identities.remove(&client_order_id);
1207                    let ts_event = clock.get_time_ns();
1208                    emitter.emit_order_rejected_event(
1209                        strategy_id,
1210                        instrument_id,
1211                        client_order_id,
1212                        &format!("submit-order-error: {e}"),
1213                        ts_event,
1214                        false,
1215                    );
1216                    anyhow::bail!("submit order failed: {e}");
1217                }
1218            }
1219
1220            Ok(())
1221        });
1222
1223        Ok(())
1224    }
1225
1226    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
1227        if cmd.order_list.client_order_ids.is_empty() {
1228            return Ok(());
1229        }
1230
1231        let tp_sl = match parse_bybit_tp_sl_params(cmd.params.as_ref()) {
1232            Ok(p) => p,
1233            Err(e) => {
1234                let cache = self.core.cache();
1235
1236                for cid in &cmd.order_list.client_order_ids {
1237                    if let Some(order) = cache.order(cid) {
1238                        self.emitter.emit_order_denied(order, &e.to_string());
1239                    }
1240                }
1241                return Ok(());
1242            }
1243        };
1244
1245        if self.config.environment == BybitEnvironment::Demo
1246            && (tp_sl.has_tp_sl() || tp_sl.order_iv.is_some() || tp_sl.mmp.is_some())
1247        {
1248            let cache = self.core.cache();
1249
1250            for cid in &cmd.order_list.client_order_ids {
1251                if let Some(order) = cache.order(cid) {
1252                    self.emitter.emit_order_denied(
1253                        order,
1254                        "Native TP/SL and option params are not supported in demo mode",
1255                    );
1256                }
1257            }
1258            return Ok(());
1259        }
1260
1261        let instrument_id = cmd.instrument_id;
1262        let product_type = self.get_product_type_for_instrument(instrument_id);
1263        let strategy_id = cmd.strategy_id;
1264
1265        let mut valid_orders = Vec::with_capacity(cmd.order_list.client_order_ids.len());
1266        {
1267            let cache = self.core.cache();
1268            let mut deny_reason: Option<String> = None;
1269
1270            for cid in &cmd.order_list.client_order_ids {
1271                let Some(order) = cache.order(cid) else {
1272                    deny_reason = Some(format!("Order not found in cache: {cid}"));
1273                    break;
1274                };
1275
1276                if order.is_closed() {
1277                    deny_reason = Some(format!("Cannot submit closed order {cid}"));
1278                    break;
1279                }
1280
1281                if let Err(e) = BybitOrderSide::try_from(order.order_side()) {
1282                    deny_reason = Some(e.to_string());
1283                    break;
1284                }
1285
1286                if let Err(e) = Self::map_order_type(order.order_type()) {
1287                    deny_reason = Some(e.to_string());
1288                    break;
1289                }
1290
1291                valid_orders.push(order.clone());
1292            }
1293
1294            // Deny entire list if any order fails validation
1295            if let Some(reason) = deny_reason {
1296                for cid in &cmd.order_list.client_order_ids {
1297                    if let Some(order) = cache.order(cid) {
1298                        self.emitter.emit_order_denied(order, &reason);
1299                    }
1300                }
1301                return Ok(());
1302            }
1303        }
1304
1305        if valid_orders.is_empty() {
1306            return Ok(());
1307        }
1308
1309        for order in &valid_orders {
1310            self.emitter.emit_order_submitted(order);
1311            self.dispatch_state.order_identities.insert(
1312                order.client_order_id(),
1313                OrderIdentity {
1314                    instrument_id,
1315                    strategy_id,
1316                    order_side: order.order_side(),
1317                    order_type: order.order_type(),
1318                },
1319            );
1320        }
1321
1322        let emitter = self.emitter.clone();
1323        let clock = self.clock;
1324
1325        // Demo mode: submit individually via HTTP
1326        if self.config.environment == BybitEnvironment::Demo {
1327            let http_client = self.http_client.clone();
1328            let account_id = self.core.account_id;
1329            let is_leverage = tp_sl.is_leverage;
1330
1331            let order_data: Vec<_> = valid_orders
1332                .iter()
1333                .map(|o| {
1334                    let bybit_side = BybitOrderSide::try_from(o.order_side())
1335                        .expect("order side validated above");
1336                    let position_idx = self.resolve_position_idx(
1337                        instrument_id,
1338                        bybit_side,
1339                        o.is_reduce_only(),
1340                        tp_sl.position_idx,
1341                    );
1342                    (
1343                        o.client_order_id(),
1344                        o.order_side(),
1345                        o.order_type(),
1346                        o.quantity(),
1347                        o.time_in_force(),
1348                        o.price(),
1349                        o.trigger_price(),
1350                        o.is_post_only(),
1351                        o.is_reduce_only(),
1352                        o.is_quote_quantity(),
1353                        position_idx,
1354                    )
1355                })
1356                .collect();
1357
1358            self.spawn_task("submit_order_list_http", async move {
1359                for (
1360                    cid,
1361                    side,
1362                    otype,
1363                    qty,
1364                    tif,
1365                    price,
1366                    trigger,
1367                    post_only,
1368                    reduce,
1369                    quote_qty,
1370                    position_idx,
1371                ) in order_data
1372                {
1373                    if let Err(e) = http_client
1374                        .submit_order(
1375                            account_id,
1376                            product_type,
1377                            instrument_id,
1378                            cid,
1379                            side,
1380                            otype,
1381                            qty,
1382                            Some(tif),
1383                            price,
1384                            trigger,
1385                            Some(post_only),
1386                            reduce,
1387                            quote_qty,
1388                            is_leverage,
1389                            position_idx,
1390                        )
1391                        .await
1392                    {
1393                        let ts_event = clock.get_time_ns();
1394                        emitter.emit_order_rejected_event(
1395                            strategy_id,
1396                            instrument_id,
1397                            cid,
1398                            &format!("submit-order-error: {e}"),
1399                            ts_event,
1400                            false,
1401                        );
1402                    }
1403                }
1404                Ok(())
1405            });
1406
1407            return Ok(());
1408        }
1409
1410        // Live mode: batch submit via WebSocket
1411        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1412
1413        let mut order_params = Vec::with_capacity(valid_orders.len());
1414        let mut client_order_ids = Vec::with_capacity(valid_orders.len());
1415
1416        for order in &valid_orders {
1417            let bybit_side =
1418                BybitOrderSide::try_from(order.order_side()).expect("order side validated above");
1419            let position_idx = self.resolve_position_idx(
1420                instrument_id,
1421                bybit_side,
1422                order.is_reduce_only(),
1423                tp_sl.position_idx,
1424            );
1425            let params =
1426                Self::build_ws_place_params(order, product_type, raw_symbol, &tp_sl, position_idx)
1427                    .expect("validated above");
1428            order_params.push(params);
1429            client_order_ids.push(order.client_order_id());
1430        }
1431
1432        let ws_trade = self.ws_trade.clone();
1433        let dispatch_state = Arc::clone(&self.dispatch_state);
1434
1435        self.spawn_task("submit_order_list", async move {
1436            match ws_trade.batch_place_orders(order_params).await {
1437                Ok(req_ids) => {
1438                    for (req_id, chunk_cids) in req_ids
1439                        .into_iter()
1440                        .zip(client_order_ids.chunks(20).map(|c| c.to_vec()))
1441                    {
1442                        let chunk_voids = vec![None; chunk_cids.len()];
1443                        dispatch_state
1444                            .pending_requests
1445                            .insert(req_id, (chunk_cids, chunk_voids, PendingOperation::Place));
1446                    }
1447                }
1448                Err(e) => {
1449                    for cid in &client_order_ids {
1450                        dispatch_state.order_identities.remove(cid);
1451                    }
1452
1453                    let ts_event = clock.get_time_ns();
1454
1455                    for cid in &client_order_ids {
1456                        emitter.emit_order_rejected_event(
1457                            strategy_id,
1458                            instrument_id,
1459                            *cid,
1460                            &format!("submit-order-list-error: {e}"),
1461                            ts_event,
1462                            false,
1463                        );
1464                    }
1465                    anyhow::bail!("submit order list failed: {e}");
1466                }
1467            }
1468            Ok(())
1469        });
1470
1471        Ok(())
1472    }
1473
1474    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1475        let instrument_id = cmd.instrument_id;
1476        let product_type = self.get_product_type_for_instrument(instrument_id);
1477        let client_order_id = cmd.client_order_id;
1478        let strategy_id = cmd.strategy_id;
1479        let venue_order_id = cmd.venue_order_id;
1480        let emitter = self.emitter.clone();
1481        let clock = self.clock;
1482
1483        let has_order_iv = cmd
1484            .params
1485            .as_ref()
1486            .and_then(|p| p.get("order_iv"))
1487            .is_some();
1488
1489        if self.config.environment == BybitEnvironment::Demo && has_order_iv {
1490            let ts_event = self.clock.get_time_ns();
1491            self.emitter.emit_order_modify_rejected_event(
1492                strategy_id,
1493                instrument_id,
1494                client_order_id,
1495                venue_order_id,
1496                "Option params (order_iv) are not supported in demo mode",
1497                ts_event,
1498            );
1499            return Ok(());
1500        }
1501
1502        if self.config.environment == BybitEnvironment::Demo {
1503            let http_client = self.http_client.clone();
1504            let account_id = self.core.account_id;
1505            let quantity = cmd.quantity;
1506            let price = cmd.price;
1507
1508            self.spawn_task("modify_order_http", async move {
1509                let result = http_client
1510                    .modify_order(
1511                        account_id,
1512                        product_type,
1513                        instrument_id,
1514                        Some(client_order_id),
1515                        venue_order_id,
1516                        quantity,
1517                        price,
1518                    )
1519                    .await;
1520
1521                if let Err(e) = result {
1522                    let ts_event = clock.get_time_ns();
1523                    emitter.emit_order_modify_rejected_event(
1524                        strategy_id,
1525                        instrument_id,
1526                        client_order_id,
1527                        venue_order_id,
1528                        &format!("modify-order-error: {e}"),
1529                        ts_event,
1530                    );
1531                    anyhow::bail!("modify order failed: {e}");
1532                }
1533
1534                Ok(())
1535            });
1536
1537            return Ok(());
1538        }
1539
1540        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1541
1542        let order_iv = if let Some(value) = cmd.params.as_ref().and_then(|p| p.get("order_iv")) {
1543            match get_price_str(cmd.params.as_ref().unwrap(), "order_iv") {
1544                Some(s) => Some(s),
1545                None => {
1546                    let ts_event = self.clock.get_time_ns();
1547                    self.emitter.emit_order_modify_rejected_event(
1548                        strategy_id,
1549                        instrument_id,
1550                        client_order_id,
1551                        venue_order_id,
1552                        &format!("invalid type for 'order_iv': {value}, expected string or number"),
1553                        ts_event,
1554                    );
1555                    return Ok(());
1556                }
1557            }
1558        } else {
1559            None
1560        };
1561
1562        let params = BybitWsAmendOrderParams {
1563            category: product_type,
1564            symbol: Ustr::from(raw_symbol),
1565            order_id: cmd.venue_order_id.map(|v| v.to_string()),
1566            order_link_id: Some(cmd.client_order_id.to_string()),
1567            qty: cmd.quantity.map(|q| q.to_string()),
1568            price: cmd.price.map(|p| p.to_string()),
1569            trigger_price: None,
1570            take_profit: None,
1571            stop_loss: None,
1572            tp_trigger_by: None,
1573            sl_trigger_by: None,
1574            order_iv,
1575        };
1576
1577        let ws_trade = self.ws_trade.clone();
1578        let dispatch_state = Arc::clone(&self.dispatch_state);
1579
1580        self.spawn_task("modify_order", async move {
1581            match ws_trade.amend_order(params).await {
1582                Ok(req_id) => {
1583                    dispatch_state.pending_requests.insert(
1584                        req_id,
1585                        (
1586                            vec![client_order_id],
1587                            vec![venue_order_id],
1588                            PendingOperation::Amend,
1589                        ),
1590                    );
1591                }
1592                Err(e) => {
1593                    let ts_event = clock.get_time_ns();
1594                    emitter.emit_order_modify_rejected_event(
1595                        strategy_id,
1596                        instrument_id,
1597                        client_order_id,
1598                        venue_order_id,
1599                        &format!("modify-order-error: {e}"),
1600                        ts_event,
1601                    );
1602                    anyhow::bail!("modify order failed: {e}");
1603                }
1604            }
1605
1606            Ok(())
1607        });
1608
1609        Ok(())
1610    }
1611
1612    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1613        let instrument_id = cmd.instrument_id;
1614        let product_type = self.get_product_type_for_instrument(instrument_id);
1615        let client_order_id = cmd.client_order_id;
1616        let strategy_id = cmd.strategy_id;
1617        let venue_order_id = cmd.venue_order_id;
1618        let emitter = self.emitter.clone();
1619        let clock = self.clock;
1620
1621        if self.config.environment == BybitEnvironment::Demo {
1622            let http_client = self.http_client.clone();
1623            let account_id = self.core.account_id;
1624
1625            self.spawn_task("cancel_order_http", async move {
1626                let result = http_client
1627                    .cancel_order(
1628                        account_id,
1629                        product_type,
1630                        instrument_id,
1631                        Some(client_order_id),
1632                        venue_order_id,
1633                    )
1634                    .await;
1635
1636                if let Err(e) = result {
1637                    let ts_event = clock.get_time_ns();
1638                    emitter.emit_order_cancel_rejected_event(
1639                        strategy_id,
1640                        instrument_id,
1641                        client_order_id,
1642                        venue_order_id,
1643                        &format!("cancel-order-error: {e}"),
1644                        ts_event,
1645                    );
1646                    anyhow::bail!("cancel order failed: {e}");
1647                }
1648
1649                Ok(())
1650            });
1651
1652            return Ok(());
1653        }
1654
1655        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1656
1657        let params = BybitWsCancelOrderParams {
1658            category: product_type,
1659            symbol: Ustr::from(raw_symbol),
1660            order_id: cmd.venue_order_id.map(|v| v.to_string()),
1661            order_link_id: Some(cmd.client_order_id.to_string()),
1662        };
1663
1664        let ws_trade = self.ws_trade.clone();
1665        let dispatch_state = Arc::clone(&self.dispatch_state);
1666
1667        self.spawn_task("cancel_order", async move {
1668            match ws_trade.cancel_order(params).await {
1669                Ok(req_id) => {
1670                    dispatch_state.pending_requests.insert(
1671                        req_id,
1672                        (
1673                            vec![client_order_id],
1674                            vec![venue_order_id],
1675                            PendingOperation::Cancel,
1676                        ),
1677                    );
1678                }
1679                Err(e) => {
1680                    let ts_event = clock.get_time_ns();
1681                    emitter.emit_order_cancel_rejected_event(
1682                        strategy_id,
1683                        instrument_id,
1684                        client_order_id,
1685                        venue_order_id,
1686                        &format!("cancel-order-error: {e}"),
1687                        ts_event,
1688                    );
1689                    anyhow::bail!("cancel order failed: {e}");
1690                }
1691            }
1692
1693            Ok(())
1694        });
1695
1696        Ok(())
1697    }
1698
1699    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1700        if cmd.order_side != OrderSide::NoOrderSide {
1701            log::warn!(
1702                "Bybit does not support order_side filtering for cancel all orders; \
1703                ignoring order_side={:?} and canceling all orders",
1704                cmd.order_side,
1705            );
1706        }
1707
1708        let instrument_id = cmd.instrument_id;
1709        let product_type = self.get_product_type_for_instrument(instrument_id);
1710        let account_id = self.core.account_id;
1711        let http_client = self.http_client.clone();
1712
1713        self.spawn_task("cancel_all_orders", async move {
1714            match http_client
1715                .cancel_all_orders(account_id, product_type, instrument_id)
1716                .await
1717            {
1718                Ok(reports) => {
1719                    for report in reports {
1720                        log::debug!("Cancelled order: {report:?}");
1721                    }
1722                }
1723                Err(e) => {
1724                    log::error!("Failed to cancel all orders for {instrument_id}: {e}");
1725                }
1726            }
1727            Ok(())
1728        });
1729
1730        Ok(())
1731    }
1732
1733    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1734        if cmd.cancels.is_empty() {
1735            return Ok(());
1736        }
1737
1738        let instrument_id = cmd.instrument_id;
1739        let product_type = self.get_product_type_for_instrument(instrument_id);
1740
1741        // Demo mode: cancel individually via HTTP (batch not supported)
1742        if self.config.environment == BybitEnvironment::Demo {
1743            let http_client = self.http_client.clone();
1744            let account_id = self.core.account_id;
1745            let strategy_id = cmd.strategy_id;
1746            let emitter = self.emitter.clone();
1747            let clock = self.clock;
1748            let cancels: Vec<_> = cmd
1749                .cancels
1750                .iter()
1751                .map(|c| (c.client_order_id, c.venue_order_id))
1752                .collect();
1753
1754            self.spawn_task("batch_cancel_orders_http", async move {
1755                for (client_order_id, venue_order_id) in cancels {
1756                    if let Err(e) = http_client
1757                        .cancel_order(
1758                            account_id,
1759                            product_type,
1760                            instrument_id,
1761                            Some(client_order_id),
1762                            venue_order_id,
1763                        )
1764                        .await
1765                    {
1766                        let ts_event = clock.get_time_ns();
1767                        emitter.emit_order_cancel_rejected_event(
1768                            strategy_id,
1769                            instrument_id,
1770                            client_order_id,
1771                            venue_order_id,
1772                            &format!("cancel-order-error: {e}"),
1773                            ts_event,
1774                        );
1775                    }
1776                }
1777                Ok(())
1778            });
1779
1780            return Ok(());
1781        }
1782
1783        let raw_symbol = Ustr::from(extract_raw_symbol(instrument_id.symbol.as_str()));
1784
1785        let mut cancel_params = Vec::with_capacity(cmd.cancels.len());
1786        let client_order_ids: Vec<_> = cmd.cancels.iter().map(|c| c.client_order_id).collect();
1787        let venue_order_ids: Vec<_> = cmd.cancels.iter().map(|c| c.venue_order_id).collect();
1788        for cancel in &cmd.cancels {
1789            cancel_params.push(BybitWsCancelOrderParams {
1790                category: product_type,
1791                symbol: raw_symbol,
1792                order_id: cancel.venue_order_id.map(|v| v.to_string()),
1793                order_link_id: Some(cancel.client_order_id.to_string()),
1794            });
1795        }
1796
1797        let ws_trade = self.ws_trade.clone();
1798        let dispatch_state = Arc::clone(&self.dispatch_state);
1799
1800        self.spawn_task("batch_cancel_orders", async move {
1801            match ws_trade.batch_cancel_orders(cancel_params).await {
1802                Ok(req_ids) => {
1803                    for (req_id, (chunk_cids, chunk_voids)) in req_ids.into_iter().zip(
1804                        client_order_ids
1805                            .chunks(20)
1806                            .map(|c| c.to_vec())
1807                            .zip(venue_order_ids.chunks(20).map(|c| c.to_vec())),
1808                    ) {
1809                        dispatch_state
1810                            .pending_requests
1811                            .insert(req_id, (chunk_cids, chunk_voids, PendingOperation::Cancel));
1812                    }
1813                }
1814                Err(e) => {
1815                    anyhow::bail!("batch cancel orders failed: {e}");
1816                }
1817            }
1818            Ok(())
1819        });
1820
1821        Ok(())
1822    }
1823}
1824
1825#[cfg(test)]
1826mod tests {
1827    use rstest::rstest;
1828
1829    use super::*;
1830    use crate::common::enums::BybitMarketUnit;
1831
1832    #[rstest]
1833    #[case::spot_market_base(
1834        BybitProductType::Spot,
1835        BybitOrderType::Market,
1836        false,
1837        Some(BybitMarketUnit::BaseCoin)
1838    )]
1839    #[case::spot_market_quote(
1840        BybitProductType::Spot,
1841        BybitOrderType::Market,
1842        true,
1843        Some(BybitMarketUnit::QuoteCoin)
1844    )]
1845    #[case::spot_limit(BybitProductType::Spot, BybitOrderType::Limit, true, None)]
1846    #[case::linear_market(BybitProductType::Linear, BybitOrderType::Market, true, None)]
1847    fn test_ws_params_market_unit(
1848        #[case] product_type: BybitProductType,
1849        #[case] order_type: BybitOrderType,
1850        #[case] is_quote_quantity: bool,
1851        #[case] expected: Option<BybitMarketUnit>,
1852    ) {
1853        let params = BybitWsPlaceOrderParams {
1854            category: product_type,
1855            symbol: ustr::Ustr::from("BTCUSDT"),
1856            side: BybitOrderSide::Buy,
1857            order_type,
1858            qty: "1.0".to_string(),
1859            is_leverage: None,
1860            market_unit: spot_market_unit(product_type, order_type, is_quote_quantity),
1861            price: None,
1862            time_in_force: None,
1863            order_link_id: None,
1864            reduce_only: None,
1865            close_on_trigger: None,
1866            trigger_price: None,
1867            trigger_by: None,
1868            trigger_direction: None,
1869            tpsl_mode: None,
1870            take_profit: None,
1871            stop_loss: None,
1872            tp_trigger_by: None,
1873            sl_trigger_by: None,
1874            sl_trigger_price: None,
1875            tp_trigger_price: None,
1876            sl_order_type: None,
1877            tp_order_type: None,
1878            sl_limit_price: None,
1879            tp_limit_price: None,
1880            order_iv: None,
1881            mmp: None,
1882            position_idx: None,
1883        };
1884
1885        assert_eq!(params.market_unit, expected);
1886    }
1887
1888    #[rstest]
1889    #[case::market(OrderType::Market, BybitOrderType::Market, false)]
1890    #[case::limit(OrderType::Limit, BybitOrderType::Limit, false)]
1891    #[case::stop_market(OrderType::StopMarket, BybitOrderType::Market, true)]
1892    #[case::stop_limit(OrderType::StopLimit, BybitOrderType::Limit, true)]
1893    #[case::market_if_touched(OrderType::MarketIfTouched, BybitOrderType::Market, true)]
1894    #[case::limit_if_touched(OrderType::LimitIfTouched, BybitOrderType::Limit, true)]
1895    fn test_map_order_type(
1896        #[case] input: OrderType,
1897        #[case] expected_type: BybitOrderType,
1898        #[case] expected_conditional: bool,
1899    ) {
1900        let (bybit_type, is_conditional) = BybitExecutionClient::map_order_type(input).unwrap();
1901        assert_eq!(bybit_type, expected_type);
1902        assert_eq!(is_conditional, expected_conditional);
1903    }
1904
1905    #[rstest]
1906    fn test_map_order_type_rejects_trailing_stop() {
1907        assert!(BybitExecutionClient::map_order_type(OrderType::TrailingStopMarket).is_err());
1908    }
1909
1910    #[rstest]
1911    #[case::buy_open(BybitOrderSide::Buy, false, BybitPositionIdx::BuyHedge)]
1912    #[case::sell_open(BybitOrderSide::Sell, false, BybitPositionIdx::SellHedge)]
1913    #[case::sell_close_long(BybitOrderSide::Sell, true, BybitPositionIdx::BuyHedge)]
1914    #[case::buy_close_short(BybitOrderSide::Buy, true, BybitPositionIdx::SellHedge)]
1915    fn test_resolve_position_idx_hedge_mode(
1916        #[case] side: BybitOrderSide,
1917        #[case] is_reduce_only: bool,
1918        #[case] expected: BybitPositionIdx,
1919    ) {
1920        let idx = resolve_position_idx(
1921            Some(BybitPositionMode::BothSides),
1922            side,
1923            is_reduce_only,
1924            None,
1925        );
1926        assert_eq!(idx, Some(expected));
1927    }
1928
1929    #[rstest]
1930    fn test_resolve_position_idx_one_way_mode() {
1931        let idx = resolve_position_idx(
1932            Some(BybitPositionMode::MergedSingle),
1933            BybitOrderSide::Buy,
1934            false,
1935            None,
1936        );
1937        assert_eq!(idx, Some(BybitPositionIdx::OneWay));
1938    }
1939
1940    #[rstest]
1941    fn test_resolve_position_idx_manual_override_wins() {
1942        let idx = resolve_position_idx(
1943            Some(BybitPositionMode::BothSides),
1944            BybitOrderSide::Buy,
1945            false,
1946            Some(BybitPositionIdx::SellHedge),
1947        );
1948        assert_eq!(idx, Some(BybitPositionIdx::SellHedge));
1949    }
1950
1951    #[rstest]
1952    fn test_resolve_position_idx_returns_none_when_unconfigured() {
1953        let idx = resolve_position_idx(None, BybitOrderSide::Buy, false, None);
1954        assert!(idx.is_none());
1955    }
1956
1957    #[rstest]
1958    #[case::linear("BTCUSDT-LINEAR", true)]
1959    #[case::inverse("BTCUSD-INVERSE", true)]
1960    #[case::spot("BTCUSDT-SPOT", false)]
1961    #[case::option("BTC-30JUN25-100000-C-OPTION", false)]
1962    fn test_parse_derivative_symbol_filters_product_type(
1963        #[case] symbol_str: &str,
1964        #[case] keeps: bool,
1965    ) {
1966        let result = parse_derivative_symbol(symbol_str);
1967        assert_eq!(result.is_some(), keeps);
1968    }
1969
1970    #[rstest]
1971    fn test_parse_derivative_symbol_rejects_malformed() {
1972        assert!(parse_derivative_symbol("not-a-real-symbol").is_none());
1973    }
1974
1975    #[rstest]
1976    #[case::matches_msg("Position mode has not been modified", "110025", true)]
1977    #[case::matches_code("retCode 110025: noop", "110025", true)]
1978    #[case::matches_msg_only("Already not been modified", "", true)]
1979    #[case::wrong_code("retCode 99999: other", "110025", false)]
1980    #[case::empty_no_modified_msg("retCode 99999", "", false)]
1981    fn test_is_unchanged_error(#[case] msg: &str, #[case] code: &str, #[case] expected: bool) {
1982        let err = anyhow::anyhow!("{msg}");
1983        assert_eq!(is_unchanged_error(&err, code), expected);
1984    }
1985
1986    #[rstest]
1987    #[case::matches("Margin needs to be equal to or greater than 0.5", true)]
1988    #[case::no_match("Some other error", false)]
1989    fn test_is_low_margin_error(#[case] msg: &str, #[case] expected: bool) {
1990        let err = anyhow::anyhow!("{msg}");
1991        assert_eq!(is_low_margin_error(&err), expected);
1992    }
1993}