Skip to main content

nautilus_architect_ax/websocket/data/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Market data WebSocket message handler for Ax.
17
18use std::{
19    collections::VecDeque,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use nautilus_network::websocket::{SubscriptionState, WebSocketClient};
28use tokio_tungstenite::tungstenite::Message;
29use ustr::Ustr;
30
31use crate::{
32    common::enums::{AxCandleWidth, AxMarketDataLevel, AxMdRequestType},
33    websocket::{
34        messages::{
35            AxDataWsMessage, AxMdMessage, AxMdSubscribe, AxMdSubscribeCandles, AxMdUnsubscribe,
36            AxMdUnsubscribeCandles,
37        },
38        parse::parse_md_message,
39    },
40};
41
42/// Commands sent from the outer client to the inner message handler.
43#[derive(Debug)]
44pub enum HandlerCommand {
45    /// Set the WebSocket client for this handler.
46    SetClient(WebSocketClient),
47    /// Disconnect the WebSocket connection.
48    Disconnect,
49    /// Replay all subscriptions after a reconnection.
50    ReplaySubscriptions,
51    /// Subscribe to market data for a symbol.
52    Subscribe {
53        /// Request ID for correlation.
54        request_id: i64,
55        /// Instrument symbol.
56        symbol: Ustr,
57        /// Market data level.
58        level: AxMarketDataLevel,
59    },
60    /// Unsubscribe from market data for a symbol.
61    Unsubscribe {
62        /// Request ID for correlation.
63        request_id: i64,
64        /// Instrument symbol.
65        symbol: Ustr,
66    },
67    /// Subscribe to candle data for a symbol.
68    SubscribeCandles {
69        /// Request ID for correlation.
70        request_id: i64,
71        /// Instrument symbol.
72        symbol: Ustr,
73        /// Candle width/interval.
74        width: AxCandleWidth,
75    },
76    /// Unsubscribe from candle data for a symbol.
77    UnsubscribeCandles {
78        /// Request ID for correlation.
79        request_id: i64,
80        /// Instrument symbol.
81        symbol: Ustr,
82        /// Candle width/interval.
83        width: AxCandleWidth,
84    },
85}
86
87/// Market data feed handler that processes WebSocket messages.
88///
89/// Runs in a dedicated Tokio task and owns the WebSocket client exclusively.
90/// Emits raw venue types for downstream consumers to parse.
91pub(crate) struct AxMdWsFeedHandler {
92    signal: Arc<AtomicBool>,
93    inner: Option<WebSocketClient>,
94    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
95    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
96    subscriptions: SubscriptionState,
97    message_queue: VecDeque<AxDataWsMessage>,
98    replay_request_id: i64,
99    needs_subscription_replay: bool,
100    pending_subscribe_requests: AHashMap<i64, String>,
101}
102
103impl AxMdWsFeedHandler {
104    /// Creates a new [`AxMdWsFeedHandler`] instance.
105    #[must_use]
106    pub fn new(
107        signal: Arc<AtomicBool>,
108        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
109        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
110        subscriptions: SubscriptionState,
111    ) -> Self {
112        Self {
113            signal,
114            inner: None,
115            cmd_rx,
116            raw_rx,
117            subscriptions,
118            message_queue: VecDeque::new(),
119            replay_request_id: -1,
120            needs_subscription_replay: false,
121            pending_subscribe_requests: AHashMap::new(),
122        }
123    }
124
125    fn next_replay_request_id(&mut self) -> i64 {
126        self.replay_request_id -= 1;
127        self.replay_request_id
128    }
129
130    async fn replay_subscriptions(&mut self) {
131        let topics = self.subscriptions.all_topics();
132        if topics.is_empty() {
133            log::debug!("No subscriptions to replay after reconnect");
134            return;
135        }
136
137        log::info!("Replaying {} subscriptions after reconnect", topics.len());
138
139        for topic in topics {
140            self.subscriptions.mark_subscribe(&topic);
141
142            // Topic format: "symbol:Level" or "candles:symbol:Width"
143            if let Some(rest) = topic.strip_prefix("candles:") {
144                if let Some((symbol, width_str)) = rest.rsplit_once(':') {
145                    if let Some(width) = Self::parse_candle_width(width_str) {
146                        let request_id = self.next_replay_request_id();
147                        log::debug!(
148                            "Replaying candle subscription: symbol={symbol}, width={width:?}"
149                        );
150                        self.send_subscribe_candles(request_id, Ustr::from(symbol), width)
151                            .await;
152                    } else {
153                        log::warn!("Failed to parse candle width from topic: {topic}");
154                    }
155                } else {
156                    log::warn!("Invalid candle topic format: {topic}");
157                }
158            } else if let Some((symbol, level_str)) = topic.rsplit_once(':') {
159                if let Some(level) = Self::parse_market_data_level(level_str) {
160                    let request_id = self.next_replay_request_id();
161                    log::debug!(
162                        "Replaying market data subscription: symbol={symbol}, level={level:?}"
163                    );
164                    self.send_subscribe(request_id, Ustr::from(symbol), level)
165                        .await;
166                } else {
167                    log::warn!("Failed to parse market data level from topic: {topic}");
168                }
169            } else {
170                log::warn!("Unknown topic format: {topic}");
171            }
172        }
173
174        log::info!("Subscription replay completed");
175    }
176
177    fn parse_market_data_level(s: &str) -> Option<AxMarketDataLevel> {
178        match s {
179            "Level1" => Some(AxMarketDataLevel::Level1),
180            "Level2" => Some(AxMarketDataLevel::Level2),
181            "Level3" => Some(AxMarketDataLevel::Level3),
182            _ => None,
183        }
184    }
185
186    fn parse_candle_width(s: &str) -> Option<AxCandleWidth> {
187        match s {
188            "Seconds1" => Some(AxCandleWidth::Seconds1),
189            "Seconds5" => Some(AxCandleWidth::Seconds5),
190            "Minutes1" => Some(AxCandleWidth::Minutes1),
191            "Minutes5" => Some(AxCandleWidth::Minutes5),
192            "Minutes15" => Some(AxCandleWidth::Minutes15),
193            "Hours1" => Some(AxCandleWidth::Hours1),
194            "Days1" => Some(AxCandleWidth::Days1),
195            _ => None,
196        }
197    }
198
199    /// Returns the next message from the handler.
200    ///
201    /// This method blocks until a message is available or the handler is stopped.
202    pub async fn next(&mut self) -> Option<AxDataWsMessage> {
203        loop {
204            if self.needs_subscription_replay && self.message_queue.is_empty() {
205                self.needs_subscription_replay = false;
206                self.replay_subscriptions().await;
207            }
208
209            if let Some(msg) = self.message_queue.pop_front() {
210                return Some(msg);
211            }
212
213            tokio::select! {
214                Some(cmd) = self.cmd_rx.recv() => {
215                    self.handle_command(cmd).await;
216                }
217
218                () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
219                    if self.signal.load(Ordering::Acquire) {
220                        log::debug!("Stop signal received during idle period");
221                        return None;
222                    }
223                }
224
225                msg = self.raw_rx.recv() => {
226                    let msg = match msg {
227                        Some(msg) => msg,
228                        None => {
229                            log::debug!("WebSocket stream closed");
230                            return None;
231                        }
232                    };
233
234                    if let Message::Ping(data) = &msg {
235                        log::trace!("Received ping frame with {} bytes", data.len());
236
237                        if let Some(client) = &self.inner
238                            && let Err(e) = client.send_pong(data.to_vec()).await
239                        {
240                            log::warn!("Failed to send pong frame: {e}");
241                        }
242                        continue;
243                    }
244
245                    if let Some(message) = self.parse_raw_message(msg) {
246                        self.message_queue.push_back(message);
247                    }
248
249                    if self.signal.load(Ordering::Acquire) {
250                        log::debug!("Stop signal received");
251                        return None;
252                    }
253                }
254            }
255        }
256    }
257
258    async fn handle_command(&mut self, cmd: HandlerCommand) {
259        match cmd {
260            HandlerCommand::SetClient(client) => {
261                log::debug!("WebSocketClient received by handler");
262                self.inner = Some(client);
263            }
264            HandlerCommand::Disconnect => {
265                log::debug!("Disconnect command received");
266
267                if let Some(inner) = self.inner.take() {
268                    inner.disconnect().await;
269                }
270            }
271            HandlerCommand::ReplaySubscriptions => {
272                log::debug!("ReplaySubscriptions command received");
273                self.replay_subscriptions().await;
274            }
275            HandlerCommand::Subscribe {
276                request_id,
277                symbol,
278                level,
279            } => {
280                log::debug!(
281                    "Subscribe command received: request_id={request_id}, symbol={symbol}, level={level:?}"
282                );
283                let topic = format!("{symbol}:{level:?}");
284                self.pending_subscribe_requests.insert(request_id, topic);
285                self.send_subscribe(request_id, symbol, level).await;
286            }
287            HandlerCommand::Unsubscribe { request_id, symbol } => {
288                log::debug!(
289                    "Unsubscribe command received: request_id={request_id}, symbol={symbol}"
290                );
291                self.send_unsubscribe(request_id, symbol).await;
292            }
293            HandlerCommand::SubscribeCandles {
294                request_id,
295                symbol,
296                width,
297            } => {
298                log::debug!(
299                    "SubscribeCandles command received: request_id={request_id}, symbol={symbol}, width={width:?}"
300                );
301                let topic = format!("candles:{symbol}:{width:?}");
302                self.pending_subscribe_requests.insert(request_id, topic);
303                self.send_subscribe_candles(request_id, symbol, width).await;
304            }
305            HandlerCommand::UnsubscribeCandles {
306                request_id,
307                symbol,
308                width,
309            } => {
310                log::debug!(
311                    "UnsubscribeCandles command received: request_id={request_id}, symbol={symbol}, width={width:?}"
312                );
313                self.message_queue
314                    .push_back(AxDataWsMessage::CandleUnsubscribed { symbol, width });
315                self.send_unsubscribe_candles(request_id, symbol, width)
316                    .await;
317            }
318        }
319    }
320
321    async fn send_subscribe(&mut self, request_id: i64, symbol: Ustr, level: AxMarketDataLevel) {
322        let msg = AxMdSubscribe {
323            rid: request_id,
324            msg_type: AxMdRequestType::Subscribe,
325            symbol,
326            level,
327        };
328
329        if let Err(e) = self.send_json(&msg).await {
330            self.pending_subscribe_requests.remove(&request_id);
331            log::error!("Failed to send subscribe message: {e}");
332        }
333    }
334
335    async fn send_unsubscribe(&self, request_id: i64, symbol: Ustr) {
336        let msg = AxMdUnsubscribe {
337            rid: request_id,
338            msg_type: AxMdRequestType::Unsubscribe,
339            symbol,
340        };
341
342        if let Err(e) = self.send_json(&msg).await {
343            log::error!("Failed to send unsubscribe message: {e}");
344        }
345    }
346
347    async fn send_subscribe_candles(
348        &mut self,
349        request_id: i64,
350        symbol: Ustr,
351        width: AxCandleWidth,
352    ) {
353        let msg = AxMdSubscribeCandles {
354            rid: request_id,
355            msg_type: AxMdRequestType::SubscribeCandles,
356            symbol,
357            width,
358        };
359
360        if let Err(e) = self.send_json(&msg).await {
361            self.pending_subscribe_requests.remove(&request_id);
362            log::error!("Failed to send subscribe_candles message: {e}");
363        }
364    }
365
366    async fn send_unsubscribe_candles(&self, request_id: i64, symbol: Ustr, width: AxCandleWidth) {
367        let msg = AxMdUnsubscribeCandles {
368            rid: request_id,
369            msg_type: AxMdRequestType::UnsubscribeCandles,
370            symbol,
371            width,
372        };
373
374        if let Err(e) = self.send_json(&msg).await {
375            log::error!("Failed to send unsubscribe_candles message: {e}");
376        }
377    }
378
379    async fn send_json<T: serde::Serialize>(&self, msg: &T) -> Result<(), String> {
380        let Some(inner) = &self.inner else {
381            return Err("No WebSocket client available".to_string());
382        };
383
384        let payload = serde_json::to_string(msg).map_err(|e| e.to_string())?;
385        log::trace!("Sending: {payload}");
386
387        inner
388            .send_text(payload, None)
389            .await
390            .map_err(|e| e.to_string())
391    }
392
393    fn parse_raw_message(&mut self, msg: Message) -> Option<AxDataWsMessage> {
394        match msg {
395            Message::Text(text) => {
396                if text == nautilus_network::RECONNECTED {
397                    log::info!("Received WebSocket reconnected signal");
398                    self.needs_subscription_replay = true;
399                    return Some(AxDataWsMessage::Reconnected);
400                }
401
402                log::trace!("Raw websocket message: {text}");
403
404                match parse_md_message(&text) {
405                    Ok(message) => self.handle_message(message),
406                    Err(e) => {
407                        log::error!("Failed to parse WebSocket message: {e}: {text}");
408                        None
409                    }
410                }
411            }
412            Message::Binary(data) => {
413                log::debug!("Received binary message with {} bytes", data.len());
414                None
415            }
416            Message::Close(_) => {
417                log::debug!("Received close message, waiting for reconnection");
418                None
419            }
420            _ => None,
421        }
422    }
423
424    fn handle_message(&mut self, message: AxMdMessage) -> Option<AxDataWsMessage> {
425        match &message {
426            AxMdMessage::Error(error) => {
427                let is_benign = error.message.contains("already subscribed")
428                    || error.message.contains("not subscribed");
429
430                if is_benign {
431                    if let Some(rid) = error.request_id {
432                        self.pending_subscribe_requests.remove(&rid);
433                    }
434                    log::warn!("Subscription state: {}", error.message);
435                } else {
436                    if let Some(rid) = error.request_id
437                        && let Some(topic) = self.pending_subscribe_requests.remove(&rid)
438                    {
439                        log::warn!(
440                            "Rolling back subscription for topic '{topic}' \
441                             due to error: {}",
442                            error.message
443                        );
444                        self.subscriptions.mark_unsubscribe(&topic);
445                    }
446                    log::error!("Received error from exchange: {}", error.message);
447                }
448            }
449            AxMdMessage::SubscriptionResponse(response) => {
450                self.pending_subscribe_requests.remove(&response.rid);
451
452                if let Some(symbol) = &response.result.subscribed {
453                    log::debug!("Subscription confirmed for symbol: {symbol}");
454                } else if let Some(candle) = &response.result.subscribed_candle {
455                    log::debug!("Candle subscription confirmed: {candle}");
456                } else if let Some(symbol) = &response.result.unsubscribed {
457                    log::debug!("Unsubscription confirmed for symbol: {symbol}");
458                } else if let Some(candle) = &response.result.unsubscribed_candle {
459                    log::debug!("Candle unsubscription confirmed: {candle}");
460                }
461                return None;
462            }
463            _ => {}
464        }
465
466        Some(AxDataWsMessage::MdMessage(message))
467    }
468}