1use std::{
19 sync::{
20 Arc,
21 atomic::{AtomicBool, Ordering},
22 },
23 time::Duration,
24};
25
26use ahash::{AHashMap, AHashSet};
27use futures_util::{SinkExt, StreamExt};
28use nautilus_common::{
29 clients::DataClient,
30 live::{runner::get_data_event_sender, runtime::get_runtime},
31 messages::{
32 DataEvent,
33 data::{
34 subscribe::{SubscribeFundingRates, SubscribeIndexPrices, SubscribeMarkPrices},
35 unsubscribe::{UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeMarkPrices},
36 },
37 },
38};
39use nautilus_core::string::urlencoding;
40use nautilus_model::{
41 data::Data,
42 identifiers::{ClientId, Venue},
43};
44use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
45use tokio_tungstenite::{connect_async, tungstenite};
46use tokio_util::sync::CancellationToken;
47
48use crate::{
49 common::{
50 consts::{
51 WS_HEARTBEAT_INTERVAL_SECS, WS_INITIAL_RECONNECT_DELAY_SECS,
52 WS_MAX_RECONNECT_DELAY_SECS,
53 },
54 enums::TardisDataType,
55 urls::resolve_ws_base_url,
56 },
57 config::{BookSnapshotOutput, TardisDataClientConfig},
58 http::TardisHttpClient,
59 machine::{
60 cache::DerivativeTickerCache,
61 client::determine_instrument_info,
62 message::WsMessage,
63 parse::{
64 parse_derivative_ticker_index_price, parse_derivative_ticker_mark_price,
65 parse_tardis_ws_message, parse_tardis_ws_message_funding_rate,
66 },
67 types::{TardisInstrumentKey, TardisInstrumentMiniInfo},
68 },
69};
70
71#[derive(Debug)]
73pub struct TardisDataClient {
74 client_id: ClientId,
75 config: TardisDataClientConfig,
76 is_connected: Arc<AtomicBool>,
77 cancellation_token: CancellationToken,
78 tasks: Vec<JoinHandle<()>>,
79 data_sender: UnboundedSender<DataEvent>,
80}
81
82impl TardisDataClient {
83 pub fn new(client_id: ClientId, config: TardisDataClientConfig) -> anyhow::Result<Self> {
89 let data_sender = get_data_event_sender();
90
91 Ok(Self {
92 client_id,
93 config,
94 is_connected: Arc::new(AtomicBool::new(false)),
95 cancellation_token: CancellationToken::new(),
96 tasks: Vec::new(),
97 data_sender,
98 })
99 }
100
101 fn is_stream_mode(&self) -> bool {
103 self.config.options.is_empty() && !self.config.stream_options.is_empty()
104 }
105
106 fn build_ws_url(&self, base_url: &str) -> anyhow::Result<String> {
112 let deriv = TardisDataType::DerivativeTicker.as_tardis_str();
113
114 if self.is_stream_mode() {
115 let mut options = self.config.stream_options.clone();
116 for opt in &mut options {
117 if !opt.data_types.iter().any(|dt| dt == deriv) {
118 opt.data_types.push(deriv.to_string());
119 }
120 }
121 let options_json = serde_json::to_string(&options)?;
122 Ok(format!(
123 "{base_url}/ws-stream-normalized?options={}",
124 urlencoding::encode(&options_json)
125 ))
126 } else {
127 let mut options = self.config.options.clone();
128 for opt in &mut options {
129 if !opt.data_types.iter().any(|dt| dt == deriv) {
130 opt.data_types.push(deriv.to_string());
131 }
132 }
133 let options_json = serde_json::to_string(&options)?;
134 Ok(format!(
135 "{base_url}/ws-replay-normalized?options={}",
136 urlencoding::encode(&options_json)
137 ))
138 }
139 }
140
141 fn spawn_ws_task(
146 &mut self,
147 ws_stream: tokio_tungstenite::WebSocketStream<
148 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
149 >,
150 url: String,
151 instrument_map: AHashMap<TardisInstrumentKey, Arc<TardisInstrumentMiniInfo>>,
152 book_snapshot_output: BookSnapshotOutput,
153 is_stream_mode: bool,
154 ) {
155 let sender = self.data_sender.clone();
156 let cancel = self.cancellation_token.clone();
157 let connected = self.is_connected.clone();
158
159 let handle = get_runtime().spawn(async move {
160 let mut reconnect_delay = Duration::from_secs(WS_INITIAL_RECONNECT_DELAY_SECS);
161 let instrument_map = instrument_map;
162
163 let should_reconnect = Self::run_ws_session(
165 ws_stream,
166 &cancel,
167 &sender,
168 &instrument_map,
169 &book_snapshot_output,
170 )
171 .await;
172
173 if !should_reconnect || !is_stream_mode || cancel.is_cancelled() {
174 connected.store(false, Ordering::Release);
175 return;
176 }
177
178 connected.store(false, Ordering::Release);
180
181 loop {
183 log::warn!(
184 "Stream disconnected, reconnecting in {}s",
185 reconnect_delay.as_secs()
186 );
187
188 tokio::select! {
189 () = tokio::time::sleep(reconnect_delay) => {}
190 () = cancel.cancelled() => break,
191 }
192
193 reconnect_delay = std::cmp::min(
194 reconnect_delay * 2,
195 Duration::from_secs(WS_MAX_RECONNECT_DELAY_SECS),
196 );
197
198 let ws_result = tokio::select! {
200 result = connect_async(&url) => Some(result),
201 () = cancel.cancelled() => None,
202 };
203
204 let Some(ws_result) = ws_result else {
205 break;
206 };
207
208 match ws_result {
209 Ok((ws_stream, _)) => {
210 log::info!("Reconnected to Tardis Machine");
211 connected.store(true, Ordering::Release);
212 reconnect_delay = Duration::from_secs(WS_INITIAL_RECONNECT_DELAY_SECS);
213
214 let should_reconnect = Self::run_ws_session(
215 ws_stream,
216 &cancel,
217 &sender,
218 &instrument_map,
219 &book_snapshot_output,
220 )
221 .await;
222
223 if !should_reconnect || cancel.is_cancelled() {
224 break;
225 }
226
227 connected.store(false, Ordering::Release);
228 }
229 Err(e) => {
230 if cancel.is_cancelled() {
231 break;
232 }
233
234 log::warn!(
235 "Failed to reconnect to Tardis Machine: {e}, retrying in {}s",
236 reconnect_delay.as_secs()
237 );
238 }
239 }
240 }
241
242 connected.store(false, Ordering::Release);
243 });
244
245 self.tasks.push(handle);
246 }
247
248 async fn run_ws_session(
251 ws_stream: tokio_tungstenite::WebSocketStream<
252 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
253 >,
254 cancel: &CancellationToken,
255 sender: &UnboundedSender<DataEvent>,
256 instrument_map: &AHashMap<TardisInstrumentKey, Arc<TardisInstrumentMiniInfo>>,
257 book_snapshot_output: &BookSnapshotOutput,
258 ) -> bool {
259 let (mut writer, mut reader) = ws_stream.split();
260
261 let heartbeat_token = cancel.child_token();
262 let heartbeat_signal = heartbeat_token.clone();
263
264 get_runtime().spawn(async move {
265 let mut interval =
266 tokio::time::interval(Duration::from_secs(WS_HEARTBEAT_INTERVAL_SECS));
267 loop {
268 tokio::select! {
269 _ = interval.tick() => {
270 log::trace!("Sending PING");
271
272 if let Err(e) = writer.send(tungstenite::Message::Ping(vec![].into())).await {
273 log::debug!("Heartbeat send failed: {e}");
274 break;
275 }
276 }
277 () = heartbeat_signal.cancelled() => break,
278 }
279 }
280 });
281
282 let should_reconnect = Self::run_ws_loop(
283 &mut reader,
284 cancel,
285 sender,
286 instrument_map,
287 book_snapshot_output,
288 )
289 .await;
290
291 heartbeat_token.cancel();
292 should_reconnect
293 }
294
295 fn send_derivative_ticker_events(
300 ws_msg: &WsMessage,
301 info: &Arc<TardisInstrumentMiniInfo>,
302 sender: &UnboundedSender<DataEvent>,
303 cache: &mut DerivativeTickerCache,
304 ) -> bool {
305 if let Some(funding) = parse_tardis_ws_message_funding_rate(ws_msg.clone(), info)
306 && cache.should_emit_funding_rate(&funding)
307 && sender.send(DataEvent::FundingRate(funding)).is_err()
308 {
309 return false;
310 }
311
312 if let WsMessage::DerivativeTicker(msg) = ws_msg {
313 if let Ok(Some(mark_price)) =
314 parse_derivative_ticker_mark_price(msg, info.instrument_id, info.price_precision)
315 && cache.should_emit_mark_price(&mark_price)
316 && sender
317 .send(DataEvent::Data(Data::MarkPriceUpdate(mark_price)))
318 .is_err()
319 {
320 return false;
321 }
322
323 if let Ok(Some(index_price)) =
324 parse_derivative_ticker_index_price(msg, info.instrument_id, info.price_precision)
325 && cache.should_emit_index_price(&index_price)
326 && sender
327 .send(DataEvent::Data(Data::IndexPriceUpdate(index_price)))
328 .is_err()
329 {
330 return false;
331 }
332 }
333
334 true
335 }
336
337 async fn run_ws_loop(
341 reader: &mut futures_util::stream::SplitStream<
342 tokio_tungstenite::WebSocketStream<
343 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
344 >,
345 >,
346 cancel: &CancellationToken,
347 sender: &UnboundedSender<DataEvent>,
348 instrument_map: &AHashMap<TardisInstrumentKey, Arc<TardisInstrumentMiniInfo>>,
349 book_snapshot_output: &BookSnapshotOutput,
350 ) -> bool {
351 let mut ticker_cache = DerivativeTickerCache::default();
352
353 loop {
354 let msg = tokio::select! {
355 msg = reader.next() => msg,
356 () = cancel.cancelled() => {
357 log::debug!("Stream task cancelled");
358 return false;
359 }
360 };
361
362 match msg {
363 Some(Ok(tungstenite::Message::Text(text))) => {
364 match serde_json::from_str::<WsMessage>(&text) {
365 Ok(ws_msg) => {
366 if matches!(ws_msg, WsMessage::Disconnect(_)) {
367 log::debug!("Received disconnect message");
368 continue;
369 }
370
371 let info = determine_instrument_info(&ws_msg, instrument_map);
372
373 if let Some(info) = info {
374 if matches!(ws_msg, WsMessage::DerivativeTicker(_)) {
375 if !Self::send_derivative_ticker_events(
376 &ws_msg,
377 &info,
378 sender,
379 &mut ticker_cache,
380 ) {
381 return false;
382 }
383 } else {
384 let event = parse_tardis_ws_message(
385 ws_msg,
386 &info,
387 book_snapshot_output,
388 )
389 .map(DataEvent::Data);
390
391 if let Some(event) = event
392 && let Err(e) = sender.send(event)
393 {
394 log::error!("Failed to send data event: {e}");
395 return false;
396 }
397 }
398 }
399 }
400 Err(e) => {
401 log::error!("Failed to deserialize message: {e}");
402 }
403 }
404 }
405 Some(Ok(tungstenite::Message::Close(frame))) => {
406 if let Some(frame) = frame {
407 log::info!("WebSocket closed: {} {}", frame.code, frame.reason);
408 } else {
409 log::info!("WebSocket closed");
410 }
411 return true;
412 }
413 Some(Ok(_)) => {}
414 Some(Err(e)) => {
415 log::error!("WebSocket error: {e}");
416 return true;
417 }
418 None => {
419 log::info!("Stream ended");
420 return true;
421 }
422 }
423 }
424 }
425}
426
427#[async_trait::async_trait(?Send)]
428impl DataClient for TardisDataClient {
429 fn client_id(&self) -> ClientId {
430 self.client_id
431 }
432
433 fn venue(&self) -> Option<Venue> {
434 None }
436
437 fn start(&mut self) -> anyhow::Result<()> {
438 log::info!("Starting {}", self.client_id);
439 Ok(())
440 }
441
442 fn stop(&mut self) -> anyhow::Result<()> {
443 log::info!("Stopping {}", self.client_id);
444 self.cancellation_token.cancel();
445
446 for handle in self.tasks.drain(..) {
447 handle.abort();
448 }
449 self.is_connected.store(false, Ordering::Release);
450 Ok(())
451 }
452
453 fn reset(&mut self) -> anyhow::Result<()> {
454 self.cancellation_token.cancel();
455
456 for handle in self.tasks.drain(..) {
457 handle.abort();
458 }
459 self.cancellation_token = CancellationToken::new();
460 self.is_connected.store(false, Ordering::Release);
461 Ok(())
462 }
463
464 fn dispose(&mut self) -> anyhow::Result<()> {
465 self.stop()
466 }
467
468 fn is_connected(&self) -> bool {
469 self.is_connected.load(Ordering::Acquire)
470 }
471
472 fn is_disconnected(&self) -> bool {
473 !self.is_connected()
474 }
475
476 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
477 log::info!("Subscribed mark prices for {}", cmd.instrument_id);
478 Ok(())
479 }
480
481 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
482 log::info!("Subscribed index prices for {}", cmd.instrument_id);
483 Ok(())
484 }
485
486 fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
487 log::info!("Subscribed funding rates for {}", cmd.instrument_id);
488 Ok(())
489 }
490
491 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
492 log::info!("Unsubscribed mark prices for {}", cmd.instrument_id);
493 Ok(())
494 }
495
496 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
497 log::info!("Unsubscribed index prices for {}", cmd.instrument_id);
498 Ok(())
499 }
500
501 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
502 log::info!("Unsubscribed funding rates for {}", cmd.instrument_id);
503 Ok(())
504 }
505
506 async fn connect(&mut self) -> anyhow::Result<()> {
507 if self.is_connected() {
508 return Ok(());
509 }
510
511 if self.config.options.is_empty() && self.config.stream_options.is_empty() {
512 anyhow::bail!("Either replay `options` or `stream_options` must be provided");
513 }
514
515 let is_stream_mode = self.is_stream_mode();
516 let book_snapshot_output = self.config.book_snapshot_output.clone();
517
518 let http_client = TardisHttpClient::new(
519 self.config.api_key.as_deref(),
520 None,
521 None,
522 self.config.normalize_symbols,
523 self.config.proxy_url.clone(),
524 )?;
525
526 let exchanges: AHashSet<_> = if is_stream_mode {
527 self.config
528 .stream_options
529 .iter()
530 .map(|opt| opt.exchange)
531 .collect()
532 } else {
533 self.config.options.iter().map(|opt| opt.exchange).collect()
534 };
535
536 let base_url = resolve_ws_base_url(self.config.tardis_ws_url.as_deref())?;
537 let (instrument_map, instruments) = http_client
538 .bootstrap_instruments(&exchanges)
539 .await
540 .map_err(|e| anyhow::anyhow!("Failed to bootstrap instruments: {e}"))?;
541
542 for instrument in instruments {
543 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
544 log::error!("Failed to send instrument event: {e}");
545 }
546 }
547
548 let url = self.build_ws_url(&base_url)?;
549
550 let mode_label = if is_stream_mode { "stream" } else { "replay" };
551 log::info!("Connecting to Tardis Machine {mode_label}");
552 log::debug!("URL: {url}");
553
554 self.cancellation_token = CancellationToken::new();
555
556 let (ws_stream, _) = connect_async(&url)
557 .await
558 .map_err(|e| anyhow::anyhow!("Failed to connect to Tardis Machine: {e}"))?;
559
560 log::info!("Connected to Tardis Machine");
561
562 self.spawn_ws_task(
563 ws_stream,
564 url,
565 instrument_map,
566 book_snapshot_output,
567 is_stream_mode,
568 );
569 self.is_connected.store(true, Ordering::Release);
570
571 log::info!("Connected: {}", self.client_id);
572 Ok(())
573 }
574
575 async fn disconnect(&mut self) -> anyhow::Result<()> {
576 self.cancellation_token.cancel();
577 self.cancellation_token = CancellationToken::new();
578
579 let handles: Vec<_> = self.tasks.drain(..).collect();
580 if !handles.is_empty() {
581 for handle in handles {
582 if let Err(e) = handle.await {
583 log::error!("Error joining task: {e}");
584 }
585 }
586 log::info!("Disconnected: {}", self.client_id);
587 }
588
589 self.is_connected.store(false, Ordering::Release);
590
591 Ok(())
592 }
593}
594
595#[cfg(test)]
596mod tests {
597 use chrono::NaiveDate;
598 use nautilus_common::live::runner::set_data_event_sender;
599 use nautilus_model::identifiers::ClientId;
600 use rstest::rstest;
601
602 use super::*;
603 use crate::{
604 common::enums::TardisExchange, config::TardisDataClientConfig,
605 machine::types::ReplayNormalizedRequestOptions,
606 };
607
608 fn setup_test_env() {
609 use std::cell::OnceCell;
610
611 thread_local! {
612 static INIT: OnceCell<()> = const { OnceCell::new() };
613 }
614
615 INIT.with(|cell| {
616 cell.get_or_init(|| {
617 let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
618 set_data_event_sender(sender);
619 });
620 });
621 }
622
623 #[rstest]
624 fn test_build_ws_url_injects_derivative_ticker() {
625 setup_test_env();
626
627 let config = TardisDataClientConfig {
628 options: vec![ReplayNormalizedRequestOptions {
629 exchange: TardisExchange::BinanceFutures,
630 symbols: Some(vec!["BTCUSDT".to_string()]),
631 from: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
632 to: NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
633 data_types: vec!["trade".to_string()],
634 with_disconnect_messages: Some(false),
635 }],
636 ..Default::default()
637 };
638
639 let client = TardisDataClient::new(ClientId::new("TARDIS"), config).unwrap();
640 let url = client.build_ws_url("ws://localhost:8001").unwrap();
641
642 assert!(
643 url.contains("derivative_ticker"),
644 "URL should contain derivative_ticker but was: {url}"
645 );
646 assert!(url.contains("trade"), "URL should still contain trade");
647 }
648
649 #[rstest]
650 fn test_build_ws_url_does_not_duplicate_derivative_ticker() {
651 setup_test_env();
652
653 let config = TardisDataClientConfig {
654 options: vec![ReplayNormalizedRequestOptions {
655 exchange: TardisExchange::BinanceFutures,
656 symbols: Some(vec!["BTCUSDT".to_string()]),
657 from: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
658 to: NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
659 data_types: vec!["trade".to_string(), "derivative_ticker".to_string()],
660 with_disconnect_messages: Some(false),
661 }],
662 ..Default::default()
663 };
664
665 let client = TardisDataClient::new(ClientId::new("TARDIS"), config).unwrap();
666 let ws_url = client.build_ws_url("ws://localhost:8001").unwrap();
667
668 let decoded = urlencoding::decode(ws_url.split("options=").nth(1).unwrap()).unwrap();
669 let count = decoded.matches("derivative_ticker").count();
670 assert_eq!(count, 1, "derivative_ticker should appear exactly once");
671 }
672}