Skip to main content

nautilus_tardis/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Tardis data client for streaming replay or live data into the engine.
17
18use std::{
19    sync::{
20        Arc,
21        atomic::{AtomicBool, Ordering},
22    },
23    time::Duration,
24};
25
26use ahash::{AHashMap, AHashSet};
27use futures_util::{SinkExt, StreamExt};
28use nautilus_common::{
29    clients::DataClient,
30    live::{runner::get_data_event_sender, runtime::get_runtime},
31    messages::{
32        DataEvent,
33        data::{
34            subscribe::{SubscribeFundingRates, SubscribeIndexPrices, SubscribeMarkPrices},
35            unsubscribe::{UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeMarkPrices},
36        },
37    },
38};
39use nautilus_core::string::urlencoding;
40use nautilus_model::{
41    data::Data,
42    identifiers::{ClientId, Venue},
43};
44use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
45use tokio_tungstenite::{connect_async, tungstenite};
46use tokio_util::sync::CancellationToken;
47
48use crate::{
49    common::{
50        consts::{
51            WS_HEARTBEAT_INTERVAL_SECS, WS_INITIAL_RECONNECT_DELAY_SECS,
52            WS_MAX_RECONNECT_DELAY_SECS,
53        },
54        enums::TardisDataType,
55        urls::resolve_ws_base_url,
56    },
57    config::{BookSnapshotOutput, TardisDataClientConfig},
58    http::TardisHttpClient,
59    machine::{
60        cache::DerivativeTickerCache,
61        client::determine_instrument_info,
62        message::WsMessage,
63        parse::{
64            parse_derivative_ticker_index_price, parse_derivative_ticker_mark_price,
65            parse_tardis_ws_message, parse_tardis_ws_message_funding_rate,
66        },
67        types::{TardisInstrumentKey, TardisInstrumentMiniInfo},
68    },
69};
70
71/// Tardis data client for streaming replay or live data into the platform.
72#[derive(Debug)]
73pub struct TardisDataClient {
74    client_id: ClientId,
75    config: TardisDataClientConfig,
76    is_connected: Arc<AtomicBool>,
77    cancellation_token: CancellationToken,
78    tasks: Vec<JoinHandle<()>>,
79    data_sender: UnboundedSender<DataEvent>,
80}
81
82impl TardisDataClient {
83    /// Creates a new [`TardisDataClient`] instance.
84    ///
85    /// # Errors
86    ///
87    /// Returns an error if the data event sender is not initialized.
88    pub fn new(client_id: ClientId, config: TardisDataClientConfig) -> anyhow::Result<Self> {
89        let data_sender = get_data_event_sender();
90
91        Ok(Self {
92            client_id,
93            config,
94            is_connected: Arc::new(AtomicBool::new(false)),
95            cancellation_token: CancellationToken::new(),
96            tasks: Vec::new(),
97            data_sender,
98        })
99    }
100
101    /// Returns `true` if the client is configured for live streaming mode.
102    fn is_stream_mode(&self) -> bool {
103        self.config.options.is_empty() && !self.config.stream_options.is_empty()
104    }
105
106    /// Builds the WebSocket URL for connecting to the Tardis Machine Server.
107    ///
108    /// Ensures `derivative_ticker` is included in the data types for each
109    /// option set so that mark price, index price, and funding rate events
110    /// are available without requiring manual configuration.
111    fn build_ws_url(&self, base_url: &str) -> anyhow::Result<String> {
112        let deriv = TardisDataType::DerivativeTicker.as_tardis_str();
113
114        if self.is_stream_mode() {
115            let mut options = self.config.stream_options.clone();
116            for opt in &mut options {
117                if !opt.data_types.iter().any(|dt| dt == deriv) {
118                    opt.data_types.push(deriv.to_string());
119                }
120            }
121            let options_json = serde_json::to_string(&options)?;
122            Ok(format!(
123                "{base_url}/ws-stream-normalized?options={}",
124                urlencoding::encode(&options_json)
125            ))
126        } else {
127            let mut options = self.config.options.clone();
128            for opt in &mut options {
129                if !opt.data_types.iter().any(|dt| dt == deriv) {
130                    opt.data_types.push(deriv.to_string());
131                }
132            }
133            let options_json = serde_json::to_string(&options)?;
134            Ok(format!(
135                "{base_url}/ws-replay-normalized?options={}",
136                urlencoding::encode(&options_json)
137            ))
138        }
139    }
140
141    /// Spawns the WebSocket message processing loop using an already-connected
142    /// stream. The initial handshake happens in `connect()` so callers get an
143    /// error if the first connection fails. In stream mode the spawned task
144    /// handles subsequent reconnections automatically.
145    fn spawn_ws_task(
146        &mut self,
147        ws_stream: tokio_tungstenite::WebSocketStream<
148            tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
149        >,
150        url: String,
151        instrument_map: AHashMap<TardisInstrumentKey, Arc<TardisInstrumentMiniInfo>>,
152        book_snapshot_output: BookSnapshotOutput,
153        is_stream_mode: bool,
154    ) {
155        let sender = self.data_sender.clone();
156        let cancel = self.cancellation_token.clone();
157        let connected = self.is_connected.clone();
158
159        let handle = get_runtime().spawn(async move {
160            let mut reconnect_delay = Duration::from_secs(WS_INITIAL_RECONNECT_DELAY_SECS);
161            let instrument_map = instrument_map;
162
163            // Process the initial (already-connected) stream
164            let should_reconnect = Self::run_ws_session(
165                ws_stream,
166                &cancel,
167                &sender,
168                &instrument_map,
169                &book_snapshot_output,
170            )
171            .await;
172
173            if !should_reconnect || !is_stream_mode || cancel.is_cancelled() {
174                connected.store(false, Ordering::Release);
175                return;
176            }
177
178            // Mark disconnected while reconnecting so health checks see the outage
179            connected.store(false, Ordering::Release);
180
181            // Reconnection loop (stream mode only)
182            loop {
183                log::warn!(
184                    "Stream disconnected, reconnecting in {}s",
185                    reconnect_delay.as_secs()
186                );
187
188                tokio::select! {
189                    () = tokio::time::sleep(reconnect_delay) => {}
190                    () = cancel.cancelled() => break,
191                }
192
193                reconnect_delay = std::cmp::min(
194                    reconnect_delay * 2,
195                    Duration::from_secs(WS_MAX_RECONNECT_DELAY_SECS),
196                );
197
198                // Reconnect WS first (critical path), then refresh instruments
199                let ws_result = tokio::select! {
200                    result = connect_async(&url) => Some(result),
201                    () = cancel.cancelled() => None,
202                };
203
204                let Some(ws_result) = ws_result else {
205                    break;
206                };
207
208                match ws_result {
209                    Ok((ws_stream, _)) => {
210                        log::info!("Reconnected to Tardis Machine");
211                        connected.store(true, Ordering::Release);
212                        reconnect_delay = Duration::from_secs(WS_INITIAL_RECONNECT_DELAY_SECS);
213
214                        let should_reconnect = Self::run_ws_session(
215                            ws_stream,
216                            &cancel,
217                            &sender,
218                            &instrument_map,
219                            &book_snapshot_output,
220                        )
221                        .await;
222
223                        if !should_reconnect || cancel.is_cancelled() {
224                            break;
225                        }
226
227                        connected.store(false, Ordering::Release);
228                    }
229                    Err(e) => {
230                        if cancel.is_cancelled() {
231                            break;
232                        }
233
234                        log::warn!(
235                            "Failed to reconnect to Tardis Machine: {e}, retrying in {}s",
236                            reconnect_delay.as_secs()
237                        );
238                    }
239                }
240            }
241
242            connected.store(false, Ordering::Release);
243        });
244
245        self.tasks.push(handle);
246    }
247
248    /// Runs a single WebSocket session: starts heartbeat, processes messages,
249    /// and returns whether the caller should attempt reconnection.
250    async fn run_ws_session(
251        ws_stream: tokio_tungstenite::WebSocketStream<
252            tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
253        >,
254        cancel: &CancellationToken,
255        sender: &UnboundedSender<DataEvent>,
256        instrument_map: &AHashMap<TardisInstrumentKey, Arc<TardisInstrumentMiniInfo>>,
257        book_snapshot_output: &BookSnapshotOutput,
258    ) -> bool {
259        let (mut writer, mut reader) = ws_stream.split();
260
261        let heartbeat_token = cancel.child_token();
262        let heartbeat_signal = heartbeat_token.clone();
263
264        get_runtime().spawn(async move {
265            let mut interval =
266                tokio::time::interval(Duration::from_secs(WS_HEARTBEAT_INTERVAL_SECS));
267            loop {
268                tokio::select! {
269                    _ = interval.tick() => {
270                        log::trace!("Sending PING");
271
272                        if let Err(e) = writer.send(tungstenite::Message::Ping(vec![].into())).await {
273                            log::debug!("Heartbeat send failed: {e}");
274                            break;
275                        }
276                    }
277                    () = heartbeat_signal.cancelled() => break,
278                }
279            }
280        });
281
282        let should_reconnect = Self::run_ws_loop(
283            &mut reader,
284            cancel,
285            sender,
286            instrument_map,
287            book_snapshot_output,
288        )
289        .await;
290
291        heartbeat_token.cancel();
292        should_reconnect
293    }
294
295    /// Extracts and sends all data events from a `DerivativeTicker` message:
296    /// funding rate, mark price, and index price. Only emits events when values
297    /// change from the previous update. Returns `false` if the channel is broken
298    /// and the caller should exit the loop.
299    fn send_derivative_ticker_events(
300        ws_msg: &WsMessage,
301        info: &Arc<TardisInstrumentMiniInfo>,
302        sender: &UnboundedSender<DataEvent>,
303        cache: &mut DerivativeTickerCache,
304    ) -> bool {
305        if let Some(funding) = parse_tardis_ws_message_funding_rate(ws_msg.clone(), info)
306            && cache.should_emit_funding_rate(&funding)
307            && sender.send(DataEvent::FundingRate(funding)).is_err()
308        {
309            return false;
310        }
311
312        if let WsMessage::DerivativeTicker(msg) = ws_msg {
313            if let Ok(Some(mark_price)) =
314                parse_derivative_ticker_mark_price(msg, info.instrument_id, info.price_precision)
315                && cache.should_emit_mark_price(&mark_price)
316                && sender
317                    .send(DataEvent::Data(Data::MarkPriceUpdate(mark_price)))
318                    .is_err()
319            {
320                return false;
321            }
322
323            if let Ok(Some(index_price)) =
324                parse_derivative_ticker_index_price(msg, info.instrument_id, info.price_precision)
325                && cache.should_emit_index_price(&index_price)
326                && sender
327                    .send(DataEvent::Data(Data::IndexPriceUpdate(index_price)))
328                    .is_err()
329            {
330                return false;
331            }
332        }
333
334        true
335    }
336
337    /// Processes WebSocket messages until the stream ends, an error occurs, or
338    /// the cancellation token fires. Returns `true` if the caller should attempt
339    /// reconnection (stream mode only).
340    async fn run_ws_loop(
341        reader: &mut futures_util::stream::SplitStream<
342            tokio_tungstenite::WebSocketStream<
343                tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
344            >,
345        >,
346        cancel: &CancellationToken,
347        sender: &UnboundedSender<DataEvent>,
348        instrument_map: &AHashMap<TardisInstrumentKey, Arc<TardisInstrumentMiniInfo>>,
349        book_snapshot_output: &BookSnapshotOutput,
350    ) -> bool {
351        let mut ticker_cache = DerivativeTickerCache::default();
352
353        loop {
354            let msg = tokio::select! {
355                msg = reader.next() => msg,
356                () = cancel.cancelled() => {
357                    log::debug!("Stream task cancelled");
358                    return false;
359                }
360            };
361
362            match msg {
363                Some(Ok(tungstenite::Message::Text(text))) => {
364                    match serde_json::from_str::<WsMessage>(&text) {
365                        Ok(ws_msg) => {
366                            if matches!(ws_msg, WsMessage::Disconnect(_)) {
367                                log::debug!("Received disconnect message");
368                                continue;
369                            }
370
371                            let info = determine_instrument_info(&ws_msg, instrument_map);
372
373                            if let Some(info) = info {
374                                if matches!(ws_msg, WsMessage::DerivativeTicker(_)) {
375                                    if !Self::send_derivative_ticker_events(
376                                        &ws_msg,
377                                        &info,
378                                        sender,
379                                        &mut ticker_cache,
380                                    ) {
381                                        return false;
382                                    }
383                                } else {
384                                    let event = parse_tardis_ws_message(
385                                        ws_msg,
386                                        &info,
387                                        book_snapshot_output,
388                                    )
389                                    .map(DataEvent::Data);
390
391                                    if let Some(event) = event
392                                        && let Err(e) = sender.send(event)
393                                    {
394                                        log::error!("Failed to send data event: {e}");
395                                        return false;
396                                    }
397                                }
398                            }
399                        }
400                        Err(e) => {
401                            log::error!("Failed to deserialize message: {e}");
402                        }
403                    }
404                }
405                Some(Ok(tungstenite::Message::Close(frame))) => {
406                    if let Some(frame) = frame {
407                        log::info!("WebSocket closed: {} {}", frame.code, frame.reason);
408                    } else {
409                        log::info!("WebSocket closed");
410                    }
411                    return true;
412                }
413                Some(Ok(_)) => {}
414                Some(Err(e)) => {
415                    log::error!("WebSocket error: {e}");
416                    return true;
417                }
418                None => {
419                    log::info!("Stream ended");
420                    return true;
421                }
422            }
423        }
424    }
425}
426
427#[async_trait::async_trait(?Send)]
428impl DataClient for TardisDataClient {
429    fn client_id(&self) -> ClientId {
430        self.client_id
431    }
432
433    fn venue(&self) -> Option<Venue> {
434        None // Tardis is multi-venue
435    }
436
437    fn start(&mut self) -> anyhow::Result<()> {
438        log::info!("Starting {}", self.client_id);
439        Ok(())
440    }
441
442    fn stop(&mut self) -> anyhow::Result<()> {
443        log::info!("Stopping {}", self.client_id);
444        self.cancellation_token.cancel();
445
446        for handle in self.tasks.drain(..) {
447            handle.abort();
448        }
449        self.is_connected.store(false, Ordering::Release);
450        Ok(())
451    }
452
453    fn reset(&mut self) -> anyhow::Result<()> {
454        self.cancellation_token.cancel();
455
456        for handle in self.tasks.drain(..) {
457            handle.abort();
458        }
459        self.cancellation_token = CancellationToken::new();
460        self.is_connected.store(false, Ordering::Release);
461        Ok(())
462    }
463
464    fn dispose(&mut self) -> anyhow::Result<()> {
465        self.stop()
466    }
467
468    fn is_connected(&self) -> bool {
469        self.is_connected.load(Ordering::Acquire)
470    }
471
472    fn is_disconnected(&self) -> bool {
473        !self.is_connected()
474    }
475
476    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
477        log::info!("Subscribed mark prices for {}", cmd.instrument_id);
478        Ok(())
479    }
480
481    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
482        log::info!("Subscribed index prices for {}", cmd.instrument_id);
483        Ok(())
484    }
485
486    fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
487        log::info!("Subscribed funding rates for {}", cmd.instrument_id);
488        Ok(())
489    }
490
491    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
492        log::info!("Unsubscribed mark prices for {}", cmd.instrument_id);
493        Ok(())
494    }
495
496    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
497        log::info!("Unsubscribed index prices for {}", cmd.instrument_id);
498        Ok(())
499    }
500
501    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
502        log::info!("Unsubscribed funding rates for {}", cmd.instrument_id);
503        Ok(())
504    }
505
506    async fn connect(&mut self) -> anyhow::Result<()> {
507        if self.is_connected() {
508            return Ok(());
509        }
510
511        if self.config.options.is_empty() && self.config.stream_options.is_empty() {
512            anyhow::bail!("Either replay `options` or `stream_options` must be provided");
513        }
514
515        let is_stream_mode = self.is_stream_mode();
516        let book_snapshot_output = self.config.book_snapshot_output.clone();
517
518        let http_client = TardisHttpClient::new(
519            self.config.api_key.as_deref(),
520            None,
521            None,
522            self.config.normalize_symbols,
523            self.config.proxy_url.clone(),
524        )?;
525
526        let exchanges: AHashSet<_> = if is_stream_mode {
527            self.config
528                .stream_options
529                .iter()
530                .map(|opt| opt.exchange)
531                .collect()
532        } else {
533            self.config.options.iter().map(|opt| opt.exchange).collect()
534        };
535
536        let base_url = resolve_ws_base_url(self.config.tardis_ws_url.as_deref())?;
537        let (instrument_map, instruments) = http_client
538            .bootstrap_instruments(&exchanges)
539            .await
540            .map_err(|e| anyhow::anyhow!("Failed to bootstrap instruments: {e}"))?;
541
542        for instrument in instruments {
543            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
544                log::error!("Failed to send instrument event: {e}");
545            }
546        }
547
548        let url = self.build_ws_url(&base_url)?;
549
550        let mode_label = if is_stream_mode { "stream" } else { "replay" };
551        log::info!("Connecting to Tardis Machine {mode_label}");
552        log::debug!("URL: {url}");
553
554        self.cancellation_token = CancellationToken::new();
555
556        let (ws_stream, _) = connect_async(&url)
557            .await
558            .map_err(|e| anyhow::anyhow!("Failed to connect to Tardis Machine: {e}"))?;
559
560        log::info!("Connected to Tardis Machine");
561
562        self.spawn_ws_task(
563            ws_stream,
564            url,
565            instrument_map,
566            book_snapshot_output,
567            is_stream_mode,
568        );
569        self.is_connected.store(true, Ordering::Release);
570
571        log::info!("Connected: {}", self.client_id);
572        Ok(())
573    }
574
575    async fn disconnect(&mut self) -> anyhow::Result<()> {
576        self.cancellation_token.cancel();
577        self.cancellation_token = CancellationToken::new();
578
579        let handles: Vec<_> = self.tasks.drain(..).collect();
580        if !handles.is_empty() {
581            for handle in handles {
582                if let Err(e) = handle.await {
583                    log::error!("Error joining task: {e}");
584                }
585            }
586            log::info!("Disconnected: {}", self.client_id);
587        }
588
589        self.is_connected.store(false, Ordering::Release);
590
591        Ok(())
592    }
593}
594
595#[cfg(test)]
596mod tests {
597    use chrono::NaiveDate;
598    use nautilus_common::live::runner::set_data_event_sender;
599    use nautilus_model::identifiers::ClientId;
600    use rstest::rstest;
601
602    use super::*;
603    use crate::{
604        common::enums::TardisExchange, config::TardisDataClientConfig,
605        machine::types::ReplayNormalizedRequestOptions,
606    };
607
608    fn setup_test_env() {
609        use std::cell::OnceCell;
610
611        thread_local! {
612            static INIT: OnceCell<()> = const { OnceCell::new() };
613        }
614
615        INIT.with(|cell| {
616            cell.get_or_init(|| {
617                let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
618                set_data_event_sender(sender);
619            });
620        });
621    }
622
623    #[rstest]
624    fn test_build_ws_url_injects_derivative_ticker() {
625        setup_test_env();
626
627        let config = TardisDataClientConfig {
628            options: vec![ReplayNormalizedRequestOptions {
629                exchange: TardisExchange::BinanceFutures,
630                symbols: Some(vec!["BTCUSDT".to_string()]),
631                from: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
632                to: NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
633                data_types: vec!["trade".to_string()],
634                with_disconnect_messages: Some(false),
635            }],
636            ..Default::default()
637        };
638
639        let client = TardisDataClient::new(ClientId::new("TARDIS"), config).unwrap();
640        let url = client.build_ws_url("ws://localhost:8001").unwrap();
641
642        assert!(
643            url.contains("derivative_ticker"),
644            "URL should contain derivative_ticker but was: {url}"
645        );
646        assert!(url.contains("trade"), "URL should still contain trade");
647    }
648
649    #[rstest]
650    fn test_build_ws_url_does_not_duplicate_derivative_ticker() {
651        setup_test_env();
652
653        let config = TardisDataClientConfig {
654            options: vec![ReplayNormalizedRequestOptions {
655                exchange: TardisExchange::BinanceFutures,
656                symbols: Some(vec!["BTCUSDT".to_string()]),
657                from: NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
658                to: NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
659                data_types: vec!["trade".to_string(), "derivative_ticker".to_string()],
660                with_disconnect_messages: Some(false),
661            }],
662            ..Default::default()
663        };
664
665        let client = TardisDataClient::new(ClientId::new("TARDIS"), config).unwrap();
666        let ws_url = client.build_ws_url("ws://localhost:8001").unwrap();
667
668        let decoded = urlencoding::decode(ws_url.split("options=").nth(1).unwrap()).unwrap();
669        let count = decoded.matches("derivative_ticker").count();
670        assert_eq!(count, 1, "derivative_ticker should appear exactly once");
671    }
672}