Skip to main content

nautilus_binance/spot/websocket/trading/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Spot WebSocket API message handler.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel.
21//!
22//! ## Responsibilities
23//!
24//! - Command processing: Receives `BinanceSpotWsTradingCommand` from client, serializes to JSON requests.
25//! - Response decoding: Parses SBE binary responses using schema 3 decoders.
26//! - Request correlation: Matches responses to pending requests by ID.
27//! - Message transformation: Emits `BinanceSpotWsTradingMessage` events to client via channel.
28
29use std::{
30    fmt::Debug,
31    sync::{
32        Arc,
33        atomic::{AtomicBool, AtomicU64, Ordering},
34    },
35};
36
37use ahash::AHashMap;
38use nautilus_network::{RECONNECTED, websocket::WebSocketClient};
39use tokio_tungstenite::tungstenite::Message;
40
41use super::{
42    client::BINANCE_WS_RATE_LIMIT_KEY_ORDER,
43    error::{BinanceWsApiError, BinanceWsApiResult},
44    messages::{
45        BinanceSpotWsTradingCommand, BinanceSpotWsTradingMessage, BinanceSpotWsTradingRequest,
46        BinanceSpotWsTradingRequestMeta, method,
47    },
48};
49use crate::{
50    common::credential::SigningCredential,
51    spot::{
52        http::{models::BinanceCancelOrderResponse, parse},
53        sbe::spot::{
54            ReadBuf,
55            error_response_codec::ErrorResponseDecoder,
56            message_header_codec,
57            web_socket_response_codec::{SBE_TEMPLATE_ID, WebSocketResponseDecoder},
58        },
59    },
60};
61
62/// Binance Spot WebSocket API handler.
63///
64/// Runs in a dedicated Tokio task, processing commands from the client
65/// and transforming raw WebSocket messages into Nautilus domain events.
66/// Messages are sent to the client via the output channel.
67pub struct BinanceSpotWsTradingHandler {
68    signal: Arc<AtomicBool>,
69    inner: Option<WebSocketClient>,
70    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceSpotWsTradingCommand>,
71    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
72    out_tx: tokio::sync::mpsc::UnboundedSender<BinanceSpotWsTradingMessage>,
73    credential: Arc<SigningCredential>,
74    pending_requests: AHashMap<String, BinanceSpotWsTradingRequestMeta>,
75    request_id_counter: AtomicU64,
76}
77
78impl Debug for BinanceSpotWsTradingHandler {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        f.debug_struct(stringify!(BinanceSpotWsTradingHandler))
81            .field("inner", &self.inner.as_ref().map(|_| "<client>"))
82            .field(
83                "pending_requests",
84                &format!("{} pending", self.pending_requests.len()),
85            )
86            .finish_non_exhaustive()
87    }
88}
89
90impl BinanceSpotWsTradingHandler {
91    /// Creates a new handler instance.
92    #[must_use]
93    pub fn new(
94        signal: Arc<AtomicBool>,
95        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceSpotWsTradingCommand>,
96        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
97        out_tx: tokio::sync::mpsc::UnboundedSender<BinanceSpotWsTradingMessage>,
98        credential: Arc<SigningCredential>,
99    ) -> Self {
100        Self {
101            signal,
102            inner: None,
103            cmd_rx,
104            raw_rx,
105            out_tx,
106            credential,
107            pending_requests: AHashMap::new(),
108            request_id_counter: AtomicU64::new(1000),
109        }
110    }
111
112    /// Runs the main event loop for commands and raw messages.
113    ///
114    /// Sends output messages via `out_tx` channel. Returns `false` when disconnected
115    /// or the signal is set, indicating the handler should exit.
116    pub async fn run(&mut self) -> bool {
117        loop {
118            if self.signal.load(Ordering::Relaxed) {
119                return false;
120            }
121
122            tokio::select! {
123                Some(cmd) = self.cmd_rx.recv() => {
124                    match cmd {
125                        BinanceSpotWsTradingCommand::SetClient(client) => {
126                            log::debug!("Handler received WebSocket client");
127                            self.inner = Some(client);
128                            self.emit(BinanceSpotWsTradingMessage::Connected);
129                        }
130                        BinanceSpotWsTradingCommand::Disconnect => {
131                            log::debug!("Handler disconnecting WebSocket client");
132                            self.inner = None;
133                            return false;
134                        }
135                        BinanceSpotWsTradingCommand::PlaceOrder { id, params } => {
136                            if let Err(e) = self.handle_place_order(id.clone(), params).await {
137                                log::error!("Failed to handle place order command: {e}");
138                                self.emit(BinanceSpotWsTradingMessage::OrderRejected {
139                                    request_id: id,
140                                    code: -1,
141                                    msg: e.to_string(),
142                                });
143                            }
144                        }
145                        BinanceSpotWsTradingCommand::CancelOrder { id, params } => {
146                            if let Err(e) = self.handle_cancel_order(id.clone(), params).await {
147                                log::error!("Failed to handle cancel order command: {e}");
148                                self.emit(BinanceSpotWsTradingMessage::CancelRejected {
149                                    request_id: id,
150                                    code: -1,
151                                    msg: e.to_string(),
152                                });
153                            }
154                        }
155                        BinanceSpotWsTradingCommand::CancelReplaceOrder { id, params } => {
156                            if let Err(e) = self.handle_cancel_replace_order(id.clone(), params).await {
157                                log::error!("Failed to handle cancel replace command: {e}");
158                                self.emit(BinanceSpotWsTradingMessage::CancelReplaceRejected {
159                                    request_id: id,
160                                    code: -1,
161                                    msg: e.to_string(),
162                                });
163                            }
164                        }
165                        BinanceSpotWsTradingCommand::CancelAllOrders { id, symbol } => {
166                            if let Err(e) = self.handle_cancel_all_orders(id.clone(), symbol).await {
167                                log::error!("Failed to handle cancel all command: {e}");
168                                self.emit(BinanceSpotWsTradingMessage::CancelRejected {
169                                    request_id: id,
170                                    code: -1,
171                                    msg: e.to_string(),
172                                });
173                            }
174                        }
175                        BinanceSpotWsTradingCommand::SessionLogon => {
176                            if let Err(e) = self.handle_session_logon().await {
177                                log::error!("Session logon failed: {e}");
178                                self.emit(BinanceSpotWsTradingMessage::Error(
179                                    format!("Session logon failed: {e}"),
180                                ));
181                            }
182                        }
183                        BinanceSpotWsTradingCommand::SubscribeUserData => {
184                            if let Err(e) = self.handle_subscribe_user_data().await {
185                                log::error!("User data subscribe failed: {e}");
186                                self.emit(BinanceSpotWsTradingMessage::Error(
187                                    format!("User data subscribe failed: {e}"),
188                                ));
189                            }
190                        }
191                    }
192                }
193                Some(msg) = self.raw_rx.recv() => {
194                    if let Message::Text(ref text) = msg
195                        && text.as_str() == RECONNECTED
196                    {
197                        log::info!("Handler received reconnection signal");
198
199                        // Fail any pending requests - they won't get responses on new connection
200                        self.fail_pending_requests();
201
202                        self.emit(BinanceSpotWsTradingMessage::Reconnected);
203                        continue;
204                    }
205
206                    self.handle_message(msg);
207                }
208                else => {
209                    // Both channels closed
210                    return false;
211                }
212            }
213        }
214    }
215
216    /// Sends a message to the output channel.
217    fn emit(&self, msg: BinanceSpotWsTradingMessage) {
218        if let Err(e) = self.out_tx.send(msg) {
219            log::error!("Failed to send message to output channel: {e}");
220        }
221    }
222
223    /// Fails all pending requests after a reconnection.
224    fn fail_pending_requests(&mut self) {
225        if self.pending_requests.is_empty() {
226            return;
227        }
228
229        let count = self.pending_requests.len();
230        log::warn!("Failing {count} pending requests after reconnection");
231
232        let pending = std::mem::take(&mut self.pending_requests);
233        for (request_id, meta) in pending {
234            let msg = self.create_rejection(
235                request_id,
236                -1,
237                "Connection lost before response received".to_string(),
238                meta,
239            );
240            self.emit(msg);
241        }
242    }
243
244    async fn handle_place_order(
245        &mut self,
246        id: String,
247        params: crate::spot::http::query::NewOrderParams,
248    ) -> BinanceWsApiResult<()> {
249        let params_json = serde_json::to_value(&params)
250            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
251        let signed_params = self.sign_params(params_json)?;
252
253        let request = BinanceSpotWsTradingRequest::new(&id, method::ORDER_PLACE, signed_params);
254        self.pending_requests
255            .insert(id.clone(), BinanceSpotWsTradingRequestMeta::PlaceOrder);
256        self.send_request(request).await
257    }
258
259    async fn handle_cancel_order(
260        &mut self,
261        id: String,
262        params: crate::spot::http::query::CancelOrderParams,
263    ) -> BinanceWsApiResult<()> {
264        let params_json = serde_json::to_value(&params)
265            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
266        let signed_params = self.sign_params(params_json)?;
267
268        let request = BinanceSpotWsTradingRequest::new(&id, method::ORDER_CANCEL, signed_params);
269        self.pending_requests
270            .insert(id.clone(), BinanceSpotWsTradingRequestMeta::CancelOrder);
271        self.send_request(request).await
272    }
273
274    async fn handle_cancel_replace_order(
275        &mut self,
276        id: String,
277        params: crate::spot::http::query::CancelReplaceOrderParams,
278    ) -> BinanceWsApiResult<()> {
279        let params_json = serde_json::to_value(&params)
280            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
281        let signed_params = self.sign_params(params_json)?;
282
283        let request =
284            BinanceSpotWsTradingRequest::new(&id, method::ORDER_CANCEL_REPLACE, signed_params);
285        self.pending_requests.insert(
286            id.clone(),
287            BinanceSpotWsTradingRequestMeta::CancelReplaceOrder,
288        );
289        self.send_request(request).await
290    }
291
292    async fn handle_cancel_all_orders(
293        &mut self,
294        id: String,
295        symbol: String,
296    ) -> BinanceWsApiResult<()> {
297        let params_json = serde_json::json!({ "symbol": symbol });
298        let signed_params = self.sign_params(params_json)?;
299
300        let request =
301            BinanceSpotWsTradingRequest::new(&id, method::OPEN_ORDERS_CANCEL_ALL, signed_params);
302        self.pending_requests
303            .insert(id.clone(), BinanceSpotWsTradingRequestMeta::CancelAllOrders);
304        self.send_request(request).await
305    }
306
307    async fn handle_session_logon(&mut self) -> BinanceWsApiResult<()> {
308        let id = self.next_request_id();
309        let params_json = serde_json::json!({});
310        let signed_params = self.sign_params(params_json)?;
311
312        let request = BinanceSpotWsTradingRequest::new(&id, "session.logon", signed_params);
313        self.pending_requests
314            .insert(id, BinanceSpotWsTradingRequestMeta::SessionLogon);
315        self.send_request(request).await
316    }
317
318    async fn handle_subscribe_user_data(&mut self) -> BinanceWsApiResult<()> {
319        let id = self.next_request_id();
320        let request = BinanceSpotWsTradingRequest::new(
321            &id,
322            "userDataStream.subscribe",
323            serde_json::json!({}),
324        );
325        self.pending_requests
326            .insert(id, BinanceSpotWsTradingRequestMeta::SubscribeUserData);
327        self.send_request(request).await
328    }
329
330    fn next_request_id(&self) -> String {
331        let id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
332        format!("ws-{id}")
333    }
334
335    fn sign_params(&self, mut params: serde_json::Value) -> BinanceWsApiResult<serde_json::Value> {
336        let timestamp = std::time::SystemTime::now()
337            .duration_since(std::time::UNIX_EPOCH)
338            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?
339            .as_millis() as i64;
340
341        if let Some(obj) = params.as_object_mut() {
342            obj.insert("timestamp".to_string(), serde_json::json!(timestamp));
343            obj.insert(
344                "apiKey".to_string(),
345                serde_json::json!(self.credential.api_key()),
346            );
347        }
348
349        let query_string = serde_urlencoded::to_string(&params)
350            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
351        let signature = self.credential.sign(&query_string);
352
353        if let Some(obj) = params.as_object_mut() {
354            obj.insert("signature".to_string(), serde_json::json!(signature));
355        }
356
357        Ok(params)
358    }
359
360    async fn send_request(
361        &mut self,
362        request: BinanceSpotWsTradingRequest,
363    ) -> BinanceWsApiResult<()> {
364        let client = self.inner.as_mut().ok_or_else(|| {
365            BinanceWsApiError::ConnectionError("WebSocket not connected".to_string())
366        })?;
367
368        let json = serde_json::to_string(&request)
369            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
370
371        log::debug!(
372            "Sending WebSocket API request id={} method={}",
373            request.id,
374            request.method
375        );
376
377        // Apply rate limiting for order operations
378        client
379            .send_text(json, Some(BINANCE_WS_RATE_LIMIT_KEY_ORDER.as_slice()))
380            .await
381            .map_err(|e| {
382                BinanceWsApiError::ConnectionError(format!("Failed to send request: {e}"))
383            })?;
384
385        Ok(())
386    }
387
388    fn handle_message(&mut self, msg: Message) {
389        match msg {
390            Message::Binary(data) => self.handle_binary_response(&data),
391            Message::Text(text) => self.handle_text_response(&text),
392            Message::Ping(_) | Message::Pong(_) => {}
393            Message::Close(frame) => {
394                log::debug!("WebSocket closed: {frame:?}");
395            }
396            Message::Frame(_) => {}
397        }
398    }
399
400    fn handle_binary_response(&mut self, data: &[u8]) {
401        match self.decode_ws_api_response(data) {
402            Ok(response) => self.emit(response),
403            Err(e) => {
404                log::error!("Failed to decode WebSocket API response: {e}");
405                self.emit(BinanceSpotWsTradingMessage::Error(e.to_string()));
406            }
407        }
408    }
409
410    fn handle_text_response(&mut self, text: &str) {
411        let json: serde_json::Value = match serde_json::from_str(text) {
412            Ok(j) => j,
413            Err(e) => {
414                log::warn!("Failed to parse text response as JSON: {e}");
415                return;
416            }
417        };
418
419        // User data events arrive wrapped: {"subscriptionId": N, "event": {...}}
420        if let Some(event) = json.get("event") {
421            self.handle_user_data_event(event);
422            return;
423        }
424
425        // WS API responses have an "id" field for request correlation
426        if let Some(id) = json.get("id") {
427            let id_str = match id {
428                serde_json::Value::String(s) => s.clone(),
429                serde_json::Value::Number(n) => n.to_string(),
430                _ => return,
431            };
432
433            if let Some(meta) = self.pending_requests.remove(&id_str) {
434                // Check for error: nested {"error": {"code": N, "msg": "..."}}
435                // or top-level {"code": N, "msg": "..."}
436                let error_info = json
437                    .get("error")
438                    .map(|e| {
439                        (
440                            e.get("code").and_then(|v| v.as_i64()).unwrap_or(-1),
441                            e.get("msg")
442                                .and_then(|v| v.as_str())
443                                .unwrap_or("Unknown error")
444                                .to_string(),
445                        )
446                    })
447                    .or_else(|| {
448                        json.get("code").and_then(|c| c.as_i64()).map(|code| {
449                            let msg = json
450                                .get("msg")
451                                .and_then(|v| v.as_str())
452                                .unwrap_or("Unknown error")
453                                .to_string();
454                            (code, msg)
455                        })
456                    });
457
458                if let Some((code, msg)) = error_info {
459                    let rejection = self.create_rejection(id_str, code as i32, msg, meta);
460                    self.emit(rejection);
461                    return;
462                }
463
464                // Success response
465                match meta {
466                    BinanceSpotWsTradingRequestMeta::SessionLogon => {
467                        log::info!("Session authenticated");
468                        self.emit(BinanceSpotWsTradingMessage::Authenticated);
469                    }
470                    BinanceSpotWsTradingRequestMeta::SubscribeUserData => {
471                        let subscription_id = json
472                            .get("result")
473                            .and_then(|r| r.get("subscriptionId"))
474                            .map(|v| v.to_string())
475                            .unwrap_or_default();
476                        log::info!("User data stream subscribed: id={subscription_id}");
477                        self.emit(BinanceSpotWsTradingMessage::UserDataSubscribed {
478                            subscription_id,
479                        });
480                    }
481                    _ => {
482                        // Order operation responses come as SBE binary, not JSON text.
483                        // If we get a JSON success for an order operation, log it.
484                        log::debug!("Unexpected JSON success for request {id_str}: {json}");
485                    }
486                }
487                return;
488            }
489
490            // Error response without matching pending request
491            if let Some(code) = json.get("code").and_then(|v| v.as_i64()) {
492                let msg = json
493                    .get("msg")
494                    .and_then(|v| v.as_str())
495                    .unwrap_or("Unknown error");
496                log::warn!(
497                    "Received error response without matching request ID: code={code} msg={msg}"
498                );
499            }
500            return;
501        }
502
503        // Stream termination event
504        if json.get("eventStreamTerminated").is_some() {
505            log::warn!("User data stream terminated, resubscribe needed");
506            return;
507        }
508
509        log::debug!("Unhandled text message: {text}");
510    }
511
512    fn handle_user_data_event(&self, event: &serde_json::Value) {
513        let event_type = event.get("e").and_then(|v| v.as_str()).unwrap_or("");
514
515        match event_type {
516            "executionReport" => {
517                match serde_json::from_value::<super::user_data::BinanceSpotExecutionReport>(
518                    event.clone(),
519                ) {
520                    Ok(report) => {
521                        log::debug!(
522                            "Execution report: symbol={}, order_id={}, exec={:?}, status={:?}",
523                            report.symbol,
524                            report.order_id,
525                            report.execution_type,
526                            report.order_status
527                        );
528                        self.emit(BinanceSpotWsTradingMessage::ExecutionReport(Box::new(
529                            report,
530                        )));
531                    }
532                    Err(e) => log::warn!("Failed to parse execution report: {e}"),
533                }
534            }
535            "outboundAccountPosition" => {
536                match serde_json::from_value::<super::user_data::BinanceSpotAccountPositionMsg>(
537                    event.clone(),
538                ) {
539                    Ok(msg) => {
540                        log::debug!("Account position update: {} balance(s)", msg.balances.len());
541                        self.emit(BinanceSpotWsTradingMessage::AccountPosition(msg));
542                    }
543                    Err(e) => log::warn!("Failed to parse account position: {e}"),
544                }
545            }
546            "balanceUpdate" => {
547                match serde_json::from_value::<super::user_data::BinanceSpotBalanceUpdateMsg>(
548                    event.clone(),
549                ) {
550                    Ok(msg) => {
551                        log::debug!("Balance update: asset={}, delta={}", msg.asset, msg.delta);
552                        self.emit(BinanceSpotWsTradingMessage::BalanceUpdate(msg));
553                    }
554                    Err(e) => log::warn!("Failed to parse balance update: {e}"),
555                }
556            }
557            _ => {
558                log::debug!("Unhandled user data event type: {event_type}");
559            }
560        }
561    }
562
563    fn decode_ws_api_response(
564        &mut self,
565        data: &[u8],
566    ) -> Result<BinanceSpotWsTradingMessage, BinanceWsApiError> {
567        // Check template ID before parsing
568        if data.len() >= message_header_codec::ENCODED_LENGTH {
569            let buf = ReadBuf::new(data);
570            let template_id = buf.get_u16_at(2);
571
572            // User data stream events arrive as SBE with their own template IDs
573            // (not wrapped in WebSocketResponse template 50).
574            match template_id {
575                601 => {
576                    log::debug!("Received SBE BalanceUpdateEvent ({} bytes)", data.len());
577                    match super::decode_sbe::decode_balance_update(data) {
578                        Ok(msg) => {
579                            log::debug!(
580                                "SBE balance update: asset={}, delta={}",
581                                msg.asset,
582                                msg.delta
583                            );
584                            return Ok(BinanceSpotWsTradingMessage::BalanceUpdate(msg));
585                        }
586                        Err(e) => {
587                            log::error!("Failed to decode SBE BalanceUpdateEvent: {e}");
588                            return Ok(BinanceSpotWsTradingMessage::Error(format!(
589                                "SBE BalanceUpdateEvent decode failed: {e}"
590                            )));
591                        }
592                    }
593                }
594                603 => {
595                    log::debug!("Received SBE ExecutionReportEvent ({} bytes)", data.len());
596                    match super::decode_sbe::decode_execution_report(data) {
597                        Ok(report) => {
598                            log::debug!(
599                                "SBE execution report: symbol={}, order_id={}, exec={:?}, status={:?}",
600                                report.symbol,
601                                report.order_id,
602                                report.execution_type,
603                                report.order_status
604                            );
605                            return Ok(BinanceSpotWsTradingMessage::ExecutionReport(Box::new(
606                                report,
607                            )));
608                        }
609                        Err(e) => {
610                            log::error!("Failed to decode SBE ExecutionReportEvent: {e}");
611                            return Ok(BinanceSpotWsTradingMessage::Error(format!(
612                                "SBE ExecutionReportEvent decode failed: {e}"
613                            )));
614                        }
615                    }
616                }
617                606 => {
618                    log::debug!(
619                        "Received SBE ListStatusEvent ({} bytes), not yet decoded",
620                        data.len()
621                    );
622                    return Ok(BinanceSpotWsTradingMessage::Error(
623                        "SBE ListStatusEvent decoding not yet implemented".to_string(),
624                    ));
625                }
626                607 => {
627                    log::debug!(
628                        "Received SBE OutboundAccountPositionEvent ({} bytes)",
629                        data.len()
630                    );
631
632                    match super::decode_sbe::decode_account_position(data) {
633                        Ok(msg) => {
634                            log::debug!("SBE account position: {} balance(s)", msg.balances.len());
635                            return Ok(BinanceSpotWsTradingMessage::AccountPosition(msg));
636                        }
637                        Err(e) => {
638                            log::error!("Failed to decode SBE OutboundAccountPositionEvent: {e}");
639                            return Ok(BinanceSpotWsTradingMessage::Error(format!(
640                                "SBE OutboundAccountPositionEvent decode failed: {e}"
641                            )));
642                        }
643                    }
644                }
645                _ => {} // Fall through to WebSocketResponse parsing
646            }
647        }
648
649        // Standard WebSocketResponse envelope (template 50)
650        let (request_id, status, result_data) = self.parse_envelope(data)?;
651
652        // Look up the pending request by ID
653        let meta = self.pending_requests.remove(&request_id).ok_or_else(|| {
654            BinanceWsApiError::UnknownRequestId(format!("No pending request for ID: {request_id}"))
655        })?;
656
657        // Check for error status (non-200)
658        if status != 200 {
659            let (code, msg) = Self::try_decode_sbe_error(&result_data).unwrap_or((
660                status as i32,
661                format!("Request failed with status {status}"),
662            ));
663            return Ok(self.create_rejection(request_id, code, msg, meta));
664        }
665
666        // Decode the inner payload based on request type
667        match meta {
668            BinanceSpotWsTradingRequestMeta::PlaceOrder => {
669                let response = parse::decode_new_order_full(&result_data)?;
670                Ok(BinanceSpotWsTradingMessage::OrderAccepted {
671                    request_id,
672                    response,
673                })
674            }
675            BinanceSpotWsTradingRequestMeta::CancelOrder => {
676                let response = parse::decode_cancel_order(&result_data)?;
677                Ok(BinanceSpotWsTradingMessage::OrderCanceled {
678                    request_id,
679                    response,
680                })
681            }
682            BinanceSpotWsTradingRequestMeta::CancelReplaceOrder => {
683                // Cancel-replace returns both cancel and new order info
684                let new_order_response = parse::decode_new_order_full(&result_data)?;
685                let cancel_response = BinanceCancelOrderResponse {
686                    price_exponent: new_order_response.price_exponent,
687                    qty_exponent: new_order_response.qty_exponent,
688                    order_id: 0,
689                    order_list_id: None,
690                    transact_time: new_order_response.transact_time,
691                    price_mantissa: 0,
692                    orig_qty_mantissa: 0,
693                    executed_qty_mantissa: 0,
694                    cummulative_quote_qty_mantissa: 0,
695                    status: crate::spot::sbe::spot::order_status::OrderStatus::Canceled,
696                    time_in_force: new_order_response.time_in_force,
697                    order_type: new_order_response.order_type,
698                    side: new_order_response.side,
699                    self_trade_prevention_mode: new_order_response.self_trade_prevention_mode,
700                    client_order_id: String::new(),
701                    orig_client_order_id: String::new(),
702                    symbol: new_order_response.symbol.clone(),
703                };
704                Ok(BinanceSpotWsTradingMessage::CancelReplaceAccepted {
705                    request_id,
706                    cancel_response,
707                    new_order_response,
708                })
709            }
710            BinanceSpotWsTradingRequestMeta::CancelAllOrders => {
711                let responses = parse::decode_cancel_open_orders(&result_data)?;
712                Ok(BinanceSpotWsTradingMessage::AllOrdersCanceled {
713                    request_id,
714                    responses,
715                })
716            }
717            BinanceSpotWsTradingRequestMeta::SessionLogon => {
718                log::info!("Session authenticated (SBE response)");
719                Ok(BinanceSpotWsTradingMessage::Authenticated)
720            }
721            BinanceSpotWsTradingRequestMeta::SubscribeUserData => {
722                log::info!("User data stream subscribed (SBE response)");
723                Ok(BinanceSpotWsTradingMessage::UserDataSubscribed {
724                    subscription_id: request_id,
725                })
726            }
727        }
728    }
729
730    /// Parses the WebSocketResponse SBE envelope.
731    ///
732    /// Returns (request_id, status, result_payload).
733    fn parse_envelope(&self, data: &[u8]) -> Result<(String, u16, Vec<u8>), BinanceWsApiError> {
734        if data.len() < message_header_codec::ENCODED_LENGTH {
735            return Err(BinanceWsApiError::DecodeError(
736                crate::spot::sbe::error::SbeDecodeError::BufferTooShort {
737                    expected: message_header_codec::ENCODED_LENGTH,
738                    actual: data.len(),
739                },
740            ));
741        }
742
743        let buf = ReadBuf::new(data);
744
745        // Parse message header
746        let block_length = buf.get_u16_at(0);
747        let template_id = buf.get_u16_at(2);
748
749        if template_id != SBE_TEMPLATE_ID {
750            return Err(BinanceWsApiError::DecodeError(
751                crate::spot::sbe::error::SbeDecodeError::UnknownTemplateId(template_id),
752            ));
753        }
754
755        let version = buf.get_u16_at(6);
756
757        // Create decoder at offset after message header
758        let decoder = WebSocketResponseDecoder::default().wrap(
759            buf,
760            message_header_codec::ENCODED_LENGTH,
761            block_length,
762            version,
763        );
764
765        // Read status from fixed block (offset 1 within block)
766        let status = decoder.status();
767
768        // Skip rate_limits group
769        let mut rate_limits = decoder.rate_limits_decoder();
770        while rate_limits.advance().unwrap_or(None).is_some() {}
771        let mut decoder = rate_limits.parent().map_err(|_| {
772            BinanceWsApiError::ClientError("Failed to get parent from rate_limits".to_string())
773        })?;
774
775        // Extract request ID
776        let id_coords = decoder.id_decoder();
777        let id_bytes = decoder.id_slice(id_coords);
778        let request_id = String::from_utf8_lossy(id_bytes).to_string();
779
780        // Extract result payload - copy to owned Vec to avoid lifetime issues
781        let result_coords = decoder.result_decoder();
782        let result_data = decoder.result_slice(result_coords).to_vec();
783
784        Ok((request_id, status, result_data))
785    }
786
787    fn create_rejection(
788        &self,
789        request_id: String,
790        code: i32,
791        msg: String,
792        meta: BinanceSpotWsTradingRequestMeta,
793    ) -> BinanceSpotWsTradingMessage {
794        match meta {
795            BinanceSpotWsTradingRequestMeta::PlaceOrder => {
796                BinanceSpotWsTradingMessage::OrderRejected {
797                    request_id,
798                    code,
799                    msg,
800                }
801            }
802            BinanceSpotWsTradingRequestMeta::CancelOrder => {
803                BinanceSpotWsTradingMessage::CancelRejected {
804                    request_id,
805                    code,
806                    msg,
807                }
808            }
809            BinanceSpotWsTradingRequestMeta::CancelReplaceOrder => {
810                BinanceSpotWsTradingMessage::CancelReplaceRejected {
811                    request_id,
812                    code,
813                    msg,
814                }
815            }
816            BinanceSpotWsTradingRequestMeta::CancelAllOrders => {
817                BinanceSpotWsTradingMessage::CancelRejected {
818                    request_id,
819                    code,
820                    msg,
821                }
822            }
823            BinanceSpotWsTradingRequestMeta::SessionLogon
824            | BinanceSpotWsTradingRequestMeta::SubscribeUserData => {
825                BinanceSpotWsTradingMessage::Error(format!("code={code}: {msg}"))
826            }
827        }
828    }
829
830    // Decodes the SBE error response to extract the Binance error code and message
831    fn try_decode_sbe_error(data: &[u8]) -> Option<(i32, String)> {
832        const HEADER_LEN: usize = 8;
833
834        if data.len()
835            < HEADER_LEN + crate::spot::sbe::spot::error_response_codec::SBE_BLOCK_LENGTH as usize
836        {
837            return None;
838        }
839
840        let buf = ReadBuf::new(data);
841        let header = message_header_codec::MessageHeaderDecoder::default().wrap(buf, 0);
842        if header.template_id() != crate::spot::sbe::spot::error_response_codec::SBE_TEMPLATE_ID {
843            return None;
844        }
845
846        let mut decoder = ErrorResponseDecoder::default().header(header, 0);
847        let code = i32::from(decoder.code());
848        let msg_coords = decoder.msg_decoder();
849        let msg_bytes = decoder.msg_slice(msg_coords);
850        let msg = String::from_utf8_lossy(msg_bytes).into_owned();
851
852        Some((code, msg))
853    }
854}