nautilus_architect_ax/websocket/data/
handler.rs1use std::{
19 collections::VecDeque,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use nautilus_network::websocket::{SubscriptionState, WebSocketClient};
28use tokio_tungstenite::tungstenite::Message;
29use ustr::Ustr;
30
31use crate::{
32 common::enums::{AxCandleWidth, AxMarketDataLevel, AxMdRequestType},
33 websocket::{
34 messages::{
35 AxDataWsMessage, AxMdMessage, AxMdSubscribe, AxMdSubscribeCandles, AxMdUnsubscribe,
36 AxMdUnsubscribeCandles,
37 },
38 parse::parse_md_message,
39 },
40};
41
42#[derive(Debug)]
44pub enum HandlerCommand {
45 SetClient(WebSocketClient),
47 Disconnect,
49 ReplaySubscriptions,
51 Subscribe {
53 request_id: i64,
55 symbol: Ustr,
57 level: AxMarketDataLevel,
59 },
60 Unsubscribe {
62 request_id: i64,
64 symbol: Ustr,
66 },
67 SubscribeCandles {
69 request_id: i64,
71 symbol: Ustr,
73 width: AxCandleWidth,
75 },
76 UnsubscribeCandles {
78 request_id: i64,
80 symbol: Ustr,
82 width: AxCandleWidth,
84 },
85}
86
87pub(crate) struct AxMdWsFeedHandler {
92 signal: Arc<AtomicBool>,
93 inner: Option<WebSocketClient>,
94 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
95 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
96 subscriptions: SubscriptionState,
97 message_queue: VecDeque<AxDataWsMessage>,
98 replay_request_id: i64,
99 needs_subscription_replay: bool,
100 pending_subscribe_requests: AHashMap<i64, String>,
101}
102
103impl AxMdWsFeedHandler {
104 #[must_use]
106 pub fn new(
107 signal: Arc<AtomicBool>,
108 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
109 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
110 subscriptions: SubscriptionState,
111 ) -> Self {
112 Self {
113 signal,
114 inner: None,
115 cmd_rx,
116 raw_rx,
117 subscriptions,
118 message_queue: VecDeque::new(),
119 replay_request_id: -1,
120 needs_subscription_replay: false,
121 pending_subscribe_requests: AHashMap::new(),
122 }
123 }
124
125 fn next_replay_request_id(&mut self) -> i64 {
126 self.replay_request_id -= 1;
127 self.replay_request_id
128 }
129
130 async fn replay_subscriptions(&mut self) {
131 let topics = self.subscriptions.all_topics();
132 if topics.is_empty() {
133 log::debug!("No subscriptions to replay after reconnect");
134 return;
135 }
136
137 log::info!("Replaying {} subscriptions after reconnect", topics.len());
138
139 for topic in topics {
140 self.subscriptions.mark_subscribe(&topic);
141
142 if let Some(rest) = topic.strip_prefix("candles:") {
144 if let Some((symbol, width_str)) = rest.rsplit_once(':') {
145 if let Some(width) = Self::parse_candle_width(width_str) {
146 let request_id = self.next_replay_request_id();
147 log::debug!(
148 "Replaying candle subscription: symbol={symbol}, width={width:?}"
149 );
150 self.send_subscribe_candles(request_id, Ustr::from(symbol), width)
151 .await;
152 } else {
153 log::warn!("Failed to parse candle width from topic: {topic}");
154 }
155 } else {
156 log::warn!("Invalid candle topic format: {topic}");
157 }
158 } else if let Some((symbol, level_str)) = topic.rsplit_once(':') {
159 if let Some(level) = Self::parse_market_data_level(level_str) {
160 let request_id = self.next_replay_request_id();
161 log::debug!(
162 "Replaying market data subscription: symbol={symbol}, level={level:?}"
163 );
164 self.send_subscribe(request_id, Ustr::from(symbol), level)
165 .await;
166 } else {
167 log::warn!("Failed to parse market data level from topic: {topic}");
168 }
169 } else {
170 log::warn!("Unknown topic format: {topic}");
171 }
172 }
173
174 log::info!("Subscription replay completed");
175 }
176
177 fn parse_market_data_level(s: &str) -> Option<AxMarketDataLevel> {
178 match s {
179 "Level1" => Some(AxMarketDataLevel::Level1),
180 "Level2" => Some(AxMarketDataLevel::Level2),
181 "Level3" => Some(AxMarketDataLevel::Level3),
182 _ => None,
183 }
184 }
185
186 fn parse_candle_width(s: &str) -> Option<AxCandleWidth> {
187 match s {
188 "Seconds1" => Some(AxCandleWidth::Seconds1),
189 "Seconds5" => Some(AxCandleWidth::Seconds5),
190 "Minutes1" => Some(AxCandleWidth::Minutes1),
191 "Minutes5" => Some(AxCandleWidth::Minutes5),
192 "Minutes15" => Some(AxCandleWidth::Minutes15),
193 "Hours1" => Some(AxCandleWidth::Hours1),
194 "Days1" => Some(AxCandleWidth::Days1),
195 _ => None,
196 }
197 }
198
199 pub async fn next(&mut self) -> Option<AxDataWsMessage> {
203 loop {
204 if self.needs_subscription_replay && self.message_queue.is_empty() {
205 self.needs_subscription_replay = false;
206 self.replay_subscriptions().await;
207 }
208
209 if let Some(msg) = self.message_queue.pop_front() {
210 return Some(msg);
211 }
212
213 tokio::select! {
214 Some(cmd) = self.cmd_rx.recv() => {
215 self.handle_command(cmd).await;
216 }
217
218 () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
219 if self.signal.load(Ordering::Acquire) {
220 log::debug!("Stop signal received during idle period");
221 return None;
222 }
223 }
224
225 msg = self.raw_rx.recv() => {
226 let msg = match msg {
227 Some(msg) => msg,
228 None => {
229 log::debug!("WebSocket stream closed");
230 return None;
231 }
232 };
233
234 if let Message::Ping(data) = &msg {
235 log::trace!("Received ping frame with {} bytes", data.len());
236
237 if let Some(client) = &self.inner
238 && let Err(e) = client.send_pong(data.to_vec()).await
239 {
240 log::warn!("Failed to send pong frame: {e}");
241 }
242 continue;
243 }
244
245 if let Some(message) = self.parse_raw_message(msg) {
246 self.message_queue.push_back(message);
247 }
248
249 if self.signal.load(Ordering::Acquire) {
250 log::debug!("Stop signal received");
251 return None;
252 }
253 }
254 }
255 }
256 }
257
258 async fn handle_command(&mut self, cmd: HandlerCommand) {
259 match cmd {
260 HandlerCommand::SetClient(client) => {
261 log::debug!("WebSocketClient received by handler");
262 self.inner = Some(client);
263 }
264 HandlerCommand::Disconnect => {
265 log::debug!("Disconnect command received");
266
267 if let Some(inner) = self.inner.take() {
268 inner.disconnect().await;
269 }
270 }
271 HandlerCommand::ReplaySubscriptions => {
272 log::debug!("ReplaySubscriptions command received");
273 self.replay_subscriptions().await;
274 }
275 HandlerCommand::Subscribe {
276 request_id,
277 symbol,
278 level,
279 } => {
280 log::debug!(
281 "Subscribe command received: request_id={request_id}, symbol={symbol}, level={level:?}"
282 );
283 let topic = format!("{symbol}:{level:?}");
284 self.pending_subscribe_requests.insert(request_id, topic);
285 self.send_subscribe(request_id, symbol, level).await;
286 }
287 HandlerCommand::Unsubscribe { request_id, symbol } => {
288 log::debug!(
289 "Unsubscribe command received: request_id={request_id}, symbol={symbol}"
290 );
291 self.send_unsubscribe(request_id, symbol).await;
292 }
293 HandlerCommand::SubscribeCandles {
294 request_id,
295 symbol,
296 width,
297 } => {
298 log::debug!(
299 "SubscribeCandles command received: request_id={request_id}, symbol={symbol}, width={width:?}"
300 );
301 let topic = format!("candles:{symbol}:{width:?}");
302 self.pending_subscribe_requests.insert(request_id, topic);
303 self.send_subscribe_candles(request_id, symbol, width).await;
304 }
305 HandlerCommand::UnsubscribeCandles {
306 request_id,
307 symbol,
308 width,
309 } => {
310 log::debug!(
311 "UnsubscribeCandles command received: request_id={request_id}, symbol={symbol}, width={width:?}"
312 );
313 self.message_queue
314 .push_back(AxDataWsMessage::CandleUnsubscribed { symbol, width });
315 self.send_unsubscribe_candles(request_id, symbol, width)
316 .await;
317 }
318 }
319 }
320
321 async fn send_subscribe(&mut self, request_id: i64, symbol: Ustr, level: AxMarketDataLevel) {
322 let msg = AxMdSubscribe {
323 rid: request_id,
324 msg_type: AxMdRequestType::Subscribe,
325 symbol,
326 level,
327 };
328
329 if let Err(e) = self.send_json(&msg).await {
330 self.pending_subscribe_requests.remove(&request_id);
331 log::error!("Failed to send subscribe message: {e}");
332 }
333 }
334
335 async fn send_unsubscribe(&self, request_id: i64, symbol: Ustr) {
336 let msg = AxMdUnsubscribe {
337 rid: request_id,
338 msg_type: AxMdRequestType::Unsubscribe,
339 symbol,
340 };
341
342 if let Err(e) = self.send_json(&msg).await {
343 log::error!("Failed to send unsubscribe message: {e}");
344 }
345 }
346
347 async fn send_subscribe_candles(
348 &mut self,
349 request_id: i64,
350 symbol: Ustr,
351 width: AxCandleWidth,
352 ) {
353 let msg = AxMdSubscribeCandles {
354 rid: request_id,
355 msg_type: AxMdRequestType::SubscribeCandles,
356 symbol,
357 width,
358 };
359
360 if let Err(e) = self.send_json(&msg).await {
361 self.pending_subscribe_requests.remove(&request_id);
362 log::error!("Failed to send subscribe_candles message: {e}");
363 }
364 }
365
366 async fn send_unsubscribe_candles(&self, request_id: i64, symbol: Ustr, width: AxCandleWidth) {
367 let msg = AxMdUnsubscribeCandles {
368 rid: request_id,
369 msg_type: AxMdRequestType::UnsubscribeCandles,
370 symbol,
371 width,
372 };
373
374 if let Err(e) = self.send_json(&msg).await {
375 log::error!("Failed to send unsubscribe_candles message: {e}");
376 }
377 }
378
379 async fn send_json<T: serde::Serialize>(&self, msg: &T) -> Result<(), String> {
380 let Some(inner) = &self.inner else {
381 return Err("No WebSocket client available".to_string());
382 };
383
384 let payload = serde_json::to_string(msg).map_err(|e| e.to_string())?;
385 log::trace!("Sending: {payload}");
386
387 inner
388 .send_text(payload, None)
389 .await
390 .map_err(|e| e.to_string())
391 }
392
393 fn parse_raw_message(&mut self, msg: Message) -> Option<AxDataWsMessage> {
394 match msg {
395 Message::Text(text) => {
396 if text == nautilus_network::RECONNECTED {
397 log::info!("Received WebSocket reconnected signal");
398 self.needs_subscription_replay = true;
399 return Some(AxDataWsMessage::Reconnected);
400 }
401
402 log::trace!("Raw websocket message: {text}");
403
404 match parse_md_message(&text) {
405 Ok(message) => self.handle_message(message),
406 Err(e) => {
407 log::error!("Failed to parse WebSocket message: {e}: {text}");
408 None
409 }
410 }
411 }
412 Message::Binary(data) => {
413 log::debug!("Received binary message with {} bytes", data.len());
414 None
415 }
416 Message::Close(_) => {
417 log::debug!("Received close message, waiting for reconnection");
418 None
419 }
420 _ => None,
421 }
422 }
423
424 fn handle_message(&mut self, message: AxMdMessage) -> Option<AxDataWsMessage> {
425 match &message {
426 AxMdMessage::Error(error) => {
427 let is_benign = error.message.contains("already subscribed")
428 || error.message.contains("not subscribed");
429
430 if is_benign {
431 if let Some(rid) = error.request_id {
432 self.pending_subscribe_requests.remove(&rid);
433 }
434 log::warn!("Subscription state: {}", error.message);
435 } else {
436 if let Some(rid) = error.request_id
437 && let Some(topic) = self.pending_subscribe_requests.remove(&rid)
438 {
439 log::warn!(
440 "Rolling back subscription for topic '{topic}' \
441 due to error: {}",
442 error.message
443 );
444 self.subscriptions.mark_unsubscribe(&topic);
445 }
446 log::error!("Received error from exchange: {}", error.message);
447 }
448 }
449 AxMdMessage::SubscriptionResponse(response) => {
450 self.pending_subscribe_requests.remove(&response.rid);
451
452 if let Some(symbol) = &response.result.subscribed {
453 log::debug!("Subscription confirmed for symbol: {symbol}");
454 } else if let Some(candle) = &response.result.subscribed_candle {
455 log::debug!("Candle subscription confirmed: {candle}");
456 } else if let Some(symbol) = &response.result.unsubscribed {
457 log::debug!("Unsubscription confirmed for symbol: {symbol}");
458 } else if let Some(candle) = &response.result.unsubscribed_candle {
459 log::debug!("Candle unsubscription confirmed: {candle}");
460 }
461 return None;
462 }
463 _ => {}
464 }
465
466 Some(AxDataWsMessage::MdMessage(message))
467 }
468}