nautilus_binance/futures/websocket/trading/
handler.rs1use std::{
26 fmt::Debug,
27 sync::{
28 Arc,
29 atomic::{AtomicBool, Ordering},
30 },
31};
32
33use ahash::AHashMap;
34use nautilus_network::{RECONNECTED, websocket::WebSocketClient};
35use tokio_tungstenite::tungstenite::Message;
36
37use super::{
38 client::BINANCE_FUTURES_WS_RATE_LIMIT_KEY_ORDER,
39 error::{BinanceFuturesWsApiError, BinanceFuturesWsApiResult},
40 messages::{
41 BinanceFuturesWsTradingCommand, BinanceFuturesWsTradingMessage,
42 BinanceFuturesWsTradingRequest, BinanceFuturesWsTradingRequestMeta,
43 BinanceFuturesWsTradingResponse, method,
44 },
45};
46use crate::{
47 common::credential::SigningCredential,
48 futures::http::query::{
49 BinanceCancelOrderParams, BinanceModifyOrderParams, BinanceNewOrderParams,
50 },
51};
52
53pub struct BinanceFuturesWsTradingHandler {
58 signal: Arc<AtomicBool>,
59 inner: Option<WebSocketClient>,
60 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesWsTradingCommand>,
61 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
62 out_tx: tokio::sync::mpsc::UnboundedSender<BinanceFuturesWsTradingMessage>,
63 credential: Arc<SigningCredential>,
64 pending_requests: AHashMap<String, BinanceFuturesWsTradingRequestMeta>,
65}
66
67impl Debug for BinanceFuturesWsTradingHandler {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct(stringify!(BinanceFuturesWsTradingHandler))
70 .field("inner", &self.inner.as_ref().map(|_| "<client>"))
71 .field(
72 "pending_requests",
73 &format!("{} pending", self.pending_requests.len()),
74 )
75 .finish_non_exhaustive()
76 }
77}
78
79impl BinanceFuturesWsTradingHandler {
80 #[must_use]
82 pub fn new(
83 signal: Arc<AtomicBool>,
84 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesWsTradingCommand>,
85 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
86 out_tx: tokio::sync::mpsc::UnboundedSender<BinanceFuturesWsTradingMessage>,
87 credential: Arc<SigningCredential>,
88 ) -> Self {
89 Self {
90 signal,
91 inner: None,
92 cmd_rx,
93 raw_rx,
94 out_tx,
95 credential,
96 pending_requests: AHashMap::new(),
97 }
98 }
99
100 pub async fn run(&mut self) -> bool {
104 loop {
105 if self.signal.load(Ordering::Relaxed) {
106 return false;
107 }
108
109 tokio::select! {
110 Some(cmd) = self.cmd_rx.recv() => {
111 match cmd {
112 BinanceFuturesWsTradingCommand::SetClient(client) => {
113 log::debug!("Handler received WebSocket client");
114 self.inner = Some(client);
115 self.emit(BinanceFuturesWsTradingMessage::Connected);
116 }
117 BinanceFuturesWsTradingCommand::Disconnect => {
118 log::debug!("Handler disconnecting WebSocket client");
119 self.inner = None;
120 return false;
121 }
122 BinanceFuturesWsTradingCommand::PlaceOrder { id, params } => {
123 if let Err(e) = self.handle_place_order(id.clone(), params).await {
124 log::error!("Failed to handle place order command: {e}");
125 self.emit(BinanceFuturesWsTradingMessage::OrderRejected {
126 request_id: id,
127 code: -1,
128 msg: e.to_string(),
129 });
130 }
131 }
132 BinanceFuturesWsTradingCommand::CancelOrder { id, params } => {
133 if let Err(e) = self.handle_cancel_order(id.clone(), params).await {
134 log::error!("Failed to handle cancel order command: {e}");
135 self.emit(BinanceFuturesWsTradingMessage::CancelRejected {
136 request_id: id,
137 code: -1,
138 msg: e.to_string(),
139 });
140 }
141 }
142 BinanceFuturesWsTradingCommand::ModifyOrder { id, params } => {
143 if let Err(e) = self.handle_modify_order(id.clone(), params).await {
144 log::error!("Failed to handle modify order command: {e}");
145 self.emit(BinanceFuturesWsTradingMessage::ModifyRejected {
146 request_id: id,
147 code: -1,
148 msg: e.to_string(),
149 });
150 }
151 }
152 }
153 }
154 Some(msg) = self.raw_rx.recv() => {
155 if let Message::Text(ref text) = msg
156 && text.as_str() == RECONNECTED
157 {
158 log::info!("Handler received reconnection signal");
159 self.fail_pending_requests();
160 self.emit(BinanceFuturesWsTradingMessage::Reconnected);
161 continue;
162 }
163
164 self.handle_message(msg);
165 }
166 else => {
167 return false;
168 }
169 }
170 }
171 }
172
173 fn emit(&self, msg: BinanceFuturesWsTradingMessage) {
174 if let Err(e) = self.out_tx.send(msg) {
175 log::error!("Failed to send message to output channel: {e}");
176 }
177 }
178
179 fn fail_pending_requests(&mut self) {
180 if self.pending_requests.is_empty() {
181 return;
182 }
183
184 let count = self.pending_requests.len();
185 log::warn!("Failing {count} pending requests after reconnection");
186
187 let pending = std::mem::take(&mut self.pending_requests);
188 for (request_id, meta) in pending {
189 let msg = self.create_rejection(
190 request_id,
191 -1,
192 "Connection lost before response received".to_string(),
193 meta,
194 );
195 self.emit(msg);
196 }
197 }
198
199 async fn handle_place_order(
200 &mut self,
201 id: String,
202 params: BinanceNewOrderParams,
203 ) -> BinanceFuturesWsApiResult<()> {
204 let params_json = serde_json::to_value(¶ms)
205 .map_err(|e| BinanceFuturesWsApiError::JsonError(e.to_string()))?;
206 let signed_params = self.sign_params(params_json)?;
207
208 let request = BinanceFuturesWsTradingRequest::new(&id, method::ORDER_PLACE, signed_params);
209 self.pending_requests
210 .insert(id.clone(), BinanceFuturesWsTradingRequestMeta::PlaceOrder);
211 self.send_request(request).await
212 }
213
214 async fn handle_cancel_order(
215 &mut self,
216 id: String,
217 params: BinanceCancelOrderParams,
218 ) -> BinanceFuturesWsApiResult<()> {
219 let params_json = serde_json::to_value(¶ms)
220 .map_err(|e| BinanceFuturesWsApiError::JsonError(e.to_string()))?;
221 let signed_params = self.sign_params(params_json)?;
222
223 let request = BinanceFuturesWsTradingRequest::new(&id, method::ORDER_CANCEL, signed_params);
224 self.pending_requests
225 .insert(id.clone(), BinanceFuturesWsTradingRequestMeta::CancelOrder);
226 self.send_request(request).await
227 }
228
229 async fn handle_modify_order(
230 &mut self,
231 id: String,
232 params: BinanceModifyOrderParams,
233 ) -> BinanceFuturesWsApiResult<()> {
234 let params_json = serde_json::to_value(¶ms)
235 .map_err(|e| BinanceFuturesWsApiError::JsonError(e.to_string()))?;
236 let signed_params = self.sign_params(params_json)?;
237
238 let request = BinanceFuturesWsTradingRequest::new(&id, method::ORDER_MODIFY, signed_params);
239 self.pending_requests
240 .insert(id.clone(), BinanceFuturesWsTradingRequestMeta::ModifyOrder);
241 self.send_request(request).await
242 }
243
244 fn sign_params(
245 &self,
246 mut params: serde_json::Value,
247 ) -> BinanceFuturesWsApiResult<serde_json::Value> {
248 let timestamp = std::time::SystemTime::now()
249 .duration_since(std::time::UNIX_EPOCH)
250 .map_err(|e| BinanceFuturesWsApiError::ClientError(e.to_string()))?
251 .as_millis() as i64;
252
253 if let Some(obj) = params.as_object_mut() {
254 obj.insert("timestamp".to_string(), serde_json::json!(timestamp));
255 obj.insert(
256 "apiKey".to_string(),
257 serde_json::json!(self.credential.api_key()),
258 );
259 }
260
261 let query_string = serde_urlencoded::to_string(¶ms)
262 .map_err(|e| BinanceFuturesWsApiError::ClientError(e.to_string()))?;
263 let signature = self.credential.sign(&query_string);
264
265 if let Some(obj) = params.as_object_mut() {
266 obj.insert("signature".to_string(), serde_json::json!(signature));
267 }
268
269 Ok(params)
270 }
271
272 async fn send_request(
273 &mut self,
274 request: BinanceFuturesWsTradingRequest,
275 ) -> BinanceFuturesWsApiResult<()> {
276 let client = self.inner.as_mut().ok_or_else(|| {
277 BinanceFuturesWsApiError::ConnectionError("WebSocket not connected".to_string())
278 })?;
279
280 let json = serde_json::to_string(&request)
281 .map_err(|e| BinanceFuturesWsApiError::JsonError(e.to_string()))?;
282
283 log::debug!(
284 "Sending Futures WS Trading API request id={} method={}",
285 request.id,
286 request.method
287 );
288
289 client
290 .send_text(
291 json,
292 Some(BINANCE_FUTURES_WS_RATE_LIMIT_KEY_ORDER.as_slice()),
293 )
294 .await
295 .map_err(|e| {
296 BinanceFuturesWsApiError::ConnectionError(format!("Failed to send request: {e}"))
297 })?;
298
299 Ok(())
300 }
301
302 fn handle_message(&mut self, msg: Message) {
303 match msg {
304 Message::Text(text) => self.handle_text_response(&text),
305 Message::Ping(_) | Message::Pong(_) => {}
306 Message::Close(frame) => {
307 log::debug!("WebSocket closed: {frame:?}");
308 }
309 Message::Binary(_) | Message::Frame(_) => {}
310 }
311 }
312
313 fn handle_text_response(&mut self, text: &str) {
314 let response: BinanceFuturesWsTradingResponse = match serde_json::from_str(text) {
315 Ok(r) => r,
316 Err(e) => {
317 log::warn!("Failed to parse WS Trading API response: {e}");
318 return;
319 }
320 };
321
322 let Some(meta) = self.pending_requests.remove(&response.id) else {
323 log::warn!("Received response for unknown request ID: {}", response.id);
324 return;
325 };
326
327 if response.status != 200 {
328 let (code, msg) = response.error.map(|e| (e.code, e.msg)).unwrap_or((
329 -1,
330 format!("Request failed with status {}", response.status),
331 ));
332 let rejection = self.create_rejection(response.id, code, msg, meta);
333 self.emit(rejection);
334 return;
335 }
336
337 let Some(result) = response.result else {
338 log::warn!(
339 "Missing result in success response for request {}",
340 response.id
341 );
342 return;
343 };
344
345 match meta {
346 BinanceFuturesWsTradingRequestMeta::PlaceOrder => {
347 match serde_json::from_value(result) {
348 Ok(order) => {
349 self.emit(BinanceFuturesWsTradingMessage::OrderAccepted {
350 request_id: response.id,
351 response: Box::new(order),
352 });
353 }
354 Err(e) => {
355 log::error!("Failed to deserialize order response: {e}");
356 self.emit(BinanceFuturesWsTradingMessage::Error(e.to_string()));
357 }
358 }
359 }
360 BinanceFuturesWsTradingRequestMeta::CancelOrder => match serde_json::from_value(result)
361 {
362 Ok(order) => {
363 self.emit(BinanceFuturesWsTradingMessage::OrderCanceled {
364 request_id: response.id,
365 response: Box::new(order),
366 });
367 }
368 Err(e) => {
369 log::error!("Failed to deserialize cancel response: {e}");
370 self.emit(BinanceFuturesWsTradingMessage::Error(e.to_string()));
371 }
372 },
373 BinanceFuturesWsTradingRequestMeta::ModifyOrder => match serde_json::from_value(result)
374 {
375 Ok(order) => {
376 self.emit(BinanceFuturesWsTradingMessage::OrderModified {
377 request_id: response.id,
378 response: Box::new(order),
379 });
380 }
381 Err(e) => {
382 log::error!("Failed to deserialize modify response: {e}");
383 self.emit(BinanceFuturesWsTradingMessage::Error(e.to_string()));
384 }
385 },
386 }
387 }
388
389 fn create_rejection(
390 &self,
391 request_id: String,
392 code: i32,
393 msg: String,
394 meta: BinanceFuturesWsTradingRequestMeta,
395 ) -> BinanceFuturesWsTradingMessage {
396 match meta {
397 BinanceFuturesWsTradingRequestMeta::PlaceOrder => {
398 BinanceFuturesWsTradingMessage::OrderRejected {
399 request_id,
400 code,
401 msg,
402 }
403 }
404 BinanceFuturesWsTradingRequestMeta::CancelOrder => {
405 BinanceFuturesWsTradingMessage::CancelRejected {
406 request_id,
407 code,
408 msg,
409 }
410 }
411 BinanceFuturesWsTradingRequestMeta::ModifyOrder => {
412 BinanceFuturesWsTradingMessage::ModifyRejected {
413 request_id,
414 code,
415 msg,
416 }
417 }
418 }
419 }
420}