Skip to main content

nautilus_binance/futures/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Binance Futures adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, Mutex, RwLock,
22        atomic::{AtomicBool, Ordering},
23    },
24    time::Duration,
25};
26
27use anyhow::Context;
28use async_trait::async_trait;
29use dashmap::DashMap;
30use nautilus_common::{
31    cache::fifo::FifoCache,
32    clients::ExecutionClient,
33    live::{get_runtime, runner::get_exec_event_sender},
34    messages::execution::{
35        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
36        GenerateOrderStatusReport, GenerateOrderStatusReports, GenerateOrderStatusReportsBuilder,
37        GeneratePositionStatusReports, GeneratePositionStatusReportsBuilder, ModifyOrder,
38        QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
39    },
40};
41use nautilus_core::{
42    AtomicSet, MUTEX_POISONED, UUID4, UnixNanos,
43    datetime::{NANOSECONDS_IN_MILLISECOND, mins_to_nanos},
44    time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
47use nautilus_model::{
48    accounts::AccountAny,
49    enums::{
50        AccountType, OmsType, OrderType, PositionSideSpecified, TrailingOffsetType, TriggerType,
51    },
52    events::{
53        AccountState, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderModifyRejected,
54        OrderRejected, OrderUpdated,
55    },
56    identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, Venue, VenueOrderId},
57    instruments::Instrument,
58    orders::Order,
59    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
60    types::{AccountBalance, Currency, MarginBalance, Money, Quantity},
61};
62use rust_decimal::Decimal;
63use tokio::{sync::Mutex as TokioMutex, task::JoinHandle};
64use tokio_util::sync::CancellationToken;
65
66use super::{
67    http::{
68        BinanceFuturesHttpError,
69        client::{BinanceFuturesHttpClient, BinanceFuturesInstrument, is_algo_order_type},
70        models::{BatchOrderResult, BinancePositionRisk},
71        query::{
72            BatchCancelItem, BinanceAllOrdersParamsBuilder, BinanceOpenOrdersParamsBuilder,
73            BinanceOrderQueryParamsBuilder, BinancePositionRiskParamsBuilder,
74            BinanceSetLeverageParams, BinanceSetMarginTypeParams, BinanceUserTradesParamsBuilder,
75        },
76    },
77    websocket::{
78        streams::{
79            client::BinanceFuturesWebSocketClient,
80            dispatch::{DispatchCtx, dispatch_user_stream_message, spawn_user_stream_dispatch},
81            recovery::{
82                RecoveryCtx, WsBuildParams, build_and_connect_user_stream, run_recovery_driver,
83            },
84        },
85        trading::{client::BinanceFuturesWsTradingClient, dispatch::dispatch_ws_trading_message},
86    },
87};
88use crate::{
89    common::{
90        consts::{
91            BINANCE_FUTURES_USD_WS_API_TESTNET_URL, BINANCE_FUTURES_USD_WS_API_URL,
92            BINANCE_GTX_ORDER_REJECT_CODE, BINANCE_NAUTILUS_FUTURES_BROKER_ID, BINANCE_VENUE,
93        },
94        credential::resolve_credentials,
95        dispatch::{OrderIdentity, PendingOperation, PendingRequest, WsDispatchState},
96        encoder::encode_broker_id,
97        enums::{
98            BinanceEnvironment, BinancePriceMatch, BinanceProductType, BinanceSide,
99            BinanceTimeInForce, BinanceWorkingType,
100        },
101        symbol::format_binance_symbol,
102        urls::{get_usdm_ws_route_base_url, get_ws_private_base_url},
103    },
104    config::BinanceExecClientConfig,
105    futures::{
106        conversions::{
107            determine_position_side, trailing_offset_to_callback_rate,
108            trailing_offset_to_callback_rate_string,
109        },
110        http::{
111            client::order_type_to_binance_futures,
112            models::BinanceFuturesAccountInfo,
113            query::{
114                BinanceCancelOrderParamsBuilder, BinanceModifyOrderParamsBuilder,
115                BinanceNewOrderParams,
116            },
117        },
118    },
119};
120
121/// Listen key keepalive interval (30 minutes).
122const LISTEN_KEY_KEEPALIVE_SECS: u64 = 30 * 60;
123
124/// Consecutive keepalive failures before a listenKey rotation is triggered.
125const MAX_KEEPALIVE_FAILURES: u32 = 1;
126
127/// Live execution client for Binance Futures trading.
128///
129/// Implements the [`ExecutionClient`] trait for order management on Binance
130/// USD-M and COIN-M Futures markets. Uses HTTP API for order operations and
131/// WebSocket for real-time order updates via user data stream.
132///
133/// Uses a two-tier architecture with an execution handler that maintains
134/// pending order maps for correlating WebSocket updates with order context.
135#[derive(Debug)]
136pub struct BinanceFuturesExecutionClient {
137    core: ExecutionClientCore,
138    clock: &'static AtomicTime,
139    config: BinanceExecClientConfig,
140    emitter: ExecutionEventEmitter,
141    dispatch_state: Arc<WsDispatchState>,
142    product_type: BinanceProductType,
143    http_client: BinanceFuturesHttpClient,
144    ws_client: Arc<TokioMutex<Option<BinanceFuturesWebSocketClient>>>,
145    ws_trading_client: Option<BinanceFuturesWsTradingClient>,
146    ws_trading_handle: Mutex<Option<JoinHandle<()>>>,
147    listen_key: Arc<RwLock<Option<String>>>,
148    cancellation_token: CancellationToken,
149    triggered_algo_order_ids: Arc<AtomicSet<ClientOrderId>>,
150    algo_client_order_ids: Arc<AtomicSet<ClientOrderId>>,
151    ws_task: Arc<Mutex<Option<JoinHandle<()>>>>,
152    keepalive_task: Mutex<Option<JoinHandle<()>>>,
153    recovery_task: Mutex<Option<JoinHandle<()>>>,
154    recovery_lock: Arc<TokioMutex<()>>,
155    recovery_tx: Mutex<Option<tokio::sync::mpsc::UnboundedSender<()>>>,
156    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
157    is_hedge_mode: AtomicBool,
158}
159
160impl BinanceFuturesExecutionClient {
161    /// Creates a new [`BinanceFuturesExecutionClient`].
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if the HTTP client fails to initialize or credentials are missing.
166    pub fn new(core: ExecutionClientCore, config: BinanceExecClientConfig) -> anyhow::Result<Self> {
167        let product_type = config
168            .product_types
169            .iter()
170            .find(|pt| matches!(pt, BinanceProductType::UsdM | BinanceProductType::CoinM))
171            .copied()
172            .unwrap_or(BinanceProductType::UsdM);
173
174        let (api_key, api_secret) = resolve_credentials(
175            config.api_key.clone(),
176            config.api_secret.clone(),
177            config.environment,
178            product_type,
179        )?;
180
181        let clock = get_atomic_clock_realtime();
182
183        let http_client = BinanceFuturesHttpClient::new(
184            product_type,
185            config.environment,
186            clock,
187            Some(api_key.clone()),
188            Some(api_secret.clone()),
189            config.base_url_http.clone(),
190            None, // recv_window
191            None, // timeout_secs
192            None, // proxy_url
193            config.treat_expired_as_canceled,
194        )
195        .context("failed to construct Binance Futures HTTP client")?;
196
197        let ws_trading_client = if config.use_ws_trading && product_type == BinanceProductType::UsdM
198        {
199            let ws_trading_url =
200                config
201                    .base_url_ws_trading
202                    .clone()
203                    .or_else(|| match config.environment {
204                        BinanceEnvironment::Testnet => {
205                            Some(BINANCE_FUTURES_USD_WS_API_TESTNET_URL.to_string())
206                        }
207                        _ => Some(BINANCE_FUTURES_USD_WS_API_URL.to_string()),
208                    });
209
210            Some(BinanceFuturesWsTradingClient::new(
211                ws_trading_url,
212                api_key,
213                api_secret,
214                None, // heartbeat
215                config.transport_backend,
216            ))
217        } else {
218            None
219        };
220
221        let emitter = ExecutionEventEmitter::new(
222            clock,
223            core.trader_id,
224            core.account_id,
225            core.account_type,
226            core.base_currency,
227        );
228
229        Ok(Self {
230            core,
231            clock,
232            config,
233            emitter,
234            dispatch_state: Arc::new(WsDispatchState::default()),
235            product_type,
236            http_client,
237            ws_client: Arc::new(TokioMutex::new(None)),
238            ws_trading_client,
239            ws_trading_handle: Mutex::new(None),
240            listen_key: Arc::new(RwLock::new(None)),
241            cancellation_token: CancellationToken::new(),
242            triggered_algo_order_ids: Arc::new(AtomicSet::new()),
243            algo_client_order_ids: Arc::new(AtomicSet::new()),
244            ws_task: Arc::new(Mutex::new(None)),
245            keepalive_task: Mutex::new(None),
246            recovery_task: Mutex::new(None),
247            recovery_lock: Arc::new(TokioMutex::new(())),
248            recovery_tx: Mutex::new(None),
249            pending_tasks: Mutex::new(Vec::new()),
250            is_hedge_mode: AtomicBool::new(false),
251        })
252    }
253
254    /// Returns whether the account is in hedge mode (dual side position).
255    #[must_use]
256    pub fn is_hedge_mode(&self) -> bool {
257        self.is_hedge_mode.load(Ordering::Acquire)
258    }
259
260    /// Returns a clone of the HTTP client's instruments cache Arc.
261    #[doc(hidden)]
262    #[must_use]
263    pub fn instruments_cache(&self) -> Arc<DashMap<ustr::Ustr, BinanceFuturesInstrument>> {
264        self.http_client.instruments_cache()
265    }
266
267    /// Converts Binance futures account info to Nautilus account state.
268    fn create_account_state(&self, account_info: &BinanceFuturesAccountInfo) -> AccountState {
269        Self::create_account_state_from(
270            account_info,
271            self.core.account_id,
272            self.core.account_type,
273            self.clock,
274        )
275    }
276
277    fn create_account_state_from(
278        account_info: &BinanceFuturesAccountInfo,
279        account_id: AccountId,
280        account_type: AccountType,
281        clock: &'static AtomicTime,
282    ) -> AccountState {
283        let ts_now = clock.get_time_ns();
284
285        let balances: Vec<AccountBalance> = account_info
286            .assets
287            .iter()
288            .filter_map(|b| {
289                if b.wallet_balance.is_zero() {
290                    return None;
291                }
292
293                let currency = Currency::from(&b.asset);
294                AccountBalance::from_total_and_free(b.wallet_balance, b.available_balance, currency)
295                    .ok()
296            })
297            .collect();
298
299        // Emit account-wide (cross-margin) margin balances per collateral asset.
300        // Binance reports per-asset `initialMargin` / `maintMargin` which covers both
301        // USDT-M (typically USDT, or USDT+BNB under multi-assets mode) and COIN-M
302        // (one entry per base coin, e.g. BTC / ETH).
303        let mut margins: Vec<MarginBalance> = Vec::new();
304
305        for asset in &account_info.assets {
306            let initial_dec = asset.initial_margin.unwrap_or_default();
307            let maint_dec = asset.maint_margin.unwrap_or_default();
308
309            if initial_dec.is_zero() && maint_dec.is_zero() {
310                continue;
311            }
312
313            let currency = Currency::from(&asset.asset);
314            let initial = Money::from_decimal(initial_dec, currency)
315                .unwrap_or_else(|_| Money::zero(currency));
316            let maintenance =
317                Money::from_decimal(maint_dec, currency).unwrap_or_else(|_| Money::zero(currency));
318            margins.push(MarginBalance::new(initial, maintenance, None));
319        }
320
321        AccountState::new(
322            account_id,
323            account_type,
324            balances,
325            margins,
326            true, // reported
327            UUID4::new(),
328            ts_now,
329            ts_now,
330            None, // base currency
331        )
332    }
333
334    async fn refresh_account_state(&self) -> anyhow::Result<AccountState> {
335        let account_info = match self.http_client.query_account().await {
336            Ok(info) => info,
337            Err(e) => {
338                log::error!("Binance Futures account state request failed: {e}");
339                anyhow::bail!("Binance Futures account state request failed: {e}");
340            }
341        };
342
343        Ok(self.create_account_state(&account_info))
344    }
345
346    fn update_account_state(&self) {
347        let http_client = self.http_client.clone();
348        let account_id = self.core.account_id;
349        let account_type = self.core.account_type;
350        let emitter = self.emitter.clone();
351        let clock = self.clock;
352
353        self.spawn_task("query_account", async move {
354            let account_info = http_client
355                .query_account()
356                .await
357                .context("Binance Futures account state request failed")?;
358            let account_state =
359                Self::create_account_state_from(&account_info, account_id, account_type, clock);
360            let ts_now = clock.get_time_ns();
361            emitter.emit_account_state(
362                account_state.balances.clone(),
363                account_state.margins.clone(),
364                account_state.is_reported,
365                ts_now,
366            );
367            Ok(())
368        });
369    }
370
371    async fn init_hedge_mode(&self) -> anyhow::Result<bool> {
372        let response = self.http_client.query_hedge_mode().await?;
373        Ok(response.dual_side_position)
374    }
375
376    /// Returns whether the WS trading client is connected and active.
377    fn ws_trading_active(&self) -> bool {
378        self.ws_trading_client
379            .as_ref()
380            .is_some_and(|c| c.is_active())
381    }
382
383    fn submit_order_internal(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
384        let order = self
385            .core
386            .cache()
387            .order(&cmd.client_order_id)
388            .cloned()
389            .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
390
391        let emitter = self.emitter.clone();
392        let trader_id = self.core.trader_id;
393        let account_id = self.core.account_id;
394        let clock = self.clock;
395        let client_order_id = order.client_order_id();
396        let strategy_id = order.strategy_id();
397        let instrument_id = order.instrument_id();
398        let order_side = order.order_side();
399        let order_type = order.order_type();
400        let quantity = order.quantity();
401        let time_in_force = order.time_in_force();
402        let price = order.price();
403        let trigger_price = order.trigger_price();
404        let reduce_only = order.is_reduce_only();
405        let post_only = order.is_post_only();
406        let activation_price = order.activation_price();
407        let trailing_offset = order.trailing_offset();
408        let trigger_type = order.trigger_type();
409        let position_side = determine_position_side(self.is_hedge_mode(), order_side, reduce_only);
410
411        // Register identity for tracked/external dispatch routing
412        self.dispatch_state.order_identities.insert(
413            client_order_id,
414            OrderIdentity {
415                instrument_id,
416                strategy_id,
417                order_side,
418                order_type,
419                price,
420            },
421        );
422
423        let use_algo_api = is_algo_order_type(order_type);
424
425        let close_position = cmd
426            .params
427            .as_ref()
428            .and_then(|p| p.get_bool("close_position"))
429            .unwrap_or(false);
430
431        let price_match = cmd
432            .params
433            .as_ref()
434            .and_then(|p| p.get_str("price_match"))
435            .map(BinancePriceMatch::from_param)
436            .transpose()?;
437
438        let callback_rate = trailing_offset
439            .map(trailing_offset_to_callback_rate_string)
440            .transpose()?;
441
442        let working_type = match trigger_type {
443            Some(TriggerType::MarkPrice) => Some(BinanceWorkingType::MarkPrice),
444            Some(TriggerType::LastPrice | TriggerType::Default) => {
445                Some(BinanceWorkingType::ContractPrice)
446            }
447            _ => None,
448        };
449
450        // Non-algo orders can route through WS trading API when active
451        if self.ws_trading_active() && !use_algo_api {
452            let ws_client = self.ws_trading_client.as_ref().unwrap().clone();
453            let dispatch_state = self.dispatch_state.clone();
454            let ts_init = clock.get_time_ns();
455
456            let symbol = format_binance_symbol(&instrument_id);
457            let binance_side = BinanceSide::try_from(order_side)?;
458            let binance_order_type = order_type_to_binance_futures(order_type)?;
459            let binance_tif = if post_only {
460                BinanceTimeInForce::Gtx
461            } else {
462                BinanceTimeInForce::try_from(time_in_force)?
463            };
464
465            let requires_time_in_force = matches!(
466                order_type,
467                OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
468            );
469
470            let client_id_str =
471                encode_broker_id(&client_order_id, BINANCE_NAUTILUS_FUTURES_BROKER_ID);
472
473            let params = BinanceNewOrderParams {
474                symbol,
475                side: binance_side,
476                order_type: binance_order_type,
477                time_in_force: if requires_time_in_force {
478                    Some(binance_tif)
479                } else {
480                    None
481                },
482                quantity: Some(quantity.to_string()),
483                price: if price_match.is_some() {
484                    None
485                } else {
486                    price.map(|p| p.to_string())
487                },
488                new_client_order_id: Some(client_id_str),
489                stop_price: trigger_price.map(|p| p.to_string()),
490                reduce_only: if reduce_only { Some(true) } else { None },
491                position_side,
492                close_position: None,
493                activation_price: activation_price.map(|p| p.to_string()),
494                callback_rate,
495                working_type,
496                price_protect: None,
497                new_order_resp_type: None,
498                good_till_date: None,
499                recv_window: None,
500                price_match,
501                self_trade_prevention_mode: None,
502            };
503
504            // Pre-register before sending to avoid response racing the insert
505            let request_id = ws_client.next_request_id();
506            dispatch_state.pending_requests.insert(
507                request_id.clone(),
508                PendingRequest {
509                    client_order_id,
510                    venue_order_id: None,
511                    operation: PendingOperation::Place,
512                },
513            );
514
515            self.spawn_task("submit_order_ws", async move {
516                if let Err(e) = ws_client
517                    .place_order_with_id(request_id.clone(), params)
518                    .await
519                {
520                    dispatch_state.pending_requests.remove(&request_id);
521                    let rejected = OrderRejected::new(
522                        trader_id,
523                        strategy_id,
524                        instrument_id,
525                        client_order_id,
526                        account_id,
527                        format!("ws-submit-order-error: {e}").into(),
528                        UUID4::new(),
529                        ts_init,
530                        clock.get_time_ns(),
531                        false,
532                        false,
533                    );
534                    emitter.send_order_event(OrderEventAny::Rejected(rejected));
535                    anyhow::bail!("WS submit order failed: {e}");
536                }
537                Ok(())
538            });
539
540            return Ok(());
541        }
542
543        let http_client = self.http_client.clone();
544
545        self.spawn_task("submit_order", async move {
546            let result = if use_algo_api {
547                http_client
548                    .submit_algo_order(
549                        account_id,
550                        instrument_id,
551                        client_order_id,
552                        order_side,
553                        order_type,
554                        quantity,
555                        time_in_force,
556                        price,
557                        trigger_price,
558                        reduce_only,
559                        close_position,
560                        position_side,
561                        activation_price,
562                        callback_rate,
563                        working_type,
564                    )
565                    .await
566            } else {
567                http_client
568                    .submit_order(
569                        account_id,
570                        instrument_id,
571                        client_order_id,
572                        order_side,
573                        order_type,
574                        quantity,
575                        time_in_force,
576                        price,
577                        trigger_price,
578                        reduce_only,
579                        post_only,
580                        position_side,
581                        price_match,
582                    )
583                    .await
584            };
585
586            match result {
587                Ok(report) => {
588                    log::debug!(
589                        "Order submit accepted: client_order_id={}, venue_order_id={}",
590                        client_order_id,
591                        report.venue_order_id
592                    );
593                }
594                Err(e) => {
595                    // Keep order registered - if HTTP failed due to timeout but order
596                    // reached Binance, WebSocket updates will still arrive. The order
597                    // will be cleaned up via WebSocket rejection or reconciliation.
598                    let due_post_only =
599                        e.downcast_ref::<BinanceFuturesHttpError>()
600                            .is_some_and(|be| {
601                                matches!(
602                                    be,
603                                    BinanceFuturesHttpError::BinanceError { code, .. }
604                                        if *code == BINANCE_GTX_ORDER_REJECT_CODE
605                                )
606                            });
607                    let ts_now = clock.get_time_ns();
608                    let rejected_event = OrderRejected::new(
609                        trader_id,
610                        strategy_id,
611                        instrument_id,
612                        client_order_id,
613                        account_id,
614                        format!("submit-order-error: {e}").into(),
615                        UUID4::new(),
616                        ts_now,
617                        ts_now,
618                        false,
619                        due_post_only,
620                    );
621
622                    emitter.send_order_event(OrderEventAny::Rejected(rejected_event));
623
624                    return Err(e);
625                }
626            }
627
628            Ok(())
629        });
630
631        Ok(())
632    }
633
634    fn cancel_order_internal(&self, cmd: &CancelOrder) {
635        let command = cmd.clone();
636
637        // Non-triggered algo orders use algo cancel endpoint, triggered use regular
638        let is_algo = self
639            .core
640            .cache()
641            .order(&command.client_order_id)
642            .is_some_and(|order| is_algo_order_type(order.order_type()));
643        let is_triggered = self
644            .triggered_algo_order_ids
645            .contains(&command.client_order_id);
646        let use_algo_cancel = is_algo && !is_triggered;
647
648        let emitter = self.emitter.clone();
649        let trader_id = self.core.trader_id;
650        let account_id = self.core.account_id;
651        let clock = self.clock;
652        let instrument_id = command.instrument_id;
653        let venue_order_id = command.venue_order_id;
654        let client_order_id = command.client_order_id;
655
656        // Non-algo cancels can route through WS trading API when active
657        if self.ws_trading_active() && !use_algo_cancel {
658            let ws_client = self.ws_trading_client.as_ref().unwrap().clone();
659            let dispatch_state = self.dispatch_state.clone();
660
661            let mut cancel_builder = BinanceCancelOrderParamsBuilder::default();
662            cancel_builder.symbol(instrument_id.symbol.to_string());
663
664            if let Some(venue_id) = venue_order_id {
665                match venue_id.inner().parse::<i64>() {
666                    Ok(order_id) => {
667                        cancel_builder.order_id(order_id);
668                    }
669                    Err(e) => {
670                        let ts_now = clock.get_time_ns();
671                        let rejected = OrderCancelRejected::new(
672                            trader_id,
673                            command.strategy_id,
674                            instrument_id,
675                            client_order_id,
676                            format!("failed to parse venue_order_id: {e}").into(),
677                            UUID4::new(),
678                            ts_now,
679                            ts_now,
680                            false,
681                            venue_order_id,
682                            Some(account_id),
683                        );
684                        emitter.send_order_event(OrderEventAny::CancelRejected(rejected));
685                        return;
686                    }
687                }
688            }
689
690            cancel_builder.orig_client_order_id(encode_broker_id(
691                &client_order_id,
692                BINANCE_NAUTILUS_FUTURES_BROKER_ID,
693            ));
694
695            let params = cancel_builder.build().unwrap();
696
697            // Pre-register before sending to avoid response racing the insert
698            let request_id = ws_client.next_request_id();
699            dispatch_state.pending_requests.insert(
700                request_id.clone(),
701                PendingRequest {
702                    client_order_id,
703                    venue_order_id,
704                    operation: PendingOperation::Cancel,
705                },
706            );
707
708            self.spawn_task("cancel_order_ws", async move {
709                if let Err(e) = ws_client
710                    .cancel_order_with_id(request_id.clone(), params)
711                    .await
712                {
713                    dispatch_state.pending_requests.remove(&request_id);
714                    let ts_now = clock.get_time_ns();
715                    let rejected = OrderCancelRejected::new(
716                        trader_id,
717                        command.strategy_id,
718                        command.instrument_id,
719                        client_order_id,
720                        format!("ws-cancel-order-error: {e}").into(),
721                        UUID4::new(),
722                        ts_now,
723                        ts_now,
724                        false,
725                        command.venue_order_id,
726                        Some(account_id),
727                    );
728                    emitter.send_order_event(OrderEventAny::CancelRejected(rejected));
729                    anyhow::bail!("WS cancel order failed: {e}");
730                }
731                Ok(())
732            });
733
734            return;
735        }
736
737        let http_client = self.http_client.clone();
738
739        self.spawn_task("cancel_order", async move {
740            let result = if use_algo_cancel {
741                // Try algo cancel first; if it fails, the order may have been triggered
742                // before this session started, so fall back to regular cancel
743                match http_client.cancel_algo_order(client_order_id).await {
744                    Ok(()) => Ok(()),
745                    Err(algo_err) => {
746                        log::debug!("Algo cancel failed, trying regular cancel: {algo_err}");
747                        http_client
748                            .cancel_order(instrument_id, venue_order_id, Some(client_order_id))
749                            .await
750                            .map(|_| ())
751                    }
752                }
753            } else {
754                http_client
755                    .cancel_order(instrument_id, venue_order_id, Some(client_order_id))
756                    .await
757                    .map(|_| ())
758            };
759
760            match result {
761                Ok(()) => {
762                    log::debug!("Cancel request accepted: client_order_id={client_order_id}");
763                }
764                Err(e) => {
765                    let ts_now = clock.get_time_ns();
766                    let rejected_event = OrderCancelRejected::new(
767                        trader_id,
768                        command.strategy_id,
769                        command.instrument_id,
770                        client_order_id,
771                        format!("cancel-order-error: {e}").into(),
772                        UUID4::new(),
773                        ts_now,
774                        ts_now,
775                        false,
776                        command.venue_order_id,
777                        Some(account_id),
778                    );
779
780                    emitter.send_order_event(OrderEventAny::CancelRejected(rejected_event));
781
782                    return Err(e);
783                }
784            }
785
786            Ok(())
787        });
788    }
789
790    fn spawn_task<F>(&self, description: &'static str, fut: F)
791    where
792        F: Future<Output = anyhow::Result<()>> + Send + 'static,
793    {
794        crate::common::execution::spawn_task(&self.pending_tasks, description, fut);
795    }
796
797    fn abort_pending_tasks(&self) {
798        crate::common::execution::abort_pending_tasks(&self.pending_tasks);
799    }
800
801    /// Returns the (price_precision, size_precision) for an instrument.
802    fn get_instrument_precision(&self, instrument_id: InstrumentId) -> (u8, u8) {
803        let cache = self.core.cache();
804        cache
805            .instrument(&instrument_id)
806            .map_or((8, 8), |i| (i.price_precision(), i.size_precision()))
807    }
808
809    /// Creates a position status report from Binance position risk data.
810    fn create_position_report(
811        &self,
812        position: &BinancePositionRisk,
813        instrument_id: InstrumentId,
814        size_precision: u8,
815    ) -> anyhow::Result<PositionStatusReport> {
816        let position_amount: Decimal = position
817            .position_amt
818            .parse()
819            .context("invalid position_amt")?;
820
821        if position_amount.is_zero() {
822            anyhow::bail!("Position is flat");
823        }
824
825        let entry_price: Decimal = position
826            .entry_price
827            .parse()
828            .context("invalid entry_price")?;
829
830        let position_side = if position_amount > Decimal::ZERO {
831            PositionSideSpecified::Long
832        } else {
833            PositionSideSpecified::Short
834        };
835
836        let ts_now = self.clock.get_time_ns();
837
838        Ok(PositionStatusReport::new(
839            self.core.account_id,
840            instrument_id,
841            position_side,
842            Quantity::new(position_amount.abs().to_string().parse()?, size_precision),
843            ts_now,
844            ts_now,
845            Some(UUID4::new()),
846            None, // venue_position_id
847            Some(entry_price),
848        ))
849    }
850
851    async fn apply_futures_config(&self) -> anyhow::Result<()> {
852        if let Some(ref leverages) = self.config.futures_leverages {
853            for (symbol, leverage) in leverages {
854                let params = BinanceSetLeverageParams {
855                    symbol: symbol.clone(),
856                    leverage: *leverage,
857                    recv_window: None,
858                };
859                let response = self
860                    .http_client
861                    .set_leverage(&params)
862                    .await
863                    .context(format!("failed to set leverage for {symbol}"))?;
864                log::info!("Set leverage {} {}X", response.symbol, response.leverage);
865            }
866        }
867
868        if let Some(ref margin_types) = self.config.futures_margin_types {
869            for (symbol, margin_type) in margin_types {
870                let params = BinanceSetMarginTypeParams {
871                    symbol: symbol.clone(),
872                    margin_type: *margin_type,
873                    recv_window: None,
874                };
875
876                match self.http_client.set_margin_type(&params).await {
877                    Ok(_) => {
878                        log::info!("Set {symbol} margin type to {margin_type:?}");
879                    }
880                    Err(e) => {
881                        let err_str = format!("{e}");
882                        if err_str.contains("-4046") {
883                            log::info!("{symbol} margin type already {margin_type:?}");
884                        } else {
885                            return Err(e)
886                                .context(format!("failed to set margin type for {symbol}"));
887                        }
888                    }
889                }
890            }
891        }
892
893        Ok(())
894    }
895}
896
897#[async_trait(?Send)]
898impl ExecutionClient for BinanceFuturesExecutionClient {
899    fn is_connected(&self) -> bool {
900        self.core.is_connected()
901    }
902
903    fn client_id(&self) -> ClientId {
904        self.core.client_id
905    }
906
907    fn account_id(&self) -> AccountId {
908        self.core.account_id
909    }
910
911    fn venue(&self) -> Venue {
912        *BINANCE_VENUE
913    }
914
915    fn oms_type(&self) -> OmsType {
916        self.core.oms_type
917    }
918
919    fn get_account(&self) -> Option<AccountAny> {
920        self.core.cache().account(&self.core.account_id).cloned()
921    }
922
923    async fn connect(&mut self) -> anyhow::Result<()> {
924        if self.core.is_connected() {
925            return Ok(());
926        }
927
928        // Reinitialize cancellation token in case of reconnection
929        self.cancellation_token = CancellationToken::new();
930
931        // Check hedge mode
932        let is_hedge_mode = self
933            .init_hedge_mode()
934            .await
935            .context("failed to query hedge mode")?;
936        self.is_hedge_mode.store(is_hedge_mode, Ordering::Release);
937        log::info!("Hedge mode (dual side position): {is_hedge_mode}");
938
939        // Load instruments if not already done
940        let _instruments = if self.core.instruments_initialized() {
941            Vec::new()
942        } else {
943            let instruments = self
944                .http_client
945                .request_instruments()
946                .await
947                .context("failed to request Binance Futures instruments")?;
948
949            if instruments.is_empty() {
950                log::warn!("No instruments returned for Binance Futures");
951            } else {
952                log::info!("Loaded {} Futures instruments", instruments.len());
953            }
954
955            self.core.set_instruments_initialized();
956            instruments
957        };
958
959        // Apply configured leverage and margin types
960        self.apply_futures_config()
961            .await
962            .context("failed to apply futures config")?;
963
964        // Create listen key for user data stream
965        log::info!("Creating listen key for user data stream...");
966        let listen_key_response = self
967            .http_client
968            .create_listen_key()
969            .await
970            .context("failed to create listen key")?;
971        let listen_key = listen_key_response.listen_key;
972        log::info!("Listen key created successfully");
973
974        {
975            let mut key_guard = self.listen_key.write().expect(MUTEX_POISONED);
976            *key_guard = Some(listen_key.clone());
977        }
978
979        let (api_key, api_secret) = resolve_credentials(
980            self.config.api_key.clone(),
981            self.config.api_secret.clone(),
982            self.config.environment,
983            self.product_type,
984        )?;
985
986        let private_base_url = self.config.base_url_ws.clone().map_or_else(
987            || get_ws_private_base_url(self.product_type, self.config.environment).to_string(),
988            |url| {
989                if self.product_type == BinanceProductType::UsdM
990                    && self.config.environment == BinanceEnvironment::Mainnet
991                {
992                    get_usdm_ws_route_base_url(&url, "private")
993                } else {
994                    url
995                }
996            },
997        );
998
999        let (recovery_tx, recovery_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1000        *self.recovery_tx.lock().expect(MUTEX_POISONED) = Some(recovery_tx.clone());
1001
1002        let seen_trade_ids: Arc<Mutex<FifoCache<(ustr::Ustr, i64), 10_000>>> =
1003            Arc::new(Mutex::new(FifoCache::new()));
1004
1005        let dispatch_ctx = Arc::new(DispatchCtx {
1006            emitter: self.emitter.clone(),
1007            http_client: self.http_client.clone(),
1008            account_id: self.core.account_id,
1009            product_type: self.product_type,
1010            clock: self.clock,
1011            dispatch_state: self.dispatch_state.clone(),
1012            triggered_algo_ids: self.triggered_algo_order_ids.clone(),
1013            algo_client_ids: self.algo_client_order_ids.clone(),
1014            use_position_ids: self.config.use_position_ids,
1015            default_taker_fee: self.config.default_taker_fee,
1016            treat_expired_as_canceled: self.config.treat_expired_as_canceled,
1017            use_trade_lite: self.config.use_trade_lite,
1018            seen_trade_ids,
1019            cancellation_token: self.cancellation_token.clone(),
1020        });
1021
1022        let ws_build_params = WsBuildParams {
1023            product_type: self.product_type,
1024            environment: self.config.environment,
1025            api_key: api_key.clone(),
1026            api_secret: api_secret.clone(),
1027            private_base_url: private_base_url.clone(),
1028            transport_backend: self.config.transport_backend,
1029        };
1030
1031        let ws_client = build_and_connect_user_stream(&ws_build_params, &listen_key).await?;
1032        let stream = ws_client.stream();
1033        *self.ws_client.lock().await = Some(ws_client);
1034
1035        let ws_task = spawn_user_stream_dispatch(
1036            stream,
1037            dispatch_ctx.clone(),
1038            recovery_tx.clone(),
1039            dispatch_user_stream_message,
1040        );
1041        *self.ws_task.lock().expect(MUTEX_POISONED) = Some(ws_task);
1042
1043        // Start listen key keepalive task
1044        {
1045            let http_client = self.http_client.clone();
1046            let listen_key_ref = self.listen_key.clone();
1047            let cancel = self.cancellation_token.clone();
1048            let recovery_tx = recovery_tx.clone();
1049
1050            let keepalive_task = get_runtime().spawn(async move {
1051                let mut interval =
1052                    tokio::time::interval(Duration::from_secs(LISTEN_KEY_KEEPALIVE_SECS));
1053                let mut consecutive_failures: u32 = 0;
1054
1055                loop {
1056                    tokio::select! {
1057                        _ = interval.tick() => {
1058                            let key = {
1059                                let guard = listen_key_ref.read().expect(MUTEX_POISONED);
1060                                guard.clone()
1061                            };
1062
1063                            if let Some(ref key) = key {
1064                                match http_client.keepalive_listen_key(key).await {
1065                                    Ok(()) => {
1066                                        log::debug!("Listen key keepalive sent successfully");
1067                                        consecutive_failures = 0;
1068                                    }
1069                                    Err(e) => {
1070                                        consecutive_failures += 1;
1071                                        log::warn!(
1072                                            "Listen key keepalive failed ({consecutive_failures}/{MAX_KEEPALIVE_FAILURES}): {e}",
1073                                        );
1074
1075                                        if consecutive_failures >= MAX_KEEPALIVE_FAILURES
1076                                            && recovery_tx.send(()).is_err()
1077                                        {
1078                                            log::warn!(
1079                                                "Recovery channel closed, keepalive exiting",
1080                                            );
1081                                            break;
1082                                        }
1083                                    }
1084                                }
1085                            }
1086                        }
1087                        () = cancel.cancelled() => {
1088                            log::debug!("Listen key keepalive task cancelled");
1089                            break;
1090                        }
1091                    }
1092                }
1093            });
1094            *self.keepalive_task.lock().expect(MUTEX_POISONED) = Some(keepalive_task);
1095        }
1096
1097        // Start listen key recovery driver task
1098        {
1099            let recovery_ctx = RecoveryCtx {
1100                http_client: self.http_client.clone(),
1101                listen_key: self.listen_key.clone(),
1102                ws_client: self.ws_client.clone(),
1103                ws_task: self.ws_task.clone(),
1104                recovery_lock: self.recovery_lock.clone(),
1105                ws_build_params,
1106                dispatch_ctx,
1107                recovery_tx: recovery_tx.clone(),
1108            };
1109            let cancel = self.cancellation_token.clone();
1110
1111            let recovery_task = get_runtime().spawn(async move {
1112                run_recovery_driver(
1113                    recovery_ctx,
1114                    recovery_rx,
1115                    cancel,
1116                    dispatch_user_stream_message,
1117                )
1118                .await;
1119            });
1120            *self.recovery_task.lock().expect(MUTEX_POISONED) = Some(recovery_task);
1121        }
1122
1123        // Request initial account state
1124        let account_state = self
1125            .refresh_account_state()
1126            .await
1127            .context("failed to request Binance Futures account state")?;
1128
1129        if !account_state.balances.is_empty() {
1130            log::info!(
1131                "Received account state with {} balance(s) and {} margin(s)",
1132                account_state.balances.len(),
1133                account_state.margins.len()
1134            );
1135        }
1136
1137        self.emitter.send_account_state(account_state);
1138
1139        crate::common::execution::await_account_registered(&self.core, self.core.account_id, 30.0)
1140            .await?;
1141
1142        // Connect WS trading client (primary order transport for USD-M)
1143        if let Some(ref mut ws_trading) = self.ws_trading_client {
1144            match ws_trading.connect().await {
1145                Ok(()) => {
1146                    log::info!("Connected to Binance Futures WS trading API");
1147
1148                    let ws_trading_clone = ws_trading.clone();
1149                    let emitter = self.emitter.clone();
1150                    let account_id = self.core.account_id;
1151                    let clock = self.clock;
1152                    let dispatch_state = self.dispatch_state.clone();
1153
1154                    let handle = get_runtime().spawn(async move {
1155                        while let Some(msg) = ws_trading_clone.recv().await {
1156                            dispatch_ws_trading_message(
1157                                msg,
1158                                &emitter,
1159                                account_id,
1160                                clock,
1161                                &dispatch_state,
1162                            );
1163                        }
1164                    });
1165
1166                    *self.ws_trading_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1167                }
1168                Err(e) => {
1169                    log::error!(
1170                        "Failed to connect WS trading API: {e}. \
1171                         Order operations will use HTTP fallback"
1172                    );
1173                }
1174            }
1175        }
1176
1177        self.core.set_connected();
1178        log::info!("Connected: client_id={}", self.core.client_id);
1179        Ok(())
1180    }
1181
1182    async fn disconnect(&mut self) -> anyhow::Result<()> {
1183        if self.core.is_disconnected() {
1184            return Ok(());
1185        }
1186
1187        // Drop the recovery tx so the driver exits its recv loop
1188        self.recovery_tx.lock().expect(MUTEX_POISONED).take();
1189
1190        // Cancel all background tasks
1191        self.cancellation_token.cancel();
1192
1193        // Abort WS trading task and disconnect
1194        if let Some(handle) = self.ws_trading_handle.lock().expect(MUTEX_POISONED).take() {
1195            handle.abort();
1196        }
1197
1198        if let Some(ref mut ws_trading) = self.ws_trading_client {
1199            ws_trading.disconnect().await;
1200        }
1201
1202        // Wait for WebSocket task to complete
1203        let ws_task = self.ws_task.lock().expect(MUTEX_POISONED).take();
1204        if let Some(task) = ws_task {
1205            let _ = task.await;
1206        }
1207
1208        // Abort the keepalive task. An in-flight keepalive_listen_key HTTP
1209        // call ignores the cancellation token until it returns, so awaiting
1210        // without aborting can stall disconnect for the full HTTP timeout.
1211        let keepalive_task = self.keepalive_task.lock().expect(MUTEX_POISONED).take();
1212        if let Some(task) = keepalive_task {
1213            task.abort();
1214            let _ = task.await;
1215        }
1216
1217        // Abort the recovery driver task. Waiting would block disconnect until
1218        // any in-flight HTTP or WebSocket call inside recover_user_data_stream
1219        // returns, which can be many seconds under a network outage.
1220        let recovery_task = self.recovery_task.lock().expect(MUTEX_POISONED).take();
1221        if let Some(task) = recovery_task {
1222            task.abort();
1223            let _ = task.await;
1224        }
1225
1226        // Close WebSocket
1227        if let Some(mut ws_client) = self.ws_client.lock().await.take() {
1228            let _ = ws_client.close().await;
1229        }
1230
1231        // Close listen key
1232        let listen_key = self.listen_key.read().expect(MUTEX_POISONED).clone();
1233        if let Some(ref key) = listen_key
1234            && let Err(e) = self.http_client.close_listen_key(key).await
1235        {
1236            log::warn!("Failed to close listen key: {e}");
1237        }
1238        *self.listen_key.write().expect(MUTEX_POISONED) = None;
1239
1240        self.abort_pending_tasks();
1241
1242        self.core.set_disconnected();
1243        log::info!("Disconnected: client_id={}", self.core.client_id);
1244        Ok(())
1245    }
1246
1247    async fn generate_order_status_report(
1248        &self,
1249        cmd: &GenerateOrderStatusReport,
1250    ) -> anyhow::Result<Option<OrderStatusReport>> {
1251        let Some(instrument_id) = cmd.instrument_id else {
1252            log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
1253            return Ok(None);
1254        };
1255
1256        let symbol = instrument_id.symbol.to_string();
1257        let order_id = cmd
1258            .venue_order_id
1259            .as_ref()
1260            .map(|id| {
1261                id.inner()
1262                    .parse::<i64>()
1263                    .context("failed to parse venue_order_id as numeric")
1264            })
1265            .transpose()?;
1266        let orig_client_order_id = cmd
1267            .client_order_id
1268            .map(|id| encode_broker_id(&id, BINANCE_NAUTILUS_FUTURES_BROKER_ID));
1269
1270        let mut builder = BinanceOrderQueryParamsBuilder::default();
1271        builder.symbol(symbol);
1272
1273        if let Some(oid) = order_id {
1274            builder.order_id(oid);
1275        }
1276
1277        if let Some(ref coid) = orig_client_order_id {
1278            builder.orig_client_order_id(coid.clone());
1279        }
1280        let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1281
1282        let (_, size_precision) = self.get_instrument_precision(instrument_id);
1283        let ts_init = self.clock.get_time_ns();
1284
1285        match self.http_client.query_order(&params).await {
1286            Ok(order) => {
1287                let report = order.to_order_status_report(
1288                    self.core.account_id,
1289                    instrument_id,
1290                    size_precision,
1291                    self.config.treat_expired_as_canceled,
1292                    ts_init,
1293                )?;
1294                Ok(Some(report))
1295            }
1296            Err(BinanceFuturesHttpError::BinanceError { code: -2013, .. }) => {
1297                // Order not found in regular API, try algo order API
1298                let Some(client_order_id) = cmd.client_order_id else {
1299                    return Ok(None);
1300                };
1301
1302                match self.http_client.query_algo_order(client_order_id).await {
1303                    Ok(algo_order) => {
1304                        let report = algo_order.to_order_status_report(
1305                            self.core.account_id,
1306                            instrument_id,
1307                            size_precision,
1308                            ts_init,
1309                        )?;
1310                        Ok(Some(report))
1311                    }
1312                    Err(e) => {
1313                        log::debug!("Algo order query also failed: {e}");
1314                        Ok(None)
1315                    }
1316                }
1317            }
1318            Err(e) => Err(e.into()),
1319        }
1320    }
1321
1322    async fn generate_order_status_reports(
1323        &self,
1324        cmd: &GenerateOrderStatusReports,
1325    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1326        let ts_init = self.clock.get_time_ns();
1327        let mut reports = Vec::new();
1328
1329        if cmd.open_only {
1330            let symbol = cmd.instrument_id.map(|id| id.symbol.to_string());
1331            let mut builder = BinanceOpenOrdersParamsBuilder::default();
1332
1333            if let Some(s) = symbol {
1334                builder.symbol(s);
1335            }
1336            let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1337
1338            let (orders, algo_orders) = tokio::try_join!(
1339                self.http_client.query_open_orders(&params),
1340                self.http_client.query_open_algo_orders(cmd.instrument_id),
1341            )?;
1342
1343            for order in orders {
1344                if let Some(instrument_id) = cmd.instrument_id {
1345                    let (_, size_precision) = self.get_instrument_precision(instrument_id);
1346
1347                    if let Ok(report) = order.to_order_status_report(
1348                        self.core.account_id,
1349                        instrument_id,
1350                        size_precision,
1351                        self.config.treat_expired_as_canceled,
1352                        ts_init,
1353                    ) {
1354                        reports.push(report);
1355                    }
1356                } else {
1357                    let cache = self.core.cache();
1358                    if let Some(instrument) = cache
1359                        .instruments(&BINANCE_VENUE, None)
1360                        .into_iter()
1361                        .find(|i| i.symbol().as_str() == order.symbol.as_str())
1362                        && let Ok(report) = order.to_order_status_report(
1363                            self.core.account_id,
1364                            instrument.id(),
1365                            instrument.size_precision(),
1366                            self.config.treat_expired_as_canceled,
1367                            ts_init,
1368                        )
1369                    {
1370                        reports.push(report);
1371                    }
1372                }
1373            }
1374
1375            for algo_order in algo_orders {
1376                if let Some(instrument_id) = cmd.instrument_id {
1377                    let (_, size_precision) = self.get_instrument_precision(instrument_id);
1378
1379                    if let Ok(report) = algo_order.to_order_status_report(
1380                        self.core.account_id,
1381                        instrument_id,
1382                        size_precision,
1383                        ts_init,
1384                    ) {
1385                        reports.push(report);
1386                    }
1387                } else {
1388                    let cache = self.core.cache();
1389                    if let Some(instrument) = cache
1390                        .instruments(&BINANCE_VENUE, None)
1391                        .into_iter()
1392                        .find(|i| i.symbol().as_str() == algo_order.symbol.as_str())
1393                        && let Ok(report) = algo_order.to_order_status_report(
1394                            self.core.account_id,
1395                            instrument.id(),
1396                            instrument.size_precision(),
1397                            ts_init,
1398                        )
1399                    {
1400                        reports.push(report);
1401                    }
1402                }
1403            }
1404        } else if let Some(instrument_id) = cmd.instrument_id {
1405            let symbol = instrument_id.symbol.to_string();
1406            let start_time = cmd
1407                .start
1408                .map(|t| t.as_i64() / NANOSECONDS_IN_MILLISECOND as i64);
1409            let end_time = cmd
1410                .end
1411                .map(|t| t.as_i64() / NANOSECONDS_IN_MILLISECOND as i64);
1412
1413            let mut builder = BinanceAllOrdersParamsBuilder::default();
1414            builder.symbol(symbol);
1415
1416            if let Some(st) = start_time {
1417                builder.start_time(st);
1418            }
1419
1420            if let Some(et) = end_time {
1421                builder.end_time(et);
1422            }
1423            let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1424
1425            let orders = self.http_client.query_all_orders(&params).await?;
1426            let (_, size_precision) = self.get_instrument_precision(instrument_id);
1427
1428            for order in orders {
1429                if let Ok(report) = order.to_order_status_report(
1430                    self.core.account_id,
1431                    instrument_id,
1432                    size_precision,
1433                    self.config.treat_expired_as_canceled,
1434                    ts_init,
1435                ) {
1436                    reports.push(report);
1437                }
1438            }
1439        }
1440
1441        Ok(reports)
1442    }
1443
1444    async fn generate_fill_reports(
1445        &self,
1446        cmd: GenerateFillReports,
1447    ) -> anyhow::Result<Vec<FillReport>> {
1448        let Some(instrument_id) = cmd.instrument_id else {
1449            log::warn!("generate_fill_reports requires instrument_id for Binance Futures");
1450            return Ok(Vec::new());
1451        };
1452
1453        let symbol = instrument_id.symbol.to_string();
1454        let start_time = cmd
1455            .start
1456            .map(|t| t.as_i64() / NANOSECONDS_IN_MILLISECOND as i64);
1457        let end_time = cmd
1458            .end
1459            .map(|t| t.as_i64() / NANOSECONDS_IN_MILLISECOND as i64);
1460
1461        let mut builder = BinanceUserTradesParamsBuilder::default();
1462        builder.symbol(symbol);
1463
1464        if let Some(st) = start_time {
1465            builder.start_time(st);
1466        }
1467
1468        if let Some(et) = end_time {
1469            builder.end_time(et);
1470        }
1471        let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1472
1473        let trades = self.http_client.query_user_trades(&params).await?;
1474        let (price_precision, size_precision) = self.get_instrument_precision(instrument_id);
1475        let ts_init = self.clock.get_time_ns();
1476
1477        let mut reports = Vec::new();
1478
1479        for trade in trades {
1480            if let Ok(report) = trade.to_fill_report(
1481                self.core.account_id,
1482                instrument_id,
1483                price_precision,
1484                size_precision,
1485                ts_init,
1486            ) {
1487                reports.push(report);
1488            }
1489        }
1490
1491        Ok(reports)
1492    }
1493
1494    async fn generate_position_status_reports(
1495        &self,
1496        cmd: &GeneratePositionStatusReports,
1497    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1498        let symbol = cmd.instrument_id.map(|id| id.symbol.to_string());
1499
1500        let mut builder = BinancePositionRiskParamsBuilder::default();
1501
1502        if let Some(s) = symbol {
1503            builder.symbol(s);
1504        }
1505        let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1506
1507        let positions = self.http_client.query_positions(&params).await?;
1508
1509        let mut reports = Vec::new();
1510
1511        for position in positions {
1512            let position_amt: f64 = position.position_amt.parse().unwrap_or(0.0);
1513            if position_amt == 0.0 {
1514                continue;
1515            }
1516
1517            let cache = self.core.cache();
1518            if let Some(instrument) = cache
1519                .instruments(&BINANCE_VENUE, None)
1520                .into_iter()
1521                .find(|i| i.symbol().as_str() == position.symbol.as_str())
1522                && let Ok(report) = self.create_position_report(
1523                    &position,
1524                    instrument.id(),
1525                    instrument.size_precision(),
1526                )
1527            {
1528                reports.push(report);
1529            }
1530        }
1531
1532        Ok(reports)
1533    }
1534
1535    async fn generate_mass_status(
1536        &self,
1537        lookback_mins: Option<u64>,
1538    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1539        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
1540
1541        let ts_now = self.clock.get_time_ns();
1542
1543        let start = lookback_mins.map(|mins| {
1544            let lookback_ns = mins_to_nanos(mins);
1545            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1546        });
1547
1548        let order_cmd = GenerateOrderStatusReportsBuilder::default()
1549            .ts_init(ts_now)
1550            .open_only(true)
1551            .start(start)
1552            .build()
1553            .map_err(|e| anyhow::anyhow!("{e}"))?;
1554
1555        let position_cmd = GeneratePositionStatusReportsBuilder::default()
1556            .ts_init(ts_now)
1557            .start(start)
1558            .build()
1559            .map_err(|e| anyhow::anyhow!("{e}"))?;
1560
1561        let (order_reports, position_reports) = tokio::try_join!(
1562            self.generate_order_status_reports(&order_cmd),
1563            self.generate_position_status_reports(&position_cmd),
1564        )?;
1565
1566        log::info!("Received {} OrderStatusReports", order_reports.len());
1567        log::info!("Received {} PositionReports", position_reports.len());
1568
1569        let mut mass_status = ExecutionMassStatus::new(
1570            self.core.client_id,
1571            self.core.account_id,
1572            *BINANCE_VENUE,
1573            ts_now,
1574            None,
1575        );
1576
1577        mass_status.add_order_reports(order_reports);
1578        mass_status.add_position_reports(position_reports);
1579
1580        Ok(Some(mass_status))
1581    }
1582
1583    fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
1584        self.update_account_state();
1585        Ok(())
1586    }
1587
1588    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
1589        log::debug!("query_order: client_order_id={}", cmd.client_order_id);
1590
1591        let http_client = self.http_client.clone();
1592        let command = cmd;
1593        let emitter = self.emitter.clone();
1594        let account_id = self.core.account_id;
1595        let clock = self.clock;
1596
1597        let symbol = command.instrument_id.symbol.to_string();
1598        let order_id = command
1599            .venue_order_id
1600            .map(|id| {
1601                id.inner()
1602                    .parse::<i64>()
1603                    .map_err(|e| anyhow::anyhow!("failed to parse venue_order_id: {e}"))
1604            })
1605            .transpose()?;
1606        let orig_client_order_id = Some(encode_broker_id(
1607            &command.client_order_id,
1608            BINANCE_NAUTILUS_FUTURES_BROKER_ID,
1609        ));
1610        let (_, size_precision) = self.get_instrument_precision(command.instrument_id);
1611        let treat_expired_as_canceled = self.config.treat_expired_as_canceled;
1612
1613        self.spawn_task("query_order", async move {
1614            let mut builder = BinanceOrderQueryParamsBuilder::default();
1615            builder.symbol(symbol.clone());
1616
1617            if let Some(oid) = order_id {
1618                builder.order_id(oid);
1619            }
1620
1621            if let Some(coid) = orig_client_order_id {
1622                builder.orig_client_order_id(coid);
1623            }
1624            let params = builder
1625                .build()
1626                .map_err(|e| anyhow::anyhow!("failed to build order query params: {e}"))?;
1627
1628            let result = http_client.query_order(&params).await;
1629
1630            match result {
1631                Ok(order) => {
1632                    let ts_init = clock.get_time_ns();
1633                    let report = order.to_order_status_report(
1634                        account_id,
1635                        command.instrument_id,
1636                        size_precision,
1637                        treat_expired_as_canceled,
1638                        ts_init,
1639                    )?;
1640
1641                    emitter.send_order_status_report(report);
1642                }
1643                Err(e) => log::warn!("Failed to query order status: {e}"),
1644            }
1645
1646            Ok(())
1647        });
1648
1649        Ok(())
1650    }
1651
1652    fn generate_account_state(
1653        &self,
1654        balances: Vec<AccountBalance>,
1655        margins: Vec<MarginBalance>,
1656        reported: bool,
1657        ts_event: UnixNanos,
1658    ) -> anyhow::Result<()> {
1659        self.emitter
1660            .emit_account_state(balances, margins, reported, ts_event);
1661        Ok(())
1662    }
1663
1664    fn start(&mut self) -> anyhow::Result<()> {
1665        if self.core.is_started() {
1666            return Ok(());
1667        }
1668
1669        self.emitter.set_sender(get_exec_event_sender());
1670        self.core.set_started();
1671
1672        let http_client = self.http_client.clone();
1673
1674        get_runtime().spawn(async move {
1675            match http_client.request_instruments().await {
1676                Ok(instruments) => {
1677                    if instruments.is_empty() {
1678                        log::warn!("No instruments returned for Binance Futures");
1679                    } else {
1680                        log::info!("Loaded {} Futures instruments", instruments.len());
1681                    }
1682                }
1683                Err(e) => {
1684                    log::error!("Failed to request Binance Futures instruments: {e}");
1685                }
1686            }
1687        });
1688
1689        log::info!(
1690            "Started: client_id={}, account_id={}, account_type={:?}, environment={:?}",
1691            self.core.client_id,
1692            self.core.account_id,
1693            self.core.account_type,
1694            self.config.environment,
1695        );
1696        Ok(())
1697    }
1698
1699    fn stop(&mut self) -> anyhow::Result<()> {
1700        if self.core.is_stopped() {
1701            return Ok(());
1702        }
1703
1704        self.cancellation_token.cancel();
1705
1706        if let Some(handle) = self.ws_trading_handle.lock().expect(MUTEX_POISONED).take() {
1707            handle.abort();
1708        }
1709
1710        if let Some(handle) = self.ws_task.lock().expect(MUTEX_POISONED).take() {
1711            handle.abort();
1712        }
1713
1714        if let Some(handle) = self.keepalive_task.lock().expect(MUTEX_POISONED).take() {
1715            handle.abort();
1716        }
1717
1718        self.recovery_tx.lock().expect(MUTEX_POISONED).take();
1719        if let Some(handle) = self.recovery_task.lock().expect(MUTEX_POISONED).take() {
1720            handle.abort();
1721        }
1722
1723        self.abort_pending_tasks();
1724        self.core.set_stopped();
1725        self.core.set_disconnected();
1726        log::info!("Stopped: client_id={}", self.core.client_id);
1727        Ok(())
1728    }
1729
1730    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
1731        let order = self
1732            .core
1733            .cache()
1734            .order(&cmd.client_order_id)
1735            .cloned()
1736            .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
1737
1738        if order.is_closed() {
1739            let client_order_id = order.client_order_id();
1740            log::warn!("Cannot submit closed order {client_order_id}");
1741            return Ok(());
1742        }
1743
1744        // Validate before submission (Initialized -> Denied is valid,
1745        // but Submitted -> Denied is not, so validate before emitting OrderSubmitted)
1746        if let Some(offset_type) = order.trailing_offset_type() {
1747            if offset_type != TrailingOffsetType::BasisPoints {
1748                anyhow::bail!(
1749                    "Binance only supports TrailingOffsetType::BasisPoints, received {offset_type:?}"
1750                );
1751            }
1752
1753            if let Some(offset) = order.trailing_offset() {
1754                trailing_offset_to_callback_rate(offset)?;
1755            }
1756        }
1757
1758        let close_position = cmd
1759            .params
1760            .as_ref()
1761            .and_then(|p| p.get_bool("close_position"))
1762            .unwrap_or(false);
1763
1764        if close_position {
1765            let order_type = order.order_type();
1766
1767            if !matches!(
1768                order_type,
1769                OrderType::StopMarket | OrderType::MarketIfTouched
1770            ) {
1771                anyhow::bail!(
1772                    "`close_position` is not supported for order type {order_type:?} on Binance"
1773                );
1774            }
1775
1776            if order.is_reduce_only() {
1777                anyhow::bail!("`close_position` cannot be combined with `reduce_only` on Binance");
1778            }
1779        }
1780
1781        if let Some(pm_str) = cmd.params.as_ref().and_then(|p| p.get_str("price_match")) {
1782            BinancePriceMatch::from_param(pm_str)?;
1783            let order_type = order.order_type();
1784            anyhow::ensure!(
1785                !order.is_post_only(),
1786                "price_match cannot be combined with post-only orders"
1787            );
1788            anyhow::ensure!(
1789                order_type == OrderType::Limit,
1790                "price_match is not supported for order type {order_type:?}"
1791            );
1792        }
1793
1794        log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
1795        self.emitter.emit_order_submitted(&order);
1796
1797        self.submit_order_internal(&cmd)
1798    }
1799
1800    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
1801        log::warn!(
1802            "submit_order_list not yet implemented for Binance Futures (received {} orders)",
1803            cmd.order_list.client_order_ids.len()
1804        );
1805        Ok(())
1806    }
1807
1808    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1809        let order = {
1810            let cache = self.core.cache();
1811            cache.order(&cmd.client_order_id).cloned()
1812        };
1813
1814        let Some(order) = order else {
1815            log::warn!(
1816                "Cannot modify order {}: not found in cache",
1817                cmd.client_order_id
1818            );
1819            let ts_init = self.clock.get_time_ns();
1820            let rejected_event = OrderModifyRejected::new(
1821                self.core.trader_id,
1822                cmd.strategy_id,
1823                cmd.instrument_id,
1824                cmd.client_order_id,
1825                "Order not found in cache for modify".into(),
1826                UUID4::new(),
1827                ts_init, // no venue timestamp, rejected locally
1828                ts_init,
1829                false,
1830                cmd.venue_order_id,
1831                Some(self.core.account_id),
1832            );
1833
1834            self.emitter
1835                .send_order_event(OrderEventAny::ModifyRejected(rejected_event));
1836            return Ok(());
1837        };
1838
1839        let http_client = self.http_client.clone();
1840        let emitter = self.emitter.clone();
1841        let trader_id = self.core.trader_id;
1842        let account_id = self.core.account_id;
1843        let instrument_id = cmd.instrument_id;
1844        let venue_order_id = cmd.venue_order_id;
1845        let client_order_id = Some(cmd.client_order_id);
1846        let order_side = order.order_side();
1847        let quantity = cmd.quantity.unwrap_or_else(|| order.quantity());
1848        let price = cmd.price.or_else(|| order.price());
1849
1850        let Some(price) = price else {
1851            log::warn!(
1852                "Cannot modify order {}: price required",
1853                cmd.client_order_id
1854            );
1855            let ts_init = self.clock.get_time_ns();
1856            let rejected_event = OrderModifyRejected::new(
1857                self.core.trader_id,
1858                cmd.strategy_id,
1859                cmd.instrument_id,
1860                cmd.client_order_id,
1861                "Price required for order modification".into(),
1862                UUID4::new(),
1863                ts_init, // no venue timestamp, rejected locally
1864                ts_init,
1865                false,
1866                cmd.venue_order_id,
1867                Some(self.core.account_id),
1868            );
1869
1870            self.emitter
1871                .send_order_event(OrderEventAny::ModifyRejected(rejected_event));
1872            return Ok(());
1873        };
1874        let command = cmd;
1875        let clock = self.clock;
1876
1877        if self.ws_trading_active() {
1878            let ws_client = self.ws_trading_client.as_ref().unwrap().clone();
1879            let dispatch_state = self.dispatch_state.clone();
1880
1881            let binance_side = BinanceSide::try_from(order_side)?;
1882            let orig_client_order_id =
1883                client_order_id.map(|id| encode_broker_id(&id, BINANCE_NAUTILUS_FUTURES_BROKER_ID));
1884
1885            let mut modify_builder = BinanceModifyOrderParamsBuilder::default();
1886            modify_builder
1887                .symbol(format_binance_symbol(&instrument_id))
1888                .side(binance_side)
1889                .quantity(quantity.to_string())
1890                .price(price.to_string());
1891
1892            if let Some(venue_id) = venue_order_id {
1893                let order_id: i64 = venue_id
1894                    .inner()
1895                    .parse()
1896                    .context("failed to parse venue_order_id as numeric")?;
1897                modify_builder.order_id(order_id);
1898            }
1899
1900            if let Some(client_id) = orig_client_order_id {
1901                modify_builder.orig_client_order_id(client_id);
1902            }
1903
1904            let params = modify_builder
1905                .build()
1906                .context("failed to build modify params")?;
1907
1908            // Pre-register before sending to avoid response racing the insert
1909            let request_id = ws_client.next_request_id();
1910            dispatch_state.pending_requests.insert(
1911                request_id.clone(),
1912                PendingRequest {
1913                    client_order_id: command.client_order_id,
1914                    venue_order_id,
1915                    operation: PendingOperation::Modify,
1916                },
1917            );
1918
1919            self.spawn_task("modify_order_ws", async move {
1920                if let Err(e) = ws_client
1921                    .modify_order_with_id(request_id.clone(), params)
1922                    .await
1923                {
1924                    dispatch_state.pending_requests.remove(&request_id);
1925                    let ts_now = clock.get_time_ns();
1926                    let rejected = OrderModifyRejected::new(
1927                        trader_id,
1928                        command.strategy_id,
1929                        command.instrument_id,
1930                        command.client_order_id,
1931                        format!("ws-modify-order-error: {e}").into(),
1932                        UUID4::new(),
1933                        ts_now,
1934                        ts_now,
1935                        false,
1936                        command.venue_order_id,
1937                        Some(account_id),
1938                    );
1939                    emitter.send_order_event(OrderEventAny::ModifyRejected(rejected));
1940                    anyhow::bail!("WS modify order failed: {e}");
1941                }
1942                Ok(())
1943            });
1944
1945            return Ok(());
1946        }
1947
1948        self.spawn_task("modify_order", async move {
1949            let result = http_client
1950                .modify_order(
1951                    account_id,
1952                    instrument_id,
1953                    venue_order_id,
1954                    client_order_id,
1955                    order_side,
1956                    quantity,
1957                    price,
1958                )
1959                .await;
1960
1961            match result {
1962                Ok(report) => {
1963                    let ts_now = clock.get_time_ns();
1964                    let updated_event = OrderUpdated::new(
1965                        trader_id,
1966                        command.strategy_id,
1967                        command.instrument_id,
1968                        command.client_order_id,
1969                        quantity,
1970                        UUID4::new(),
1971                        ts_now,
1972                        ts_now,
1973                        false,
1974                        Some(report.venue_order_id),
1975                        Some(account_id),
1976                        Some(price),
1977                        None,
1978                        None,
1979                        false, // is_quote_quantity
1980                    );
1981
1982                    emitter.send_order_event(OrderEventAny::Updated(updated_event));
1983                }
1984                Err(e) => {
1985                    let ts_now = clock.get_time_ns();
1986                    let rejected_event = OrderModifyRejected::new(
1987                        trader_id,
1988                        command.strategy_id,
1989                        command.instrument_id,
1990                        command.client_order_id,
1991                        format!("modify-order-failed: {e}").into(),
1992                        UUID4::new(),
1993                        ts_now,
1994                        ts_now,
1995                        false,
1996                        command.venue_order_id,
1997                        Some(account_id),
1998                    );
1999
2000                    emitter.send_order_event(OrderEventAny::ModifyRejected(rejected_event));
2001
2002                    anyhow::bail!("Modify order failed: {e}");
2003                }
2004            }
2005
2006            Ok(())
2007        });
2008
2009        Ok(())
2010    }
2011
2012    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
2013        self.cancel_order_internal(&cmd);
2014        Ok(())
2015    }
2016
2017    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
2018        let http_client = self.http_client.clone();
2019        let instrument_id = cmd.instrument_id;
2020
2021        // USD-M Futures WS Trading API does not expose an openOrders.cancelAll
2022        // method, so regular and algo cancel-all both go through HTTP.
2023        self.spawn_task("cancel_all_orders", async move {
2024            match http_client.cancel_all_orders(instrument_id).await {
2025                Ok(_) => {
2026                    log::info!("Cancel all regular orders request accepted for {instrument_id}");
2027                }
2028                Err(e) => {
2029                    log::error!("Failed to cancel all regular orders for {instrument_id}: {e}");
2030                }
2031            }
2032
2033            match http_client.cancel_all_algo_orders(instrument_id).await {
2034                Ok(()) => {
2035                    log::info!("Cancel all algo orders request accepted for {instrument_id}");
2036                }
2037                Err(e) => {
2038                    log::error!("Failed to cancel all algo orders for {instrument_id}: {e}");
2039                }
2040            }
2041
2042            Ok(())
2043        });
2044
2045        Ok(())
2046    }
2047
2048    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
2049        const BATCH_SIZE: usize = 5;
2050
2051        if cmd.cancels.is_empty() {
2052            return Ok(());
2053        }
2054
2055        let http_client = self.http_client.clone();
2056        let command = cmd;
2057
2058        let emitter = self.emitter.clone();
2059        let trader_id = self.core.trader_id;
2060        let account_id = self.core.account_id;
2061        let clock = self.clock;
2062
2063        self.spawn_task("batch_cancel_orders", async move {
2064            for chunk in command.cancels.chunks(BATCH_SIZE) {
2065                let batch_items: Vec<BatchCancelItem> = chunk
2066                    .iter()
2067                    .map(|cancel| {
2068                        if let Some(venue_order_id) = cancel.venue_order_id {
2069                            let order_id = venue_order_id.inner().parse::<i64>().unwrap_or(0);
2070                            if order_id != 0 {
2071                                BatchCancelItem::by_order_id(
2072                                    command.instrument_id.symbol.to_string(),
2073                                    order_id,
2074                                )
2075                            } else {
2076                                BatchCancelItem::by_client_order_id(
2077                                    command.instrument_id.symbol.to_string(),
2078                                    encode_broker_id(
2079                                        &cancel.client_order_id,
2080                                        BINANCE_NAUTILUS_FUTURES_BROKER_ID,
2081                                    ),
2082                                )
2083                            }
2084                        } else {
2085                            BatchCancelItem::by_client_order_id(
2086                                command.instrument_id.symbol.to_string(),
2087                                encode_broker_id(
2088                                    &cancel.client_order_id,
2089                                    BINANCE_NAUTILUS_FUTURES_BROKER_ID,
2090                                ),
2091                            )
2092                        }
2093                    })
2094                    .collect();
2095
2096                match http_client.batch_cancel_orders(&batch_items).await {
2097                    Ok(results) => {
2098                        for (i, result) in results.iter().enumerate() {
2099                            let cancel = &chunk[i];
2100
2101                            match result {
2102                                BatchOrderResult::Success(response) => {
2103                                    let venue_order_id =
2104                                        VenueOrderId::new(response.order_id.to_string());
2105                                    let canceled_event = OrderCanceled::new(
2106                                        trader_id,
2107                                        cancel.strategy_id,
2108                                        cancel.instrument_id,
2109                                        cancel.client_order_id,
2110                                        UUID4::new(),
2111                                        cancel.ts_init,
2112                                        clock.get_time_ns(),
2113                                        false,
2114                                        Some(venue_order_id),
2115                                        Some(account_id),
2116                                    );
2117
2118                                    emitter
2119                                        .send_order_event(OrderEventAny::Canceled(canceled_event));
2120                                }
2121                                BatchOrderResult::Error(error) => {
2122                                    let rejected_event = OrderCancelRejected::new(
2123                                        trader_id,
2124                                        cancel.strategy_id,
2125                                        cancel.instrument_id,
2126                                        cancel.client_order_id,
2127                                        format!(
2128                                            "batch-cancel-error: code={}, msg={}",
2129                                            error.code, error.msg
2130                                        )
2131                                        .into(),
2132                                        UUID4::new(),
2133                                        clock.get_time_ns(),
2134                                        cancel.ts_init,
2135                                        false,
2136                                        cancel.venue_order_id,
2137                                        Some(account_id),
2138                                    );
2139
2140                                    emitter.send_order_event(OrderEventAny::CancelRejected(
2141                                        rejected_event,
2142                                    ));
2143                                }
2144                            }
2145                        }
2146                    }
2147                    Err(e) => {
2148                        for cancel in chunk {
2149                            let rejected_event = OrderCancelRejected::new(
2150                                trader_id,
2151                                cancel.strategy_id,
2152                                cancel.instrument_id,
2153                                cancel.client_order_id,
2154                                format!("batch-cancel-request-failed: {e}").into(),
2155                                UUID4::new(),
2156                                clock.get_time_ns(),
2157                                cancel.ts_init,
2158                                false,
2159                                cancel.venue_order_id,
2160                                Some(account_id),
2161                            );
2162
2163                            emitter.send_order_event(OrderEventAny::CancelRejected(rejected_event));
2164                        }
2165                    }
2166                }
2167            }
2168
2169            Ok(())
2170        });
2171
2172        Ok(())
2173    }
2174}