nautilus_tardis/machine/
mod.rs1pub mod cache;
17pub mod client;
18pub mod message;
19pub mod parse;
20pub mod types;
21
22use std::{
23 sync::{
24 Arc,
25 atomic::{AtomicBool, Ordering},
26 },
27 time::Duration,
28};
29
30use async_stream::stream;
31use futures_util::{SinkExt, Stream, StreamExt, stream::SplitSink};
32use message::WsMessage;
33use nautilus_common::live::get_runtime;
34use nautilus_core::string::urlencoding;
35use tokio::net::TcpStream;
36use tokio_tungstenite::{
37 MaybeTlsStream, WebSocketStream, connect_async,
38 tungstenite::{self, protocol::frame::coding::CloseCode},
39};
40use types::{ReplayNormalizedRequestOptions, StreamNormalizedRequestOptions};
41
42pub use crate::machine::client::TardisMachineClient;
43
44pub type Result<T> = std::result::Result<T, Error>;
45
46#[derive(Debug, thiserror::Error)]
48pub enum Error {
49 #[error("Options cannot be empty")]
51 EmptyOptions,
52 #[error("Failed to connect: {0}")]
54 ConnectFailed(#[from] tungstenite::Error),
55 #[error("Connection rejected: {reason}")]
57 ConnectRejected {
58 status: tungstenite::http::StatusCode,
60 reason: String,
62 },
63 #[error("Connection closed: {reason}")]
65 ConnectionClosed {
66 reason: String,
68 },
69 #[error("Failed to deserialize message: {0}")]
71 Deserialization(#[from] serde_json::Error),
72}
73
74pub async fn replay_normalized(
81 base_url: &str,
82 options: Vec<ReplayNormalizedRequestOptions>,
83 signal: Arc<AtomicBool>,
84) -> Result<impl Stream<Item = Result<WsMessage>>> {
85 if options.is_empty() {
86 return Err(Error::EmptyOptions);
87 }
88
89 let path = format!("{base_url}/ws-replay-normalized?options=");
90 let options = serde_json::to_string(&options)?;
91
92 let plain_url = format!("{path}{options}");
93 log::debug!("Connecting to {plain_url}");
94
95 let url = format!("{path}{}", urlencoding::encode(&options));
96 stream_from_websocket(base_url, url, signal).await
97}
98
99pub async fn stream_normalized(
106 base_url: &str,
107 options: Vec<StreamNormalizedRequestOptions>,
108 signal: Arc<AtomicBool>,
109) -> Result<impl Stream<Item = Result<WsMessage>>> {
110 if options.is_empty() {
111 return Err(Error::EmptyOptions);
112 }
113
114 let path = format!("{base_url}/ws-stream-normalized?options=");
115 let options = serde_json::to_string(&options)?;
116
117 let plain_url = format!("{path}{options}");
118 log::debug!("Connecting to {plain_url}");
119
120 let url = format!("{path}{}", urlencoding::encode(&options));
121 stream_from_websocket(base_url, url, signal).await
122}
123
124async fn stream_from_websocket(
125 base_url: &str,
126 url: String,
127 signal: Arc<AtomicBool>,
128) -> Result<impl Stream<Item = Result<WsMessage>>> {
129 let (ws_stream, ws_resp) = connect_async(url).await?;
130
131 handle_connection_response(&ws_resp)?;
132 log::info!("Connected to {base_url}");
133
134 Ok(stream! {
135 let (writer, mut reader) = ws_stream.split();
136 get_runtime().spawn(heartbeat(writer));
137
138 let timeout = Duration::from_millis(10);
140
141 log::info!("Streaming from websocket...");
142
143 loop {
144 if signal.load(Ordering::Relaxed) {
145 log::debug!("Shutdown signal received");
146 break;
147 }
148
149 let result = tokio::time::timeout(timeout, reader.next()).await;
150 let msg = match result {
151 Ok(msg) => msg,
152 Err(_) => continue, };
154
155 match msg {
156 Some(Ok(msg)) => match msg {
157 tungstenite::Message::Frame(_)
158 | tungstenite::Message::Binary(_)
159 | tungstenite::Message::Pong(_)
160 | tungstenite::Message::Ping(_) => {
161 log::trace!("Received {msg:?}");
162 }
163 tungstenite::Message::Close(Some(frame)) => {
164 let reason = frame.reason.to_string();
165 if frame.code == CloseCode::Normal {
166 log::debug!("Connection closed normally: {reason}");
167 } else {
168 log::error!(
169 "Connection closed abnormally with code: {:?}, reason: {reason}", frame.code
170 );
171 yield Err(Error::ConnectionClosed { reason });
172 }
173 break;
174 }
175 tungstenite::Message::Close(None) => {
176 log::error!("Connection closed without a frame");
177 yield Err(Error::ConnectionClosed {
178 reason: "No close frame provided".to_string()
179 });
180 break;
181 }
182 tungstenite::Message::Text(msg) => {
183 match serde_json::from_str::<WsMessage>(&msg) {
184 Ok(parsed_msg) => yield Ok(parsed_msg),
185 Err(e) => {
186 log::error!("Failed to deserialize message: {msg}. Error: {e}");
187 yield Err(Error::Deserialization(e));
188 }
189 }
190 }
191 },
192 Some(Err(e)) => {
193 log::error!("WebSocket error: {e}");
194 yield Err(Error::ConnectFailed(e));
195 break;
196 }
197 None => {
198 log::error!("Connection closed unexpectedly");
199 yield Err(Error::ConnectionClosed {
200 reason: "Unexpected connection close".to_string(),
201 });
202 break;
203 }
204 }
205 }
206
207 log::info!("Shutdown stream");
208 })
209}
210
211fn handle_connection_response(
212 ws_resp: &tungstenite::http::Response<Option<Vec<u8>>>,
213) -> Result<()> {
214 if ws_resp.status() != tungstenite::http::StatusCode::SWITCHING_PROTOCOLS {
215 return match ws_resp.body() {
216 Some(resp) => Err(Error::ConnectRejected {
217 status: ws_resp.status(),
218 reason: String::from_utf8_lossy(resp).to_string(),
219 }),
220 None => Err(Error::ConnectRejected {
221 status: ws_resp.status(),
222 reason: "Unknown reason".to_string(),
223 }),
224 };
225 }
226 Ok(())
227}
228
229async fn heartbeat(
230 mut sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
231) {
232 let mut heartbeat_interval = tokio::time::interval(Duration::from_secs(10));
233
234 loop {
235 heartbeat_interval.tick().await;
236 log::trace!("Sending PING");
237
238 if let Err(e) = sender.send(tungstenite::Message::Ping(vec![].into())).await {
239 log::debug!("Heartbeat send failed (connection closed): {e}");
240 break;
241 }
242 }
243
244 log::debug!("Heartbeat task exiting");
245}