1use std::{
30 fmt::Debug,
31 sync::{
32 Arc,
33 atomic::{AtomicBool, AtomicU64, Ordering},
34 },
35};
36
37use ahash::AHashMap;
38use nautilus_network::{RECONNECTED, websocket::WebSocketClient};
39use tokio_tungstenite::tungstenite::Message;
40
41use super::{
42 client::BINANCE_WS_RATE_LIMIT_KEY_ORDER,
43 error::{BinanceWsApiError, BinanceWsApiResult},
44 messages::{
45 BinanceSpotWsTradingCommand, BinanceSpotWsTradingMessage, BinanceSpotWsTradingRequest,
46 BinanceSpotWsTradingRequestMeta, method,
47 },
48};
49use crate::{
50 common::credential::SigningCredential,
51 spot::{
52 http::{models::BinanceCancelOrderResponse, parse},
53 sbe::spot::{
54 ReadBuf,
55 error_response_codec::ErrorResponseDecoder,
56 message_header_codec,
57 web_socket_response_codec::{SBE_TEMPLATE_ID, WebSocketResponseDecoder},
58 },
59 },
60};
61
62pub struct BinanceSpotWsTradingHandler {
68 signal: Arc<AtomicBool>,
69 inner: Option<WebSocketClient>,
70 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceSpotWsTradingCommand>,
71 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
72 out_tx: tokio::sync::mpsc::UnboundedSender<BinanceSpotWsTradingMessage>,
73 credential: Arc<SigningCredential>,
74 pending_requests: AHashMap<String, BinanceSpotWsTradingRequestMeta>,
75 request_id_counter: AtomicU64,
76}
77
78impl Debug for BinanceSpotWsTradingHandler {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 f.debug_struct(stringify!(BinanceSpotWsTradingHandler))
81 .field("inner", &self.inner.as_ref().map(|_| "<client>"))
82 .field(
83 "pending_requests",
84 &format!("{} pending", self.pending_requests.len()),
85 )
86 .finish_non_exhaustive()
87 }
88}
89
90impl BinanceSpotWsTradingHandler {
91 #[must_use]
93 pub fn new(
94 signal: Arc<AtomicBool>,
95 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceSpotWsTradingCommand>,
96 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
97 out_tx: tokio::sync::mpsc::UnboundedSender<BinanceSpotWsTradingMessage>,
98 credential: Arc<SigningCredential>,
99 ) -> Self {
100 Self {
101 signal,
102 inner: None,
103 cmd_rx,
104 raw_rx,
105 out_tx,
106 credential,
107 pending_requests: AHashMap::new(),
108 request_id_counter: AtomicU64::new(1000),
109 }
110 }
111
112 pub async fn run(&mut self) -> bool {
117 loop {
118 if self.signal.load(Ordering::Relaxed) {
119 return false;
120 }
121
122 tokio::select! {
123 Some(cmd) = self.cmd_rx.recv() => {
124 match cmd {
125 BinanceSpotWsTradingCommand::SetClient(client) => {
126 log::debug!("Handler received WebSocket client");
127 self.inner = Some(client);
128 self.emit(BinanceSpotWsTradingMessage::Connected);
129 }
130 BinanceSpotWsTradingCommand::Disconnect => {
131 log::debug!("Handler disconnecting WebSocket client");
132 self.inner = None;
133 return false;
134 }
135 BinanceSpotWsTradingCommand::PlaceOrder { id, params } => {
136 if let Err(e) = self.handle_place_order(id.clone(), params).await {
137 log::error!("Failed to handle place order command: {e}");
138 self.emit(BinanceSpotWsTradingMessage::OrderRejected {
139 request_id: id,
140 code: -1,
141 msg: e.to_string(),
142 });
143 }
144 }
145 BinanceSpotWsTradingCommand::CancelOrder { id, params } => {
146 if let Err(e) = self.handle_cancel_order(id.clone(), params).await {
147 log::error!("Failed to handle cancel order command: {e}");
148 self.emit(BinanceSpotWsTradingMessage::CancelRejected {
149 request_id: id,
150 code: -1,
151 msg: e.to_string(),
152 });
153 }
154 }
155 BinanceSpotWsTradingCommand::CancelReplaceOrder { id, params } => {
156 if let Err(e) = self.handle_cancel_replace_order(id.clone(), params).await {
157 log::error!("Failed to handle cancel replace command: {e}");
158 self.emit(BinanceSpotWsTradingMessage::CancelReplaceRejected {
159 request_id: id,
160 code: -1,
161 msg: e.to_string(),
162 });
163 }
164 }
165 BinanceSpotWsTradingCommand::CancelAllOrders { id, symbol } => {
166 if let Err(e) = self.handle_cancel_all_orders(id.clone(), symbol).await {
167 log::error!("Failed to handle cancel all command: {e}");
168 self.emit(BinanceSpotWsTradingMessage::CancelRejected {
169 request_id: id,
170 code: -1,
171 msg: e.to_string(),
172 });
173 }
174 }
175 BinanceSpotWsTradingCommand::SessionLogon => {
176 if let Err(e) = self.handle_session_logon().await {
177 log::error!("Session logon failed: {e}");
178 self.emit(BinanceSpotWsTradingMessage::Error(
179 format!("Session logon failed: {e}"),
180 ));
181 }
182 }
183 BinanceSpotWsTradingCommand::SubscribeUserData => {
184 if let Err(e) = self.handle_subscribe_user_data().await {
185 log::error!("User data subscribe failed: {e}");
186 self.emit(BinanceSpotWsTradingMessage::Error(
187 format!("User data subscribe failed: {e}"),
188 ));
189 }
190 }
191 }
192 }
193 Some(msg) = self.raw_rx.recv() => {
194 if let Message::Text(ref text) = msg
195 && text.as_str() == RECONNECTED
196 {
197 log::info!("Handler received reconnection signal");
198
199 self.fail_pending_requests();
201
202 self.emit(BinanceSpotWsTradingMessage::Reconnected);
203 continue;
204 }
205
206 self.handle_message(msg);
207 }
208 else => {
209 return false;
211 }
212 }
213 }
214 }
215
216 fn emit(&self, msg: BinanceSpotWsTradingMessage) {
218 if let Err(e) = self.out_tx.send(msg) {
219 log::error!("Failed to send message to output channel: {e}");
220 }
221 }
222
223 fn fail_pending_requests(&mut self) {
225 if self.pending_requests.is_empty() {
226 return;
227 }
228
229 let count = self.pending_requests.len();
230 log::warn!("Failing {count} pending requests after reconnection");
231
232 let pending = std::mem::take(&mut self.pending_requests);
233 for (request_id, meta) in pending {
234 let msg = self.create_rejection(
235 request_id,
236 -1,
237 "Connection lost before response received".to_string(),
238 meta,
239 );
240 self.emit(msg);
241 }
242 }
243
244 async fn handle_place_order(
245 &mut self,
246 id: String,
247 params: crate::spot::http::query::NewOrderParams,
248 ) -> BinanceWsApiResult<()> {
249 let params_json = serde_json::to_value(¶ms)
250 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
251 let signed_params = self.sign_params(params_json)?;
252
253 let request = BinanceSpotWsTradingRequest::new(&id, method::ORDER_PLACE, signed_params);
254 self.pending_requests
255 .insert(id.clone(), BinanceSpotWsTradingRequestMeta::PlaceOrder);
256 self.send_request(request).await
257 }
258
259 async fn handle_cancel_order(
260 &mut self,
261 id: String,
262 params: crate::spot::http::query::CancelOrderParams,
263 ) -> BinanceWsApiResult<()> {
264 let params_json = serde_json::to_value(¶ms)
265 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
266 let signed_params = self.sign_params(params_json)?;
267
268 let request = BinanceSpotWsTradingRequest::new(&id, method::ORDER_CANCEL, signed_params);
269 self.pending_requests
270 .insert(id.clone(), BinanceSpotWsTradingRequestMeta::CancelOrder);
271 self.send_request(request).await
272 }
273
274 async fn handle_cancel_replace_order(
275 &mut self,
276 id: String,
277 params: crate::spot::http::query::CancelReplaceOrderParams,
278 ) -> BinanceWsApiResult<()> {
279 let params_json = serde_json::to_value(¶ms)
280 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
281 let signed_params = self.sign_params(params_json)?;
282
283 let request =
284 BinanceSpotWsTradingRequest::new(&id, method::ORDER_CANCEL_REPLACE, signed_params);
285 self.pending_requests.insert(
286 id.clone(),
287 BinanceSpotWsTradingRequestMeta::CancelReplaceOrder,
288 );
289 self.send_request(request).await
290 }
291
292 async fn handle_cancel_all_orders(
293 &mut self,
294 id: String,
295 symbol: String,
296 ) -> BinanceWsApiResult<()> {
297 let params_json = serde_json::json!({ "symbol": symbol });
298 let signed_params = self.sign_params(params_json)?;
299
300 let request =
301 BinanceSpotWsTradingRequest::new(&id, method::OPEN_ORDERS_CANCEL_ALL, signed_params);
302 self.pending_requests
303 .insert(id.clone(), BinanceSpotWsTradingRequestMeta::CancelAllOrders);
304 self.send_request(request).await
305 }
306
307 async fn handle_session_logon(&mut self) -> BinanceWsApiResult<()> {
308 let id = self.next_request_id();
309 let params_json = serde_json::json!({});
310 let signed_params = self.sign_params(params_json)?;
311
312 let request = BinanceSpotWsTradingRequest::new(&id, "session.logon", signed_params);
313 self.pending_requests
314 .insert(id, BinanceSpotWsTradingRequestMeta::SessionLogon);
315 self.send_request(request).await
316 }
317
318 async fn handle_subscribe_user_data(&mut self) -> BinanceWsApiResult<()> {
319 let id = self.next_request_id();
320 let request = BinanceSpotWsTradingRequest::new(
321 &id,
322 "userDataStream.subscribe",
323 serde_json::json!({}),
324 );
325 self.pending_requests
326 .insert(id, BinanceSpotWsTradingRequestMeta::SubscribeUserData);
327 self.send_request(request).await
328 }
329
330 fn next_request_id(&self) -> String {
331 let id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
332 format!("ws-{id}")
333 }
334
335 fn sign_params(&self, mut params: serde_json::Value) -> BinanceWsApiResult<serde_json::Value> {
336 let timestamp = std::time::SystemTime::now()
337 .duration_since(std::time::UNIX_EPOCH)
338 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?
339 .as_millis() as i64;
340
341 if let Some(obj) = params.as_object_mut() {
342 obj.insert("timestamp".to_string(), serde_json::json!(timestamp));
343 obj.insert(
344 "apiKey".to_string(),
345 serde_json::json!(self.credential.api_key()),
346 );
347 }
348
349 let query_string = serde_urlencoded::to_string(¶ms)
350 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
351 let signature = self.credential.sign(&query_string);
352
353 if let Some(obj) = params.as_object_mut() {
354 obj.insert("signature".to_string(), serde_json::json!(signature));
355 }
356
357 Ok(params)
358 }
359
360 async fn send_request(
361 &mut self,
362 request: BinanceSpotWsTradingRequest,
363 ) -> BinanceWsApiResult<()> {
364 let client = self.inner.as_mut().ok_or_else(|| {
365 BinanceWsApiError::ConnectionError("WebSocket not connected".to_string())
366 })?;
367
368 let json = serde_json::to_string(&request)
369 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
370
371 log::debug!(
372 "Sending WebSocket API request id={} method={}",
373 request.id,
374 request.method
375 );
376
377 client
379 .send_text(json, Some(BINANCE_WS_RATE_LIMIT_KEY_ORDER.as_slice()))
380 .await
381 .map_err(|e| {
382 BinanceWsApiError::ConnectionError(format!("Failed to send request: {e}"))
383 })?;
384
385 Ok(())
386 }
387
388 fn handle_message(&mut self, msg: Message) {
389 match msg {
390 Message::Binary(data) => self.handle_binary_response(&data),
391 Message::Text(text) => self.handle_text_response(&text),
392 Message::Ping(_) | Message::Pong(_) => {}
393 Message::Close(frame) => {
394 log::debug!("WebSocket closed: {frame:?}");
395 }
396 Message::Frame(_) => {}
397 }
398 }
399
400 fn handle_binary_response(&mut self, data: &[u8]) {
401 match self.decode_ws_api_response(data) {
402 Ok(response) => self.emit(response),
403 Err(e) => {
404 log::error!("Failed to decode WebSocket API response: {e}");
405 self.emit(BinanceSpotWsTradingMessage::Error(e.to_string()));
406 }
407 }
408 }
409
410 fn handle_text_response(&mut self, text: &str) {
411 let json: serde_json::Value = match serde_json::from_str(text) {
412 Ok(j) => j,
413 Err(e) => {
414 log::warn!("Failed to parse text response as JSON: {e}");
415 return;
416 }
417 };
418
419 if let Some(event) = json.get("event") {
421 self.handle_user_data_event(event);
422 return;
423 }
424
425 if let Some(id) = json.get("id") {
427 let id_str = match id {
428 serde_json::Value::String(s) => s.clone(),
429 serde_json::Value::Number(n) => n.to_string(),
430 _ => return,
431 };
432
433 if let Some(meta) = self.pending_requests.remove(&id_str) {
434 let error_info = json
437 .get("error")
438 .map(|e| {
439 (
440 e.get("code").and_then(|v| v.as_i64()).unwrap_or(-1),
441 e.get("msg")
442 .and_then(|v| v.as_str())
443 .unwrap_or("Unknown error")
444 .to_string(),
445 )
446 })
447 .or_else(|| {
448 json.get("code").and_then(|c| c.as_i64()).map(|code| {
449 let msg = json
450 .get("msg")
451 .and_then(|v| v.as_str())
452 .unwrap_or("Unknown error")
453 .to_string();
454 (code, msg)
455 })
456 });
457
458 if let Some((code, msg)) = error_info {
459 let rejection = self.create_rejection(id_str, code as i32, msg, meta);
460 self.emit(rejection);
461 return;
462 }
463
464 match meta {
466 BinanceSpotWsTradingRequestMeta::SessionLogon => {
467 log::info!("Session authenticated");
468 self.emit(BinanceSpotWsTradingMessage::Authenticated);
469 }
470 BinanceSpotWsTradingRequestMeta::SubscribeUserData => {
471 let subscription_id = json
472 .get("result")
473 .and_then(|r| r.get("subscriptionId"))
474 .map(|v| v.to_string())
475 .unwrap_or_default();
476 log::info!("User data stream subscribed: id={subscription_id}");
477 self.emit(BinanceSpotWsTradingMessage::UserDataSubscribed {
478 subscription_id,
479 });
480 }
481 _ => {
482 log::debug!("Unexpected JSON success for request {id_str}: {json}");
485 }
486 }
487 return;
488 }
489
490 if let Some(code) = json.get("code").and_then(|v| v.as_i64()) {
492 let msg = json
493 .get("msg")
494 .and_then(|v| v.as_str())
495 .unwrap_or("Unknown error");
496 log::warn!(
497 "Received error response without matching request ID: code={code} msg={msg}"
498 );
499 }
500 return;
501 }
502
503 if json.get("eventStreamTerminated").is_some() {
505 log::warn!("User data stream terminated, resubscribe needed");
506 return;
507 }
508
509 log::debug!("Unhandled text message: {text}");
510 }
511
512 fn handle_user_data_event(&self, event: &serde_json::Value) {
513 let event_type = event.get("e").and_then(|v| v.as_str()).unwrap_or("");
514
515 match event_type {
516 "executionReport" => {
517 match serde_json::from_value::<super::user_data::BinanceSpotExecutionReport>(
518 event.clone(),
519 ) {
520 Ok(report) => {
521 log::debug!(
522 "Execution report: symbol={}, order_id={}, exec={:?}, status={:?}",
523 report.symbol,
524 report.order_id,
525 report.execution_type,
526 report.order_status
527 );
528 self.emit(BinanceSpotWsTradingMessage::ExecutionReport(Box::new(
529 report,
530 )));
531 }
532 Err(e) => log::warn!("Failed to parse execution report: {e}"),
533 }
534 }
535 "outboundAccountPosition" => {
536 match serde_json::from_value::<super::user_data::BinanceSpotAccountPositionMsg>(
537 event.clone(),
538 ) {
539 Ok(msg) => {
540 log::debug!("Account position update: {} balance(s)", msg.balances.len());
541 self.emit(BinanceSpotWsTradingMessage::AccountPosition(msg));
542 }
543 Err(e) => log::warn!("Failed to parse account position: {e}"),
544 }
545 }
546 "balanceUpdate" => {
547 match serde_json::from_value::<super::user_data::BinanceSpotBalanceUpdateMsg>(
548 event.clone(),
549 ) {
550 Ok(msg) => {
551 log::debug!("Balance update: asset={}, delta={}", msg.asset, msg.delta);
552 self.emit(BinanceSpotWsTradingMessage::BalanceUpdate(msg));
553 }
554 Err(e) => log::warn!("Failed to parse balance update: {e}"),
555 }
556 }
557 _ => {
558 log::debug!("Unhandled user data event type: {event_type}");
559 }
560 }
561 }
562
563 fn decode_ws_api_response(
564 &mut self,
565 data: &[u8],
566 ) -> Result<BinanceSpotWsTradingMessage, BinanceWsApiError> {
567 if data.len() >= message_header_codec::ENCODED_LENGTH {
569 let buf = ReadBuf::new(data);
570 let template_id = buf.get_u16_at(2);
571
572 match template_id {
575 601 => {
576 log::debug!("Received SBE BalanceUpdateEvent ({} bytes)", data.len());
577 match super::decode_sbe::decode_balance_update(data) {
578 Ok(msg) => {
579 log::debug!(
580 "SBE balance update: asset={}, delta={}",
581 msg.asset,
582 msg.delta
583 );
584 return Ok(BinanceSpotWsTradingMessage::BalanceUpdate(msg));
585 }
586 Err(e) => {
587 log::error!("Failed to decode SBE BalanceUpdateEvent: {e}");
588 return Ok(BinanceSpotWsTradingMessage::Error(format!(
589 "SBE BalanceUpdateEvent decode failed: {e}"
590 )));
591 }
592 }
593 }
594 603 => {
595 log::debug!("Received SBE ExecutionReportEvent ({} bytes)", data.len());
596 match super::decode_sbe::decode_execution_report(data) {
597 Ok(report) => {
598 log::debug!(
599 "SBE execution report: symbol={}, order_id={}, exec={:?}, status={:?}",
600 report.symbol,
601 report.order_id,
602 report.execution_type,
603 report.order_status
604 );
605 return Ok(BinanceSpotWsTradingMessage::ExecutionReport(Box::new(
606 report,
607 )));
608 }
609 Err(e) => {
610 log::error!("Failed to decode SBE ExecutionReportEvent: {e}");
611 return Ok(BinanceSpotWsTradingMessage::Error(format!(
612 "SBE ExecutionReportEvent decode failed: {e}"
613 )));
614 }
615 }
616 }
617 606 => {
618 log::debug!(
619 "Received SBE ListStatusEvent ({} bytes), not yet decoded",
620 data.len()
621 );
622 return Ok(BinanceSpotWsTradingMessage::Error(
623 "SBE ListStatusEvent decoding not yet implemented".to_string(),
624 ));
625 }
626 607 => {
627 log::debug!(
628 "Received SBE OutboundAccountPositionEvent ({} bytes)",
629 data.len()
630 );
631
632 match super::decode_sbe::decode_account_position(data) {
633 Ok(msg) => {
634 log::debug!("SBE account position: {} balance(s)", msg.balances.len());
635 return Ok(BinanceSpotWsTradingMessage::AccountPosition(msg));
636 }
637 Err(e) => {
638 log::error!("Failed to decode SBE OutboundAccountPositionEvent: {e}");
639 return Ok(BinanceSpotWsTradingMessage::Error(format!(
640 "SBE OutboundAccountPositionEvent decode failed: {e}"
641 )));
642 }
643 }
644 }
645 _ => {} }
647 }
648
649 let (request_id, status, result_data) = self.parse_envelope(data)?;
651
652 let meta = self.pending_requests.remove(&request_id).ok_or_else(|| {
654 BinanceWsApiError::UnknownRequestId(format!("No pending request for ID: {request_id}"))
655 })?;
656
657 if status != 200 {
659 let (code, msg) = Self::try_decode_sbe_error(&result_data).unwrap_or((
660 status as i32,
661 format!("Request failed with status {status}"),
662 ));
663 return Ok(self.create_rejection(request_id, code, msg, meta));
664 }
665
666 match meta {
668 BinanceSpotWsTradingRequestMeta::PlaceOrder => {
669 let response = parse::decode_new_order_full(&result_data)?;
670 Ok(BinanceSpotWsTradingMessage::OrderAccepted {
671 request_id,
672 response,
673 })
674 }
675 BinanceSpotWsTradingRequestMeta::CancelOrder => {
676 let response = parse::decode_cancel_order(&result_data)?;
677 Ok(BinanceSpotWsTradingMessage::OrderCanceled {
678 request_id,
679 response,
680 })
681 }
682 BinanceSpotWsTradingRequestMeta::CancelReplaceOrder => {
683 let new_order_response = parse::decode_new_order_full(&result_data)?;
685 let cancel_response = BinanceCancelOrderResponse {
686 price_exponent: new_order_response.price_exponent,
687 qty_exponent: new_order_response.qty_exponent,
688 order_id: 0,
689 order_list_id: None,
690 transact_time: new_order_response.transact_time,
691 price_mantissa: 0,
692 orig_qty_mantissa: 0,
693 executed_qty_mantissa: 0,
694 cummulative_quote_qty_mantissa: 0,
695 status: crate::spot::sbe::spot::order_status::OrderStatus::Canceled,
696 time_in_force: new_order_response.time_in_force,
697 order_type: new_order_response.order_type,
698 side: new_order_response.side,
699 self_trade_prevention_mode: new_order_response.self_trade_prevention_mode,
700 client_order_id: String::new(),
701 orig_client_order_id: String::new(),
702 symbol: new_order_response.symbol.clone(),
703 };
704 Ok(BinanceSpotWsTradingMessage::CancelReplaceAccepted {
705 request_id,
706 cancel_response,
707 new_order_response,
708 })
709 }
710 BinanceSpotWsTradingRequestMeta::CancelAllOrders => {
711 let responses = parse::decode_cancel_open_orders(&result_data)?;
712 Ok(BinanceSpotWsTradingMessage::AllOrdersCanceled {
713 request_id,
714 responses,
715 })
716 }
717 BinanceSpotWsTradingRequestMeta::SessionLogon => {
718 log::info!("Session authenticated (SBE response)");
719 Ok(BinanceSpotWsTradingMessage::Authenticated)
720 }
721 BinanceSpotWsTradingRequestMeta::SubscribeUserData => {
722 log::info!("User data stream subscribed (SBE response)");
723 Ok(BinanceSpotWsTradingMessage::UserDataSubscribed {
724 subscription_id: request_id,
725 })
726 }
727 }
728 }
729
730 fn parse_envelope(&self, data: &[u8]) -> Result<(String, u16, Vec<u8>), BinanceWsApiError> {
734 if data.len() < message_header_codec::ENCODED_LENGTH {
735 return Err(BinanceWsApiError::DecodeError(
736 crate::spot::sbe::error::SbeDecodeError::BufferTooShort {
737 expected: message_header_codec::ENCODED_LENGTH,
738 actual: data.len(),
739 },
740 ));
741 }
742
743 let buf = ReadBuf::new(data);
744
745 let block_length = buf.get_u16_at(0);
747 let template_id = buf.get_u16_at(2);
748
749 if template_id != SBE_TEMPLATE_ID {
750 return Err(BinanceWsApiError::DecodeError(
751 crate::spot::sbe::error::SbeDecodeError::UnknownTemplateId(template_id),
752 ));
753 }
754
755 let version = buf.get_u16_at(6);
756
757 let decoder = WebSocketResponseDecoder::default().wrap(
759 buf,
760 message_header_codec::ENCODED_LENGTH,
761 block_length,
762 version,
763 );
764
765 let status = decoder.status();
767
768 let mut rate_limits = decoder.rate_limits_decoder();
770 while rate_limits.advance().unwrap_or(None).is_some() {}
771 let mut decoder = rate_limits.parent().map_err(|_| {
772 BinanceWsApiError::ClientError("Failed to get parent from rate_limits".to_string())
773 })?;
774
775 let id_coords = decoder.id_decoder();
777 let id_bytes = decoder.id_slice(id_coords);
778 let request_id = String::from_utf8_lossy(id_bytes).to_string();
779
780 let result_coords = decoder.result_decoder();
782 let result_data = decoder.result_slice(result_coords).to_vec();
783
784 Ok((request_id, status, result_data))
785 }
786
787 fn create_rejection(
788 &self,
789 request_id: String,
790 code: i32,
791 msg: String,
792 meta: BinanceSpotWsTradingRequestMeta,
793 ) -> BinanceSpotWsTradingMessage {
794 match meta {
795 BinanceSpotWsTradingRequestMeta::PlaceOrder => {
796 BinanceSpotWsTradingMessage::OrderRejected {
797 request_id,
798 code,
799 msg,
800 }
801 }
802 BinanceSpotWsTradingRequestMeta::CancelOrder => {
803 BinanceSpotWsTradingMessage::CancelRejected {
804 request_id,
805 code,
806 msg,
807 }
808 }
809 BinanceSpotWsTradingRequestMeta::CancelReplaceOrder => {
810 BinanceSpotWsTradingMessage::CancelReplaceRejected {
811 request_id,
812 code,
813 msg,
814 }
815 }
816 BinanceSpotWsTradingRequestMeta::CancelAllOrders => {
817 BinanceSpotWsTradingMessage::CancelRejected {
818 request_id,
819 code,
820 msg,
821 }
822 }
823 BinanceSpotWsTradingRequestMeta::SessionLogon
824 | BinanceSpotWsTradingRequestMeta::SubscribeUserData => {
825 BinanceSpotWsTradingMessage::Error(format!("code={code}: {msg}"))
826 }
827 }
828 }
829
830 fn try_decode_sbe_error(data: &[u8]) -> Option<(i32, String)> {
832 const HEADER_LEN: usize = 8;
833
834 if data.len()
835 < HEADER_LEN + crate::spot::sbe::spot::error_response_codec::SBE_BLOCK_LENGTH as usize
836 {
837 return None;
838 }
839
840 let buf = ReadBuf::new(data);
841 let header = message_header_codec::MessageHeaderDecoder::default().wrap(buf, 0);
842 if header.template_id() != crate::spot::sbe::spot::error_response_codec::SBE_TEMPLATE_ID {
843 return None;
844 }
845
846 let mut decoder = ErrorResponseDecoder::default().header(header, 0);
847 let code = i32::from(decoder.code());
848 let msg_coords = decoder.msg_decoder();
849 let msg_bytes = decoder.msg_slice(msg_coords);
850 let msg = String::from_utf8_lossy(msg_bytes).into_owned();
851
852 Some((code, msg))
853 }
854}