Skip to main content

nautilus_tardis/machine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16pub mod cache;
17pub mod client;
18pub mod message;
19pub mod parse;
20pub mod types;
21
22use std::{
23    sync::{
24        Arc,
25        atomic::{AtomicBool, Ordering},
26    },
27    time::Duration,
28};
29
30use async_stream::stream;
31use futures_util::{SinkExt, Stream, StreamExt, stream::SplitSink};
32use message::WsMessage;
33use nautilus_common::live::get_runtime;
34use nautilus_core::string::urlencoding;
35use tokio::net::TcpStream;
36use tokio_tungstenite::{
37    MaybeTlsStream, WebSocketStream, connect_async,
38    tungstenite::{self, protocol::frame::coding::CloseCode},
39};
40use types::{ReplayNormalizedRequestOptions, StreamNormalizedRequestOptions};
41
42pub use crate::machine::client::TardisMachineClient;
43
44pub type Result<T> = std::result::Result<T, Error>;
45
46/// The error that could happen while interacting with Tardis Machine Server.
47#[derive(Debug, thiserror::Error)]
48pub enum Error {
49    /// An error that could happen when an empty options array was given.
50    #[error("Options cannot be empty")]
51    EmptyOptions,
52    /// An error when failed to connect to Tardis' websocket connection.
53    #[error("Failed to connect: {0}")]
54    ConnectFailed(#[from] tungstenite::Error),
55    /// An error when WS connection to the machine server was rejected.
56    #[error("Connection rejected: {reason}")]
57    ConnectRejected {
58        /// The status code for the initial WS connection.
59        status: tungstenite::http::StatusCode,
60        /// The reason why the connection was rejected.
61        reason: String,
62    },
63    /// An error where the websocket connection was closed unexpectedly by Tardis.
64    #[error("Connection closed: {reason}")]
65    ConnectionClosed {
66        /// The reason why the connection was closed.
67        reason: String,
68    },
69    /// An error when deserializing the response from Tardis.
70    #[error("Failed to deserialize message: {0}")]
71    Deserialization(#[from] serde_json::Error),
72}
73
74/// Connects to the Tardis Machine WS replay endpoint and returns a stream of WebSocket messages.
75///
76/// # Errors
77///
78/// Returns `Error::EmptyOptions` if no options provided,
79/// or `Error::ConnectFailed`/`Error::ConnectRejected` if connection fails.
80pub async fn replay_normalized(
81    base_url: &str,
82    options: Vec<ReplayNormalizedRequestOptions>,
83    signal: Arc<AtomicBool>,
84) -> Result<impl Stream<Item = Result<WsMessage>>> {
85    if options.is_empty() {
86        return Err(Error::EmptyOptions);
87    }
88
89    let path = format!("{base_url}/ws-replay-normalized?options=");
90    let options = serde_json::to_string(&options)?;
91
92    let plain_url = format!("{path}{options}");
93    log::debug!("Connecting to {plain_url}");
94
95    let url = format!("{path}{}", urlencoding::encode(&options));
96    stream_from_websocket(base_url, url, signal).await
97}
98
99/// Connects to the Tardis Machine WS streaming endpoint and returns a stream of WebSocket messages.
100///
101/// # Errors
102///
103/// Returns `Error::EmptyOptions` if no options provided,
104/// or `Error::ConnectFailed`/`Error::ConnectRejected` if connection fails.
105pub async fn stream_normalized(
106    base_url: &str,
107    options: Vec<StreamNormalizedRequestOptions>,
108    signal: Arc<AtomicBool>,
109) -> Result<impl Stream<Item = Result<WsMessage>>> {
110    if options.is_empty() {
111        return Err(Error::EmptyOptions);
112    }
113
114    let path = format!("{base_url}/ws-stream-normalized?options=");
115    let options = serde_json::to_string(&options)?;
116
117    let plain_url = format!("{path}{options}");
118    log::debug!("Connecting to {plain_url}");
119
120    let url = format!("{path}{}", urlencoding::encode(&options));
121    stream_from_websocket(base_url, url, signal).await
122}
123
124async fn stream_from_websocket(
125    base_url: &str,
126    url: String,
127    signal: Arc<AtomicBool>,
128) -> Result<impl Stream<Item = Result<WsMessage>>> {
129    let (ws_stream, ws_resp) = connect_async(url).await?;
130
131    handle_connection_response(&ws_resp)?;
132    log::info!("Connected to {base_url}");
133
134    Ok(stream! {
135        let (writer, mut reader) = ws_stream.split();
136        get_runtime().spawn(heartbeat(writer));
137
138        // Timeout awaiting the next record before checking signal
139        let timeout = Duration::from_millis(10);
140
141        log::info!("Streaming from websocket...");
142
143        loop {
144            if signal.load(Ordering::Relaxed) {
145                log::debug!("Shutdown signal received");
146                break;
147            }
148
149            let result = tokio::time::timeout(timeout, reader.next()).await;
150            let msg = match result {
151                Ok(msg) => msg,
152                Err(_) => continue, // Timeout
153            };
154
155            match msg {
156                Some(Ok(msg)) => match msg {
157                    tungstenite::Message::Frame(_)
158                    | tungstenite::Message::Binary(_)
159                    | tungstenite::Message::Pong(_)
160                    | tungstenite::Message::Ping(_) => {
161                        log::trace!("Received {msg:?}");
162                    }
163                    tungstenite::Message::Close(Some(frame)) => {
164                        let reason = frame.reason.to_string();
165                        if frame.code == CloseCode::Normal {
166                            log::debug!("Connection closed normally: {reason}");
167                        } else {
168                            log::error!(
169                                "Connection closed abnormally with code: {:?}, reason: {reason}", frame.code
170                            );
171                            yield Err(Error::ConnectionClosed { reason });
172                        }
173                        break;
174                    }
175                    tungstenite::Message::Close(None) => {
176                        log::error!("Connection closed without a frame");
177                        yield Err(Error::ConnectionClosed {
178                            reason: "No close frame provided".to_string()
179                        });
180                        break;
181                    }
182                    tungstenite::Message::Text(msg) => {
183                        match serde_json::from_str::<WsMessage>(&msg) {
184                            Ok(parsed_msg) => yield Ok(parsed_msg),
185                            Err(e) => {
186                                log::error!("Failed to deserialize message: {msg}. Error: {e}");
187                                yield Err(Error::Deserialization(e));
188                            }
189                        }
190                    }
191                },
192                Some(Err(e)) => {
193                    log::error!("WebSocket error: {e}");
194                    yield Err(Error::ConnectFailed(e));
195                    break;
196                }
197                None => {
198                    log::error!("Connection closed unexpectedly");
199                    yield Err(Error::ConnectionClosed {
200                        reason: "Unexpected connection close".to_string(),
201                    });
202                    break;
203                }
204            }
205        }
206
207        log::info!("Shutdown stream");
208    })
209}
210
211fn handle_connection_response(
212    ws_resp: &tungstenite::http::Response<Option<Vec<u8>>>,
213) -> Result<()> {
214    if ws_resp.status() != tungstenite::http::StatusCode::SWITCHING_PROTOCOLS {
215        return match ws_resp.body() {
216            Some(resp) => Err(Error::ConnectRejected {
217                status: ws_resp.status(),
218                reason: String::from_utf8_lossy(resp).to_string(),
219            }),
220            None => Err(Error::ConnectRejected {
221                status: ws_resp.status(),
222                reason: "Unknown reason".to_string(),
223            }),
224        };
225    }
226    Ok(())
227}
228
229async fn heartbeat(
230    mut sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
231) {
232    let mut heartbeat_interval = tokio::time::interval(Duration::from_secs(10));
233
234    loop {
235        heartbeat_interval.tick().await;
236        log::trace!("Sending PING");
237
238        if let Err(e) = sender.send(tungstenite::Message::Ping(vec![].into())).await {
239            log::debug!("Heartbeat send failed (connection closed): {e}");
240            break;
241        }
242    }
243
244    log::debug!("Heartbeat task exiting");
245}