1use std::{
19 future::Future,
20 sync::{Arc, Mutex},
21 time::Duration,
22};
23
24use ahash::AHashMap;
25use anyhow::Context;
26use async_trait::async_trait;
27use nautilus_common::{
28 cache::fifo::FifoCache,
29 clients::ExecutionClient,
30 live::{get_runtime, runner::get_exec_event_sender},
31 messages::execution::{
32 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
33 GenerateOrderStatusReport, GenerateOrderStatusReports, GenerateOrderStatusReportsBuilder,
34 GeneratePositionStatusReports, GeneratePositionStatusReportsBuilder, ModifyOrder,
35 QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
36 },
37};
38use nautilus_core::{
39 MUTEX_POISONED, UUID4, UnixNanos,
40 datetime::mins_to_nanos,
41 time::{AtomicTime, get_atomic_clock_realtime},
42};
43use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
44use nautilus_model::{
45 accounts::AccountAny,
46 enums::{LiquiditySide, OmsType, OrderType},
47 events::{
48 AccountState, OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny,
49 OrderFilled, OrderModifyRejected, OrderRejected, OrderUpdated,
50 },
51 identifiers::{
52 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, Venue, VenueOrderId,
53 },
54 instruments::Instrument,
55 orders::Order,
56 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
57 types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
58};
59use tokio::task::JoinHandle;
60use ustr::Ustr;
61
62use super::websocket::trading::{
63 client::BinanceSpotWsTradingClient,
64 messages::BinanceSpotWsTradingMessage,
65 parse::{
66 parse_spot_account_position, parse_spot_exec_report_to_fill,
67 parse_spot_exec_report_to_order_status,
68 },
69 user_data::{BinanceSpotExecutionReport, BinanceSpotExecutionType},
70};
71use crate::{
72 common::{
73 consts::{
74 BINANCE_GTX_ORDER_REJECT_CODE, BINANCE_NAUTILUS_SPOT_BROKER_ID,
75 BINANCE_NEW_ORDER_REJECTED_CODE, BINANCE_SPOT_POST_ONLY_REJECT_MSG, BINANCE_VENUE,
76 },
77 credential::resolve_credentials,
78 dispatch::{
79 OrderIdentity, PendingOperation, PendingRequest, WsDispatchState,
80 ensure_accepted_emitted,
81 },
82 encoder::{decode_broker_id, encode_broker_id},
83 enums::{BinanceProductType, BinanceSide, BinanceTimeInForce},
84 },
85 config::BinanceExecClientConfig,
86 spot::{
87 enums::{
88 BinanceCancelReplaceMode, BinanceOrderResponseType, BinanceSpotOrderType,
89 order_type_to_binance_spot, time_in_force_to_binance_spot,
90 },
91 http::{
92 client::BinanceSpotHttpClient,
93 models::BatchCancelResult,
94 query::{BatchCancelItem, CancelOrderParams, CancelReplaceOrderParams, NewOrderParams},
95 },
96 },
97};
98
99#[derive(Debug)]
106pub struct BinanceSpotExecutionClient {
107 core: ExecutionClientCore,
108 clock: &'static AtomicTime,
109 config: BinanceExecClientConfig,
110 emitter: ExecutionEventEmitter,
111 dispatch_state: Arc<WsDispatchState>,
112 http_client: BinanceSpotHttpClient,
113 ws_trading_client: Option<BinanceSpotWsTradingClient>,
114 ws_trading_handle: Mutex<Option<JoinHandle<()>>>,
115 ws_authenticated: Arc<tokio::sync::Notify>,
116 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
117}
118
119impl BinanceSpotExecutionClient {
120 pub fn new(core: ExecutionClientCore, config: BinanceExecClientConfig) -> anyhow::Result<Self> {
126 let product_type = config
127 .product_types
128 .first()
129 .copied()
130 .unwrap_or(BinanceProductType::Spot);
131
132 let (api_key, api_secret) = resolve_credentials(
133 config.api_key.clone(),
134 config.api_secret.clone(),
135 config.environment,
136 product_type,
137 )?;
138
139 let clock = get_atomic_clock_realtime();
140
141 let http_client = BinanceSpotHttpClient::new(
142 config.environment,
143 clock,
144 Some(api_key.clone()),
145 Some(api_secret.clone()),
146 config.base_url_http.clone(),
147 None, None, None, )
151 .context("failed to construct Binance Spot HTTP client")?;
152 let emitter = ExecutionEventEmitter::new(
153 clock,
154 core.trader_id,
155 core.account_id,
156 core.account_type,
157 core.base_currency,
158 );
159
160 let ws_trading_client = if config.use_ws_trading {
161 Some(BinanceSpotWsTradingClient::new(
162 config.base_url_ws_trading.clone(),
163 api_key,
164 api_secret,
165 None, config.transport_backend,
167 ))
168 } else {
169 None
170 };
171
172 Ok(Self {
173 core,
174 clock,
175 config,
176 emitter,
177 dispatch_state: Arc::new(WsDispatchState::default()),
178 http_client,
179 ws_trading_client,
180 ws_trading_handle: Mutex::new(None),
181 ws_authenticated: Arc::new(tokio::sync::Notify::new()),
182 pending_tasks: Mutex::new(Vec::new()),
183 })
184 }
185
186 async fn refresh_account_state(&self) -> anyhow::Result<AccountState> {
187 self.http_client
188 .request_account_state(self.core.account_id)
189 .await
190 }
191
192 fn update_account_state(&self) {
193 let http_client = self.http_client.clone();
194 let account_id = self.core.account_id;
195 let emitter = self.emitter.clone();
196 let clock = self.clock;
197
198 self.spawn_task("query_account", async move {
199 let account_state = http_client.request_account_state(account_id).await?;
200 let ts_now = clock.get_time_ns();
201 emitter.emit_account_state(
202 account_state.balances.clone(),
203 account_state.margins.clone(),
204 account_state.is_reported,
205 ts_now,
206 );
207 Ok(())
208 });
209 }
210
211 fn ws_trading_active(&self) -> bool {
213 self.ws_trading_client
214 .as_ref()
215 .is_some_and(|c| c.is_active())
216 }
217
218 fn submit_order_internal(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
219 let order = self
220 .core
221 .cache()
222 .order(&cmd.client_order_id)
223 .cloned()
224 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
225
226 let event_emitter = self.emitter.clone();
227 let trader_id = self.core.trader_id;
228 let account_id = self.core.account_id;
229 let client_order_id = order.client_order_id();
230 let strategy_id = order.strategy_id();
231 let instrument_id = order.instrument_id();
232 let order_side = order.order_side();
233 let order_type = order.order_type();
234 let quantity = order.quantity();
235 let time_in_force = order.time_in_force();
236 let price = order.price();
237 let trigger_price = order.trigger_price();
238 let is_post_only = order.is_post_only();
239 let is_quote_quantity = order.is_quote_quantity();
240 let display_qty = order.display_qty();
241 let clock = self.clock;
242 let ts_init = self.clock.get_time_ns();
243
244 self.dispatch_state.order_identities.insert(
246 client_order_id,
247 OrderIdentity {
248 instrument_id,
249 strategy_id,
250 order_side,
251 order_type,
252 price,
253 },
254 );
255
256 if self.ws_trading_active() {
257 let ws_client = self.ws_trading_client.as_ref().unwrap().clone();
258 let dispatch_state = self.dispatch_state.clone();
259 let params =
260 build_new_order_params(&order, client_order_id, is_post_only, is_quote_quantity)?;
261
262 let request_id = ws_client.next_request_id();
264 dispatch_state.pending_requests.insert(
265 request_id.clone(),
266 PendingRequest {
267 client_order_id,
268 venue_order_id: None,
269 operation: PendingOperation::Place,
270 },
271 );
272
273 self.spawn_task("submit_order_ws", async move {
274 if let Err(e) = ws_client
275 .place_order_with_id(request_id.clone(), params)
276 .await
277 {
278 dispatch_state.pending_requests.remove(&request_id);
279 let rejected = OrderRejected::new(
280 trader_id,
281 strategy_id,
282 instrument_id,
283 client_order_id,
284 account_id,
285 format!("ws-submit-order-error: {e}").into(),
286 UUID4::new(),
287 ts_init,
288 clock.get_time_ns(),
289 false,
290 false,
291 );
292 event_emitter.send_order_event(OrderEventAny::Rejected(rejected));
293 anyhow::bail!("WS submit order failed: {e}");
294 }
295 Ok(())
296 });
297 } else {
298 let http_client = self.http_client.clone();
299 let dispatch_state = self.dispatch_state.clone();
300 log::debug!("WS trading not active, falling back to HTTP for submit_order");
301
302 self.spawn_task("submit_order_http", async move {
303 let result = http_client
304 .submit_order(
305 account_id,
306 instrument_id,
307 client_order_id,
308 order_side,
309 order_type,
310 quantity,
311 time_in_force,
312 price,
313 trigger_price,
314 is_post_only,
315 is_quote_quantity,
316 display_qty,
317 )
318 .await;
319
320 match result {
321 Ok(report) => {
322 dispatch_state.insert_accepted(client_order_id);
323 let accepted = OrderAccepted::new(
324 trader_id,
325 strategy_id,
326 instrument_id,
327 client_order_id,
328 report.venue_order_id,
329 account_id,
330 UUID4::new(),
331 ts_init,
332 ts_init,
333 false,
334 );
335 event_emitter.send_order_event(OrderEventAny::Accepted(accepted));
336 }
337 Err(e) => {
338 let due_post_only = e
339 .downcast_ref::<crate::spot::http::BinanceSpotHttpError>()
340 .is_some_and(is_spot_post_only_rejection);
341 dispatch_state.cleanup_terminal(client_order_id);
342 let rejected = OrderRejected::new(
343 trader_id,
344 strategy_id,
345 instrument_id,
346 client_order_id,
347 account_id,
348 format!("submit-order-error: {e}").into(),
349 UUID4::new(),
350 ts_init,
351 clock.get_time_ns(),
352 false,
353 due_post_only,
354 );
355 event_emitter.send_order_event(OrderEventAny::Rejected(rejected));
356 return Err(e);
357 }
358 }
359 Ok(())
360 });
361 }
362
363 Ok(())
364 }
365
366 fn cancel_order_internal(&self, cmd: &CancelOrder) {
367 let event_emitter = self.emitter.clone();
368 let trader_id = self.core.trader_id;
369 let account_id = self.core.account_id;
370 let clock = self.clock;
371 let command = cmd.clone();
372
373 if self.ws_trading_active() {
374 let ws_client = self.ws_trading_client.as_ref().unwrap().clone();
375 let dispatch_state = self.dispatch_state.clone();
376 let params = build_cancel_order_params(&command);
377
378 let request_id = ws_client.next_request_id();
380 dispatch_state.pending_requests.insert(
381 request_id.clone(),
382 PendingRequest {
383 client_order_id: command.client_order_id,
384 venue_order_id: command.venue_order_id,
385 operation: PendingOperation::Cancel,
386 },
387 );
388
389 self.spawn_task("cancel_order_ws", async move {
390 if let Err(e) = ws_client
391 .cancel_order_with_id(request_id.clone(), params)
392 .await
393 {
394 dispatch_state.pending_requests.remove(&request_id);
395 let ts_now = clock.get_time_ns();
396 let rejected_event = OrderCancelRejected::new(
397 trader_id,
398 command.strategy_id,
399 command.instrument_id,
400 command.client_order_id,
401 format!("ws-cancel-order-error: {e}").into(),
402 UUID4::new(),
403 ts_now,
404 ts_now,
405 false,
406 command.venue_order_id,
407 Some(account_id),
408 );
409 event_emitter.send_order_event(OrderEventAny::CancelRejected(rejected_event));
410 anyhow::bail!("WS cancel order failed: {e}");
411 }
412 Ok(())
413 });
414 } else {
415 let http_client = self.http_client.clone();
416 let dispatch_state = self.dispatch_state.clone();
417 log::debug!("WS trading not active, falling back to HTTP for cancel_order");
418
419 self.spawn_task("cancel_order_http", async move {
420 let result = http_client
421 .cancel_order(
422 command.instrument_id,
423 command.venue_order_id,
424 Some(command.client_order_id),
425 )
426 .await
427 .map_err(|e| anyhow::anyhow!("Cancel order failed: {e}"));
428
429 match result {
430 Ok(venue_order_id) => {
431 dispatch_state.cleanup_terminal(command.client_order_id);
432 let ts_now = clock.get_time_ns();
433 let canceled_event = OrderCanceled::new(
434 trader_id,
435 command.strategy_id,
436 command.instrument_id,
437 command.client_order_id,
438 UUID4::new(),
439 ts_now,
440 ts_now,
441 false,
442 Some(venue_order_id),
443 Some(account_id),
444 );
445 event_emitter.send_order_event(OrderEventAny::Canceled(canceled_event));
446 }
447 Err(e) => {
448 let ts_now = clock.get_time_ns();
449 let rejected_event = OrderCancelRejected::new(
450 trader_id,
451 command.strategy_id,
452 command.instrument_id,
453 command.client_order_id,
454 format!("cancel-order-error: {e}").into(),
455 UUID4::new(),
456 ts_now,
457 ts_now,
458 false,
459 command.venue_order_id,
460 Some(account_id),
461 );
462 event_emitter
463 .send_order_event(OrderEventAny::CancelRejected(rejected_event));
464 return Err(e);
465 }
466 }
467 Ok(())
468 });
469 }
470 }
471
472 fn spawn_task<F>(&self, description: &'static str, fut: F)
473 where
474 F: Future<Output = anyhow::Result<()>> + Send + 'static,
475 {
476 crate::common::execution::spawn_task(&self.pending_tasks, description, fut);
477 }
478
479 fn abort_pending_tasks(&self) {
480 crate::common::execution::abort_pending_tasks(&self.pending_tasks);
481 }
482}
483
484#[async_trait(?Send)]
485impl ExecutionClient for BinanceSpotExecutionClient {
486 fn is_connected(&self) -> bool {
487 self.core.is_connected()
488 }
489
490 fn client_id(&self) -> ClientId {
491 self.core.client_id
492 }
493
494 fn account_id(&self) -> AccountId {
495 self.core.account_id
496 }
497
498 fn venue(&self) -> Venue {
499 *BINANCE_VENUE
500 }
501
502 fn oms_type(&self) -> OmsType {
503 self.core.oms_type
504 }
505
506 fn get_account(&self) -> Option<AccountAny> {
507 self.core.cache().account(&self.core.account_id).cloned()
508 }
509
510 async fn connect(&mut self) -> anyhow::Result<()> {
511 if self.core.is_connected() {
512 return Ok(());
513 }
514
515 if !self.core.instruments_initialized() {
517 let instruments = self
518 .http_client
519 .request_instruments()
520 .await
521 .context("failed to request Binance Spot instruments")?;
522
523 if instruments.is_empty() {
524 log::warn!("No instruments returned for Binance Spot");
525 } else {
526 log::info!("Loaded {} Spot instruments", instruments.len());
527 self.http_client.cache_instruments(instruments);
528 }
529
530 self.core.set_instruments_initialized();
531 }
532
533 let account_state = self
535 .refresh_account_state()
536 .await
537 .context("failed to request Binance account state")?;
538
539 if !account_state.balances.is_empty() {
540 log::info!(
541 "Received account state with {} balance(s)",
542 account_state.balances.len()
543 );
544 }
545
546 self.emitter.send_account_state(account_state);
547
548 crate::common::execution::await_account_registered(&self.core, self.core.account_id, 30.0)
550 .await?;
551
552 if let Some(ref mut ws_trading) = self.ws_trading_client {
554 match ws_trading.connect().await {
555 Ok(()) => {
556 log::info!("Connected to Binance Spot WS trading API");
557
558 let ws_trading_clone = ws_trading.clone();
559 let emitter = self.emitter.clone();
560 let account_id = self.core.account_id;
561 let clock = self.clock;
562 let http_client = self.http_client.clone();
563 let dispatch_state = self.dispatch_state.clone();
564 let ws_authenticated = self.ws_authenticated.clone();
565 let seen_trade_ids = std::sync::Arc::new(Mutex::new(FifoCache::new()));
566
567 let handle = get_runtime().spawn(async move {
568 loop {
569 match ws_trading_clone.recv().await {
570 Some(msg) => {
571 dispatch_ws_trading_message(
572 msg,
573 &emitter,
574 &http_client,
575 account_id,
576 clock,
577 &dispatch_state,
578 &ws_authenticated,
579 &seen_trade_ids,
580 );
581 }
582 None => {
583 log::warn!("WS trading dispatch loop ended");
584 break;
585 }
586 }
587 }
588 });
589
590 *self.ws_trading_handle.lock().expect(MUTEX_POISONED) = Some(handle);
591
592 if let Err(e) = ws_trading.session_logon().await {
594 log::error!("WS session logon failed: {e}");
595 } else {
596 let auth_result = tokio::time::timeout(
597 Duration::from_secs(10),
598 self.ws_authenticated.notified(),
599 )
600 .await;
601
602 if auth_result.is_err() {
603 log::error!(
604 "WS session authentication timed out, \
605 order operations will use HTTP fallback"
606 );
607
608 if let Some(handle) =
609 self.ws_trading_handle.lock().expect(MUTEX_POISONED).take()
610 {
611 handle.abort();
612 }
613 ws_trading.disconnect().await;
614 self.ws_trading_client = None;
615 } else if let Err(e) = ws_trading.subscribe_user_data().await {
616 log::error!("WS user data subscribe failed: {e}");
617 }
618 }
619 }
620 Err(e) => {
621 log::error!(
622 "Failed to connect WS trading API: {e}. \
623 Order operations will use HTTP fallback"
624 );
625 }
626 }
627 }
628
629 self.core.set_connected();
630 log::info!("Connected: client_id={}", self.core.client_id);
631 Ok(())
632 }
633
634 async fn disconnect(&mut self) -> anyhow::Result<()> {
635 if self.core.is_disconnected() {
636 return Ok(());
637 }
638
639 if let Some(handle) = self.ws_trading_handle.lock().expect(MUTEX_POISONED).take() {
641 handle.abort();
642 }
643
644 if let Some(ref mut ws_trading) = self.ws_trading_client {
645 ws_trading.disconnect().await;
646 }
647
648 self.abort_pending_tasks();
649
650 self.core.set_disconnected();
651 log::info!("Disconnected: client_id={}", self.core.client_id);
652 Ok(())
653 }
654
655 fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
656 self.update_account_state();
657 Ok(())
658 }
659
660 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
661 log::debug!("query_order: client_order_id={}", cmd.client_order_id);
662
663 let http_client = self.http_client.clone();
664 let command = cmd;
665 let event_emitter = self.emitter.clone();
666 let account_id = self.core.account_id;
667
668 self.spawn_task("query_order", async move {
669 let result = http_client
670 .request_order_status_report(
671 account_id,
672 command.instrument_id,
673 command.venue_order_id,
674 Some(command.client_order_id),
675 )
676 .await;
677
678 match result {
679 Ok(report) => {
680 event_emitter.send_order_status_report(report);
681 }
682 Err(e) => log::warn!("Failed to query order status: {e}"),
683 }
684
685 Ok(())
686 });
687
688 Ok(())
689 }
690
691 fn generate_account_state(
692 &self,
693 balances: Vec<AccountBalance>,
694 margins: Vec<MarginBalance>,
695 reported: bool,
696 ts_event: UnixNanos,
697 ) -> anyhow::Result<()> {
698 self.emitter
699 .emit_account_state(balances, margins, reported, ts_event);
700 Ok(())
701 }
702
703 fn start(&mut self) -> anyhow::Result<()> {
704 if self.core.is_started() {
705 return Ok(());
706 }
707
708 self.emitter.set_sender(get_exec_event_sender());
709 self.core.set_started();
710
711 let http_client = self.http_client.clone();
713
714 get_runtime().spawn(async move {
715 match http_client.request_instruments().await {
716 Ok(instruments) => {
717 if instruments.is_empty() {
718 log::warn!("No instruments returned for Binance Spot");
719 } else {
720 http_client.cache_instruments(instruments);
721 log::info!("Instruments initialized");
722 }
723 }
724 Err(e) => {
725 log::error!("Failed to request Binance Spot instruments: {e}");
726 }
727 }
728 });
729
730 log::info!(
731 "Started: client_id={}, account_id={}, account_type={:?}, environment={:?}, product_types={:?}",
732 self.core.client_id,
733 self.core.account_id,
734 self.core.account_type,
735 self.config.environment,
736 self.config.product_types,
737 );
738 Ok(())
739 }
740
741 fn stop(&mut self) -> anyhow::Result<()> {
742 if self.core.is_stopped() {
743 return Ok(());
744 }
745
746 if let Some(handle) = self.ws_trading_handle.lock().expect(MUTEX_POISONED).take() {
748 handle.abort();
749 }
750
751 self.core.set_stopped();
752 self.core.set_disconnected();
753 self.abort_pending_tasks();
754 log::info!("Stopped: client_id={}", self.core.client_id);
755 Ok(())
756 }
757
758 async fn generate_order_status_report(
759 &self,
760 cmd: &GenerateOrderStatusReport,
761 ) -> anyhow::Result<Option<OrderStatusReport>> {
762 let Some(instrument_id) = cmd.instrument_id else {
763 log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
764 return Ok(None);
765 };
766
767 let venue_order_id = cmd
769 .venue_order_id
770 .as_ref()
771 .map(|id| VenueOrderId::new(id.inner()));
772
773 let report = self
774 .http_client
775 .request_order_status_report(
776 self.core.account_id,
777 instrument_id,
778 venue_order_id,
779 cmd.client_order_id,
780 )
781 .await?;
782
783 Ok(Some(report))
784 }
785
786 async fn generate_order_status_reports(
787 &self,
788 cmd: &GenerateOrderStatusReports,
789 ) -> anyhow::Result<Vec<OrderStatusReport>> {
790 let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
791 let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
792
793 let reports = self
794 .http_client
795 .request_order_status_reports(
796 self.core.account_id,
797 cmd.instrument_id,
798 start_dt,
799 end_dt,
800 cmd.open_only,
801 None, )
803 .await?;
804
805 Ok(reports)
806 }
807
808 async fn generate_fill_reports(
809 &self,
810 cmd: GenerateFillReports,
811 ) -> anyhow::Result<Vec<FillReport>> {
812 let Some(instrument_id) = cmd.instrument_id else {
813 log::warn!("generate_fill_reports requires instrument_id for Binance Spot");
814 return Ok(Vec::new());
815 };
816
817 let venue_order_id = cmd
819 .venue_order_id
820 .as_ref()
821 .map(|id| VenueOrderId::new(id.inner()));
822
823 let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
824 let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
825
826 let reports = self
827 .http_client
828 .request_fill_reports(
829 self.core.account_id,
830 instrument_id,
831 venue_order_id,
832 start_dt,
833 end_dt,
834 None, )
836 .await?;
837
838 Ok(reports)
839 }
840
841 async fn generate_position_status_reports(
842 &self,
843 _cmd: &GeneratePositionStatusReports,
844 ) -> anyhow::Result<Vec<PositionStatusReport>> {
845 Ok(Vec::new())
848 }
849
850 async fn generate_mass_status(
851 &self,
852 lookback_mins: Option<u64>,
853 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
854 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
855
856 let ts_now = self.clock.get_time_ns();
857
858 let start = lookback_mins.map(|mins| {
859 let lookback_ns = mins_to_nanos(mins);
860 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
861 });
862
863 let order_cmd = GenerateOrderStatusReportsBuilder::default()
866 .ts_init(ts_now)
867 .open_only(true)
868 .start(start)
869 .build()
870 .map_err(|e| anyhow::anyhow!("{e}"))?;
871
872 let position_cmd = GeneratePositionStatusReportsBuilder::default()
873 .ts_init(ts_now)
874 .start(start)
875 .build()
876 .map_err(|e| anyhow::anyhow!("{e}"))?;
877
878 let (order_reports, position_reports) = tokio::try_join!(
879 self.generate_order_status_reports(&order_cmd),
880 self.generate_position_status_reports(&position_cmd),
881 )?;
882
883 log::info!("Received {} OrderStatusReports", order_reports.len());
887 log::info!("Received {} PositionReports", position_reports.len());
888
889 let mut mass_status = ExecutionMassStatus::new(
890 self.core.client_id,
891 self.core.account_id,
892 *BINANCE_VENUE,
893 ts_now,
894 None,
895 );
896
897 mass_status.add_order_reports(order_reports);
898 mass_status.add_position_reports(position_reports);
899
900 Ok(Some(mass_status))
901 }
902
903 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
904 let order = self
905 .core
906 .cache()
907 .order(&cmd.client_order_id)
908 .cloned()
909 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
910
911 if order.is_closed() {
912 let client_order_id = order.client_order_id();
913 log::warn!("Cannot submit closed order {client_order_id}");
914 return Ok(());
915 }
916
917 log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
918 self.emitter.emit_order_submitted(&order);
919
920 self.submit_order_internal(&cmd)
921 }
922
923 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
924 log::warn!(
925 "submit_order_list not yet implemented for Binance Spot execution client (received {} orders)",
926 cmd.order_list.client_order_ids.len()
927 );
928 Ok(())
929 }
930
931 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
932 let order = self.core.cache().order(&cmd.client_order_id).cloned();
936
937 let Some(order) = order else {
938 log::warn!(
939 "Cannot modify order {}: not found in cache",
940 cmd.client_order_id
941 );
942 let ts_init = self.clock.get_time_ns();
943 let rejected_event = OrderModifyRejected::new(
944 self.core.trader_id,
945 cmd.strategy_id,
946 cmd.instrument_id,
947 cmd.client_order_id,
948 "Order not found in cache for modify".into(),
949 UUID4::new(),
950 ts_init, ts_init,
952 false,
953 cmd.venue_order_id,
954 Some(self.core.account_id),
955 );
956
957 self.emitter
958 .send_order_event(OrderEventAny::ModifyRejected(rejected_event));
959 return Ok(());
960 };
961
962 let event_emitter = self.emitter.clone();
963 let trader_id = self.core.trader_id;
964 let account_id = self.core.account_id;
965 let clock = self.clock;
966
967 let order_side = order.order_side();
968 let order_type = order.order_type();
969 let time_in_force = order.time_in_force();
970 let quantity = cmd.quantity.unwrap_or_else(|| order.quantity());
971
972 if self.ws_trading_active() {
973 let command = cmd;
974 let ws_client = self.ws_trading_client.as_ref().unwrap().clone();
975 let dispatch_state = self.dispatch_state.clone();
976 let params = build_cancel_replace_params(&command, &order, quantity)?;
977
978 let request_id = ws_client.next_request_id();
980 dispatch_state.pending_requests.insert(
981 request_id.clone(),
982 PendingRequest {
983 client_order_id: command.client_order_id,
984 venue_order_id: command.venue_order_id,
985 operation: PendingOperation::Modify,
986 },
987 );
988
989 self.spawn_task("modify_order_ws", async move {
990 if let Err(e) = ws_client
991 .cancel_replace_order_with_id(request_id.clone(), params)
992 .await
993 {
994 dispatch_state.pending_requests.remove(&request_id);
995 let ts_now = clock.get_time_ns();
996 let rejected_event = OrderModifyRejected::new(
997 trader_id,
998 command.strategy_id,
999 command.instrument_id,
1000 command.client_order_id,
1001 format!("ws-modify-order-error: {e}").into(),
1002 UUID4::new(),
1003 ts_now,
1004 ts_now,
1005 false,
1006 command.venue_order_id,
1007 Some(account_id),
1008 );
1009 event_emitter.send_order_event(OrderEventAny::ModifyRejected(rejected_event));
1010 anyhow::bail!("WS modify order failed: {e}");
1011 }
1012 Ok(())
1013 });
1014 } else {
1015 let command = cmd;
1016 let http_client = self.http_client.clone();
1017 log::debug!("WS trading not active, falling back to HTTP for modify_order");
1018
1019 self.spawn_task("modify_order_http", async move {
1020 let result = http_client
1021 .modify_order(
1022 account_id,
1023 command.instrument_id,
1024 command
1025 .venue_order_id
1026 .ok_or_else(|| anyhow::anyhow!("venue_order_id required for modify"))?,
1027 command.client_order_id,
1028 order_side,
1029 order_type,
1030 quantity,
1031 time_in_force,
1032 command.price,
1033 )
1034 .await
1035 .map_err(|e| anyhow::anyhow!("Modify order failed: {e}"));
1036
1037 match result {
1038 Ok(report) => {
1039 let ts_now = clock.get_time_ns();
1040 let updated_event = OrderUpdated::new(
1041 trader_id,
1042 command.strategy_id,
1043 command.instrument_id,
1044 command.client_order_id,
1045 report.quantity,
1046 UUID4::new(),
1047 ts_now,
1048 ts_now,
1049 false,
1050 Some(report.venue_order_id),
1051 Some(account_id),
1052 report.price,
1053 None, None, false, );
1057 event_emitter.send_order_event(OrderEventAny::Updated(updated_event));
1058 }
1059 Err(e) => {
1060 let ts_now = clock.get_time_ns();
1061 let rejected_event = OrderModifyRejected::new(
1062 trader_id,
1063 command.strategy_id,
1064 command.instrument_id,
1065 command.client_order_id,
1066 format!("modify-order-error: {e}").into(),
1067 UUID4::new(),
1068 ts_now,
1069 ts_now,
1070 false,
1071 command.venue_order_id,
1072 Some(account_id),
1073 );
1074 event_emitter
1075 .send_order_event(OrderEventAny::ModifyRejected(rejected_event));
1076 return Err(e);
1077 }
1078 }
1079 Ok(())
1080 });
1081 }
1082
1083 Ok(())
1084 }
1085
1086 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1087 self.cancel_order_internal(&cmd);
1088 Ok(())
1089 }
1090
1091 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1092 let event_emitter = self.emitter.clone();
1093 let trader_id = self.core.trader_id;
1094 let account_id = self.core.account_id;
1095 let clock = self.clock;
1096
1097 if self.ws_trading_active() {
1098 let ws_client = self.ws_trading_client.as_ref().unwrap().clone();
1099 let symbol = cmd.instrument_id.symbol.to_string();
1100
1101 self.spawn_task("cancel_all_orders_ws", async move {
1102 if let Err(e) = ws_client.cancel_all_orders(symbol).await {
1103 log::error!("WS cancel_all_orders failed: {e}");
1104 }
1105 Ok(())
1107 });
1108
1109 return Ok(());
1110 }
1111
1112 log::debug!("WS trading not active, falling back to HTTP for cancel_all_orders");
1113 let http_client = self.http_client.clone();
1114
1115 let strategy_lookup: AHashMap<ClientOrderId, StrategyId> = {
1117 let cache = self.core.cache();
1118 cache
1119 .orders_open(None, Some(&cmd.instrument_id), None, None, None)
1120 .into_iter()
1121 .map(|order| (order.client_order_id(), order.strategy_id()))
1122 .collect()
1123 };
1124
1125 let command = cmd;
1126 self.spawn_task("cancel_all_orders_http", async move {
1127 let canceled_orders = http_client.cancel_all_orders(command.instrument_id).await?;
1128
1129 for (venue_order_id, client_order_id) in canceled_orders {
1130 let strategy_id = strategy_lookup
1131 .get(&client_order_id)
1132 .copied()
1133 .unwrap_or(command.strategy_id);
1134
1135 let canceled_event = OrderCanceled::new(
1136 trader_id,
1137 strategy_id,
1138 command.instrument_id,
1139 client_order_id,
1140 UUID4::new(),
1141 command.ts_init,
1142 clock.get_time_ns(),
1143 false,
1144 Some(venue_order_id),
1145 Some(account_id),
1146 );
1147
1148 event_emitter.send_order_event(OrderEventAny::Canceled(canceled_event));
1149 }
1150
1151 Ok(())
1152 });
1153
1154 Ok(())
1155 }
1156
1157 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1158 const BATCH_SIZE: usize = 5;
1159
1160 if cmd.cancels.is_empty() {
1161 return Ok(());
1162 }
1163
1164 let http_client = self.http_client.clone();
1165 let command = cmd;
1166
1167 let event_emitter = self.emitter.clone();
1168 let trader_id = self.core.trader_id;
1169 let account_id = self.core.account_id;
1170 let clock = self.clock;
1171
1172 self.spawn_task("batch_cancel_orders", async move {
1173 for chunk in command.cancels.chunks(BATCH_SIZE) {
1174 let batch_items: Vec<BatchCancelItem> = chunk
1175 .iter()
1176 .map(|cancel| {
1177 if let Some(venue_order_id) = cancel.venue_order_id {
1178 let order_id = venue_order_id.inner().parse::<i64>().unwrap_or(0);
1179 if order_id != 0 {
1180 BatchCancelItem::by_order_id(
1181 command.instrument_id.symbol.to_string(),
1182 order_id,
1183 )
1184 } else {
1185 BatchCancelItem::by_client_order_id(
1186 command.instrument_id.symbol.to_string(),
1187 encode_broker_id(
1188 &cancel.client_order_id,
1189 BINANCE_NAUTILUS_SPOT_BROKER_ID,
1190 ),
1191 )
1192 }
1193 } else {
1194 BatchCancelItem::by_client_order_id(
1195 command.instrument_id.symbol.to_string(),
1196 encode_broker_id(
1197 &cancel.client_order_id,
1198 BINANCE_NAUTILUS_SPOT_BROKER_ID,
1199 ),
1200 )
1201 }
1202 })
1203 .collect();
1204
1205 match http_client.batch_cancel_orders(&batch_items).await {
1206 Ok(results) => {
1207 for (i, result) in results.iter().enumerate() {
1208 let cancel = &chunk[i];
1209
1210 match result {
1211 BatchCancelResult::Success(success) => {
1212 let venue_order_id =
1213 VenueOrderId::new(success.order_id.to_string());
1214 let canceled_event = OrderCanceled::new(
1215 trader_id,
1216 cancel.strategy_id,
1217 cancel.instrument_id,
1218 cancel.client_order_id,
1219 UUID4::new(),
1220 cancel.ts_init,
1221 clock.get_time_ns(),
1222 false,
1223 Some(venue_order_id),
1224 Some(account_id),
1225 );
1226
1227 event_emitter
1228 .send_order_event(OrderEventAny::Canceled(canceled_event));
1229 }
1230 BatchCancelResult::Error(error) => {
1231 let rejected_event = OrderCancelRejected::new(
1232 trader_id,
1233 cancel.strategy_id,
1234 cancel.instrument_id,
1235 cancel.client_order_id,
1236 format!(
1237 "batch-cancel-error: code={}, msg={}",
1238 error.code, error.msg
1239 )
1240 .into(),
1241 UUID4::new(),
1242 clock.get_time_ns(),
1243 cancel.ts_init,
1244 false,
1245 cancel.venue_order_id,
1246 Some(account_id),
1247 );
1248
1249 event_emitter.send_order_event(OrderEventAny::CancelRejected(
1250 rejected_event,
1251 ));
1252 }
1253 }
1254 }
1255 }
1256 Err(e) => {
1257 for cancel in chunk {
1258 let rejected_event = OrderCancelRejected::new(
1259 trader_id,
1260 cancel.strategy_id,
1261 cancel.instrument_id,
1262 cancel.client_order_id,
1263 format!("batch-cancel-request-failed: {e}").into(),
1264 UUID4::new(),
1265 clock.get_time_ns(),
1266 cancel.ts_init,
1267 false,
1268 cancel.venue_order_id,
1269 Some(account_id),
1270 );
1271
1272 event_emitter
1273 .send_order_event(OrderEventAny::CancelRejected(rejected_event));
1274 }
1275 }
1276 }
1277 }
1278
1279 Ok(())
1280 });
1281
1282 Ok(())
1283 }
1284}
1285
1286#[expect(clippy::too_many_arguments)]
1287fn dispatch_ws_trading_message(
1288 msg: BinanceSpotWsTradingMessage,
1289 emitter: &ExecutionEventEmitter,
1290 http_client: &BinanceSpotHttpClient,
1291 account_id: AccountId,
1292 clock: &'static AtomicTime,
1293 dispatch_state: &WsDispatchState,
1294 ws_authenticated: &tokio::sync::Notify,
1295 seen_trade_ids: &std::sync::Arc<Mutex<FifoCache<(Ustr, i64), 10_000>>>,
1296) {
1297 match msg {
1298 BinanceSpotWsTradingMessage::OrderAccepted {
1299 request_id,
1300 response,
1301 } => {
1302 dispatch_state.pending_requests.remove(&request_id);
1303 log::debug!(
1304 "WS order accepted: request_id={request_id}, order_id={}",
1305 response.order_id
1306 );
1307 }
1309 BinanceSpotWsTradingMessage::OrderRejected {
1310 request_id,
1311 code,
1312 msg,
1313 } => {
1314 log::debug!("WS order rejected: request_id={request_id}, code={code}, msg={msg}");
1315 if let Some((_, pending)) = dispatch_state.pending_requests.remove(&request_id) {
1316 let identity = dispatch_state
1318 .order_identities
1319 .get(&pending.client_order_id)
1320 .map(|r| r.clone());
1321
1322 if let Some(identity) = identity {
1323 let code_i64 = i64::from(code);
1324 let due_post_only = code_i64 == BINANCE_GTX_ORDER_REJECT_CODE
1325 || (code_i64 == BINANCE_NEW_ORDER_REJECTED_CODE
1326 && msg == BINANCE_SPOT_POST_ONLY_REJECT_MSG);
1327 let ts_now = clock.get_time_ns();
1328 let rejected = OrderRejected::new(
1329 emitter.trader_id(),
1330 identity.strategy_id,
1331 identity.instrument_id,
1332 pending.client_order_id,
1333 account_id,
1334 Ustr::from(&format!("code={code}: {msg}")),
1335 UUID4::new(),
1336 ts_now,
1337 ts_now,
1338 false,
1339 due_post_only,
1340 );
1341 dispatch_state.cleanup_terminal(pending.client_order_id);
1342 emitter.send_order_event(OrderEventAny::Rejected(rejected));
1343 } else {
1344 log::warn!(
1345 "No order identity for {}, cannot emit OrderRejected",
1346 pending.client_order_id
1347 );
1348 }
1349 } else {
1350 log::warn!("No pending request for {request_id}, cannot emit OrderRejected");
1351 }
1352 }
1353 BinanceSpotWsTradingMessage::OrderCanceled {
1354 request_id,
1355 response,
1356 } => {
1357 dispatch_state.pending_requests.remove(&request_id);
1358 log::debug!(
1359 "WS order canceled: request_id={request_id}, order_id={}",
1360 response.order_id
1361 );
1362 }
1364 BinanceSpotWsTradingMessage::CancelRejected {
1365 request_id,
1366 code,
1367 msg,
1368 } => {
1369 log::warn!("WS cancel rejected: request_id={request_id}, code={code}, msg={msg}");
1370 if let Some((_, pending)) = dispatch_state.pending_requests.remove(&request_id)
1371 && let Some(identity) = dispatch_state
1372 .order_identities
1373 .get(&pending.client_order_id)
1374 {
1375 let ts_now = clock.get_time_ns();
1376 let rejected = OrderCancelRejected::new(
1377 emitter.trader_id(),
1378 identity.strategy_id,
1379 identity.instrument_id,
1380 pending.client_order_id,
1381 Ustr::from(&format!("code={code}: {msg}")),
1382 UUID4::new(),
1383 ts_now,
1384 ts_now,
1385 false,
1386 pending.venue_order_id,
1387 Some(account_id),
1388 );
1389 emitter.send_order_event(OrderEventAny::CancelRejected(rejected));
1390 }
1391 }
1392 BinanceSpotWsTradingMessage::CancelReplaceAccepted {
1393 request_id,
1394 cancel_response,
1395 new_order_response,
1396 } => {
1397 dispatch_state.pending_requests.remove(&request_id);
1398 log::debug!(
1399 "WS cancel-replace accepted: request_id={request_id}, \
1400 canceled_id={}, new_id={}",
1401 cancel_response.order_id,
1402 new_order_response.order_id,
1403 );
1404 }
1406 BinanceSpotWsTradingMessage::CancelReplaceRejected {
1407 request_id,
1408 code,
1409 msg,
1410 } => {
1411 log::warn!(
1412 "WS cancel-replace rejected: request_id={request_id}, code={code}, msg={msg}"
1413 );
1414
1415 if let Some((_, pending)) = dispatch_state.pending_requests.remove(&request_id)
1416 && let Some(identity) = dispatch_state
1417 .order_identities
1418 .get(&pending.client_order_id)
1419 {
1420 let ts_now = clock.get_time_ns();
1421 let rejected = OrderModifyRejected::new(
1422 emitter.trader_id(),
1423 identity.strategy_id,
1424 identity.instrument_id,
1425 pending.client_order_id,
1426 Ustr::from(&format!("code={code}: {msg}")),
1427 UUID4::new(),
1428 ts_now,
1429 ts_now,
1430 false,
1431 pending.venue_order_id,
1432 Some(account_id),
1433 );
1434 emitter.send_order_event(OrderEventAny::ModifyRejected(rejected));
1435 }
1436 }
1437 BinanceSpotWsTradingMessage::AllOrdersCanceled {
1438 request_id,
1439 responses,
1440 } => {
1441 dispatch_state.pending_requests.remove(&request_id);
1442 log::debug!(
1443 "WS all orders canceled: request_id={request_id}, count={}",
1444 responses.len()
1445 );
1446 }
1448 BinanceSpotWsTradingMessage::UserDataSubscribed { subscription_id } => {
1449 log::info!("User data stream subscribed: id={subscription_id}");
1450 }
1451 BinanceSpotWsTradingMessage::ExecutionReport(report) => {
1452 let ts_init = clock.get_time_ns();
1453 dispatch_execution_report(
1454 &report,
1455 emitter,
1456 http_client,
1457 account_id,
1458 dispatch_state,
1459 seen_trade_ids,
1460 ts_init,
1461 );
1462 }
1463 BinanceSpotWsTradingMessage::AccountPosition(position) => {
1464 let ts_init = clock.get_time_ns();
1465 let state = parse_spot_account_position(&position, account_id, ts_init);
1466 emitter.send_account_state(state);
1467 }
1468 BinanceSpotWsTradingMessage::BalanceUpdate(update) => {
1469 log::info!(
1470 "Balance update: asset={}, delta={}",
1471 update.asset,
1472 update.delta,
1473 );
1474 let http_client = http_client.clone();
1475 let emitter = emitter.clone();
1476
1477 get_runtime().spawn(async move {
1478 match http_client.request_account_state(account_id).await {
1479 Ok(state) => emitter.send_account_state(state),
1480 Err(e) => {
1481 log::error!("Failed to refresh account state after balance update: {e}");
1482 }
1483 }
1484 });
1485 }
1486 BinanceSpotWsTradingMessage::Connected => {
1487 log::info!("WS trading API connected");
1488 }
1489 BinanceSpotWsTradingMessage::Authenticated => {
1490 log::info!("WS trading API authenticated");
1491 ws_authenticated.notify_one();
1492 }
1493 BinanceSpotWsTradingMessage::Reconnected => {
1494 log::info!("WS trading API reconnected");
1495 }
1496 BinanceSpotWsTradingMessage::Error(err) => {
1497 log::error!("WS trading API error: {err}");
1498 }
1499 }
1500}
1501
1502fn build_new_order_params(
1503 order: &impl Order,
1504 client_order_id: ClientOrderId,
1505 is_post_only: bool,
1506 is_quote_quantity: bool,
1507) -> anyhow::Result<NewOrderParams> {
1508 let binance_side = BinanceSide::try_from(order.order_side())?;
1509 let binance_order_type = order_type_to_binance_spot(order.order_type(), is_post_only)?;
1510
1511 let requires_trigger = matches!(
1512 order.order_type(),
1513 OrderType::StopMarket
1514 | OrderType::StopLimit
1515 | OrderType::MarketIfTouched
1516 | OrderType::LimitIfTouched
1517 );
1518
1519 if requires_trigger && order.trigger_price().is_none() {
1520 anyhow::bail!("Conditional orders require a trigger price");
1521 }
1522
1523 let supports_tif = matches!(
1524 binance_order_type,
1525 BinanceSpotOrderType::Limit
1526 | BinanceSpotOrderType::StopLossLimit
1527 | BinanceSpotOrderType::TakeProfitLimit
1528 );
1529 let binance_tif = if supports_tif {
1530 Some(time_in_force_to_binance_spot(order.time_in_force())?)
1531 } else {
1532 None
1533 };
1534
1535 let qty_str = order.quantity().to_string();
1536 let (base_qty, quote_qty) = if is_quote_quantity {
1537 (None, Some(qty_str))
1538 } else {
1539 (Some(qty_str), None)
1540 };
1541
1542 let client_id_str = encode_broker_id(&client_order_id, BINANCE_NAUTILUS_SPOT_BROKER_ID);
1543
1544 Ok(NewOrderParams {
1545 symbol: order.instrument_id().symbol.to_string(),
1546 side: binance_side,
1547 order_type: binance_order_type,
1548 time_in_force: binance_tif,
1549 quantity: base_qty,
1550 quote_order_qty: quote_qty,
1551 price: order.price().map(|p| p.to_string()),
1552 new_client_order_id: Some(client_id_str),
1553 stop_price: order.trigger_price().map(|p| p.to_string()),
1554 trailing_delta: None,
1555 iceberg_qty: order.display_qty().map(|q| q.to_string()),
1556 new_order_resp_type: Some(BinanceOrderResponseType::Full),
1557 self_trade_prevention_mode: None,
1558 strategy_id: None,
1559 strategy_type: None,
1560 })
1561}
1562
1563fn build_cancel_order_params(cmd: &CancelOrder) -> CancelOrderParams {
1564 let order_id = cmd
1565 .venue_order_id
1566 .and_then(|id| id.inner().parse::<i64>().ok());
1567
1568 if let Some(order_id) = order_id {
1569 CancelOrderParams::by_order_id(cmd.instrument_id.symbol.to_string(), order_id)
1570 } else {
1571 let client_id_str = encode_broker_id(&cmd.client_order_id, BINANCE_NAUTILUS_SPOT_BROKER_ID);
1572 CancelOrderParams::by_client_order_id(cmd.instrument_id.symbol.to_string(), client_id_str)
1573 }
1574}
1575
1576fn build_cancel_replace_params(
1577 cmd: &ModifyOrder,
1578 order: &impl Order,
1579 quantity: Quantity,
1580) -> anyhow::Result<CancelReplaceOrderParams> {
1581 let binance_side = BinanceSide::try_from(order.order_side())?;
1582 let binance_order_type = order_type_to_binance_spot(order.order_type(), false)?;
1583 let binance_tif = time_in_force_to_binance_spot(order.time_in_force())?;
1584
1585 let cancel_order_id: Option<i64> = cmd
1586 .venue_order_id
1587 .map(|id| {
1588 id.inner()
1589 .parse::<i64>()
1590 .map_err(|_| anyhow::anyhow!("Invalid venue order ID: {id}"))
1591 })
1592 .transpose()?;
1593
1594 let client_id_str = encode_broker_id(&cmd.client_order_id, BINANCE_NAUTILUS_SPOT_BROKER_ID);
1595
1596 Ok(CancelReplaceOrderParams {
1597 symbol: cmd.instrument_id.symbol.to_string(),
1598 side: binance_side,
1599 order_type: binance_order_type,
1600 cancel_replace_mode: BinanceCancelReplaceMode::StopOnFailure,
1601 time_in_force: Some(binance_tif),
1602 quantity: Some(quantity.to_string()),
1603 quote_order_qty: None,
1604 price: cmd.price.map(|p| p.to_string()),
1605 cancel_order_id,
1606 cancel_orig_client_order_id: if cancel_order_id.is_none() {
1607 Some(client_id_str.clone())
1608 } else {
1609 None
1610 },
1611 new_client_order_id: Some(client_id_str),
1612 stop_price: None,
1613 trailing_delta: None,
1614 iceberg_qty: None,
1615 new_order_resp_type: Some(BinanceOrderResponseType::Full),
1616 self_trade_prevention_mode: None,
1617 })
1618}
1619
1620fn dispatch_execution_report(
1625 report: &BinanceSpotExecutionReport,
1626 emitter: &ExecutionEventEmitter,
1627 http_client: &BinanceSpotHttpClient,
1628 account_id: AccountId,
1629 dispatch_state: &WsDispatchState,
1630 seen_trade_ids: &std::sync::Arc<Mutex<FifoCache<(Ustr, i64), 10_000>>>,
1631 ts_init: UnixNanos,
1632) {
1633 let symbol = report.symbol;
1634 let instrument_id = InstrumentId::new(symbol.into(), *BINANCE_VENUE);
1635 let (price_precision, size_precision) = http_client
1636 .get_instrument(&symbol)
1637 .map_or((8, 8), |i| (i.price_precision(), i.size_precision()));
1638
1639 let client_order_id = ClientOrderId::new(decode_broker_id(
1640 &report.client_order_id,
1641 BINANCE_NAUTILUS_SPOT_BROKER_ID,
1642 ));
1643
1644 let identity = dispatch_state
1645 .order_identities
1646 .get(&client_order_id)
1647 .map(|r| r.clone());
1648
1649 if let Some(identity) = identity {
1650 dispatch_tracked_execution_report(
1651 report,
1652 emitter,
1653 account_id,
1654 dispatch_state,
1655 seen_trade_ids,
1656 client_order_id,
1657 &identity,
1658 instrument_id,
1659 price_precision,
1660 size_precision,
1661 ts_init,
1662 );
1663 } else {
1664 dispatch_untracked_execution_report(
1665 report,
1666 emitter,
1667 http_client,
1668 account_id,
1669 seen_trade_ids,
1670 instrument_id,
1671 price_precision,
1672 size_precision,
1673 ts_init,
1674 );
1675 }
1676}
1677
1678#[expect(clippy::too_many_arguments)]
1680fn dispatch_tracked_execution_report(
1681 report: &BinanceSpotExecutionReport,
1682 emitter: &ExecutionEventEmitter,
1683 account_id: AccountId,
1684 state: &WsDispatchState,
1685 seen_trade_ids: &std::sync::Arc<Mutex<FifoCache<(Ustr, i64), 10_000>>>,
1686 client_order_id: ClientOrderId,
1687 identity: &OrderIdentity,
1688 instrument_id: InstrumentId,
1689 price_precision: u8,
1690 size_precision: u8,
1691 ts_init: UnixNanos,
1692) {
1693 let venue_order_id = VenueOrderId::new(report.order_id.to_string());
1694 let ts_event = UnixNanos::from_millis(report.event_time as u64);
1695
1696 match report.execution_type {
1697 BinanceSpotExecutionType::New => {
1698 if state.has_filled(&client_order_id) {
1699 log::debug!("Skipping New for already-filled {client_order_id}");
1700 return;
1701 }
1702
1703 if state.has_emitted_accepted(&client_order_id) {
1704 let price: f64 = report.price.parse().unwrap_or(0.0);
1706 let quantity: f64 = report.original_qty.parse().unwrap_or(0.0);
1707 let trigger_price: f64 = report.stop_price.parse().unwrap_or(0.0);
1708 let trigger = if trigger_price > 0.0 {
1709 Some(Price::new(trigger_price, price_precision))
1710 } else {
1711 None
1712 };
1713 let updated = OrderUpdated::new(
1714 emitter.trader_id(),
1715 identity.strategy_id,
1716 identity.instrument_id,
1717 client_order_id,
1718 Quantity::new(quantity, size_precision),
1719 UUID4::new(),
1720 ts_event,
1721 ts_init,
1722 false,
1723 Some(venue_order_id),
1724 Some(account_id),
1725 Some(Price::new(price, price_precision)),
1726 trigger,
1727 None, false, );
1730 emitter.send_order_event(OrderEventAny::Updated(updated));
1731 return;
1732 }
1733 state.insert_accepted(client_order_id);
1734 let accepted = OrderAccepted::new(
1735 emitter.trader_id(),
1736 identity.strategy_id,
1737 identity.instrument_id,
1738 client_order_id,
1739 venue_order_id,
1740 account_id,
1741 UUID4::new(),
1742 ts_event,
1743 ts_init,
1744 false,
1745 );
1746 emitter.send_order_event(OrderEventAny::Accepted(accepted));
1747 }
1748 BinanceSpotExecutionType::Trade => {
1749 let dedup_key = (report.symbol, report.trade_id);
1750 let mut guard = seen_trade_ids.lock().expect(MUTEX_POISONED);
1751 let is_duplicate = guard.contains(&dedup_key);
1752 guard.add(dedup_key);
1753 drop(guard);
1754
1755 if is_duplicate {
1756 log::debug!(
1757 "Duplicate trade_id={} for {}, skipping",
1758 report.trade_id,
1759 report.symbol
1760 );
1761 return;
1762 }
1763
1764 ensure_accepted_emitted(
1765 client_order_id,
1766 account_id,
1767 venue_order_id,
1768 identity,
1769 emitter,
1770 state,
1771 ts_init,
1772 );
1773
1774 let last_qty: f64 = report.last_filled_qty.parse().unwrap_or(0.0);
1775 let last_px: f64 = report.last_filled_price.parse().unwrap_or(0.0);
1776 let commission: f64 = report.commission.parse().unwrap_or(0.0);
1777 let commission_currency = report
1778 .commission_asset
1779 .as_ref()
1780 .map_or_else(Currency::USDT, |a| {
1781 Currency::get_or_create_crypto(a.as_str())
1782 });
1783
1784 let liquidity_side = if report.is_maker {
1785 LiquiditySide::Maker
1786 } else {
1787 LiquiditySide::Taker
1788 };
1789
1790 let filled = OrderFilled::new(
1791 emitter.trader_id(),
1792 identity.strategy_id,
1793 instrument_id,
1794 client_order_id,
1795 venue_order_id,
1796 account_id,
1797 TradeId::new(report.trade_id.to_string()),
1798 identity.order_side,
1799 identity.order_type,
1800 Quantity::new(last_qty, size_precision),
1801 Price::new(last_px, price_precision),
1802 commission_currency,
1803 liquidity_side,
1804 UUID4::new(),
1805 ts_event,
1806 ts_init,
1807 false,
1808 None,
1809 Some(Money::new(commission, commission_currency)),
1810 );
1811
1812 state.insert_filled(client_order_id);
1813 emitter.send_order_event(OrderEventAny::Filled(filled));
1814
1815 let cum_qty: f64 = report.cumulative_filled_qty.parse().unwrap_or(0.0);
1816 let orig_qty: f64 = report.original_qty.parse().unwrap_or(0.0);
1817 if (orig_qty - cum_qty) <= 0.0 {
1818 state.cleanup_terminal(client_order_id);
1819 }
1820 }
1821 BinanceSpotExecutionType::Replaced => {
1822 log::debug!(
1825 "Order replaced: client_order_id={client_order_id}, venue_order_id={venue_order_id}"
1826 );
1827 }
1828 BinanceSpotExecutionType::Canceled
1829 | BinanceSpotExecutionType::Expired
1830 | BinanceSpotExecutionType::TradePrevention => {
1831 ensure_accepted_emitted(
1832 client_order_id,
1833 account_id,
1834 venue_order_id,
1835 identity,
1836 emitter,
1837 state,
1838 ts_init,
1839 );
1840 let canceled = OrderCanceled::new(
1841 emitter.trader_id(),
1842 identity.strategy_id,
1843 identity.instrument_id,
1844 client_order_id,
1845 UUID4::new(),
1846 ts_event,
1847 ts_init,
1848 false,
1849 Some(venue_order_id),
1850 Some(account_id),
1851 );
1852 state.cleanup_terminal(client_order_id);
1853 emitter.send_order_event(OrderEventAny::Canceled(canceled));
1854 }
1855 BinanceSpotExecutionType::Rejected => {
1856 let reason = if report.reject_reason.is_empty() {
1857 Ustr::from("Order rejected by venue")
1858 } else {
1859 Ustr::from(&report.reject_reason)
1860 };
1861 let due_post_only = report.time_in_force == BinanceTimeInForce::Gtx
1862 || (report.order_type == "LIMIT_MAKER"
1863 && (report.reject_reason.is_empty() || report.reject_reason == "NONE"));
1864 state.cleanup_terminal(client_order_id);
1865 emitter.emit_order_rejected_event(
1866 identity.strategy_id,
1867 identity.instrument_id,
1868 client_order_id,
1869 reason.as_str(),
1870 ts_init,
1871 due_post_only,
1872 );
1873 }
1874 }
1875}
1876
1877#[expect(clippy::too_many_arguments)]
1879fn dispatch_untracked_execution_report(
1880 report: &BinanceSpotExecutionReport,
1881 emitter: &ExecutionEventEmitter,
1882 _http_client: &BinanceSpotHttpClient,
1883 account_id: AccountId,
1884 seen_trade_ids: &std::sync::Arc<Mutex<FifoCache<(Ustr, i64), 10_000>>>,
1885 instrument_id: InstrumentId,
1886 price_precision: u8,
1887 size_precision: u8,
1888 ts_init: UnixNanos,
1889) {
1890 match report.execution_type {
1891 BinanceSpotExecutionType::Trade => {
1892 let dedup_key = (report.symbol, report.trade_id);
1893 let mut guard = seen_trade_ids.lock().expect(MUTEX_POISONED);
1894 let is_duplicate = guard.contains(&dedup_key);
1895 guard.add(dedup_key);
1896 drop(guard);
1897
1898 if is_duplicate {
1899 log::debug!(
1900 "Duplicate trade_id={} for {}, skipping",
1901 report.trade_id,
1902 report.symbol
1903 );
1904 return;
1905 }
1906
1907 match parse_spot_exec_report_to_order_status(
1908 report,
1909 instrument_id,
1910 price_precision,
1911 size_precision,
1912 account_id,
1913 ts_init,
1914 ) {
1915 Ok(status) => emitter.send_order_status_report(status),
1916 Err(e) => log::error!("Failed to parse order status report: {e}"),
1917 }
1918
1919 match parse_spot_exec_report_to_fill(
1920 report,
1921 instrument_id,
1922 price_precision,
1923 size_precision,
1924 account_id,
1925 ts_init,
1926 ) {
1927 Ok(fill) => emitter.send_fill_report(fill),
1928 Err(e) => log::error!("Failed to parse fill report: {e}"),
1929 }
1930 }
1931 BinanceSpotExecutionType::New
1932 | BinanceSpotExecutionType::Canceled
1933 | BinanceSpotExecutionType::Replaced
1934 | BinanceSpotExecutionType::Rejected
1935 | BinanceSpotExecutionType::Expired
1936 | BinanceSpotExecutionType::TradePrevention => {
1937 match parse_spot_exec_report_to_order_status(
1938 report,
1939 instrument_id,
1940 price_precision,
1941 size_precision,
1942 account_id,
1943 ts_init,
1944 ) {
1945 Ok(status) => emitter.send_order_status_report(status),
1946 Err(e) => log::error!("Failed to parse order status report: {e}"),
1947 }
1948 }
1949 }
1950}
1951
1952fn is_spot_post_only_rejection(error: &crate::spot::http::BinanceSpotHttpError) -> bool {
1954 match error {
1955 crate::spot::http::BinanceSpotHttpError::BinanceError { code, message } => {
1956 *code == BINANCE_GTX_ORDER_REJECT_CODE
1957 || (*code == BINANCE_NEW_ORDER_REJECTED_CODE
1958 && message == BINANCE_SPOT_POST_ONLY_REJECT_MSG)
1959 }
1960 _ => false,
1961 }
1962}
1963
1964#[cfg(test)]
1965mod tests {
1966 use nautilus_common::messages::ExecutionEvent;
1967 use nautilus_core::time::get_atomic_clock_realtime;
1968 use nautilus_model::{
1969 enums::{AccountType, LiquiditySide, OrderSide},
1970 identifiers::{StrategyId, TraderId},
1971 };
1972 use rstest::rstest;
1973
1974 use super::*;
1975 use crate::common::enums::BinanceEnvironment;
1976
1977 #[rstest]
1978 fn test_dispatch_ws_trading_message_emits_cancel_rejected_and_clears_pending_request() {
1979 let clock = get_atomic_clock_realtime();
1980 let (emitter, mut rx) = create_test_emitter(clock);
1981 let http_client = create_test_http_client(clock);
1982 let dispatch_state = create_tracked_dispatch_state(
1983 ClientOrderId::from("TEST"),
1984 InstrumentId::from("BTCUSDT.BINANCE"),
1985 );
1986 let ws_authenticated = tokio::sync::Notify::new();
1987 let seen_trade_ids = Arc::new(Mutex::new(FifoCache::new()));
1988
1989 dispatch_state.pending_requests.insert(
1990 "req-cancel".to_string(),
1991 PendingRequest {
1992 client_order_id: ClientOrderId::from("TEST"),
1993 venue_order_id: Some(VenueOrderId::from("12345")),
1994 operation: PendingOperation::Cancel,
1995 },
1996 );
1997
1998 dispatch_ws_trading_message(
1999 BinanceSpotWsTradingMessage::CancelRejected {
2000 request_id: "req-cancel".to_string(),
2001 code: -2011,
2002 msg: "Unknown order sent".to_string(),
2003 },
2004 &emitter,
2005 &http_client,
2006 AccountId::from("BINANCE-001"),
2007 clock,
2008 &dispatch_state,
2009 &ws_authenticated,
2010 &seen_trade_ids,
2011 );
2012
2013 assert!(dispatch_state.pending_requests.get("req-cancel").is_none());
2014
2015 match rx
2016 .try_recv()
2017 .expect("Cancel rejection event should be emitted")
2018 {
2019 ExecutionEvent::Order(OrderEventAny::CancelRejected(event)) => {
2020 assert_eq!(event.client_order_id, ClientOrderId::from("TEST"));
2021 assert_eq!(event.account_id, Some(AccountId::from("BINANCE-001")));
2022 assert!(event.reason.as_str().contains("code=-2011"));
2023 }
2024 other => panic!("Expected CancelRejected event, was {other:?}"),
2025 }
2026 }
2027
2028 #[rstest]
2029 fn test_dispatch_ws_trading_message_emits_modify_rejected_and_clears_pending_request() {
2030 let clock = get_atomic_clock_realtime();
2031 let (emitter, mut rx) = create_test_emitter(clock);
2032 let http_client = create_test_http_client(clock);
2033 let dispatch_state = create_tracked_dispatch_state(
2034 ClientOrderId::from("TEST"),
2035 InstrumentId::from("BTCUSDT.BINANCE"),
2036 );
2037 let ws_authenticated = tokio::sync::Notify::new();
2038 let seen_trade_ids = Arc::new(Mutex::new(FifoCache::new()));
2039
2040 dispatch_state.pending_requests.insert(
2041 "req-modify".to_string(),
2042 PendingRequest {
2043 client_order_id: ClientOrderId::from("TEST"),
2044 venue_order_id: Some(VenueOrderId::from("12345")),
2045 operation: PendingOperation::Modify,
2046 },
2047 );
2048
2049 dispatch_ws_trading_message(
2050 BinanceSpotWsTradingMessage::CancelReplaceRejected {
2051 request_id: "req-modify".to_string(),
2052 code: -2021,
2053 msg: "Order cancel-replace partially failed".to_string(),
2054 },
2055 &emitter,
2056 &http_client,
2057 AccountId::from("BINANCE-001"),
2058 clock,
2059 &dispatch_state,
2060 &ws_authenticated,
2061 &seen_trade_ids,
2062 );
2063
2064 assert!(dispatch_state.pending_requests.get("req-modify").is_none());
2065
2066 match rx
2067 .try_recv()
2068 .expect("Modify rejection event should be emitted")
2069 {
2070 ExecutionEvent::Order(OrderEventAny::ModifyRejected(event)) => {
2071 assert_eq!(event.client_order_id, ClientOrderId::from("TEST"));
2072 assert_eq!(event.account_id, Some(AccountId::from("BINANCE-001")));
2073 assert!(event.reason.as_str().contains("code=-2021"));
2074 }
2075 other => panic!("Expected ModifyRejected event, was {other:?}"),
2076 }
2077 }
2078
2079 fn create_test_emitter(
2080 clock: &'static AtomicTime,
2081 ) -> (
2082 ExecutionEventEmitter,
2083 tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2084 ) {
2085 let mut emitter = ExecutionEventEmitter::new(
2086 clock,
2087 TraderId::from("TESTER-001"),
2088 AccountId::from("BINANCE-001"),
2089 AccountType::Cash,
2090 None,
2091 );
2092 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
2093 emitter.set_sender(tx);
2094 (emitter, rx)
2095 }
2096
2097 fn create_test_http_client(clock: &'static AtomicTime) -> BinanceSpotHttpClient {
2098 BinanceSpotHttpClient::new(
2099 BinanceEnvironment::Mainnet,
2100 clock,
2101 None,
2102 None,
2103 None,
2104 None,
2105 None,
2106 None,
2107 )
2108 .expect("Test HTTP client should be created")
2109 }
2110
2111 fn create_tracked_dispatch_state(
2112 client_order_id: ClientOrderId,
2113 instrument_id: InstrumentId,
2114 ) -> WsDispatchState {
2115 let dispatch_state = WsDispatchState::default();
2116 dispatch_state.order_identities.insert(
2117 client_order_id,
2118 OrderIdentity {
2119 instrument_id,
2120 strategy_id: StrategyId::from("TEST-STRATEGY"),
2121 order_side: OrderSide::Buy,
2122 order_type: OrderType::Limit,
2123 price: None,
2124 },
2125 );
2126 dispatch_state
2127 }
2128
2129 #[rstest]
2130 #[case::gtx(
2131 crate::spot::http::BinanceSpotHttpError::BinanceError {
2132 code: BINANCE_GTX_ORDER_REJECT_CODE,
2133 message: "Order would immediately trigger.".to_string(),
2134 },
2135 true,
2136 )]
2137 #[case::spot_post_only(
2138 crate::spot::http::BinanceSpotHttpError::BinanceError {
2139 code: BINANCE_NEW_ORDER_REJECTED_CODE,
2140 message: BINANCE_SPOT_POST_ONLY_REJECT_MSG.to_string(),
2141 },
2142 true,
2143 )]
2144 #[case::new_order_rejected_other_message(
2145 crate::spot::http::BinanceSpotHttpError::BinanceError {
2146 code: BINANCE_NEW_ORDER_REJECTED_CODE,
2147 message: "Insufficient balance.".to_string(),
2148 },
2149 false,
2150 )]
2151 #[case::unrelated_code(
2152 crate::spot::http::BinanceSpotHttpError::BinanceError {
2153 code: -2011,
2154 message: "Unknown order sent.".to_string(),
2155 },
2156 false,
2157 )]
2158 #[case::non_binance_error(
2159 crate::spot::http::BinanceSpotHttpError::NetworkError("connection reset".to_string()),
2160 false,
2161 )]
2162 fn test_is_spot_post_only_rejection(
2163 #[case] error: crate::spot::http::BinanceSpotHttpError,
2164 #[case] expected: bool,
2165 ) {
2166 assert_eq!(is_spot_post_only_rejection(&error), expected);
2167 }
2168
2169 #[rstest]
2170 fn test_dispatch_tracked_execution_report_trade_dedup() {
2171 let clock = get_atomic_clock_realtime();
2172 let (emitter, mut rx) = create_test_emitter(clock);
2173 let http_client = create_test_http_client(clock);
2174 let client_order_id = ClientOrderId::from("x-TD67BGP9-T0000000000000");
2175 let dispatch_state = create_tracked_dispatch_state(
2176 ClientOrderId::from("O-20200101-000000-000-000-0"),
2177 InstrumentId::from("ETHUSDT.BINANCE"),
2178 );
2179 let ws_authenticated = tokio::sync::Notify::new();
2180 let seen_trade_ids = Arc::new(Mutex::new(FifoCache::new()));
2181
2182 let trade_json = crate::common::testing::load_fixture_string(
2183 "spot/user_data_json/execution_report_trade.json",
2184 );
2185 let report: BinanceSpotExecutionReport = serde_json::from_str(&trade_json).unwrap();
2186
2187 dispatch_ws_trading_message(
2188 BinanceSpotWsTradingMessage::ExecutionReport(Box::new(report.clone())),
2189 &emitter,
2190 &http_client,
2191 AccountId::from("BINANCE-001"),
2192 clock,
2193 &dispatch_state,
2194 &ws_authenticated,
2195 &seen_trade_ids,
2196 );
2197 dispatch_ws_trading_message(
2198 BinanceSpotWsTradingMessage::ExecutionReport(Box::new(report)),
2199 &emitter,
2200 &http_client,
2201 AccountId::from("BINANCE-001"),
2202 clock,
2203 &dispatch_state,
2204 &ws_authenticated,
2205 &seen_trade_ids,
2206 );
2207
2208 let mut events = Vec::new();
2209 while let Ok(event) = rx.try_recv() {
2210 events.push(event);
2211 }
2212
2213 let fills: Vec<_> = events
2214 .iter()
2215 .filter(|e| matches!(e, ExecutionEvent::Order(OrderEventAny::Filled(_))))
2216 .collect();
2217 assert_eq!(fills.len(), 1, "duplicate trade should be deduped");
2218
2219 match fills[0] {
2220 ExecutionEvent::Order(OrderEventAny::Filled(fill)) => {
2221 assert_eq!(
2222 fill.client_order_id,
2223 ClientOrderId::from("O-20200101-000000-000-000-0"),
2224 );
2225 assert_eq!(fill.trade_id, TradeId::new("98765432"));
2226 assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
2227 }
2228 _ => unreachable!(),
2229 }
2230 let _ = client_order_id;
2231 }
2232
2233 #[rstest]
2234 fn test_dispatch_tracked_execution_report_rejected_gtx_sets_post_only() {
2235 let clock = get_atomic_clock_realtime();
2236 let (emitter, mut rx) = create_test_emitter(clock);
2237 let http_client = create_test_http_client(clock);
2238 let client_order_id = ClientOrderId::from("O-20200101-000000-000-000-1");
2239 let dispatch_state =
2240 create_tracked_dispatch_state(client_order_id, InstrumentId::from("ETHUSDT.BINANCE"));
2241 let ws_authenticated = tokio::sync::Notify::new();
2242 let seen_trade_ids = Arc::new(Mutex::new(FifoCache::new()));
2243
2244 let encoded = encode_broker_id(&client_order_id, BINANCE_NAUTILUS_SPOT_BROKER_ID);
2245 let report_json = format!(
2246 r#"{{
2247 "e":"executionReport","E":1709654400000,"s":"ETHUSDT",
2248 "c":"{encoded}","S":"BUY","o":"LIMIT","f":"GTX",
2249 "q":"1.00000000","p":"2500.00000000","P":"0.00000000",
2250 "x":"REJECTED","X":"REJECTED","r":"NONE","i":12345678,
2251 "l":"0.00000000","z":"0.00000000","L":"0.00000000",
2252 "n":"0","N":null,"T":1709654400000,"t":-1,"w":false,"m":false,
2253 "O":1709654400000,"Z":"0.00000000","C":""
2254 }}"#,
2255 );
2256 let report: BinanceSpotExecutionReport = serde_json::from_str(&report_json).unwrap();
2257
2258 dispatch_ws_trading_message(
2259 BinanceSpotWsTradingMessage::ExecutionReport(Box::new(report)),
2260 &emitter,
2261 &http_client,
2262 AccountId::from("BINANCE-001"),
2263 clock,
2264 &dispatch_state,
2265 &ws_authenticated,
2266 &seen_trade_ids,
2267 );
2268
2269 match rx.try_recv().expect("OrderRejected event expected") {
2270 ExecutionEvent::Order(OrderEventAny::Rejected(event)) => {
2271 assert_eq!(event.client_order_id, client_order_id);
2272 assert_eq!(event.account_id, AccountId::from("BINANCE-001"));
2273 assert_ne!(event.due_post_only, 0);
2274 }
2275 other => panic!("Expected OrderRejected event, was {other:?}"),
2276 }
2277 }
2278}