Skip to main content

nautilus_binance/spot/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Binance Spot adapter.
17
18use std::{
19    future::Future,
20    sync::{Arc, Mutex},
21    time::Duration,
22};
23
24use ahash::AHashMap;
25use anyhow::Context;
26use async_trait::async_trait;
27use nautilus_common::{
28    cache::fifo::FifoCache,
29    clients::ExecutionClient,
30    live::{get_runtime, runner::get_exec_event_sender},
31    messages::execution::{
32        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
33        GenerateOrderStatusReport, GenerateOrderStatusReports, GenerateOrderStatusReportsBuilder,
34        GeneratePositionStatusReports, GeneratePositionStatusReportsBuilder, ModifyOrder,
35        QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
36    },
37};
38use nautilus_core::{
39    MUTEX_POISONED, UUID4, UnixNanos,
40    datetime::mins_to_nanos,
41    time::{AtomicTime, get_atomic_clock_realtime},
42};
43use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
44use nautilus_model::{
45    accounts::AccountAny,
46    enums::{LiquiditySide, OmsType, OrderType},
47    events::{
48        AccountState, OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny,
49        OrderFilled, OrderModifyRejected, OrderRejected, OrderUpdated,
50    },
51    identifiers::{
52        AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, Venue, VenueOrderId,
53    },
54    instruments::Instrument,
55    orders::Order,
56    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
57    types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
58};
59use tokio::task::JoinHandle;
60use ustr::Ustr;
61
62use super::websocket::trading::{
63    client::BinanceSpotWsTradingClient,
64    messages::BinanceSpotWsTradingMessage,
65    parse::{
66        parse_spot_account_position, parse_spot_exec_report_to_fill,
67        parse_spot_exec_report_to_order_status,
68    },
69    user_data::{BinanceSpotExecutionReport, BinanceSpotExecutionType},
70};
71use crate::{
72    common::{
73        consts::{
74            BINANCE_GTX_ORDER_REJECT_CODE, BINANCE_NAUTILUS_SPOT_BROKER_ID,
75            BINANCE_NEW_ORDER_REJECTED_CODE, BINANCE_SPOT_POST_ONLY_REJECT_MSG, BINANCE_VENUE,
76        },
77        credential::resolve_credentials,
78        dispatch::{
79            OrderIdentity, PendingOperation, PendingRequest, WsDispatchState,
80            ensure_accepted_emitted,
81        },
82        encoder::{decode_broker_id, encode_broker_id},
83        enums::{BinanceProductType, BinanceSide, BinanceTimeInForce},
84    },
85    config::BinanceExecClientConfig,
86    spot::{
87        enums::{
88            BinanceCancelReplaceMode, BinanceOrderResponseType, BinanceSpotOrderType,
89            order_type_to_binance_spot, time_in_force_to_binance_spot,
90        },
91        http::{
92            client::BinanceSpotHttpClient,
93            models::BatchCancelResult,
94            query::{BatchCancelItem, CancelOrderParams, CancelReplaceOrderParams, NewOrderParams},
95        },
96    },
97};
98
99/// Live execution client for Binance Spot trading.
100///
101/// Implements the [`ExecutionClient`] trait for order management on Binance Spot
102/// and Spot Margin markets. Uses WebSocket API as the primary transport for order
103/// operations (lowest latency), with HTTP API fallback when the WS connection is
104/// unavailable. The WebSocket User Data Stream provides real-time execution events.
105#[derive(Debug)]
106pub struct BinanceSpotExecutionClient {
107    core: ExecutionClientCore,
108    clock: &'static AtomicTime,
109    config: BinanceExecClientConfig,
110    emitter: ExecutionEventEmitter,
111    dispatch_state: Arc<WsDispatchState>,
112    http_client: BinanceSpotHttpClient,
113    ws_trading_client: Option<BinanceSpotWsTradingClient>,
114    ws_trading_handle: Mutex<Option<JoinHandle<()>>>,
115    ws_authenticated: Arc<tokio::sync::Notify>,
116    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
117}
118
119impl BinanceSpotExecutionClient {
120    /// Creates a new [`BinanceSpotExecutionClient`].
121    ///
122    /// # Errors
123    ///
124    /// Returns an error if the HTTP client fails to initialize or credentials are missing.
125    pub fn new(core: ExecutionClientCore, config: BinanceExecClientConfig) -> anyhow::Result<Self> {
126        let product_type = config
127            .product_types
128            .first()
129            .copied()
130            .unwrap_or(BinanceProductType::Spot);
131
132        let (api_key, api_secret) = resolve_credentials(
133            config.api_key.clone(),
134            config.api_secret.clone(),
135            config.environment,
136            product_type,
137        )?;
138
139        let clock = get_atomic_clock_realtime();
140
141        let http_client = BinanceSpotHttpClient::new(
142            config.environment,
143            clock,
144            Some(api_key.clone()),
145            Some(api_secret.clone()),
146            config.base_url_http.clone(),
147            None, // recv_window
148            None, // timeout_secs
149            None, // proxy_url
150        )
151        .context("failed to construct Binance Spot HTTP client")?;
152        let emitter = ExecutionEventEmitter::new(
153            clock,
154            core.trader_id,
155            core.account_id,
156            core.account_type,
157            core.base_currency,
158        );
159
160        let ws_trading_client = if config.use_ws_trading {
161            Some(BinanceSpotWsTradingClient::new(
162                config.base_url_ws_trading.clone(),
163                api_key,
164                api_secret,
165                None, // heartbeat
166                config.transport_backend,
167            ))
168        } else {
169            None
170        };
171
172        Ok(Self {
173            core,
174            clock,
175            config,
176            emitter,
177            dispatch_state: Arc::new(WsDispatchState::default()),
178            http_client,
179            ws_trading_client,
180            ws_trading_handle: Mutex::new(None),
181            ws_authenticated: Arc::new(tokio::sync::Notify::new()),
182            pending_tasks: Mutex::new(Vec::new()),
183        })
184    }
185
186    async fn refresh_account_state(&self) -> anyhow::Result<AccountState> {
187        self.http_client
188            .request_account_state(self.core.account_id)
189            .await
190    }
191
192    fn update_account_state(&self) {
193        let http_client = self.http_client.clone();
194        let account_id = self.core.account_id;
195        let emitter = self.emitter.clone();
196        let clock = self.clock;
197
198        self.spawn_task("query_account", async move {
199            let account_state = http_client.request_account_state(account_id).await?;
200            let ts_now = clock.get_time_ns();
201            emitter.emit_account_state(
202                account_state.balances.clone(),
203                account_state.margins.clone(),
204                account_state.is_reported,
205                ts_now,
206            );
207            Ok(())
208        });
209    }
210
211    /// Returns whether the WS trading client is connected and active.
212    fn ws_trading_active(&self) -> bool {
213        self.ws_trading_client
214            .as_ref()
215            .is_some_and(|c| c.is_active())
216    }
217
218    fn submit_order_internal(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
219        let order = self
220            .core
221            .cache()
222            .order(&cmd.client_order_id)
223            .cloned()
224            .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
225
226        let event_emitter = self.emitter.clone();
227        let trader_id = self.core.trader_id;
228        let account_id = self.core.account_id;
229        let client_order_id = order.client_order_id();
230        let strategy_id = order.strategy_id();
231        let instrument_id = order.instrument_id();
232        let order_side = order.order_side();
233        let order_type = order.order_type();
234        let quantity = order.quantity();
235        let time_in_force = order.time_in_force();
236        let price = order.price();
237        let trigger_price = order.trigger_price();
238        let is_post_only = order.is_post_only();
239        let is_quote_quantity = order.is_quote_quantity();
240        let display_qty = order.display_qty();
241        let clock = self.clock;
242        let ts_init = self.clock.get_time_ns();
243
244        // Register identity for tracked/external dispatch routing
245        self.dispatch_state.order_identities.insert(
246            client_order_id,
247            OrderIdentity {
248                instrument_id,
249                strategy_id,
250                order_side,
251                order_type,
252                price,
253            },
254        );
255
256        if self.ws_trading_active() {
257            let ws_client = self.ws_trading_client.as_ref().unwrap().clone();
258            let dispatch_state = self.dispatch_state.clone();
259            let params =
260                build_new_order_params(&order, client_order_id, is_post_only, is_quote_quantity)?;
261
262            // Pre-register before sending to avoid response racing the insert
263            let request_id = ws_client.next_request_id();
264            dispatch_state.pending_requests.insert(
265                request_id.clone(),
266                PendingRequest {
267                    client_order_id,
268                    venue_order_id: None,
269                    operation: PendingOperation::Place,
270                },
271            );
272
273            self.spawn_task("submit_order_ws", async move {
274                if let Err(e) = ws_client
275                    .place_order_with_id(request_id.clone(), params)
276                    .await
277                {
278                    dispatch_state.pending_requests.remove(&request_id);
279                    let rejected = OrderRejected::new(
280                        trader_id,
281                        strategy_id,
282                        instrument_id,
283                        client_order_id,
284                        account_id,
285                        format!("ws-submit-order-error: {e}").into(),
286                        UUID4::new(),
287                        ts_init,
288                        clock.get_time_ns(),
289                        false,
290                        false,
291                    );
292                    event_emitter.send_order_event(OrderEventAny::Rejected(rejected));
293                    anyhow::bail!("WS submit order failed: {e}");
294                }
295                Ok(())
296            });
297        } else {
298            let http_client = self.http_client.clone();
299            let dispatch_state = self.dispatch_state.clone();
300            log::debug!("WS trading not active, falling back to HTTP for submit_order");
301
302            self.spawn_task("submit_order_http", async move {
303                let result = http_client
304                    .submit_order(
305                        account_id,
306                        instrument_id,
307                        client_order_id,
308                        order_side,
309                        order_type,
310                        quantity,
311                        time_in_force,
312                        price,
313                        trigger_price,
314                        is_post_only,
315                        is_quote_quantity,
316                        display_qty,
317                    )
318                    .await;
319
320                match result {
321                    Ok(report) => {
322                        dispatch_state.insert_accepted(client_order_id);
323                        let accepted = OrderAccepted::new(
324                            trader_id,
325                            strategy_id,
326                            instrument_id,
327                            client_order_id,
328                            report.venue_order_id,
329                            account_id,
330                            UUID4::new(),
331                            ts_init,
332                            ts_init,
333                            false,
334                        );
335                        event_emitter.send_order_event(OrderEventAny::Accepted(accepted));
336                    }
337                    Err(e) => {
338                        let due_post_only = e
339                            .downcast_ref::<crate::spot::http::BinanceSpotHttpError>()
340                            .is_some_and(is_spot_post_only_rejection);
341                        dispatch_state.cleanup_terminal(client_order_id);
342                        let rejected = OrderRejected::new(
343                            trader_id,
344                            strategy_id,
345                            instrument_id,
346                            client_order_id,
347                            account_id,
348                            format!("submit-order-error: {e}").into(),
349                            UUID4::new(),
350                            ts_init,
351                            clock.get_time_ns(),
352                            false,
353                            due_post_only,
354                        );
355                        event_emitter.send_order_event(OrderEventAny::Rejected(rejected));
356                        return Err(e);
357                    }
358                }
359                Ok(())
360            });
361        }
362
363        Ok(())
364    }
365
366    fn cancel_order_internal(&self, cmd: &CancelOrder) {
367        let event_emitter = self.emitter.clone();
368        let trader_id = self.core.trader_id;
369        let account_id = self.core.account_id;
370        let clock = self.clock;
371        let command = cmd.clone();
372
373        if self.ws_trading_active() {
374            let ws_client = self.ws_trading_client.as_ref().unwrap().clone();
375            let dispatch_state = self.dispatch_state.clone();
376            let params = build_cancel_order_params(&command);
377
378            // Pre-register before sending to avoid response racing the insert
379            let request_id = ws_client.next_request_id();
380            dispatch_state.pending_requests.insert(
381                request_id.clone(),
382                PendingRequest {
383                    client_order_id: command.client_order_id,
384                    venue_order_id: command.venue_order_id,
385                    operation: PendingOperation::Cancel,
386                },
387            );
388
389            self.spawn_task("cancel_order_ws", async move {
390                if let Err(e) = ws_client
391                    .cancel_order_with_id(request_id.clone(), params)
392                    .await
393                {
394                    dispatch_state.pending_requests.remove(&request_id);
395                    let ts_now = clock.get_time_ns();
396                    let rejected_event = OrderCancelRejected::new(
397                        trader_id,
398                        command.strategy_id,
399                        command.instrument_id,
400                        command.client_order_id,
401                        format!("ws-cancel-order-error: {e}").into(),
402                        UUID4::new(),
403                        ts_now,
404                        ts_now,
405                        false,
406                        command.venue_order_id,
407                        Some(account_id),
408                    );
409                    event_emitter.send_order_event(OrderEventAny::CancelRejected(rejected_event));
410                    anyhow::bail!("WS cancel order failed: {e}");
411                }
412                Ok(())
413            });
414        } else {
415            let http_client = self.http_client.clone();
416            let dispatch_state = self.dispatch_state.clone();
417            log::debug!("WS trading not active, falling back to HTTP for cancel_order");
418
419            self.spawn_task("cancel_order_http", async move {
420                let result = http_client
421                    .cancel_order(
422                        command.instrument_id,
423                        command.venue_order_id,
424                        Some(command.client_order_id),
425                    )
426                    .await
427                    .map_err(|e| anyhow::anyhow!("Cancel order failed: {e}"));
428
429                match result {
430                    Ok(venue_order_id) => {
431                        dispatch_state.cleanup_terminal(command.client_order_id);
432                        let ts_now = clock.get_time_ns();
433                        let canceled_event = OrderCanceled::new(
434                            trader_id,
435                            command.strategy_id,
436                            command.instrument_id,
437                            command.client_order_id,
438                            UUID4::new(),
439                            ts_now,
440                            ts_now,
441                            false,
442                            Some(venue_order_id),
443                            Some(account_id),
444                        );
445                        event_emitter.send_order_event(OrderEventAny::Canceled(canceled_event));
446                    }
447                    Err(e) => {
448                        let ts_now = clock.get_time_ns();
449                        let rejected_event = OrderCancelRejected::new(
450                            trader_id,
451                            command.strategy_id,
452                            command.instrument_id,
453                            command.client_order_id,
454                            format!("cancel-order-error: {e}").into(),
455                            UUID4::new(),
456                            ts_now,
457                            ts_now,
458                            false,
459                            command.venue_order_id,
460                            Some(account_id),
461                        );
462                        event_emitter
463                            .send_order_event(OrderEventAny::CancelRejected(rejected_event));
464                        return Err(e);
465                    }
466                }
467                Ok(())
468            });
469        }
470    }
471
472    fn spawn_task<F>(&self, description: &'static str, fut: F)
473    where
474        F: Future<Output = anyhow::Result<()>> + Send + 'static,
475    {
476        crate::common::execution::spawn_task(&self.pending_tasks, description, fut);
477    }
478
479    fn abort_pending_tasks(&self) {
480        crate::common::execution::abort_pending_tasks(&self.pending_tasks);
481    }
482}
483
484#[async_trait(?Send)]
485impl ExecutionClient for BinanceSpotExecutionClient {
486    fn is_connected(&self) -> bool {
487        self.core.is_connected()
488    }
489
490    fn client_id(&self) -> ClientId {
491        self.core.client_id
492    }
493
494    fn account_id(&self) -> AccountId {
495        self.core.account_id
496    }
497
498    fn venue(&self) -> Venue {
499        *BINANCE_VENUE
500    }
501
502    fn oms_type(&self) -> OmsType {
503        self.core.oms_type
504    }
505
506    fn get_account(&self) -> Option<AccountAny> {
507        self.core.cache().account(&self.core.account_id).cloned()
508    }
509
510    async fn connect(&mut self) -> anyhow::Result<()> {
511        if self.core.is_connected() {
512            return Ok(());
513        }
514
515        // Load instruments if not already done
516        if !self.core.instruments_initialized() {
517            let instruments = self
518                .http_client
519                .request_instruments()
520                .await
521                .context("failed to request Binance Spot instruments")?;
522
523            if instruments.is_empty() {
524                log::warn!("No instruments returned for Binance Spot");
525            } else {
526                log::info!("Loaded {} Spot instruments", instruments.len());
527                self.http_client.cache_instruments(instruments);
528            }
529
530            self.core.set_instruments_initialized();
531        }
532
533        // Request initial account state
534        let account_state = self
535            .refresh_account_state()
536            .await
537            .context("failed to request Binance account state")?;
538
539        if !account_state.balances.is_empty() {
540            log::info!(
541                "Received account state with {} balance(s)",
542                account_state.balances.len()
543            );
544        }
545
546        self.emitter.send_account_state(account_state);
547
548        // Wait for account to be registered in cache before completing connect
549        crate::common::execution::await_account_registered(&self.core, self.core.account_id, 30.0)
550            .await?;
551
552        // Connect WS trading client (primary order transport)
553        if let Some(ref mut ws_trading) = self.ws_trading_client {
554            match ws_trading.connect().await {
555                Ok(()) => {
556                    log::info!("Connected to Binance Spot WS trading API");
557
558                    let ws_trading_clone = ws_trading.clone();
559                    let emitter = self.emitter.clone();
560                    let account_id = self.core.account_id;
561                    let clock = self.clock;
562                    let http_client = self.http_client.clone();
563                    let dispatch_state = self.dispatch_state.clone();
564                    let ws_authenticated = self.ws_authenticated.clone();
565                    let seen_trade_ids = std::sync::Arc::new(Mutex::new(FifoCache::new()));
566
567                    let handle = get_runtime().spawn(async move {
568                        loop {
569                            match ws_trading_clone.recv().await {
570                                Some(msg) => {
571                                    dispatch_ws_trading_message(
572                                        msg,
573                                        &emitter,
574                                        &http_client,
575                                        account_id,
576                                        clock,
577                                        &dispatch_state,
578                                        &ws_authenticated,
579                                        &seen_trade_ids,
580                                    );
581                                }
582                                None => {
583                                    log::warn!("WS trading dispatch loop ended");
584                                    break;
585                                }
586                            }
587                        }
588                    });
589
590                    *self.ws_trading_handle.lock().expect(MUTEX_POISONED) = Some(handle);
591
592                    // Block until session is authenticated before signaling connected
593                    if let Err(e) = ws_trading.session_logon().await {
594                        log::error!("WS session logon failed: {e}");
595                    } else {
596                        let auth_result = tokio::time::timeout(
597                            Duration::from_secs(10),
598                            self.ws_authenticated.notified(),
599                        )
600                        .await;
601
602                        if auth_result.is_err() {
603                            log::error!(
604                                "WS session authentication timed out, \
605                                 order operations will use HTTP fallback"
606                            );
607
608                            if let Some(handle) =
609                                self.ws_trading_handle.lock().expect(MUTEX_POISONED).take()
610                            {
611                                handle.abort();
612                            }
613                            ws_trading.disconnect().await;
614                            self.ws_trading_client = None;
615                        } else if let Err(e) = ws_trading.subscribe_user_data().await {
616                            log::error!("WS user data subscribe failed: {e}");
617                        }
618                    }
619                }
620                Err(e) => {
621                    log::error!(
622                        "Failed to connect WS trading API: {e}. \
623                         Order operations will use HTTP fallback"
624                    );
625                }
626            }
627        }
628
629        self.core.set_connected();
630        log::info!("Connected: client_id={}", self.core.client_id);
631        Ok(())
632    }
633
634    async fn disconnect(&mut self) -> anyhow::Result<()> {
635        if self.core.is_disconnected() {
636            return Ok(());
637        }
638
639        // Abort WS trading task and disconnect
640        if let Some(handle) = self.ws_trading_handle.lock().expect(MUTEX_POISONED).take() {
641            handle.abort();
642        }
643
644        if let Some(ref mut ws_trading) = self.ws_trading_client {
645            ws_trading.disconnect().await;
646        }
647
648        self.abort_pending_tasks();
649
650        self.core.set_disconnected();
651        log::info!("Disconnected: client_id={}", self.core.client_id);
652        Ok(())
653    }
654
655    fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
656        self.update_account_state();
657        Ok(())
658    }
659
660    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
661        log::debug!("query_order: client_order_id={}", cmd.client_order_id);
662
663        let http_client = self.http_client.clone();
664        let command = cmd;
665        let event_emitter = self.emitter.clone();
666        let account_id = self.core.account_id;
667
668        self.spawn_task("query_order", async move {
669            let result = http_client
670                .request_order_status_report(
671                    account_id,
672                    command.instrument_id,
673                    command.venue_order_id,
674                    Some(command.client_order_id),
675                )
676                .await;
677
678            match result {
679                Ok(report) => {
680                    event_emitter.send_order_status_report(report);
681                }
682                Err(e) => log::warn!("Failed to query order status: {e}"),
683            }
684
685            Ok(())
686        });
687
688        Ok(())
689    }
690
691    fn generate_account_state(
692        &self,
693        balances: Vec<AccountBalance>,
694        margins: Vec<MarginBalance>,
695        reported: bool,
696        ts_event: UnixNanos,
697    ) -> anyhow::Result<()> {
698        self.emitter
699            .emit_account_state(balances, margins, reported, ts_event);
700        Ok(())
701    }
702
703    fn start(&mut self) -> anyhow::Result<()> {
704        if self.core.is_started() {
705            return Ok(());
706        }
707
708        self.emitter.set_sender(get_exec_event_sender());
709        self.core.set_started();
710
711        // Spawn instrument bootstrap task
712        let http_client = self.http_client.clone();
713
714        get_runtime().spawn(async move {
715            match http_client.request_instruments().await {
716                Ok(instruments) => {
717                    if instruments.is_empty() {
718                        log::warn!("No instruments returned for Binance Spot");
719                    } else {
720                        http_client.cache_instruments(instruments);
721                        log::info!("Instruments initialized");
722                    }
723                }
724                Err(e) => {
725                    log::error!("Failed to request Binance Spot instruments: {e}");
726                }
727            }
728        });
729
730        log::info!(
731            "Started: client_id={}, account_id={}, account_type={:?}, environment={:?}, product_types={:?}",
732            self.core.client_id,
733            self.core.account_id,
734            self.core.account_type,
735            self.config.environment,
736            self.config.product_types,
737        );
738        Ok(())
739    }
740
741    fn stop(&mut self) -> anyhow::Result<()> {
742        if self.core.is_stopped() {
743            return Ok(());
744        }
745
746        // Abort WS trading task
747        if let Some(handle) = self.ws_trading_handle.lock().expect(MUTEX_POISONED).take() {
748            handle.abort();
749        }
750
751        self.core.set_stopped();
752        self.core.set_disconnected();
753        self.abort_pending_tasks();
754        log::info!("Stopped: client_id={}", self.core.client_id);
755        Ok(())
756    }
757
758    async fn generate_order_status_report(
759        &self,
760        cmd: &GenerateOrderStatusReport,
761    ) -> anyhow::Result<Option<OrderStatusReport>> {
762        let Some(instrument_id) = cmd.instrument_id else {
763            log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
764            return Ok(None);
765        };
766
767        // Convert ClientOrderId to VenueOrderId if provided (API naming quirk)
768        let venue_order_id = cmd
769            .venue_order_id
770            .as_ref()
771            .map(|id| VenueOrderId::new(id.inner()));
772
773        let report = self
774            .http_client
775            .request_order_status_report(
776                self.core.account_id,
777                instrument_id,
778                venue_order_id,
779                cmd.client_order_id,
780            )
781            .await?;
782
783        Ok(Some(report))
784    }
785
786    async fn generate_order_status_reports(
787        &self,
788        cmd: &GenerateOrderStatusReports,
789    ) -> anyhow::Result<Vec<OrderStatusReport>> {
790        let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
791        let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
792
793        let reports = self
794            .http_client
795            .request_order_status_reports(
796                self.core.account_id,
797                cmd.instrument_id,
798                start_dt,
799                end_dt,
800                cmd.open_only,
801                None, // limit
802            )
803            .await?;
804
805        Ok(reports)
806    }
807
808    async fn generate_fill_reports(
809        &self,
810        cmd: GenerateFillReports,
811    ) -> anyhow::Result<Vec<FillReport>> {
812        let Some(instrument_id) = cmd.instrument_id else {
813            log::warn!("generate_fill_reports requires instrument_id for Binance Spot");
814            return Ok(Vec::new());
815        };
816
817        // Convert ClientOrderId to VenueOrderId if provided (API naming quirk)
818        let venue_order_id = cmd
819            .venue_order_id
820            .as_ref()
821            .map(|id| VenueOrderId::new(id.inner()));
822
823        let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
824        let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
825
826        let reports = self
827            .http_client
828            .request_fill_reports(
829                self.core.account_id,
830                instrument_id,
831                venue_order_id,
832                start_dt,
833                end_dt,
834                None, // limit
835            )
836            .await?;
837
838        Ok(reports)
839    }
840
841    async fn generate_position_status_reports(
842        &self,
843        _cmd: &GeneratePositionStatusReports,
844    ) -> anyhow::Result<Vec<PositionStatusReport>> {
845        // Spot trading doesn't have positions in the traditional sense
846        // Returns empty for spot, could be extended for margin positions
847        Ok(Vec::new())
848    }
849
850    async fn generate_mass_status(
851        &self,
852        lookback_mins: Option<u64>,
853    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
854        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
855
856        let ts_now = self.clock.get_time_ns();
857
858        let start = lookback_mins.map(|mins| {
859            let lookback_ns = mins_to_nanos(mins);
860            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
861        });
862
863        // Binance requires instrument_id for historical orders (open_only=false).
864        // Use open_only=true for mass status to get all open orders across instruments.
865        let order_cmd = GenerateOrderStatusReportsBuilder::default()
866            .ts_init(ts_now)
867            .open_only(true)
868            .start(start)
869            .build()
870            .map_err(|e| anyhow::anyhow!("{e}"))?;
871
872        let position_cmd = GeneratePositionStatusReportsBuilder::default()
873            .ts_init(ts_now)
874            .start(start)
875            .build()
876            .map_err(|e| anyhow::anyhow!("{e}"))?;
877
878        let (order_reports, position_reports) = tokio::try_join!(
879            self.generate_order_status_reports(&order_cmd),
880            self.generate_position_status_reports(&position_cmd),
881        )?;
882
883        // Note: Fill reports require instrument_id for Binance, so we skip them in mass status
884        // They would need to be fetched per-instrument if needed
885
886        log::info!("Received {} OrderStatusReports", order_reports.len());
887        log::info!("Received {} PositionReports", position_reports.len());
888
889        let mut mass_status = ExecutionMassStatus::new(
890            self.core.client_id,
891            self.core.account_id,
892            *BINANCE_VENUE,
893            ts_now,
894            None,
895        );
896
897        mass_status.add_order_reports(order_reports);
898        mass_status.add_position_reports(position_reports);
899
900        Ok(Some(mass_status))
901    }
902
903    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
904        let order = self
905            .core
906            .cache()
907            .order(&cmd.client_order_id)
908            .cloned()
909            .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
910
911        if order.is_closed() {
912            let client_order_id = order.client_order_id();
913            log::warn!("Cannot submit closed order {client_order_id}");
914            return Ok(());
915        }
916
917        log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
918        self.emitter.emit_order_submitted(&order);
919
920        self.submit_order_internal(&cmd)
921    }
922
923    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
924        log::warn!(
925            "submit_order_list not yet implemented for Binance Spot execution client (received {} orders)",
926            cmd.order_list.client_order_ids.len()
927        );
928        Ok(())
929    }
930
931    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
932        // Binance Spot uses cancel-replace for order modification, which requires
933        // the full order specification (side, type, time_in_force). Since ModifyOrder
934        // doesn't include these fields, we need to look up the original order from cache.
935        let order = self.core.cache().order(&cmd.client_order_id).cloned();
936
937        let Some(order) = order else {
938            log::warn!(
939                "Cannot modify order {}: not found in cache",
940                cmd.client_order_id
941            );
942            let ts_init = self.clock.get_time_ns();
943            let rejected_event = OrderModifyRejected::new(
944                self.core.trader_id,
945                cmd.strategy_id,
946                cmd.instrument_id,
947                cmd.client_order_id,
948                "Order not found in cache for modify".into(),
949                UUID4::new(),
950                ts_init, // no venue timestamp, rejected locally
951                ts_init,
952                false,
953                cmd.venue_order_id,
954                Some(self.core.account_id),
955            );
956
957            self.emitter
958                .send_order_event(OrderEventAny::ModifyRejected(rejected_event));
959            return Ok(());
960        };
961
962        let event_emitter = self.emitter.clone();
963        let trader_id = self.core.trader_id;
964        let account_id = self.core.account_id;
965        let clock = self.clock;
966
967        let order_side = order.order_side();
968        let order_type = order.order_type();
969        let time_in_force = order.time_in_force();
970        let quantity = cmd.quantity.unwrap_or_else(|| order.quantity());
971
972        if self.ws_trading_active() {
973            let command = cmd;
974            let ws_client = self.ws_trading_client.as_ref().unwrap().clone();
975            let dispatch_state = self.dispatch_state.clone();
976            let params = build_cancel_replace_params(&command, &order, quantity)?;
977
978            // Pre-register before sending to avoid response racing the insert
979            let request_id = ws_client.next_request_id();
980            dispatch_state.pending_requests.insert(
981                request_id.clone(),
982                PendingRequest {
983                    client_order_id: command.client_order_id,
984                    venue_order_id: command.venue_order_id,
985                    operation: PendingOperation::Modify,
986                },
987            );
988
989            self.spawn_task("modify_order_ws", async move {
990                if let Err(e) = ws_client
991                    .cancel_replace_order_with_id(request_id.clone(), params)
992                    .await
993                {
994                    dispatch_state.pending_requests.remove(&request_id);
995                    let ts_now = clock.get_time_ns();
996                    let rejected_event = OrderModifyRejected::new(
997                        trader_id,
998                        command.strategy_id,
999                        command.instrument_id,
1000                        command.client_order_id,
1001                        format!("ws-modify-order-error: {e}").into(),
1002                        UUID4::new(),
1003                        ts_now,
1004                        ts_now,
1005                        false,
1006                        command.venue_order_id,
1007                        Some(account_id),
1008                    );
1009                    event_emitter.send_order_event(OrderEventAny::ModifyRejected(rejected_event));
1010                    anyhow::bail!("WS modify order failed: {e}");
1011                }
1012                Ok(())
1013            });
1014        } else {
1015            let command = cmd;
1016            let http_client = self.http_client.clone();
1017            log::debug!("WS trading not active, falling back to HTTP for modify_order");
1018
1019            self.spawn_task("modify_order_http", async move {
1020                let result = http_client
1021                    .modify_order(
1022                        account_id,
1023                        command.instrument_id,
1024                        command
1025                            .venue_order_id
1026                            .ok_or_else(|| anyhow::anyhow!("venue_order_id required for modify"))?,
1027                        command.client_order_id,
1028                        order_side,
1029                        order_type,
1030                        quantity,
1031                        time_in_force,
1032                        command.price,
1033                    )
1034                    .await
1035                    .map_err(|e| anyhow::anyhow!("Modify order failed: {e}"));
1036
1037                match result {
1038                    Ok(report) => {
1039                        let ts_now = clock.get_time_ns();
1040                        let updated_event = OrderUpdated::new(
1041                            trader_id,
1042                            command.strategy_id,
1043                            command.instrument_id,
1044                            command.client_order_id,
1045                            report.quantity,
1046                            UUID4::new(),
1047                            ts_now,
1048                            ts_now,
1049                            false,
1050                            Some(report.venue_order_id),
1051                            Some(account_id),
1052                            report.price,
1053                            None,  // trigger_price
1054                            None,  // protection_price
1055                            false, // is_quote_quantity
1056                        );
1057                        event_emitter.send_order_event(OrderEventAny::Updated(updated_event));
1058                    }
1059                    Err(e) => {
1060                        let ts_now = clock.get_time_ns();
1061                        let rejected_event = OrderModifyRejected::new(
1062                            trader_id,
1063                            command.strategy_id,
1064                            command.instrument_id,
1065                            command.client_order_id,
1066                            format!("modify-order-error: {e}").into(),
1067                            UUID4::new(),
1068                            ts_now,
1069                            ts_now,
1070                            false,
1071                            command.venue_order_id,
1072                            Some(account_id),
1073                        );
1074                        event_emitter
1075                            .send_order_event(OrderEventAny::ModifyRejected(rejected_event));
1076                        return Err(e);
1077                    }
1078                }
1079                Ok(())
1080            });
1081        }
1082
1083        Ok(())
1084    }
1085
1086    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1087        self.cancel_order_internal(&cmd);
1088        Ok(())
1089    }
1090
1091    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1092        let event_emitter = self.emitter.clone();
1093        let trader_id = self.core.trader_id;
1094        let account_id = self.core.account_id;
1095        let clock = self.clock;
1096
1097        if self.ws_trading_active() {
1098            let ws_client = self.ws_trading_client.as_ref().unwrap().clone();
1099            let symbol = cmd.instrument_id.symbol.to_string();
1100
1101            self.spawn_task("cancel_all_orders_ws", async move {
1102                if let Err(e) = ws_client.cancel_all_orders(symbol).await {
1103                    log::error!("WS cancel_all_orders failed: {e}");
1104                }
1105                // Individual cancel confirmations dispatched via WS trading message loop
1106                Ok(())
1107            });
1108
1109            return Ok(());
1110        }
1111
1112        log::debug!("WS trading not active, falling back to HTTP for cancel_all_orders");
1113        let http_client = self.http_client.clone();
1114
1115        // Build strategy lookup from cache before spawning (cache is not Send)
1116        let strategy_lookup: AHashMap<ClientOrderId, StrategyId> = {
1117            let cache = self.core.cache();
1118            cache
1119                .orders_open(None, Some(&cmd.instrument_id), None, None, None)
1120                .into_iter()
1121                .map(|order| (order.client_order_id(), order.strategy_id()))
1122                .collect()
1123        };
1124
1125        let command = cmd;
1126        self.spawn_task("cancel_all_orders_http", async move {
1127            let canceled_orders = http_client.cancel_all_orders(command.instrument_id).await?;
1128
1129            for (venue_order_id, client_order_id) in canceled_orders {
1130                let strategy_id = strategy_lookup
1131                    .get(&client_order_id)
1132                    .copied()
1133                    .unwrap_or(command.strategy_id);
1134
1135                let canceled_event = OrderCanceled::new(
1136                    trader_id,
1137                    strategy_id,
1138                    command.instrument_id,
1139                    client_order_id,
1140                    UUID4::new(),
1141                    command.ts_init,
1142                    clock.get_time_ns(),
1143                    false,
1144                    Some(venue_order_id),
1145                    Some(account_id),
1146                );
1147
1148                event_emitter.send_order_event(OrderEventAny::Canceled(canceled_event));
1149            }
1150
1151            Ok(())
1152        });
1153
1154        Ok(())
1155    }
1156
1157    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1158        const BATCH_SIZE: usize = 5;
1159
1160        if cmd.cancels.is_empty() {
1161            return Ok(());
1162        }
1163
1164        let http_client = self.http_client.clone();
1165        let command = cmd;
1166
1167        let event_emitter = self.emitter.clone();
1168        let trader_id = self.core.trader_id;
1169        let account_id = self.core.account_id;
1170        let clock = self.clock;
1171
1172        self.spawn_task("batch_cancel_orders", async move {
1173            for chunk in command.cancels.chunks(BATCH_SIZE) {
1174                let batch_items: Vec<BatchCancelItem> = chunk
1175                    .iter()
1176                    .map(|cancel| {
1177                        if let Some(venue_order_id) = cancel.venue_order_id {
1178                            let order_id = venue_order_id.inner().parse::<i64>().unwrap_or(0);
1179                            if order_id != 0 {
1180                                BatchCancelItem::by_order_id(
1181                                    command.instrument_id.symbol.to_string(),
1182                                    order_id,
1183                                )
1184                            } else {
1185                                BatchCancelItem::by_client_order_id(
1186                                    command.instrument_id.symbol.to_string(),
1187                                    encode_broker_id(
1188                                        &cancel.client_order_id,
1189                                        BINANCE_NAUTILUS_SPOT_BROKER_ID,
1190                                    ),
1191                                )
1192                            }
1193                        } else {
1194                            BatchCancelItem::by_client_order_id(
1195                                command.instrument_id.symbol.to_string(),
1196                                encode_broker_id(
1197                                    &cancel.client_order_id,
1198                                    BINANCE_NAUTILUS_SPOT_BROKER_ID,
1199                                ),
1200                            )
1201                        }
1202                    })
1203                    .collect();
1204
1205                match http_client.batch_cancel_orders(&batch_items).await {
1206                    Ok(results) => {
1207                        for (i, result) in results.iter().enumerate() {
1208                            let cancel = &chunk[i];
1209
1210                            match result {
1211                                BatchCancelResult::Success(success) => {
1212                                    let venue_order_id =
1213                                        VenueOrderId::new(success.order_id.to_string());
1214                                    let canceled_event = OrderCanceled::new(
1215                                        trader_id,
1216                                        cancel.strategy_id,
1217                                        cancel.instrument_id,
1218                                        cancel.client_order_id,
1219                                        UUID4::new(),
1220                                        cancel.ts_init,
1221                                        clock.get_time_ns(),
1222                                        false,
1223                                        Some(venue_order_id),
1224                                        Some(account_id),
1225                                    );
1226
1227                                    event_emitter
1228                                        .send_order_event(OrderEventAny::Canceled(canceled_event));
1229                                }
1230                                BatchCancelResult::Error(error) => {
1231                                    let rejected_event = OrderCancelRejected::new(
1232                                        trader_id,
1233                                        cancel.strategy_id,
1234                                        cancel.instrument_id,
1235                                        cancel.client_order_id,
1236                                        format!(
1237                                            "batch-cancel-error: code={}, msg={}",
1238                                            error.code, error.msg
1239                                        )
1240                                        .into(),
1241                                        UUID4::new(),
1242                                        clock.get_time_ns(),
1243                                        cancel.ts_init,
1244                                        false,
1245                                        cancel.venue_order_id,
1246                                        Some(account_id),
1247                                    );
1248
1249                                    event_emitter.send_order_event(OrderEventAny::CancelRejected(
1250                                        rejected_event,
1251                                    ));
1252                                }
1253                            }
1254                        }
1255                    }
1256                    Err(e) => {
1257                        for cancel in chunk {
1258                            let rejected_event = OrderCancelRejected::new(
1259                                trader_id,
1260                                cancel.strategy_id,
1261                                cancel.instrument_id,
1262                                cancel.client_order_id,
1263                                format!("batch-cancel-request-failed: {e}").into(),
1264                                UUID4::new(),
1265                                clock.get_time_ns(),
1266                                cancel.ts_init,
1267                                false,
1268                                cancel.venue_order_id,
1269                                Some(account_id),
1270                            );
1271
1272                            event_emitter
1273                                .send_order_event(OrderEventAny::CancelRejected(rejected_event));
1274                        }
1275                    }
1276                }
1277            }
1278
1279            Ok(())
1280        });
1281
1282        Ok(())
1283    }
1284}
1285
1286#[expect(clippy::too_many_arguments)]
1287fn dispatch_ws_trading_message(
1288    msg: BinanceSpotWsTradingMessage,
1289    emitter: &ExecutionEventEmitter,
1290    http_client: &BinanceSpotHttpClient,
1291    account_id: AccountId,
1292    clock: &'static AtomicTime,
1293    dispatch_state: &WsDispatchState,
1294    ws_authenticated: &tokio::sync::Notify,
1295    seen_trade_ids: &std::sync::Arc<Mutex<FifoCache<(Ustr, i64), 10_000>>>,
1296) {
1297    match msg {
1298        BinanceSpotWsTradingMessage::OrderAccepted {
1299            request_id,
1300            response,
1301        } => {
1302            dispatch_state.pending_requests.remove(&request_id);
1303            log::debug!(
1304                "WS order accepted: request_id={request_id}, order_id={}",
1305                response.order_id
1306            );
1307            // OrderAccepted event is synthesized from UDS executionReport (New)
1308        }
1309        BinanceSpotWsTradingMessage::OrderRejected {
1310            request_id,
1311            code,
1312            msg,
1313        } => {
1314            log::debug!("WS order rejected: request_id={request_id}, code={code}, msg={msg}");
1315            if let Some((_, pending)) = dispatch_state.pending_requests.remove(&request_id) {
1316                // Clone to drop the DashMap read guard before cleanup_terminal
1317                let identity = dispatch_state
1318                    .order_identities
1319                    .get(&pending.client_order_id)
1320                    .map(|r| r.clone());
1321
1322                if let Some(identity) = identity {
1323                    let code_i64 = i64::from(code);
1324                    let due_post_only = code_i64 == BINANCE_GTX_ORDER_REJECT_CODE
1325                        || (code_i64 == BINANCE_NEW_ORDER_REJECTED_CODE
1326                            && msg == BINANCE_SPOT_POST_ONLY_REJECT_MSG);
1327                    let ts_now = clock.get_time_ns();
1328                    let rejected = OrderRejected::new(
1329                        emitter.trader_id(),
1330                        identity.strategy_id,
1331                        identity.instrument_id,
1332                        pending.client_order_id,
1333                        account_id,
1334                        Ustr::from(&format!("code={code}: {msg}")),
1335                        UUID4::new(),
1336                        ts_now,
1337                        ts_now,
1338                        false,
1339                        due_post_only,
1340                    );
1341                    dispatch_state.cleanup_terminal(pending.client_order_id);
1342                    emitter.send_order_event(OrderEventAny::Rejected(rejected));
1343                } else {
1344                    log::warn!(
1345                        "No order identity for {}, cannot emit OrderRejected",
1346                        pending.client_order_id
1347                    );
1348                }
1349            } else {
1350                log::warn!("No pending request for {request_id}, cannot emit OrderRejected");
1351            }
1352        }
1353        BinanceSpotWsTradingMessage::OrderCanceled {
1354            request_id,
1355            response,
1356        } => {
1357            dispatch_state.pending_requests.remove(&request_id);
1358            log::debug!(
1359                "WS order canceled: request_id={request_id}, order_id={}",
1360                response.order_id
1361            );
1362            // OrderCanceled event is synthesized from UDS executionReport (Canceled)
1363        }
1364        BinanceSpotWsTradingMessage::CancelRejected {
1365            request_id,
1366            code,
1367            msg,
1368        } => {
1369            log::warn!("WS cancel rejected: request_id={request_id}, code={code}, msg={msg}");
1370            if let Some((_, pending)) = dispatch_state.pending_requests.remove(&request_id)
1371                && let Some(identity) = dispatch_state
1372                    .order_identities
1373                    .get(&pending.client_order_id)
1374            {
1375                let ts_now = clock.get_time_ns();
1376                let rejected = OrderCancelRejected::new(
1377                    emitter.trader_id(),
1378                    identity.strategy_id,
1379                    identity.instrument_id,
1380                    pending.client_order_id,
1381                    Ustr::from(&format!("code={code}: {msg}")),
1382                    UUID4::new(),
1383                    ts_now,
1384                    ts_now,
1385                    false,
1386                    pending.venue_order_id,
1387                    Some(account_id),
1388                );
1389                emitter.send_order_event(OrderEventAny::CancelRejected(rejected));
1390            }
1391        }
1392        BinanceSpotWsTradingMessage::CancelReplaceAccepted {
1393            request_id,
1394            cancel_response,
1395            new_order_response,
1396        } => {
1397            dispatch_state.pending_requests.remove(&request_id);
1398            log::debug!(
1399                "WS cancel-replace accepted: request_id={request_id}, \
1400                 canceled_id={}, new_id={}",
1401                cancel_response.order_id,
1402                new_order_response.order_id,
1403            );
1404            // OrderUpdated event is synthesized from UDS executionReport (Replaced)
1405        }
1406        BinanceSpotWsTradingMessage::CancelReplaceRejected {
1407            request_id,
1408            code,
1409            msg,
1410        } => {
1411            log::warn!(
1412                "WS cancel-replace rejected: request_id={request_id}, code={code}, msg={msg}"
1413            );
1414
1415            if let Some((_, pending)) = dispatch_state.pending_requests.remove(&request_id)
1416                && let Some(identity) = dispatch_state
1417                    .order_identities
1418                    .get(&pending.client_order_id)
1419            {
1420                let ts_now = clock.get_time_ns();
1421                let rejected = OrderModifyRejected::new(
1422                    emitter.trader_id(),
1423                    identity.strategy_id,
1424                    identity.instrument_id,
1425                    pending.client_order_id,
1426                    Ustr::from(&format!("code={code}: {msg}")),
1427                    UUID4::new(),
1428                    ts_now,
1429                    ts_now,
1430                    false,
1431                    pending.venue_order_id,
1432                    Some(account_id),
1433                );
1434                emitter.send_order_event(OrderEventAny::ModifyRejected(rejected));
1435            }
1436        }
1437        BinanceSpotWsTradingMessage::AllOrdersCanceled {
1438            request_id,
1439            responses,
1440        } => {
1441            dispatch_state.pending_requests.remove(&request_id);
1442            log::debug!(
1443                "WS all orders canceled: request_id={request_id}, count={}",
1444                responses.len()
1445            );
1446            // Individual OrderCanceled events arrive via UDS executionReport
1447        }
1448        BinanceSpotWsTradingMessage::UserDataSubscribed { subscription_id } => {
1449            log::info!("User data stream subscribed: id={subscription_id}");
1450        }
1451        BinanceSpotWsTradingMessage::ExecutionReport(report) => {
1452            let ts_init = clock.get_time_ns();
1453            dispatch_execution_report(
1454                &report,
1455                emitter,
1456                http_client,
1457                account_id,
1458                dispatch_state,
1459                seen_trade_ids,
1460                ts_init,
1461            );
1462        }
1463        BinanceSpotWsTradingMessage::AccountPosition(position) => {
1464            let ts_init = clock.get_time_ns();
1465            let state = parse_spot_account_position(&position, account_id, ts_init);
1466            emitter.send_account_state(state);
1467        }
1468        BinanceSpotWsTradingMessage::BalanceUpdate(update) => {
1469            log::info!(
1470                "Balance update: asset={}, delta={}",
1471                update.asset,
1472                update.delta,
1473            );
1474            let http_client = http_client.clone();
1475            let emitter = emitter.clone();
1476
1477            get_runtime().spawn(async move {
1478                match http_client.request_account_state(account_id).await {
1479                    Ok(state) => emitter.send_account_state(state),
1480                    Err(e) => {
1481                        log::error!("Failed to refresh account state after balance update: {e}");
1482                    }
1483                }
1484            });
1485        }
1486        BinanceSpotWsTradingMessage::Connected => {
1487            log::info!("WS trading API connected");
1488        }
1489        BinanceSpotWsTradingMessage::Authenticated => {
1490            log::info!("WS trading API authenticated");
1491            ws_authenticated.notify_one();
1492        }
1493        BinanceSpotWsTradingMessage::Reconnected => {
1494            log::info!("WS trading API reconnected");
1495        }
1496        BinanceSpotWsTradingMessage::Error(err) => {
1497            log::error!("WS trading API error: {err}");
1498        }
1499    }
1500}
1501
1502fn build_new_order_params(
1503    order: &impl Order,
1504    client_order_id: ClientOrderId,
1505    is_post_only: bool,
1506    is_quote_quantity: bool,
1507) -> anyhow::Result<NewOrderParams> {
1508    let binance_side = BinanceSide::try_from(order.order_side())?;
1509    let binance_order_type = order_type_to_binance_spot(order.order_type(), is_post_only)?;
1510
1511    let requires_trigger = matches!(
1512        order.order_type(),
1513        OrderType::StopMarket
1514            | OrderType::StopLimit
1515            | OrderType::MarketIfTouched
1516            | OrderType::LimitIfTouched
1517    );
1518
1519    if requires_trigger && order.trigger_price().is_none() {
1520        anyhow::bail!("Conditional orders require a trigger price");
1521    }
1522
1523    let supports_tif = matches!(
1524        binance_order_type,
1525        BinanceSpotOrderType::Limit
1526            | BinanceSpotOrderType::StopLossLimit
1527            | BinanceSpotOrderType::TakeProfitLimit
1528    );
1529    let binance_tif = if supports_tif {
1530        Some(time_in_force_to_binance_spot(order.time_in_force())?)
1531    } else {
1532        None
1533    };
1534
1535    let qty_str = order.quantity().to_string();
1536    let (base_qty, quote_qty) = if is_quote_quantity {
1537        (None, Some(qty_str))
1538    } else {
1539        (Some(qty_str), None)
1540    };
1541
1542    let client_id_str = encode_broker_id(&client_order_id, BINANCE_NAUTILUS_SPOT_BROKER_ID);
1543
1544    Ok(NewOrderParams {
1545        symbol: order.instrument_id().symbol.to_string(),
1546        side: binance_side,
1547        order_type: binance_order_type,
1548        time_in_force: binance_tif,
1549        quantity: base_qty,
1550        quote_order_qty: quote_qty,
1551        price: order.price().map(|p| p.to_string()),
1552        new_client_order_id: Some(client_id_str),
1553        stop_price: order.trigger_price().map(|p| p.to_string()),
1554        trailing_delta: None,
1555        iceberg_qty: order.display_qty().map(|q| q.to_string()),
1556        new_order_resp_type: Some(BinanceOrderResponseType::Full),
1557        self_trade_prevention_mode: None,
1558        strategy_id: None,
1559        strategy_type: None,
1560    })
1561}
1562
1563fn build_cancel_order_params(cmd: &CancelOrder) -> CancelOrderParams {
1564    let order_id = cmd
1565        .venue_order_id
1566        .and_then(|id| id.inner().parse::<i64>().ok());
1567
1568    if let Some(order_id) = order_id {
1569        CancelOrderParams::by_order_id(cmd.instrument_id.symbol.to_string(), order_id)
1570    } else {
1571        let client_id_str = encode_broker_id(&cmd.client_order_id, BINANCE_NAUTILUS_SPOT_BROKER_ID);
1572        CancelOrderParams::by_client_order_id(cmd.instrument_id.symbol.to_string(), client_id_str)
1573    }
1574}
1575
1576fn build_cancel_replace_params(
1577    cmd: &ModifyOrder,
1578    order: &impl Order,
1579    quantity: Quantity,
1580) -> anyhow::Result<CancelReplaceOrderParams> {
1581    let binance_side = BinanceSide::try_from(order.order_side())?;
1582    let binance_order_type = order_type_to_binance_spot(order.order_type(), false)?;
1583    let binance_tif = time_in_force_to_binance_spot(order.time_in_force())?;
1584
1585    let cancel_order_id: Option<i64> = cmd
1586        .venue_order_id
1587        .map(|id| {
1588            id.inner()
1589                .parse::<i64>()
1590                .map_err(|_| anyhow::anyhow!("Invalid venue order ID: {id}"))
1591        })
1592        .transpose()?;
1593
1594    let client_id_str = encode_broker_id(&cmd.client_order_id, BINANCE_NAUTILUS_SPOT_BROKER_ID);
1595
1596    Ok(CancelReplaceOrderParams {
1597        symbol: cmd.instrument_id.symbol.to_string(),
1598        side: binance_side,
1599        order_type: binance_order_type,
1600        cancel_replace_mode: BinanceCancelReplaceMode::StopOnFailure,
1601        time_in_force: Some(binance_tif),
1602        quantity: Some(quantity.to_string()),
1603        quote_order_qty: None,
1604        price: cmd.price.map(|p| p.to_string()),
1605        cancel_order_id,
1606        cancel_orig_client_order_id: if cancel_order_id.is_none() {
1607            Some(client_id_str.clone())
1608        } else {
1609            None
1610        },
1611        new_client_order_id: Some(client_id_str),
1612        stop_price: None,
1613        trailing_delta: None,
1614        iceberg_qty: None,
1615        new_order_resp_type: Some(BinanceOrderResponseType::Full),
1616        self_trade_prevention_mode: None,
1617    })
1618}
1619
1620/// Dispatches a Spot execution report with tracked/untracked routing.
1621///
1622/// Tracked orders (with registered identity) produce proper order events.
1623/// Untracked orders fall back to execution reports for reconciliation.
1624fn dispatch_execution_report(
1625    report: &BinanceSpotExecutionReport,
1626    emitter: &ExecutionEventEmitter,
1627    http_client: &BinanceSpotHttpClient,
1628    account_id: AccountId,
1629    dispatch_state: &WsDispatchState,
1630    seen_trade_ids: &std::sync::Arc<Mutex<FifoCache<(Ustr, i64), 10_000>>>,
1631    ts_init: UnixNanos,
1632) {
1633    let symbol = report.symbol;
1634    let instrument_id = InstrumentId::new(symbol.into(), *BINANCE_VENUE);
1635    let (price_precision, size_precision) = http_client
1636        .get_instrument(&symbol)
1637        .map_or((8, 8), |i| (i.price_precision(), i.size_precision()));
1638
1639    let client_order_id = ClientOrderId::new(decode_broker_id(
1640        &report.client_order_id,
1641        BINANCE_NAUTILUS_SPOT_BROKER_ID,
1642    ));
1643
1644    let identity = dispatch_state
1645        .order_identities
1646        .get(&client_order_id)
1647        .map(|r| r.clone());
1648
1649    if let Some(identity) = identity {
1650        dispatch_tracked_execution_report(
1651            report,
1652            emitter,
1653            account_id,
1654            dispatch_state,
1655            seen_trade_ids,
1656            client_order_id,
1657            &identity,
1658            instrument_id,
1659            price_precision,
1660            size_precision,
1661            ts_init,
1662        );
1663    } else {
1664        dispatch_untracked_execution_report(
1665            report,
1666            emitter,
1667            http_client,
1668            account_id,
1669            seen_trade_ids,
1670            instrument_id,
1671            price_precision,
1672            size_precision,
1673            ts_init,
1674        );
1675    }
1676}
1677
1678/// Dispatches a tracked execution report as proper order events.
1679#[expect(clippy::too_many_arguments)]
1680fn dispatch_tracked_execution_report(
1681    report: &BinanceSpotExecutionReport,
1682    emitter: &ExecutionEventEmitter,
1683    account_id: AccountId,
1684    state: &WsDispatchState,
1685    seen_trade_ids: &std::sync::Arc<Mutex<FifoCache<(Ustr, i64), 10_000>>>,
1686    client_order_id: ClientOrderId,
1687    identity: &OrderIdentity,
1688    instrument_id: InstrumentId,
1689    price_precision: u8,
1690    size_precision: u8,
1691    ts_init: UnixNanos,
1692) {
1693    let venue_order_id = VenueOrderId::new(report.order_id.to_string());
1694    let ts_event = UnixNanos::from_millis(report.event_time as u64);
1695
1696    match report.execution_type {
1697        BinanceSpotExecutionType::New => {
1698            if state.has_filled(&client_order_id) {
1699                log::debug!("Skipping New for already-filled {client_order_id}");
1700                return;
1701            }
1702
1703            if state.has_emitted_accepted(&client_order_id) {
1704                // Already accepted: this New is a cancel-replace result
1705                let price: f64 = report.price.parse().unwrap_or(0.0);
1706                let quantity: f64 = report.original_qty.parse().unwrap_or(0.0);
1707                let trigger_price: f64 = report.stop_price.parse().unwrap_or(0.0);
1708                let trigger = if trigger_price > 0.0 {
1709                    Some(Price::new(trigger_price, price_precision))
1710                } else {
1711                    None
1712                };
1713                let updated = OrderUpdated::new(
1714                    emitter.trader_id(),
1715                    identity.strategy_id,
1716                    identity.instrument_id,
1717                    client_order_id,
1718                    Quantity::new(quantity, size_precision),
1719                    UUID4::new(),
1720                    ts_event,
1721                    ts_init,
1722                    false,
1723                    Some(venue_order_id),
1724                    Some(account_id),
1725                    Some(Price::new(price, price_precision)),
1726                    trigger,
1727                    None,  // protection_price
1728                    false, // is_quote_quantity
1729                );
1730                emitter.send_order_event(OrderEventAny::Updated(updated));
1731                return;
1732            }
1733            state.insert_accepted(client_order_id);
1734            let accepted = OrderAccepted::new(
1735                emitter.trader_id(),
1736                identity.strategy_id,
1737                identity.instrument_id,
1738                client_order_id,
1739                venue_order_id,
1740                account_id,
1741                UUID4::new(),
1742                ts_event,
1743                ts_init,
1744                false,
1745            );
1746            emitter.send_order_event(OrderEventAny::Accepted(accepted));
1747        }
1748        BinanceSpotExecutionType::Trade => {
1749            let dedup_key = (report.symbol, report.trade_id);
1750            let mut guard = seen_trade_ids.lock().expect(MUTEX_POISONED);
1751            let is_duplicate = guard.contains(&dedup_key);
1752            guard.add(dedup_key);
1753            drop(guard);
1754
1755            if is_duplicate {
1756                log::debug!(
1757                    "Duplicate trade_id={} for {}, skipping",
1758                    report.trade_id,
1759                    report.symbol
1760                );
1761                return;
1762            }
1763
1764            ensure_accepted_emitted(
1765                client_order_id,
1766                account_id,
1767                venue_order_id,
1768                identity,
1769                emitter,
1770                state,
1771                ts_init,
1772            );
1773
1774            let last_qty: f64 = report.last_filled_qty.parse().unwrap_or(0.0);
1775            let last_px: f64 = report.last_filled_price.parse().unwrap_or(0.0);
1776            let commission: f64 = report.commission.parse().unwrap_or(0.0);
1777            let commission_currency = report
1778                .commission_asset
1779                .as_ref()
1780                .map_or_else(Currency::USDT, |a| {
1781                    Currency::get_or_create_crypto(a.as_str())
1782                });
1783
1784            let liquidity_side = if report.is_maker {
1785                LiquiditySide::Maker
1786            } else {
1787                LiquiditySide::Taker
1788            };
1789
1790            let filled = OrderFilled::new(
1791                emitter.trader_id(),
1792                identity.strategy_id,
1793                instrument_id,
1794                client_order_id,
1795                venue_order_id,
1796                account_id,
1797                TradeId::new(report.trade_id.to_string()),
1798                identity.order_side,
1799                identity.order_type,
1800                Quantity::new(last_qty, size_precision),
1801                Price::new(last_px, price_precision),
1802                commission_currency,
1803                liquidity_side,
1804                UUID4::new(),
1805                ts_event,
1806                ts_init,
1807                false,
1808                None,
1809                Some(Money::new(commission, commission_currency)),
1810            );
1811
1812            state.insert_filled(client_order_id);
1813            emitter.send_order_event(OrderEventAny::Filled(filled));
1814
1815            let cum_qty: f64 = report.cumulative_filled_qty.parse().unwrap_or(0.0);
1816            let orig_qty: f64 = report.original_qty.parse().unwrap_or(0.0);
1817            if (orig_qty - cum_qty) <= 0.0 {
1818                state.cleanup_terminal(client_order_id);
1819            }
1820        }
1821        BinanceSpotExecutionType::Replaced => {
1822            // Cancel-replace succeeded: the old order is being replaced.
1823            // The replacement NEW event follows with the new price/qty.
1824            log::debug!(
1825                "Order replaced: client_order_id={client_order_id}, venue_order_id={venue_order_id}"
1826            );
1827        }
1828        BinanceSpotExecutionType::Canceled
1829        | BinanceSpotExecutionType::Expired
1830        | BinanceSpotExecutionType::TradePrevention => {
1831            ensure_accepted_emitted(
1832                client_order_id,
1833                account_id,
1834                venue_order_id,
1835                identity,
1836                emitter,
1837                state,
1838                ts_init,
1839            );
1840            let canceled = OrderCanceled::new(
1841                emitter.trader_id(),
1842                identity.strategy_id,
1843                identity.instrument_id,
1844                client_order_id,
1845                UUID4::new(),
1846                ts_event,
1847                ts_init,
1848                false,
1849                Some(venue_order_id),
1850                Some(account_id),
1851            );
1852            state.cleanup_terminal(client_order_id);
1853            emitter.send_order_event(OrderEventAny::Canceled(canceled));
1854        }
1855        BinanceSpotExecutionType::Rejected => {
1856            let reason = if report.reject_reason.is_empty() {
1857                Ustr::from("Order rejected by venue")
1858            } else {
1859                Ustr::from(&report.reject_reason)
1860            };
1861            let due_post_only = report.time_in_force == BinanceTimeInForce::Gtx
1862                || (report.order_type == "LIMIT_MAKER"
1863                    && (report.reject_reason.is_empty() || report.reject_reason == "NONE"));
1864            state.cleanup_terminal(client_order_id);
1865            emitter.emit_order_rejected_event(
1866                identity.strategy_id,
1867                identity.instrument_id,
1868                client_order_id,
1869                reason.as_str(),
1870                ts_init,
1871                due_post_only,
1872            );
1873        }
1874    }
1875}
1876
1877/// Dispatches an untracked execution report as execution reports for reconciliation.
1878#[expect(clippy::too_many_arguments)]
1879fn dispatch_untracked_execution_report(
1880    report: &BinanceSpotExecutionReport,
1881    emitter: &ExecutionEventEmitter,
1882    _http_client: &BinanceSpotHttpClient,
1883    account_id: AccountId,
1884    seen_trade_ids: &std::sync::Arc<Mutex<FifoCache<(Ustr, i64), 10_000>>>,
1885    instrument_id: InstrumentId,
1886    price_precision: u8,
1887    size_precision: u8,
1888    ts_init: UnixNanos,
1889) {
1890    match report.execution_type {
1891        BinanceSpotExecutionType::Trade => {
1892            let dedup_key = (report.symbol, report.trade_id);
1893            let mut guard = seen_trade_ids.lock().expect(MUTEX_POISONED);
1894            let is_duplicate = guard.contains(&dedup_key);
1895            guard.add(dedup_key);
1896            drop(guard);
1897
1898            if is_duplicate {
1899                log::debug!(
1900                    "Duplicate trade_id={} for {}, skipping",
1901                    report.trade_id,
1902                    report.symbol
1903                );
1904                return;
1905            }
1906
1907            match parse_spot_exec_report_to_order_status(
1908                report,
1909                instrument_id,
1910                price_precision,
1911                size_precision,
1912                account_id,
1913                ts_init,
1914            ) {
1915                Ok(status) => emitter.send_order_status_report(status),
1916                Err(e) => log::error!("Failed to parse order status report: {e}"),
1917            }
1918
1919            match parse_spot_exec_report_to_fill(
1920                report,
1921                instrument_id,
1922                price_precision,
1923                size_precision,
1924                account_id,
1925                ts_init,
1926            ) {
1927                Ok(fill) => emitter.send_fill_report(fill),
1928                Err(e) => log::error!("Failed to parse fill report: {e}"),
1929            }
1930        }
1931        BinanceSpotExecutionType::New
1932        | BinanceSpotExecutionType::Canceled
1933        | BinanceSpotExecutionType::Replaced
1934        | BinanceSpotExecutionType::Rejected
1935        | BinanceSpotExecutionType::Expired
1936        | BinanceSpotExecutionType::TradePrevention => {
1937            match parse_spot_exec_report_to_order_status(
1938                report,
1939                instrument_id,
1940                price_precision,
1941                size_precision,
1942                account_id,
1943                ts_init,
1944            ) {
1945                Ok(status) => emitter.send_order_status_report(status),
1946                Err(e) => log::error!("Failed to parse order status report: {e}"),
1947            }
1948        }
1949    }
1950}
1951
1952// Checks for GTX (-5022) and spot LIMIT_MAKER (-2010 + specific message)
1953fn is_spot_post_only_rejection(error: &crate::spot::http::BinanceSpotHttpError) -> bool {
1954    match error {
1955        crate::spot::http::BinanceSpotHttpError::BinanceError { code, message } => {
1956            *code == BINANCE_GTX_ORDER_REJECT_CODE
1957                || (*code == BINANCE_NEW_ORDER_REJECTED_CODE
1958                    && message == BINANCE_SPOT_POST_ONLY_REJECT_MSG)
1959        }
1960        _ => false,
1961    }
1962}
1963
1964#[cfg(test)]
1965mod tests {
1966    use nautilus_common::messages::ExecutionEvent;
1967    use nautilus_core::time::get_atomic_clock_realtime;
1968    use nautilus_model::{
1969        enums::{AccountType, LiquiditySide, OrderSide},
1970        identifiers::{StrategyId, TraderId},
1971    };
1972    use rstest::rstest;
1973
1974    use super::*;
1975    use crate::common::enums::BinanceEnvironment;
1976
1977    #[rstest]
1978    fn test_dispatch_ws_trading_message_emits_cancel_rejected_and_clears_pending_request() {
1979        let clock = get_atomic_clock_realtime();
1980        let (emitter, mut rx) = create_test_emitter(clock);
1981        let http_client = create_test_http_client(clock);
1982        let dispatch_state = create_tracked_dispatch_state(
1983            ClientOrderId::from("TEST"),
1984            InstrumentId::from("BTCUSDT.BINANCE"),
1985        );
1986        let ws_authenticated = tokio::sync::Notify::new();
1987        let seen_trade_ids = Arc::new(Mutex::new(FifoCache::new()));
1988
1989        dispatch_state.pending_requests.insert(
1990            "req-cancel".to_string(),
1991            PendingRequest {
1992                client_order_id: ClientOrderId::from("TEST"),
1993                venue_order_id: Some(VenueOrderId::from("12345")),
1994                operation: PendingOperation::Cancel,
1995            },
1996        );
1997
1998        dispatch_ws_trading_message(
1999            BinanceSpotWsTradingMessage::CancelRejected {
2000                request_id: "req-cancel".to_string(),
2001                code: -2011,
2002                msg: "Unknown order sent".to_string(),
2003            },
2004            &emitter,
2005            &http_client,
2006            AccountId::from("BINANCE-001"),
2007            clock,
2008            &dispatch_state,
2009            &ws_authenticated,
2010            &seen_trade_ids,
2011        );
2012
2013        assert!(dispatch_state.pending_requests.get("req-cancel").is_none());
2014
2015        match rx
2016            .try_recv()
2017            .expect("Cancel rejection event should be emitted")
2018        {
2019            ExecutionEvent::Order(OrderEventAny::CancelRejected(event)) => {
2020                assert_eq!(event.client_order_id, ClientOrderId::from("TEST"));
2021                assert_eq!(event.account_id, Some(AccountId::from("BINANCE-001")));
2022                assert!(event.reason.as_str().contains("code=-2011"));
2023            }
2024            other => panic!("Expected CancelRejected event, was {other:?}"),
2025        }
2026    }
2027
2028    #[rstest]
2029    fn test_dispatch_ws_trading_message_emits_modify_rejected_and_clears_pending_request() {
2030        let clock = get_atomic_clock_realtime();
2031        let (emitter, mut rx) = create_test_emitter(clock);
2032        let http_client = create_test_http_client(clock);
2033        let dispatch_state = create_tracked_dispatch_state(
2034            ClientOrderId::from("TEST"),
2035            InstrumentId::from("BTCUSDT.BINANCE"),
2036        );
2037        let ws_authenticated = tokio::sync::Notify::new();
2038        let seen_trade_ids = Arc::new(Mutex::new(FifoCache::new()));
2039
2040        dispatch_state.pending_requests.insert(
2041            "req-modify".to_string(),
2042            PendingRequest {
2043                client_order_id: ClientOrderId::from("TEST"),
2044                venue_order_id: Some(VenueOrderId::from("12345")),
2045                operation: PendingOperation::Modify,
2046            },
2047        );
2048
2049        dispatch_ws_trading_message(
2050            BinanceSpotWsTradingMessage::CancelReplaceRejected {
2051                request_id: "req-modify".to_string(),
2052                code: -2021,
2053                msg: "Order cancel-replace partially failed".to_string(),
2054            },
2055            &emitter,
2056            &http_client,
2057            AccountId::from("BINANCE-001"),
2058            clock,
2059            &dispatch_state,
2060            &ws_authenticated,
2061            &seen_trade_ids,
2062        );
2063
2064        assert!(dispatch_state.pending_requests.get("req-modify").is_none());
2065
2066        match rx
2067            .try_recv()
2068            .expect("Modify rejection event should be emitted")
2069        {
2070            ExecutionEvent::Order(OrderEventAny::ModifyRejected(event)) => {
2071                assert_eq!(event.client_order_id, ClientOrderId::from("TEST"));
2072                assert_eq!(event.account_id, Some(AccountId::from("BINANCE-001")));
2073                assert!(event.reason.as_str().contains("code=-2021"));
2074            }
2075            other => panic!("Expected ModifyRejected event, was {other:?}"),
2076        }
2077    }
2078
2079    fn create_test_emitter(
2080        clock: &'static AtomicTime,
2081    ) -> (
2082        ExecutionEventEmitter,
2083        tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2084    ) {
2085        let mut emitter = ExecutionEventEmitter::new(
2086            clock,
2087            TraderId::from("TESTER-001"),
2088            AccountId::from("BINANCE-001"),
2089            AccountType::Cash,
2090            None,
2091        );
2092        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
2093        emitter.set_sender(tx);
2094        (emitter, rx)
2095    }
2096
2097    fn create_test_http_client(clock: &'static AtomicTime) -> BinanceSpotHttpClient {
2098        BinanceSpotHttpClient::new(
2099            BinanceEnvironment::Mainnet,
2100            clock,
2101            None,
2102            None,
2103            None,
2104            None,
2105            None,
2106            None,
2107        )
2108        .expect("Test HTTP client should be created")
2109    }
2110
2111    fn create_tracked_dispatch_state(
2112        client_order_id: ClientOrderId,
2113        instrument_id: InstrumentId,
2114    ) -> WsDispatchState {
2115        let dispatch_state = WsDispatchState::default();
2116        dispatch_state.order_identities.insert(
2117            client_order_id,
2118            OrderIdentity {
2119                instrument_id,
2120                strategy_id: StrategyId::from("TEST-STRATEGY"),
2121                order_side: OrderSide::Buy,
2122                order_type: OrderType::Limit,
2123                price: None,
2124            },
2125        );
2126        dispatch_state
2127    }
2128
2129    #[rstest]
2130    #[case::gtx(
2131        crate::spot::http::BinanceSpotHttpError::BinanceError {
2132            code: BINANCE_GTX_ORDER_REJECT_CODE,
2133            message: "Order would immediately trigger.".to_string(),
2134        },
2135        true,
2136    )]
2137    #[case::spot_post_only(
2138        crate::spot::http::BinanceSpotHttpError::BinanceError {
2139            code: BINANCE_NEW_ORDER_REJECTED_CODE,
2140            message: BINANCE_SPOT_POST_ONLY_REJECT_MSG.to_string(),
2141        },
2142        true,
2143    )]
2144    #[case::new_order_rejected_other_message(
2145        crate::spot::http::BinanceSpotHttpError::BinanceError {
2146            code: BINANCE_NEW_ORDER_REJECTED_CODE,
2147            message: "Insufficient balance.".to_string(),
2148        },
2149        false,
2150    )]
2151    #[case::unrelated_code(
2152        crate::spot::http::BinanceSpotHttpError::BinanceError {
2153            code: -2011,
2154            message: "Unknown order sent.".to_string(),
2155        },
2156        false,
2157    )]
2158    #[case::non_binance_error(
2159        crate::spot::http::BinanceSpotHttpError::NetworkError("connection reset".to_string()),
2160        false,
2161    )]
2162    fn test_is_spot_post_only_rejection(
2163        #[case] error: crate::spot::http::BinanceSpotHttpError,
2164        #[case] expected: bool,
2165    ) {
2166        assert_eq!(is_spot_post_only_rejection(&error), expected);
2167    }
2168
2169    #[rstest]
2170    fn test_dispatch_tracked_execution_report_trade_dedup() {
2171        let clock = get_atomic_clock_realtime();
2172        let (emitter, mut rx) = create_test_emitter(clock);
2173        let http_client = create_test_http_client(clock);
2174        let client_order_id = ClientOrderId::from("x-TD67BGP9-T0000000000000");
2175        let dispatch_state = create_tracked_dispatch_state(
2176            ClientOrderId::from("O-20200101-000000-000-000-0"),
2177            InstrumentId::from("ETHUSDT.BINANCE"),
2178        );
2179        let ws_authenticated = tokio::sync::Notify::new();
2180        let seen_trade_ids = Arc::new(Mutex::new(FifoCache::new()));
2181
2182        let trade_json = crate::common::testing::load_fixture_string(
2183            "spot/user_data_json/execution_report_trade.json",
2184        );
2185        let report: BinanceSpotExecutionReport = serde_json::from_str(&trade_json).unwrap();
2186
2187        dispatch_ws_trading_message(
2188            BinanceSpotWsTradingMessage::ExecutionReport(Box::new(report.clone())),
2189            &emitter,
2190            &http_client,
2191            AccountId::from("BINANCE-001"),
2192            clock,
2193            &dispatch_state,
2194            &ws_authenticated,
2195            &seen_trade_ids,
2196        );
2197        dispatch_ws_trading_message(
2198            BinanceSpotWsTradingMessage::ExecutionReport(Box::new(report)),
2199            &emitter,
2200            &http_client,
2201            AccountId::from("BINANCE-001"),
2202            clock,
2203            &dispatch_state,
2204            &ws_authenticated,
2205            &seen_trade_ids,
2206        );
2207
2208        let mut events = Vec::new();
2209        while let Ok(event) = rx.try_recv() {
2210            events.push(event);
2211        }
2212
2213        let fills: Vec<_> = events
2214            .iter()
2215            .filter(|e| matches!(e, ExecutionEvent::Order(OrderEventAny::Filled(_))))
2216            .collect();
2217        assert_eq!(fills.len(), 1, "duplicate trade should be deduped");
2218
2219        match fills[0] {
2220            ExecutionEvent::Order(OrderEventAny::Filled(fill)) => {
2221                assert_eq!(
2222                    fill.client_order_id,
2223                    ClientOrderId::from("O-20200101-000000-000-000-0"),
2224                );
2225                assert_eq!(fill.trade_id, TradeId::new("98765432"));
2226                assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
2227            }
2228            _ => unreachable!(),
2229        }
2230        let _ = client_order_id;
2231    }
2232
2233    #[rstest]
2234    fn test_dispatch_tracked_execution_report_rejected_gtx_sets_post_only() {
2235        let clock = get_atomic_clock_realtime();
2236        let (emitter, mut rx) = create_test_emitter(clock);
2237        let http_client = create_test_http_client(clock);
2238        let client_order_id = ClientOrderId::from("O-20200101-000000-000-000-1");
2239        let dispatch_state =
2240            create_tracked_dispatch_state(client_order_id, InstrumentId::from("ETHUSDT.BINANCE"));
2241        let ws_authenticated = tokio::sync::Notify::new();
2242        let seen_trade_ids = Arc::new(Mutex::new(FifoCache::new()));
2243
2244        let encoded = encode_broker_id(&client_order_id, BINANCE_NAUTILUS_SPOT_BROKER_ID);
2245        let report_json = format!(
2246            r#"{{
2247                "e":"executionReport","E":1709654400000,"s":"ETHUSDT",
2248                "c":"{encoded}","S":"BUY","o":"LIMIT","f":"GTX",
2249                "q":"1.00000000","p":"2500.00000000","P":"0.00000000",
2250                "x":"REJECTED","X":"REJECTED","r":"NONE","i":12345678,
2251                "l":"0.00000000","z":"0.00000000","L":"0.00000000",
2252                "n":"0","N":null,"T":1709654400000,"t":-1,"w":false,"m":false,
2253                "O":1709654400000,"Z":"0.00000000","C":""
2254            }}"#,
2255        );
2256        let report: BinanceSpotExecutionReport = serde_json::from_str(&report_json).unwrap();
2257
2258        dispatch_ws_trading_message(
2259            BinanceSpotWsTradingMessage::ExecutionReport(Box::new(report)),
2260            &emitter,
2261            &http_client,
2262            AccountId::from("BINANCE-001"),
2263            clock,
2264            &dispatch_state,
2265            &ws_authenticated,
2266            &seen_trade_ids,
2267        );
2268
2269        match rx.try_recv().expect("OrderRejected event expected") {
2270            ExecutionEvent::Order(OrderEventAny::Rejected(event)) => {
2271                assert_eq!(event.client_order_id, client_order_id);
2272                assert_eq!(event.account_id, AccountId::from("BINANCE-001"));
2273                assert_ne!(event.due_post_only, 0);
2274            }
2275            other => panic!("Expected OrderRejected event, was {other:?}"),
2276        }
2277    }
2278}