1use std::{
19 future::Future,
20 sync::{
21 Arc, Mutex, RwLock,
22 atomic::{AtomicBool, Ordering},
23 },
24 time::Duration,
25};
26
27use anyhow::Context;
28use async_trait::async_trait;
29use dashmap::DashMap;
30use nautilus_common::{
31 cache::fifo::FifoCache,
32 clients::ExecutionClient,
33 live::{get_runtime, runner::get_exec_event_sender},
34 messages::execution::{
35 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
36 GenerateOrderStatusReport, GenerateOrderStatusReports, GenerateOrderStatusReportsBuilder,
37 GeneratePositionStatusReports, GeneratePositionStatusReportsBuilder, ModifyOrder,
38 QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
39 },
40};
41use nautilus_core::{
42 AtomicSet, MUTEX_POISONED, UUID4, UnixNanos,
43 datetime::{NANOSECONDS_IN_MILLISECOND, mins_to_nanos},
44 time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
47use nautilus_model::{
48 accounts::AccountAny,
49 enums::{
50 AccountType, OmsType, OrderType, PositionSideSpecified, TrailingOffsetType, TriggerType,
51 },
52 events::{
53 AccountState, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderModifyRejected,
54 OrderRejected, OrderUpdated,
55 },
56 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, Venue, VenueOrderId},
57 instruments::Instrument,
58 orders::Order,
59 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
60 types::{AccountBalance, Currency, MarginBalance, Money, Quantity},
61};
62use rust_decimal::Decimal;
63use tokio::{sync::Mutex as TokioMutex, task::JoinHandle};
64use tokio_util::sync::CancellationToken;
65
66use super::{
67 http::{
68 BinanceFuturesHttpError,
69 client::{BinanceFuturesHttpClient, BinanceFuturesInstrument, is_algo_order_type},
70 models::{BatchOrderResult, BinancePositionRisk},
71 query::{
72 BatchCancelItem, BinanceAllOrdersParamsBuilder, BinanceOpenOrdersParamsBuilder,
73 BinanceOrderQueryParamsBuilder, BinancePositionRiskParamsBuilder,
74 BinanceSetLeverageParams, BinanceSetMarginTypeParams, BinanceUserTradesParamsBuilder,
75 },
76 },
77 websocket::{
78 streams::{
79 client::BinanceFuturesWebSocketClient,
80 dispatch::{DispatchCtx, dispatch_user_stream_message, spawn_user_stream_dispatch},
81 recovery::{
82 RecoveryCtx, WsBuildParams, build_and_connect_user_stream, run_recovery_driver,
83 },
84 },
85 trading::{client::BinanceFuturesWsTradingClient, dispatch::dispatch_ws_trading_message},
86 },
87};
88use crate::{
89 common::{
90 consts::{
91 BINANCE_FUTURES_USD_WS_API_TESTNET_URL, BINANCE_FUTURES_USD_WS_API_URL,
92 BINANCE_GTX_ORDER_REJECT_CODE, BINANCE_NAUTILUS_FUTURES_BROKER_ID, BINANCE_VENUE,
93 },
94 credential::resolve_credentials,
95 dispatch::{OrderIdentity, PendingOperation, PendingRequest, WsDispatchState},
96 encoder::encode_broker_id,
97 enums::{
98 BinanceEnvironment, BinancePriceMatch, BinanceProductType, BinanceSide,
99 BinanceTimeInForce, BinanceWorkingType,
100 },
101 symbol::format_binance_symbol,
102 urls::{get_usdm_ws_route_base_url, get_ws_private_base_url},
103 },
104 config::BinanceExecClientConfig,
105 futures::{
106 conversions::{
107 determine_position_side, trailing_offset_to_callback_rate,
108 trailing_offset_to_callback_rate_string,
109 },
110 http::{
111 client::order_type_to_binance_futures,
112 models::BinanceFuturesAccountInfo,
113 query::{
114 BinanceCancelOrderParamsBuilder, BinanceModifyOrderParamsBuilder,
115 BinanceNewOrderParams,
116 },
117 },
118 },
119};
120
121const LISTEN_KEY_KEEPALIVE_SECS: u64 = 30 * 60;
123
124const MAX_KEEPALIVE_FAILURES: u32 = 1;
126
127#[derive(Debug)]
136pub struct BinanceFuturesExecutionClient {
137 core: ExecutionClientCore,
138 clock: &'static AtomicTime,
139 config: BinanceExecClientConfig,
140 emitter: ExecutionEventEmitter,
141 dispatch_state: Arc<WsDispatchState>,
142 product_type: BinanceProductType,
143 http_client: BinanceFuturesHttpClient,
144 ws_client: Arc<TokioMutex<Option<BinanceFuturesWebSocketClient>>>,
145 ws_trading_client: Option<BinanceFuturesWsTradingClient>,
146 ws_trading_handle: Mutex<Option<JoinHandle<()>>>,
147 listen_key: Arc<RwLock<Option<String>>>,
148 cancellation_token: CancellationToken,
149 triggered_algo_order_ids: Arc<AtomicSet<ClientOrderId>>,
150 algo_client_order_ids: Arc<AtomicSet<ClientOrderId>>,
151 ws_task: Arc<Mutex<Option<JoinHandle<()>>>>,
152 keepalive_task: Mutex<Option<JoinHandle<()>>>,
153 recovery_task: Mutex<Option<JoinHandle<()>>>,
154 recovery_lock: Arc<TokioMutex<()>>,
155 recovery_tx: Mutex<Option<tokio::sync::mpsc::UnboundedSender<()>>>,
156 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
157 is_hedge_mode: AtomicBool,
158}
159
160impl BinanceFuturesExecutionClient {
161 pub fn new(core: ExecutionClientCore, config: BinanceExecClientConfig) -> anyhow::Result<Self> {
167 let product_type = config
168 .product_types
169 .iter()
170 .find(|pt| matches!(pt, BinanceProductType::UsdM | BinanceProductType::CoinM))
171 .copied()
172 .unwrap_or(BinanceProductType::UsdM);
173
174 let (api_key, api_secret) = resolve_credentials(
175 config.api_key.clone(),
176 config.api_secret.clone(),
177 config.environment,
178 product_type,
179 )?;
180
181 let clock = get_atomic_clock_realtime();
182
183 let http_client = BinanceFuturesHttpClient::new(
184 product_type,
185 config.environment,
186 clock,
187 Some(api_key.clone()),
188 Some(api_secret.clone()),
189 config.base_url_http.clone(),
190 None, None, None, config.treat_expired_as_canceled,
194 )
195 .context("failed to construct Binance Futures HTTP client")?;
196
197 let ws_trading_client = if config.use_ws_trading && product_type == BinanceProductType::UsdM
198 {
199 let ws_trading_url =
200 config
201 .base_url_ws_trading
202 .clone()
203 .or_else(|| match config.environment {
204 BinanceEnvironment::Testnet => {
205 Some(BINANCE_FUTURES_USD_WS_API_TESTNET_URL.to_string())
206 }
207 _ => Some(BINANCE_FUTURES_USD_WS_API_URL.to_string()),
208 });
209
210 Some(BinanceFuturesWsTradingClient::new(
211 ws_trading_url,
212 api_key,
213 api_secret,
214 None, config.transport_backend,
216 ))
217 } else {
218 None
219 };
220
221 let emitter = ExecutionEventEmitter::new(
222 clock,
223 core.trader_id,
224 core.account_id,
225 core.account_type,
226 core.base_currency,
227 );
228
229 Ok(Self {
230 core,
231 clock,
232 config,
233 emitter,
234 dispatch_state: Arc::new(WsDispatchState::default()),
235 product_type,
236 http_client,
237 ws_client: Arc::new(TokioMutex::new(None)),
238 ws_trading_client,
239 ws_trading_handle: Mutex::new(None),
240 listen_key: Arc::new(RwLock::new(None)),
241 cancellation_token: CancellationToken::new(),
242 triggered_algo_order_ids: Arc::new(AtomicSet::new()),
243 algo_client_order_ids: Arc::new(AtomicSet::new()),
244 ws_task: Arc::new(Mutex::new(None)),
245 keepalive_task: Mutex::new(None),
246 recovery_task: Mutex::new(None),
247 recovery_lock: Arc::new(TokioMutex::new(())),
248 recovery_tx: Mutex::new(None),
249 pending_tasks: Mutex::new(Vec::new()),
250 is_hedge_mode: AtomicBool::new(false),
251 })
252 }
253
254 #[must_use]
256 pub fn is_hedge_mode(&self) -> bool {
257 self.is_hedge_mode.load(Ordering::Acquire)
258 }
259
260 #[doc(hidden)]
262 #[must_use]
263 pub fn instruments_cache(&self) -> Arc<DashMap<ustr::Ustr, BinanceFuturesInstrument>> {
264 self.http_client.instruments_cache()
265 }
266
267 fn create_account_state(&self, account_info: &BinanceFuturesAccountInfo) -> AccountState {
269 Self::create_account_state_from(
270 account_info,
271 self.core.account_id,
272 self.core.account_type,
273 self.clock,
274 )
275 }
276
277 fn create_account_state_from(
278 account_info: &BinanceFuturesAccountInfo,
279 account_id: AccountId,
280 account_type: AccountType,
281 clock: &'static AtomicTime,
282 ) -> AccountState {
283 let ts_now = clock.get_time_ns();
284
285 let balances: Vec<AccountBalance> = account_info
286 .assets
287 .iter()
288 .filter_map(|b| {
289 if b.wallet_balance.is_zero() {
290 return None;
291 }
292
293 let currency = Currency::from(&b.asset);
294 AccountBalance::from_total_and_free(b.wallet_balance, b.available_balance, currency)
295 .ok()
296 })
297 .collect();
298
299 let mut margins: Vec<MarginBalance> = Vec::new();
304
305 for asset in &account_info.assets {
306 let initial_dec = asset.initial_margin.unwrap_or_default();
307 let maint_dec = asset.maint_margin.unwrap_or_default();
308
309 if initial_dec.is_zero() && maint_dec.is_zero() {
310 continue;
311 }
312
313 let currency = Currency::from(&asset.asset);
314 let initial = Money::from_decimal(initial_dec, currency)
315 .unwrap_or_else(|_| Money::zero(currency));
316 let maintenance =
317 Money::from_decimal(maint_dec, currency).unwrap_or_else(|_| Money::zero(currency));
318 margins.push(MarginBalance::new(initial, maintenance, None));
319 }
320
321 AccountState::new(
322 account_id,
323 account_type,
324 balances,
325 margins,
326 true, UUID4::new(),
328 ts_now,
329 ts_now,
330 None, )
332 }
333
334 async fn refresh_account_state(&self) -> anyhow::Result<AccountState> {
335 let account_info = match self.http_client.query_account().await {
336 Ok(info) => info,
337 Err(e) => {
338 log::error!("Binance Futures account state request failed: {e}");
339 anyhow::bail!("Binance Futures account state request failed: {e}");
340 }
341 };
342
343 Ok(self.create_account_state(&account_info))
344 }
345
346 fn update_account_state(&self) {
347 let http_client = self.http_client.clone();
348 let account_id = self.core.account_id;
349 let account_type = self.core.account_type;
350 let emitter = self.emitter.clone();
351 let clock = self.clock;
352
353 self.spawn_task("query_account", async move {
354 let account_info = http_client
355 .query_account()
356 .await
357 .context("Binance Futures account state request failed")?;
358 let account_state =
359 Self::create_account_state_from(&account_info, account_id, account_type, clock);
360 let ts_now = clock.get_time_ns();
361 emitter.emit_account_state(
362 account_state.balances.clone(),
363 account_state.margins.clone(),
364 account_state.is_reported,
365 ts_now,
366 );
367 Ok(())
368 });
369 }
370
371 async fn init_hedge_mode(&self) -> anyhow::Result<bool> {
372 let response = self.http_client.query_hedge_mode().await?;
373 Ok(response.dual_side_position)
374 }
375
376 fn ws_trading_active(&self) -> bool {
378 self.ws_trading_client
379 .as_ref()
380 .is_some_and(|c| c.is_active())
381 }
382
383 fn submit_order_internal(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
384 let order = self
385 .core
386 .cache()
387 .order(&cmd.client_order_id)
388 .cloned()
389 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
390
391 let emitter = self.emitter.clone();
392 let trader_id = self.core.trader_id;
393 let account_id = self.core.account_id;
394 let clock = self.clock;
395 let client_order_id = order.client_order_id();
396 let strategy_id = order.strategy_id();
397 let instrument_id = order.instrument_id();
398 let order_side = order.order_side();
399 let order_type = order.order_type();
400 let quantity = order.quantity();
401 let time_in_force = order.time_in_force();
402 let price = order.price();
403 let trigger_price = order.trigger_price();
404 let reduce_only = order.is_reduce_only();
405 let post_only = order.is_post_only();
406 let activation_price = order.activation_price();
407 let trailing_offset = order.trailing_offset();
408 let trigger_type = order.trigger_type();
409 let position_side = determine_position_side(self.is_hedge_mode(), order_side, reduce_only);
410
411 self.dispatch_state.order_identities.insert(
413 client_order_id,
414 OrderIdentity {
415 instrument_id,
416 strategy_id,
417 order_side,
418 order_type,
419 price,
420 },
421 );
422
423 let use_algo_api = is_algo_order_type(order_type);
424
425 let close_position = cmd
426 .params
427 .as_ref()
428 .and_then(|p| p.get_bool("close_position"))
429 .unwrap_or(false);
430
431 let price_match = cmd
432 .params
433 .as_ref()
434 .and_then(|p| p.get_str("price_match"))
435 .map(BinancePriceMatch::from_param)
436 .transpose()?;
437
438 let callback_rate = trailing_offset
439 .map(trailing_offset_to_callback_rate_string)
440 .transpose()?;
441
442 let working_type = match trigger_type {
443 Some(TriggerType::MarkPrice) => Some(BinanceWorkingType::MarkPrice),
444 Some(TriggerType::LastPrice | TriggerType::Default) => {
445 Some(BinanceWorkingType::ContractPrice)
446 }
447 _ => None,
448 };
449
450 if self.ws_trading_active() && !use_algo_api {
452 let ws_client = self.ws_trading_client.as_ref().unwrap().clone();
453 let dispatch_state = self.dispatch_state.clone();
454 let ts_init = clock.get_time_ns();
455
456 let symbol = format_binance_symbol(&instrument_id);
457 let binance_side = BinanceSide::try_from(order_side)?;
458 let binance_order_type = order_type_to_binance_futures(order_type)?;
459 let binance_tif = if post_only {
460 BinanceTimeInForce::Gtx
461 } else {
462 BinanceTimeInForce::try_from(time_in_force)?
463 };
464
465 let requires_time_in_force = matches!(
466 order_type,
467 OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
468 );
469
470 let client_id_str =
471 encode_broker_id(&client_order_id, BINANCE_NAUTILUS_FUTURES_BROKER_ID);
472
473 let params = BinanceNewOrderParams {
474 symbol,
475 side: binance_side,
476 order_type: binance_order_type,
477 time_in_force: if requires_time_in_force {
478 Some(binance_tif)
479 } else {
480 None
481 },
482 quantity: Some(quantity.to_string()),
483 price: if price_match.is_some() {
484 None
485 } else {
486 price.map(|p| p.to_string())
487 },
488 new_client_order_id: Some(client_id_str),
489 stop_price: trigger_price.map(|p| p.to_string()),
490 reduce_only: if reduce_only { Some(true) } else { None },
491 position_side,
492 close_position: None,
493 activation_price: activation_price.map(|p| p.to_string()),
494 callback_rate,
495 working_type,
496 price_protect: None,
497 new_order_resp_type: None,
498 good_till_date: None,
499 recv_window: None,
500 price_match,
501 self_trade_prevention_mode: None,
502 };
503
504 let request_id = ws_client.next_request_id();
506 dispatch_state.pending_requests.insert(
507 request_id.clone(),
508 PendingRequest {
509 client_order_id,
510 venue_order_id: None,
511 operation: PendingOperation::Place,
512 },
513 );
514
515 self.spawn_task("submit_order_ws", async move {
516 if let Err(e) = ws_client
517 .place_order_with_id(request_id.clone(), params)
518 .await
519 {
520 dispatch_state.pending_requests.remove(&request_id);
521 let rejected = OrderRejected::new(
522 trader_id,
523 strategy_id,
524 instrument_id,
525 client_order_id,
526 account_id,
527 format!("ws-submit-order-error: {e}").into(),
528 UUID4::new(),
529 ts_init,
530 clock.get_time_ns(),
531 false,
532 false,
533 );
534 emitter.send_order_event(OrderEventAny::Rejected(rejected));
535 anyhow::bail!("WS submit order failed: {e}");
536 }
537 Ok(())
538 });
539
540 return Ok(());
541 }
542
543 let http_client = self.http_client.clone();
544
545 self.spawn_task("submit_order", async move {
546 let result = if use_algo_api {
547 http_client
548 .submit_algo_order(
549 account_id,
550 instrument_id,
551 client_order_id,
552 order_side,
553 order_type,
554 quantity,
555 time_in_force,
556 price,
557 trigger_price,
558 reduce_only,
559 close_position,
560 position_side,
561 activation_price,
562 callback_rate,
563 working_type,
564 )
565 .await
566 } else {
567 http_client
568 .submit_order(
569 account_id,
570 instrument_id,
571 client_order_id,
572 order_side,
573 order_type,
574 quantity,
575 time_in_force,
576 price,
577 trigger_price,
578 reduce_only,
579 post_only,
580 position_side,
581 price_match,
582 )
583 .await
584 };
585
586 match result {
587 Ok(report) => {
588 log::debug!(
589 "Order submit accepted: client_order_id={}, venue_order_id={}",
590 client_order_id,
591 report.venue_order_id
592 );
593 }
594 Err(e) => {
595 let due_post_only =
599 e.downcast_ref::<BinanceFuturesHttpError>()
600 .is_some_and(|be| {
601 matches!(
602 be,
603 BinanceFuturesHttpError::BinanceError { code, .. }
604 if *code == BINANCE_GTX_ORDER_REJECT_CODE
605 )
606 });
607 let ts_now = clock.get_time_ns();
608 let rejected_event = OrderRejected::new(
609 trader_id,
610 strategy_id,
611 instrument_id,
612 client_order_id,
613 account_id,
614 format!("submit-order-error: {e}").into(),
615 UUID4::new(),
616 ts_now,
617 ts_now,
618 false,
619 due_post_only,
620 );
621
622 emitter.send_order_event(OrderEventAny::Rejected(rejected_event));
623
624 return Err(e);
625 }
626 }
627
628 Ok(())
629 });
630
631 Ok(())
632 }
633
634 fn cancel_order_internal(&self, cmd: &CancelOrder) {
635 let command = cmd.clone();
636
637 let is_algo = self
639 .core
640 .cache()
641 .order(&command.client_order_id)
642 .is_some_and(|order| is_algo_order_type(order.order_type()));
643 let is_triggered = self
644 .triggered_algo_order_ids
645 .contains(&command.client_order_id);
646 let use_algo_cancel = is_algo && !is_triggered;
647
648 let emitter = self.emitter.clone();
649 let trader_id = self.core.trader_id;
650 let account_id = self.core.account_id;
651 let clock = self.clock;
652 let instrument_id = command.instrument_id;
653 let venue_order_id = command.venue_order_id;
654 let client_order_id = command.client_order_id;
655
656 if self.ws_trading_active() && !use_algo_cancel {
658 let ws_client = self.ws_trading_client.as_ref().unwrap().clone();
659 let dispatch_state = self.dispatch_state.clone();
660
661 let mut cancel_builder = BinanceCancelOrderParamsBuilder::default();
662 cancel_builder.symbol(instrument_id.symbol.to_string());
663
664 if let Some(venue_id) = venue_order_id {
665 match venue_id.inner().parse::<i64>() {
666 Ok(order_id) => {
667 cancel_builder.order_id(order_id);
668 }
669 Err(e) => {
670 let ts_now = clock.get_time_ns();
671 let rejected = OrderCancelRejected::new(
672 trader_id,
673 command.strategy_id,
674 instrument_id,
675 client_order_id,
676 format!("failed to parse venue_order_id: {e}").into(),
677 UUID4::new(),
678 ts_now,
679 ts_now,
680 false,
681 venue_order_id,
682 Some(account_id),
683 );
684 emitter.send_order_event(OrderEventAny::CancelRejected(rejected));
685 return;
686 }
687 }
688 }
689
690 cancel_builder.orig_client_order_id(encode_broker_id(
691 &client_order_id,
692 BINANCE_NAUTILUS_FUTURES_BROKER_ID,
693 ));
694
695 let params = cancel_builder.build().unwrap();
696
697 let request_id = ws_client.next_request_id();
699 dispatch_state.pending_requests.insert(
700 request_id.clone(),
701 PendingRequest {
702 client_order_id,
703 venue_order_id,
704 operation: PendingOperation::Cancel,
705 },
706 );
707
708 self.spawn_task("cancel_order_ws", async move {
709 if let Err(e) = ws_client
710 .cancel_order_with_id(request_id.clone(), params)
711 .await
712 {
713 dispatch_state.pending_requests.remove(&request_id);
714 let ts_now = clock.get_time_ns();
715 let rejected = OrderCancelRejected::new(
716 trader_id,
717 command.strategy_id,
718 command.instrument_id,
719 client_order_id,
720 format!("ws-cancel-order-error: {e}").into(),
721 UUID4::new(),
722 ts_now,
723 ts_now,
724 false,
725 command.venue_order_id,
726 Some(account_id),
727 );
728 emitter.send_order_event(OrderEventAny::CancelRejected(rejected));
729 anyhow::bail!("WS cancel order failed: {e}");
730 }
731 Ok(())
732 });
733
734 return;
735 }
736
737 let http_client = self.http_client.clone();
738
739 self.spawn_task("cancel_order", async move {
740 let result = if use_algo_cancel {
741 match http_client.cancel_algo_order(client_order_id).await {
744 Ok(()) => Ok(()),
745 Err(algo_err) => {
746 log::debug!("Algo cancel failed, trying regular cancel: {algo_err}");
747 http_client
748 .cancel_order(instrument_id, venue_order_id, Some(client_order_id))
749 .await
750 .map(|_| ())
751 }
752 }
753 } else {
754 http_client
755 .cancel_order(instrument_id, venue_order_id, Some(client_order_id))
756 .await
757 .map(|_| ())
758 };
759
760 match result {
761 Ok(()) => {
762 log::debug!("Cancel request accepted: client_order_id={client_order_id}");
763 }
764 Err(e) => {
765 let ts_now = clock.get_time_ns();
766 let rejected_event = OrderCancelRejected::new(
767 trader_id,
768 command.strategy_id,
769 command.instrument_id,
770 client_order_id,
771 format!("cancel-order-error: {e}").into(),
772 UUID4::new(),
773 ts_now,
774 ts_now,
775 false,
776 command.venue_order_id,
777 Some(account_id),
778 );
779
780 emitter.send_order_event(OrderEventAny::CancelRejected(rejected_event));
781
782 return Err(e);
783 }
784 }
785
786 Ok(())
787 });
788 }
789
790 fn spawn_task<F>(&self, description: &'static str, fut: F)
791 where
792 F: Future<Output = anyhow::Result<()>> + Send + 'static,
793 {
794 crate::common::execution::spawn_task(&self.pending_tasks, description, fut);
795 }
796
797 fn abort_pending_tasks(&self) {
798 crate::common::execution::abort_pending_tasks(&self.pending_tasks);
799 }
800
801 fn get_instrument_precision(&self, instrument_id: InstrumentId) -> (u8, u8) {
803 let cache = self.core.cache();
804 cache
805 .instrument(&instrument_id)
806 .map_or((8, 8), |i| (i.price_precision(), i.size_precision()))
807 }
808
809 fn create_position_report(
811 &self,
812 position: &BinancePositionRisk,
813 instrument_id: InstrumentId,
814 size_precision: u8,
815 ) -> anyhow::Result<PositionStatusReport> {
816 let position_amount: Decimal = position
817 .position_amt
818 .parse()
819 .context("invalid position_amt")?;
820
821 if position_amount.is_zero() {
822 anyhow::bail!("Position is flat");
823 }
824
825 let entry_price: Decimal = position
826 .entry_price
827 .parse()
828 .context("invalid entry_price")?;
829
830 let position_side = if position_amount > Decimal::ZERO {
831 PositionSideSpecified::Long
832 } else {
833 PositionSideSpecified::Short
834 };
835
836 let ts_now = self.clock.get_time_ns();
837
838 Ok(PositionStatusReport::new(
839 self.core.account_id,
840 instrument_id,
841 position_side,
842 Quantity::new(position_amount.abs().to_string().parse()?, size_precision),
843 ts_now,
844 ts_now,
845 Some(UUID4::new()),
846 None, Some(entry_price),
848 ))
849 }
850
851 async fn apply_futures_config(&self) -> anyhow::Result<()> {
852 if let Some(ref leverages) = self.config.futures_leverages {
853 for (symbol, leverage) in leverages {
854 let params = BinanceSetLeverageParams {
855 symbol: symbol.clone(),
856 leverage: *leverage,
857 recv_window: None,
858 };
859 let response = self
860 .http_client
861 .set_leverage(¶ms)
862 .await
863 .context(format!("failed to set leverage for {symbol}"))?;
864 log::info!("Set leverage {} {}X", response.symbol, response.leverage);
865 }
866 }
867
868 if let Some(ref margin_types) = self.config.futures_margin_types {
869 for (symbol, margin_type) in margin_types {
870 let params = BinanceSetMarginTypeParams {
871 symbol: symbol.clone(),
872 margin_type: *margin_type,
873 recv_window: None,
874 };
875
876 match self.http_client.set_margin_type(¶ms).await {
877 Ok(_) => {
878 log::info!("Set {symbol} margin type to {margin_type:?}");
879 }
880 Err(e) => {
881 let err_str = format!("{e}");
882 if err_str.contains("-4046") {
883 log::info!("{symbol} margin type already {margin_type:?}");
884 } else {
885 return Err(e)
886 .context(format!("failed to set margin type for {symbol}"));
887 }
888 }
889 }
890 }
891 }
892
893 Ok(())
894 }
895}
896
897#[async_trait(?Send)]
898impl ExecutionClient for BinanceFuturesExecutionClient {
899 fn is_connected(&self) -> bool {
900 self.core.is_connected()
901 }
902
903 fn client_id(&self) -> ClientId {
904 self.core.client_id
905 }
906
907 fn account_id(&self) -> AccountId {
908 self.core.account_id
909 }
910
911 fn venue(&self) -> Venue {
912 *BINANCE_VENUE
913 }
914
915 fn oms_type(&self) -> OmsType {
916 self.core.oms_type
917 }
918
919 fn get_account(&self) -> Option<AccountAny> {
920 self.core.cache().account(&self.core.account_id).cloned()
921 }
922
923 async fn connect(&mut self) -> anyhow::Result<()> {
924 if self.core.is_connected() {
925 return Ok(());
926 }
927
928 self.cancellation_token = CancellationToken::new();
930
931 let is_hedge_mode = self
933 .init_hedge_mode()
934 .await
935 .context("failed to query hedge mode")?;
936 self.is_hedge_mode.store(is_hedge_mode, Ordering::Release);
937 log::info!("Hedge mode (dual side position): {is_hedge_mode}");
938
939 let _instruments = if self.core.instruments_initialized() {
941 Vec::new()
942 } else {
943 let instruments = self
944 .http_client
945 .request_instruments()
946 .await
947 .context("failed to request Binance Futures instruments")?;
948
949 if instruments.is_empty() {
950 log::warn!("No instruments returned for Binance Futures");
951 } else {
952 log::info!("Loaded {} Futures instruments", instruments.len());
953 }
954
955 self.core.set_instruments_initialized();
956 instruments
957 };
958
959 self.apply_futures_config()
961 .await
962 .context("failed to apply futures config")?;
963
964 log::info!("Creating listen key for user data stream...");
966 let listen_key_response = self
967 .http_client
968 .create_listen_key()
969 .await
970 .context("failed to create listen key")?;
971 let listen_key = listen_key_response.listen_key;
972 log::info!("Listen key created successfully");
973
974 {
975 let mut key_guard = self.listen_key.write().expect(MUTEX_POISONED);
976 *key_guard = Some(listen_key.clone());
977 }
978
979 let (api_key, api_secret) = resolve_credentials(
980 self.config.api_key.clone(),
981 self.config.api_secret.clone(),
982 self.config.environment,
983 self.product_type,
984 )?;
985
986 let private_base_url = self.config.base_url_ws.clone().map_or_else(
987 || get_ws_private_base_url(self.product_type, self.config.environment).to_string(),
988 |url| {
989 if self.product_type == BinanceProductType::UsdM
990 && self.config.environment == BinanceEnvironment::Mainnet
991 {
992 get_usdm_ws_route_base_url(&url, "private")
993 } else {
994 url
995 }
996 },
997 );
998
999 let (recovery_tx, recovery_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1000 *self.recovery_tx.lock().expect(MUTEX_POISONED) = Some(recovery_tx.clone());
1001
1002 let seen_trade_ids: Arc<Mutex<FifoCache<(ustr::Ustr, i64), 10_000>>> =
1003 Arc::new(Mutex::new(FifoCache::new()));
1004
1005 let dispatch_ctx = Arc::new(DispatchCtx {
1006 emitter: self.emitter.clone(),
1007 http_client: self.http_client.clone(),
1008 account_id: self.core.account_id,
1009 product_type: self.product_type,
1010 clock: self.clock,
1011 dispatch_state: self.dispatch_state.clone(),
1012 triggered_algo_ids: self.triggered_algo_order_ids.clone(),
1013 algo_client_ids: self.algo_client_order_ids.clone(),
1014 use_position_ids: self.config.use_position_ids,
1015 default_taker_fee: self.config.default_taker_fee,
1016 treat_expired_as_canceled: self.config.treat_expired_as_canceled,
1017 use_trade_lite: self.config.use_trade_lite,
1018 seen_trade_ids,
1019 cancellation_token: self.cancellation_token.clone(),
1020 });
1021
1022 let ws_build_params = WsBuildParams {
1023 product_type: self.product_type,
1024 environment: self.config.environment,
1025 api_key: api_key.clone(),
1026 api_secret: api_secret.clone(),
1027 private_base_url: private_base_url.clone(),
1028 transport_backend: self.config.transport_backend,
1029 };
1030
1031 let ws_client = build_and_connect_user_stream(&ws_build_params, &listen_key).await?;
1032 let stream = ws_client.stream();
1033 *self.ws_client.lock().await = Some(ws_client);
1034
1035 let ws_task = spawn_user_stream_dispatch(
1036 stream,
1037 dispatch_ctx.clone(),
1038 recovery_tx.clone(),
1039 dispatch_user_stream_message,
1040 );
1041 *self.ws_task.lock().expect(MUTEX_POISONED) = Some(ws_task);
1042
1043 {
1045 let http_client = self.http_client.clone();
1046 let listen_key_ref = self.listen_key.clone();
1047 let cancel = self.cancellation_token.clone();
1048 let recovery_tx = recovery_tx.clone();
1049
1050 let keepalive_task = get_runtime().spawn(async move {
1051 let mut interval =
1052 tokio::time::interval(Duration::from_secs(LISTEN_KEY_KEEPALIVE_SECS));
1053 let mut consecutive_failures: u32 = 0;
1054
1055 loop {
1056 tokio::select! {
1057 _ = interval.tick() => {
1058 let key = {
1059 let guard = listen_key_ref.read().expect(MUTEX_POISONED);
1060 guard.clone()
1061 };
1062
1063 if let Some(ref key) = key {
1064 match http_client.keepalive_listen_key(key).await {
1065 Ok(()) => {
1066 log::debug!("Listen key keepalive sent successfully");
1067 consecutive_failures = 0;
1068 }
1069 Err(e) => {
1070 consecutive_failures += 1;
1071 log::warn!(
1072 "Listen key keepalive failed ({consecutive_failures}/{MAX_KEEPALIVE_FAILURES}): {e}",
1073 );
1074
1075 if consecutive_failures >= MAX_KEEPALIVE_FAILURES
1076 && recovery_tx.send(()).is_err()
1077 {
1078 log::warn!(
1079 "Recovery channel closed, keepalive exiting",
1080 );
1081 break;
1082 }
1083 }
1084 }
1085 }
1086 }
1087 () = cancel.cancelled() => {
1088 log::debug!("Listen key keepalive task cancelled");
1089 break;
1090 }
1091 }
1092 }
1093 });
1094 *self.keepalive_task.lock().expect(MUTEX_POISONED) = Some(keepalive_task);
1095 }
1096
1097 {
1099 let recovery_ctx = RecoveryCtx {
1100 http_client: self.http_client.clone(),
1101 listen_key: self.listen_key.clone(),
1102 ws_client: self.ws_client.clone(),
1103 ws_task: self.ws_task.clone(),
1104 recovery_lock: self.recovery_lock.clone(),
1105 ws_build_params,
1106 dispatch_ctx,
1107 recovery_tx: recovery_tx.clone(),
1108 };
1109 let cancel = self.cancellation_token.clone();
1110
1111 let recovery_task = get_runtime().spawn(async move {
1112 run_recovery_driver(
1113 recovery_ctx,
1114 recovery_rx,
1115 cancel,
1116 dispatch_user_stream_message,
1117 )
1118 .await;
1119 });
1120 *self.recovery_task.lock().expect(MUTEX_POISONED) = Some(recovery_task);
1121 }
1122
1123 let account_state = self
1125 .refresh_account_state()
1126 .await
1127 .context("failed to request Binance Futures account state")?;
1128
1129 if !account_state.balances.is_empty() {
1130 log::info!(
1131 "Received account state with {} balance(s) and {} margin(s)",
1132 account_state.balances.len(),
1133 account_state.margins.len()
1134 );
1135 }
1136
1137 self.emitter.send_account_state(account_state);
1138
1139 crate::common::execution::await_account_registered(&self.core, self.core.account_id, 30.0)
1140 .await?;
1141
1142 if let Some(ref mut ws_trading) = self.ws_trading_client {
1144 match ws_trading.connect().await {
1145 Ok(()) => {
1146 log::info!("Connected to Binance Futures WS trading API");
1147
1148 let ws_trading_clone = ws_trading.clone();
1149 let emitter = self.emitter.clone();
1150 let account_id = self.core.account_id;
1151 let clock = self.clock;
1152 let dispatch_state = self.dispatch_state.clone();
1153
1154 let handle = get_runtime().spawn(async move {
1155 while let Some(msg) = ws_trading_clone.recv().await {
1156 dispatch_ws_trading_message(
1157 msg,
1158 &emitter,
1159 account_id,
1160 clock,
1161 &dispatch_state,
1162 );
1163 }
1164 });
1165
1166 *self.ws_trading_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1167 }
1168 Err(e) => {
1169 log::error!(
1170 "Failed to connect WS trading API: {e}. \
1171 Order operations will use HTTP fallback"
1172 );
1173 }
1174 }
1175 }
1176
1177 self.core.set_connected();
1178 log::info!("Connected: client_id={}", self.core.client_id);
1179 Ok(())
1180 }
1181
1182 async fn disconnect(&mut self) -> anyhow::Result<()> {
1183 if self.core.is_disconnected() {
1184 return Ok(());
1185 }
1186
1187 self.recovery_tx.lock().expect(MUTEX_POISONED).take();
1189
1190 self.cancellation_token.cancel();
1192
1193 if let Some(handle) = self.ws_trading_handle.lock().expect(MUTEX_POISONED).take() {
1195 handle.abort();
1196 }
1197
1198 if let Some(ref mut ws_trading) = self.ws_trading_client {
1199 ws_trading.disconnect().await;
1200 }
1201
1202 let ws_task = self.ws_task.lock().expect(MUTEX_POISONED).take();
1204 if let Some(task) = ws_task {
1205 let _ = task.await;
1206 }
1207
1208 let keepalive_task = self.keepalive_task.lock().expect(MUTEX_POISONED).take();
1212 if let Some(task) = keepalive_task {
1213 task.abort();
1214 let _ = task.await;
1215 }
1216
1217 let recovery_task = self.recovery_task.lock().expect(MUTEX_POISONED).take();
1221 if let Some(task) = recovery_task {
1222 task.abort();
1223 let _ = task.await;
1224 }
1225
1226 if let Some(mut ws_client) = self.ws_client.lock().await.take() {
1228 let _ = ws_client.close().await;
1229 }
1230
1231 let listen_key = self.listen_key.read().expect(MUTEX_POISONED).clone();
1233 if let Some(ref key) = listen_key
1234 && let Err(e) = self.http_client.close_listen_key(key).await
1235 {
1236 log::warn!("Failed to close listen key: {e}");
1237 }
1238 *self.listen_key.write().expect(MUTEX_POISONED) = None;
1239
1240 self.abort_pending_tasks();
1241
1242 self.core.set_disconnected();
1243 log::info!("Disconnected: client_id={}", self.core.client_id);
1244 Ok(())
1245 }
1246
1247 async fn generate_order_status_report(
1248 &self,
1249 cmd: &GenerateOrderStatusReport,
1250 ) -> anyhow::Result<Option<OrderStatusReport>> {
1251 let Some(instrument_id) = cmd.instrument_id else {
1252 log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
1253 return Ok(None);
1254 };
1255
1256 let symbol = instrument_id.symbol.to_string();
1257 let order_id = cmd
1258 .venue_order_id
1259 .as_ref()
1260 .map(|id| {
1261 id.inner()
1262 .parse::<i64>()
1263 .context("failed to parse venue_order_id as numeric")
1264 })
1265 .transpose()?;
1266 let orig_client_order_id = cmd
1267 .client_order_id
1268 .map(|id| encode_broker_id(&id, BINANCE_NAUTILUS_FUTURES_BROKER_ID));
1269
1270 let mut builder = BinanceOrderQueryParamsBuilder::default();
1271 builder.symbol(symbol);
1272
1273 if let Some(oid) = order_id {
1274 builder.order_id(oid);
1275 }
1276
1277 if let Some(ref coid) = orig_client_order_id {
1278 builder.orig_client_order_id(coid.clone());
1279 }
1280 let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1281
1282 let (_, size_precision) = self.get_instrument_precision(instrument_id);
1283 let ts_init = self.clock.get_time_ns();
1284
1285 match self.http_client.query_order(¶ms).await {
1286 Ok(order) => {
1287 let report = order.to_order_status_report(
1288 self.core.account_id,
1289 instrument_id,
1290 size_precision,
1291 self.config.treat_expired_as_canceled,
1292 ts_init,
1293 )?;
1294 Ok(Some(report))
1295 }
1296 Err(BinanceFuturesHttpError::BinanceError { code: -2013, .. }) => {
1297 let Some(client_order_id) = cmd.client_order_id else {
1299 return Ok(None);
1300 };
1301
1302 match self.http_client.query_algo_order(client_order_id).await {
1303 Ok(algo_order) => {
1304 let report = algo_order.to_order_status_report(
1305 self.core.account_id,
1306 instrument_id,
1307 size_precision,
1308 ts_init,
1309 )?;
1310 Ok(Some(report))
1311 }
1312 Err(e) => {
1313 log::debug!("Algo order query also failed: {e}");
1314 Ok(None)
1315 }
1316 }
1317 }
1318 Err(e) => Err(e.into()),
1319 }
1320 }
1321
1322 async fn generate_order_status_reports(
1323 &self,
1324 cmd: &GenerateOrderStatusReports,
1325 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1326 let ts_init = self.clock.get_time_ns();
1327 let mut reports = Vec::new();
1328
1329 if cmd.open_only {
1330 let symbol = cmd.instrument_id.map(|id| id.symbol.to_string());
1331 let mut builder = BinanceOpenOrdersParamsBuilder::default();
1332
1333 if let Some(s) = symbol {
1334 builder.symbol(s);
1335 }
1336 let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1337
1338 let (orders, algo_orders) = tokio::try_join!(
1339 self.http_client.query_open_orders(¶ms),
1340 self.http_client.query_open_algo_orders(cmd.instrument_id),
1341 )?;
1342
1343 for order in orders {
1344 if let Some(instrument_id) = cmd.instrument_id {
1345 let (_, size_precision) = self.get_instrument_precision(instrument_id);
1346
1347 if let Ok(report) = order.to_order_status_report(
1348 self.core.account_id,
1349 instrument_id,
1350 size_precision,
1351 self.config.treat_expired_as_canceled,
1352 ts_init,
1353 ) {
1354 reports.push(report);
1355 }
1356 } else {
1357 let cache = self.core.cache();
1358 if let Some(instrument) = cache
1359 .instruments(&BINANCE_VENUE, None)
1360 .into_iter()
1361 .find(|i| i.symbol().as_str() == order.symbol.as_str())
1362 && let Ok(report) = order.to_order_status_report(
1363 self.core.account_id,
1364 instrument.id(),
1365 instrument.size_precision(),
1366 self.config.treat_expired_as_canceled,
1367 ts_init,
1368 )
1369 {
1370 reports.push(report);
1371 }
1372 }
1373 }
1374
1375 for algo_order in algo_orders {
1376 if let Some(instrument_id) = cmd.instrument_id {
1377 let (_, size_precision) = self.get_instrument_precision(instrument_id);
1378
1379 if let Ok(report) = algo_order.to_order_status_report(
1380 self.core.account_id,
1381 instrument_id,
1382 size_precision,
1383 ts_init,
1384 ) {
1385 reports.push(report);
1386 }
1387 } else {
1388 let cache = self.core.cache();
1389 if let Some(instrument) = cache
1390 .instruments(&BINANCE_VENUE, None)
1391 .into_iter()
1392 .find(|i| i.symbol().as_str() == algo_order.symbol.as_str())
1393 && let Ok(report) = algo_order.to_order_status_report(
1394 self.core.account_id,
1395 instrument.id(),
1396 instrument.size_precision(),
1397 ts_init,
1398 )
1399 {
1400 reports.push(report);
1401 }
1402 }
1403 }
1404 } else if let Some(instrument_id) = cmd.instrument_id {
1405 let symbol = instrument_id.symbol.to_string();
1406 let start_time = cmd
1407 .start
1408 .map(|t| t.as_i64() / NANOSECONDS_IN_MILLISECOND as i64);
1409 let end_time = cmd
1410 .end
1411 .map(|t| t.as_i64() / NANOSECONDS_IN_MILLISECOND as i64);
1412
1413 let mut builder = BinanceAllOrdersParamsBuilder::default();
1414 builder.symbol(symbol);
1415
1416 if let Some(st) = start_time {
1417 builder.start_time(st);
1418 }
1419
1420 if let Some(et) = end_time {
1421 builder.end_time(et);
1422 }
1423 let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1424
1425 let orders = self.http_client.query_all_orders(¶ms).await?;
1426 let (_, size_precision) = self.get_instrument_precision(instrument_id);
1427
1428 for order in orders {
1429 if let Ok(report) = order.to_order_status_report(
1430 self.core.account_id,
1431 instrument_id,
1432 size_precision,
1433 self.config.treat_expired_as_canceled,
1434 ts_init,
1435 ) {
1436 reports.push(report);
1437 }
1438 }
1439 }
1440
1441 Ok(reports)
1442 }
1443
1444 async fn generate_fill_reports(
1445 &self,
1446 cmd: GenerateFillReports,
1447 ) -> anyhow::Result<Vec<FillReport>> {
1448 let Some(instrument_id) = cmd.instrument_id else {
1449 log::warn!("generate_fill_reports requires instrument_id for Binance Futures");
1450 return Ok(Vec::new());
1451 };
1452
1453 let symbol = instrument_id.symbol.to_string();
1454 let start_time = cmd
1455 .start
1456 .map(|t| t.as_i64() / NANOSECONDS_IN_MILLISECOND as i64);
1457 let end_time = cmd
1458 .end
1459 .map(|t| t.as_i64() / NANOSECONDS_IN_MILLISECOND as i64);
1460
1461 let mut builder = BinanceUserTradesParamsBuilder::default();
1462 builder.symbol(symbol);
1463
1464 if let Some(st) = start_time {
1465 builder.start_time(st);
1466 }
1467
1468 if let Some(et) = end_time {
1469 builder.end_time(et);
1470 }
1471 let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1472
1473 let trades = self.http_client.query_user_trades(¶ms).await?;
1474 let (price_precision, size_precision) = self.get_instrument_precision(instrument_id);
1475 let ts_init = self.clock.get_time_ns();
1476
1477 let mut reports = Vec::new();
1478
1479 for trade in trades {
1480 if let Ok(report) = trade.to_fill_report(
1481 self.core.account_id,
1482 instrument_id,
1483 price_precision,
1484 size_precision,
1485 ts_init,
1486 ) {
1487 reports.push(report);
1488 }
1489 }
1490
1491 Ok(reports)
1492 }
1493
1494 async fn generate_position_status_reports(
1495 &self,
1496 cmd: &GeneratePositionStatusReports,
1497 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1498 let symbol = cmd.instrument_id.map(|id| id.symbol.to_string());
1499
1500 let mut builder = BinancePositionRiskParamsBuilder::default();
1501
1502 if let Some(s) = symbol {
1503 builder.symbol(s);
1504 }
1505 let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1506
1507 let positions = self.http_client.query_positions(¶ms).await?;
1508
1509 let mut reports = Vec::new();
1510
1511 for position in positions {
1512 let position_amt: f64 = position.position_amt.parse().unwrap_or(0.0);
1513 if position_amt == 0.0 {
1514 continue;
1515 }
1516
1517 let cache = self.core.cache();
1518 if let Some(instrument) = cache
1519 .instruments(&BINANCE_VENUE, None)
1520 .into_iter()
1521 .find(|i| i.symbol().as_str() == position.symbol.as_str())
1522 && let Ok(report) = self.create_position_report(
1523 &position,
1524 instrument.id(),
1525 instrument.size_precision(),
1526 )
1527 {
1528 reports.push(report);
1529 }
1530 }
1531
1532 Ok(reports)
1533 }
1534
1535 async fn generate_mass_status(
1536 &self,
1537 lookback_mins: Option<u64>,
1538 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1539 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
1540
1541 let ts_now = self.clock.get_time_ns();
1542
1543 let start = lookback_mins.map(|mins| {
1544 let lookback_ns = mins_to_nanos(mins);
1545 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1546 });
1547
1548 let order_cmd = GenerateOrderStatusReportsBuilder::default()
1549 .ts_init(ts_now)
1550 .open_only(true)
1551 .start(start)
1552 .build()
1553 .map_err(|e| anyhow::anyhow!("{e}"))?;
1554
1555 let position_cmd = GeneratePositionStatusReportsBuilder::default()
1556 .ts_init(ts_now)
1557 .start(start)
1558 .build()
1559 .map_err(|e| anyhow::anyhow!("{e}"))?;
1560
1561 let (order_reports, position_reports) = tokio::try_join!(
1562 self.generate_order_status_reports(&order_cmd),
1563 self.generate_position_status_reports(&position_cmd),
1564 )?;
1565
1566 log::info!("Received {} OrderStatusReports", order_reports.len());
1567 log::info!("Received {} PositionReports", position_reports.len());
1568
1569 let mut mass_status = ExecutionMassStatus::new(
1570 self.core.client_id,
1571 self.core.account_id,
1572 *BINANCE_VENUE,
1573 ts_now,
1574 None,
1575 );
1576
1577 mass_status.add_order_reports(order_reports);
1578 mass_status.add_position_reports(position_reports);
1579
1580 Ok(Some(mass_status))
1581 }
1582
1583 fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
1584 self.update_account_state();
1585 Ok(())
1586 }
1587
1588 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
1589 log::debug!("query_order: client_order_id={}", cmd.client_order_id);
1590
1591 let http_client = self.http_client.clone();
1592 let command = cmd;
1593 let emitter = self.emitter.clone();
1594 let account_id = self.core.account_id;
1595 let clock = self.clock;
1596
1597 let symbol = command.instrument_id.symbol.to_string();
1598 let order_id = command
1599 .venue_order_id
1600 .map(|id| {
1601 id.inner()
1602 .parse::<i64>()
1603 .map_err(|e| anyhow::anyhow!("failed to parse venue_order_id: {e}"))
1604 })
1605 .transpose()?;
1606 let orig_client_order_id = Some(encode_broker_id(
1607 &command.client_order_id,
1608 BINANCE_NAUTILUS_FUTURES_BROKER_ID,
1609 ));
1610 let (_, size_precision) = self.get_instrument_precision(command.instrument_id);
1611 let treat_expired_as_canceled = self.config.treat_expired_as_canceled;
1612
1613 self.spawn_task("query_order", async move {
1614 let mut builder = BinanceOrderQueryParamsBuilder::default();
1615 builder.symbol(symbol.clone());
1616
1617 if let Some(oid) = order_id {
1618 builder.order_id(oid);
1619 }
1620
1621 if let Some(coid) = orig_client_order_id {
1622 builder.orig_client_order_id(coid);
1623 }
1624 let params = builder
1625 .build()
1626 .map_err(|e| anyhow::anyhow!("failed to build order query params: {e}"))?;
1627
1628 let result = http_client.query_order(¶ms).await;
1629
1630 match result {
1631 Ok(order) => {
1632 let ts_init = clock.get_time_ns();
1633 let report = order.to_order_status_report(
1634 account_id,
1635 command.instrument_id,
1636 size_precision,
1637 treat_expired_as_canceled,
1638 ts_init,
1639 )?;
1640
1641 emitter.send_order_status_report(report);
1642 }
1643 Err(e) => log::warn!("Failed to query order status: {e}"),
1644 }
1645
1646 Ok(())
1647 });
1648
1649 Ok(())
1650 }
1651
1652 fn generate_account_state(
1653 &self,
1654 balances: Vec<AccountBalance>,
1655 margins: Vec<MarginBalance>,
1656 reported: bool,
1657 ts_event: UnixNanos,
1658 ) -> anyhow::Result<()> {
1659 self.emitter
1660 .emit_account_state(balances, margins, reported, ts_event);
1661 Ok(())
1662 }
1663
1664 fn start(&mut self) -> anyhow::Result<()> {
1665 if self.core.is_started() {
1666 return Ok(());
1667 }
1668
1669 self.emitter.set_sender(get_exec_event_sender());
1670 self.core.set_started();
1671
1672 let http_client = self.http_client.clone();
1673
1674 get_runtime().spawn(async move {
1675 match http_client.request_instruments().await {
1676 Ok(instruments) => {
1677 if instruments.is_empty() {
1678 log::warn!("No instruments returned for Binance Futures");
1679 } else {
1680 log::info!("Loaded {} Futures instruments", instruments.len());
1681 }
1682 }
1683 Err(e) => {
1684 log::error!("Failed to request Binance Futures instruments: {e}");
1685 }
1686 }
1687 });
1688
1689 log::info!(
1690 "Started: client_id={}, account_id={}, account_type={:?}, environment={:?}",
1691 self.core.client_id,
1692 self.core.account_id,
1693 self.core.account_type,
1694 self.config.environment,
1695 );
1696 Ok(())
1697 }
1698
1699 fn stop(&mut self) -> anyhow::Result<()> {
1700 if self.core.is_stopped() {
1701 return Ok(());
1702 }
1703
1704 self.cancellation_token.cancel();
1705
1706 if let Some(handle) = self.ws_trading_handle.lock().expect(MUTEX_POISONED).take() {
1707 handle.abort();
1708 }
1709
1710 if let Some(handle) = self.ws_task.lock().expect(MUTEX_POISONED).take() {
1711 handle.abort();
1712 }
1713
1714 if let Some(handle) = self.keepalive_task.lock().expect(MUTEX_POISONED).take() {
1715 handle.abort();
1716 }
1717
1718 self.recovery_tx.lock().expect(MUTEX_POISONED).take();
1719 if let Some(handle) = self.recovery_task.lock().expect(MUTEX_POISONED).take() {
1720 handle.abort();
1721 }
1722
1723 self.abort_pending_tasks();
1724 self.core.set_stopped();
1725 self.core.set_disconnected();
1726 log::info!("Stopped: client_id={}", self.core.client_id);
1727 Ok(())
1728 }
1729
1730 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
1731 let order = self
1732 .core
1733 .cache()
1734 .order(&cmd.client_order_id)
1735 .cloned()
1736 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
1737
1738 if order.is_closed() {
1739 let client_order_id = order.client_order_id();
1740 log::warn!("Cannot submit closed order {client_order_id}");
1741 return Ok(());
1742 }
1743
1744 if let Some(offset_type) = order.trailing_offset_type() {
1747 if offset_type != TrailingOffsetType::BasisPoints {
1748 anyhow::bail!(
1749 "Binance only supports TrailingOffsetType::BasisPoints, received {offset_type:?}"
1750 );
1751 }
1752
1753 if let Some(offset) = order.trailing_offset() {
1754 trailing_offset_to_callback_rate(offset)?;
1755 }
1756 }
1757
1758 let close_position = cmd
1759 .params
1760 .as_ref()
1761 .and_then(|p| p.get_bool("close_position"))
1762 .unwrap_or(false);
1763
1764 if close_position {
1765 let order_type = order.order_type();
1766
1767 if !matches!(
1768 order_type,
1769 OrderType::StopMarket | OrderType::MarketIfTouched
1770 ) {
1771 anyhow::bail!(
1772 "`close_position` is not supported for order type {order_type:?} on Binance"
1773 );
1774 }
1775
1776 if order.is_reduce_only() {
1777 anyhow::bail!("`close_position` cannot be combined with `reduce_only` on Binance");
1778 }
1779 }
1780
1781 if let Some(pm_str) = cmd.params.as_ref().and_then(|p| p.get_str("price_match")) {
1782 BinancePriceMatch::from_param(pm_str)?;
1783 let order_type = order.order_type();
1784 anyhow::ensure!(
1785 !order.is_post_only(),
1786 "price_match cannot be combined with post-only orders"
1787 );
1788 anyhow::ensure!(
1789 order_type == OrderType::Limit,
1790 "price_match is not supported for order type {order_type:?}"
1791 );
1792 }
1793
1794 log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
1795 self.emitter.emit_order_submitted(&order);
1796
1797 self.submit_order_internal(&cmd)
1798 }
1799
1800 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
1801 log::warn!(
1802 "submit_order_list not yet implemented for Binance Futures (received {} orders)",
1803 cmd.order_list.client_order_ids.len()
1804 );
1805 Ok(())
1806 }
1807
1808 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1809 let order = {
1810 let cache = self.core.cache();
1811 cache.order(&cmd.client_order_id).cloned()
1812 };
1813
1814 let Some(order) = order else {
1815 log::warn!(
1816 "Cannot modify order {}: not found in cache",
1817 cmd.client_order_id
1818 );
1819 let ts_init = self.clock.get_time_ns();
1820 let rejected_event = OrderModifyRejected::new(
1821 self.core.trader_id,
1822 cmd.strategy_id,
1823 cmd.instrument_id,
1824 cmd.client_order_id,
1825 "Order not found in cache for modify".into(),
1826 UUID4::new(),
1827 ts_init, ts_init,
1829 false,
1830 cmd.venue_order_id,
1831 Some(self.core.account_id),
1832 );
1833
1834 self.emitter
1835 .send_order_event(OrderEventAny::ModifyRejected(rejected_event));
1836 return Ok(());
1837 };
1838
1839 let http_client = self.http_client.clone();
1840 let emitter = self.emitter.clone();
1841 let trader_id = self.core.trader_id;
1842 let account_id = self.core.account_id;
1843 let instrument_id = cmd.instrument_id;
1844 let venue_order_id = cmd.venue_order_id;
1845 let client_order_id = Some(cmd.client_order_id);
1846 let order_side = order.order_side();
1847 let quantity = cmd.quantity.unwrap_or_else(|| order.quantity());
1848 let price = cmd.price.or_else(|| order.price());
1849
1850 let Some(price) = price else {
1851 log::warn!(
1852 "Cannot modify order {}: price required",
1853 cmd.client_order_id
1854 );
1855 let ts_init = self.clock.get_time_ns();
1856 let rejected_event = OrderModifyRejected::new(
1857 self.core.trader_id,
1858 cmd.strategy_id,
1859 cmd.instrument_id,
1860 cmd.client_order_id,
1861 "Price required for order modification".into(),
1862 UUID4::new(),
1863 ts_init, ts_init,
1865 false,
1866 cmd.venue_order_id,
1867 Some(self.core.account_id),
1868 );
1869
1870 self.emitter
1871 .send_order_event(OrderEventAny::ModifyRejected(rejected_event));
1872 return Ok(());
1873 };
1874 let command = cmd;
1875 let clock = self.clock;
1876
1877 if self.ws_trading_active() {
1878 let ws_client = self.ws_trading_client.as_ref().unwrap().clone();
1879 let dispatch_state = self.dispatch_state.clone();
1880
1881 let binance_side = BinanceSide::try_from(order_side)?;
1882 let orig_client_order_id =
1883 client_order_id.map(|id| encode_broker_id(&id, BINANCE_NAUTILUS_FUTURES_BROKER_ID));
1884
1885 let mut modify_builder = BinanceModifyOrderParamsBuilder::default();
1886 modify_builder
1887 .symbol(format_binance_symbol(&instrument_id))
1888 .side(binance_side)
1889 .quantity(quantity.to_string())
1890 .price(price.to_string());
1891
1892 if let Some(venue_id) = venue_order_id {
1893 let order_id: i64 = venue_id
1894 .inner()
1895 .parse()
1896 .context("failed to parse venue_order_id as numeric")?;
1897 modify_builder.order_id(order_id);
1898 }
1899
1900 if let Some(client_id) = orig_client_order_id {
1901 modify_builder.orig_client_order_id(client_id);
1902 }
1903
1904 let params = modify_builder
1905 .build()
1906 .context("failed to build modify params")?;
1907
1908 let request_id = ws_client.next_request_id();
1910 dispatch_state.pending_requests.insert(
1911 request_id.clone(),
1912 PendingRequest {
1913 client_order_id: command.client_order_id,
1914 venue_order_id,
1915 operation: PendingOperation::Modify,
1916 },
1917 );
1918
1919 self.spawn_task("modify_order_ws", async move {
1920 if let Err(e) = ws_client
1921 .modify_order_with_id(request_id.clone(), params)
1922 .await
1923 {
1924 dispatch_state.pending_requests.remove(&request_id);
1925 let ts_now = clock.get_time_ns();
1926 let rejected = OrderModifyRejected::new(
1927 trader_id,
1928 command.strategy_id,
1929 command.instrument_id,
1930 command.client_order_id,
1931 format!("ws-modify-order-error: {e}").into(),
1932 UUID4::new(),
1933 ts_now,
1934 ts_now,
1935 false,
1936 command.venue_order_id,
1937 Some(account_id),
1938 );
1939 emitter.send_order_event(OrderEventAny::ModifyRejected(rejected));
1940 anyhow::bail!("WS modify order failed: {e}");
1941 }
1942 Ok(())
1943 });
1944
1945 return Ok(());
1946 }
1947
1948 self.spawn_task("modify_order", async move {
1949 let result = http_client
1950 .modify_order(
1951 account_id,
1952 instrument_id,
1953 venue_order_id,
1954 client_order_id,
1955 order_side,
1956 quantity,
1957 price,
1958 )
1959 .await;
1960
1961 match result {
1962 Ok(report) => {
1963 let ts_now = clock.get_time_ns();
1964 let updated_event = OrderUpdated::new(
1965 trader_id,
1966 command.strategy_id,
1967 command.instrument_id,
1968 command.client_order_id,
1969 quantity,
1970 UUID4::new(),
1971 ts_now,
1972 ts_now,
1973 false,
1974 Some(report.venue_order_id),
1975 Some(account_id),
1976 Some(price),
1977 None,
1978 None,
1979 false, );
1981
1982 emitter.send_order_event(OrderEventAny::Updated(updated_event));
1983 }
1984 Err(e) => {
1985 let ts_now = clock.get_time_ns();
1986 let rejected_event = OrderModifyRejected::new(
1987 trader_id,
1988 command.strategy_id,
1989 command.instrument_id,
1990 command.client_order_id,
1991 format!("modify-order-failed: {e}").into(),
1992 UUID4::new(),
1993 ts_now,
1994 ts_now,
1995 false,
1996 command.venue_order_id,
1997 Some(account_id),
1998 );
1999
2000 emitter.send_order_event(OrderEventAny::ModifyRejected(rejected_event));
2001
2002 anyhow::bail!("Modify order failed: {e}");
2003 }
2004 }
2005
2006 Ok(())
2007 });
2008
2009 Ok(())
2010 }
2011
2012 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
2013 self.cancel_order_internal(&cmd);
2014 Ok(())
2015 }
2016
2017 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
2018 let http_client = self.http_client.clone();
2019 let instrument_id = cmd.instrument_id;
2020
2021 self.spawn_task("cancel_all_orders", async move {
2024 match http_client.cancel_all_orders(instrument_id).await {
2025 Ok(_) => {
2026 log::info!("Cancel all regular orders request accepted for {instrument_id}");
2027 }
2028 Err(e) => {
2029 log::error!("Failed to cancel all regular orders for {instrument_id}: {e}");
2030 }
2031 }
2032
2033 match http_client.cancel_all_algo_orders(instrument_id).await {
2034 Ok(()) => {
2035 log::info!("Cancel all algo orders request accepted for {instrument_id}");
2036 }
2037 Err(e) => {
2038 log::error!("Failed to cancel all algo orders for {instrument_id}: {e}");
2039 }
2040 }
2041
2042 Ok(())
2043 });
2044
2045 Ok(())
2046 }
2047
2048 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
2049 const BATCH_SIZE: usize = 5;
2050
2051 if cmd.cancels.is_empty() {
2052 return Ok(());
2053 }
2054
2055 let http_client = self.http_client.clone();
2056 let command = cmd;
2057
2058 let emitter = self.emitter.clone();
2059 let trader_id = self.core.trader_id;
2060 let account_id = self.core.account_id;
2061 let clock = self.clock;
2062
2063 self.spawn_task("batch_cancel_orders", async move {
2064 for chunk in command.cancels.chunks(BATCH_SIZE) {
2065 let batch_items: Vec<BatchCancelItem> = chunk
2066 .iter()
2067 .map(|cancel| {
2068 if let Some(venue_order_id) = cancel.venue_order_id {
2069 let order_id = venue_order_id.inner().parse::<i64>().unwrap_or(0);
2070 if order_id != 0 {
2071 BatchCancelItem::by_order_id(
2072 command.instrument_id.symbol.to_string(),
2073 order_id,
2074 )
2075 } else {
2076 BatchCancelItem::by_client_order_id(
2077 command.instrument_id.symbol.to_string(),
2078 encode_broker_id(
2079 &cancel.client_order_id,
2080 BINANCE_NAUTILUS_FUTURES_BROKER_ID,
2081 ),
2082 )
2083 }
2084 } else {
2085 BatchCancelItem::by_client_order_id(
2086 command.instrument_id.symbol.to_string(),
2087 encode_broker_id(
2088 &cancel.client_order_id,
2089 BINANCE_NAUTILUS_FUTURES_BROKER_ID,
2090 ),
2091 )
2092 }
2093 })
2094 .collect();
2095
2096 match http_client.batch_cancel_orders(&batch_items).await {
2097 Ok(results) => {
2098 for (i, result) in results.iter().enumerate() {
2099 let cancel = &chunk[i];
2100
2101 match result {
2102 BatchOrderResult::Success(response) => {
2103 let venue_order_id =
2104 VenueOrderId::new(response.order_id.to_string());
2105 let canceled_event = OrderCanceled::new(
2106 trader_id,
2107 cancel.strategy_id,
2108 cancel.instrument_id,
2109 cancel.client_order_id,
2110 UUID4::new(),
2111 cancel.ts_init,
2112 clock.get_time_ns(),
2113 false,
2114 Some(venue_order_id),
2115 Some(account_id),
2116 );
2117
2118 emitter
2119 .send_order_event(OrderEventAny::Canceled(canceled_event));
2120 }
2121 BatchOrderResult::Error(error) => {
2122 let rejected_event = OrderCancelRejected::new(
2123 trader_id,
2124 cancel.strategy_id,
2125 cancel.instrument_id,
2126 cancel.client_order_id,
2127 format!(
2128 "batch-cancel-error: code={}, msg={}",
2129 error.code, error.msg
2130 )
2131 .into(),
2132 UUID4::new(),
2133 clock.get_time_ns(),
2134 cancel.ts_init,
2135 false,
2136 cancel.venue_order_id,
2137 Some(account_id),
2138 );
2139
2140 emitter.send_order_event(OrderEventAny::CancelRejected(
2141 rejected_event,
2142 ));
2143 }
2144 }
2145 }
2146 }
2147 Err(e) => {
2148 for cancel in chunk {
2149 let rejected_event = OrderCancelRejected::new(
2150 trader_id,
2151 cancel.strategy_id,
2152 cancel.instrument_id,
2153 cancel.client_order_id,
2154 format!("batch-cancel-request-failed: {e}").into(),
2155 UUID4::new(),
2156 clock.get_time_ns(),
2157 cancel.ts_init,
2158 false,
2159 cancel.venue_order_id,
2160 Some(account_id),
2161 );
2162
2163 emitter.send_order_event(OrderEventAny::CancelRejected(rejected_event));
2164 }
2165 }
2166 }
2167 }
2168
2169 Ok(())
2170 });
2171
2172 Ok(())
2173 }
2174}