Skip to main content

nautilus_tardis/
replay.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    fs,
18    path::{Path, PathBuf},
19};
20
21use ahash::{AHashMap, AHashSet};
22use anyhow::Context;
23use arrow::record_batch::RecordBatch;
24use chrono::{DateTime, Duration, NaiveDate};
25use futures_util::{StreamExt, pin_mut};
26use nautilus_core::{UnixNanos, datetime::unix_nanos_to_iso8601, string::formatting::Separable};
27use nautilus_model::{
28    data::{
29        Bar, BarType, Data, OrderBookDelta, OrderBookDeltas_API, OrderBookDepth10, QuoteTick,
30        TradeTick,
31    },
32    identifiers::InstrumentId,
33};
34use nautilus_serialization::arrow::{
35    bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
36    book_depth10_to_arrow_record_batch_bytes, quotes_to_arrow_record_batch_bytes,
37    trades_to_arrow_record_batch_bytes,
38};
39use parquet::{arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties};
40
41use crate::{
42    config::{BookSnapshotOutput, ParquetCompression, TardisReplayConfig},
43    http::TardisHttpClient,
44    machine::TardisMachineClient,
45};
46
47struct DateCursor {
48    /// Cursor date UTC.
49    date_utc: NaiveDate,
50    /// Cursor end timestamp UNIX nanoseconds.
51    end_ns: UnixNanos,
52}
53
54impl DateCursor {
55    /// Creates a new [`DateCursor`] instance.
56    fn new(current_ns: UnixNanos) -> Self {
57        let current_utc = DateTime::from_timestamp_nanos(current_ns.as_i64());
58        let date_utc = current_utc.date_naive();
59
60        // Calculate end of the current UTC day
61        let end_utc =
62            date_utc.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
63        let end_ns = UnixNanos::from(end_utc.and_utc().timestamp_nanos_opt().unwrap() as u64);
64
65        Self { date_utc, end_ns }
66    }
67}
68
69/// Runs the Tardis Machine replay from a JSON configuration file.
70///
71/// # Errors
72///
73/// Returns an error if reading or parsing the config file fails,
74/// or if any downstream replay operation fails.
75///
76/// # Panics
77///
78/// Panics if unable to determine the output path (current directory fallback fails).
79pub async fn run_tardis_machine_replay_from_config(config_filepath: &Path) -> anyhow::Result<()> {
80    log::info!("Starting replay");
81    log::info!("Config filepath: {}", config_filepath.display());
82
83    // Load and parse the replay configuration
84    let config_data = fs::read_to_string(config_filepath)
85        .with_context(|| format!("Failed to read config file: {}", config_filepath.display()))?;
86    let config: TardisReplayConfig = serde_json::from_str(&config_data)
87        .context("failed to parse config JSON into TardisReplayConfig")?;
88
89    let path = config
90        .output_path
91        .as_deref()
92        .map(Path::new)
93        .map(Path::to_path_buf)
94        .or_else(|| {
95            std::env::var("NAUTILUS_PATH")
96                .ok()
97                .map(|env_path| PathBuf::from(env_path).join("catalog").join("data"))
98        })
99        .unwrap_or_else(|| std::env::current_dir().expect("Failed to get current directory"));
100
101    log::info!("Output path: {}", path.display());
102
103    let normalize_symbols = config.normalize_symbols.unwrap_or(true);
104    log::info!("normalize_symbols={normalize_symbols}");
105
106    let book_snapshot_output = config
107        .book_snapshot_output
108        .clone()
109        .unwrap_or(BookSnapshotOutput::Deltas);
110    log::info!("book_snapshot_output={book_snapshot_output:?}");
111
112    let compression = config
113        .compression
114        .clone()
115        .unwrap_or(ParquetCompression::Zstd);
116    log::info!("compression={compression:?}");
117    let compression = compression.as_parquet_compression();
118
119    let http_client = TardisHttpClient::new(
120        None,
121        None,
122        None,
123        normalize_symbols,
124        config.proxy_url.clone(),
125    )?;
126    let mut machine_client = TardisMachineClient::new(
127        config.tardis_ws_url.as_deref(),
128        normalize_symbols,
129        book_snapshot_output,
130    )?;
131
132    let exchanges: AHashSet<_> = config.options.iter().map(|opt| opt.exchange).collect();
133    let (instrument_map, _instruments) = http_client
134        .bootstrap_instruments(&exchanges)
135        .await
136        .context("failed to bootstrap instruments")?;
137
138    for (_, info) in &instrument_map {
139        machine_client.add_instrument_info((**info).clone());
140    }
141
142    log::info!("Starting tardis-machine stream");
143    let stream = machine_client.replay(config.options).await?;
144    pin_mut!(stream);
145
146    // Initialize date cursors
147    let mut deltas_cursors: AHashMap<InstrumentId, DateCursor> = AHashMap::new();
148    let mut depths_cursors: AHashMap<InstrumentId, DateCursor> = AHashMap::new();
149    let mut quotes_cursors: AHashMap<InstrumentId, DateCursor> = AHashMap::new();
150    let mut trades_cursors: AHashMap<InstrumentId, DateCursor> = AHashMap::new();
151    let mut bars_cursors: AHashMap<BarType, DateCursor> = AHashMap::new();
152
153    // Initialize date collection maps
154    let mut deltas_map: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
155    let mut depths_map: AHashMap<InstrumentId, Vec<OrderBookDepth10>> = AHashMap::new();
156    let mut quotes_map: AHashMap<InstrumentId, Vec<QuoteTick>> = AHashMap::new();
157    let mut trades_map: AHashMap<InstrumentId, Vec<TradeTick>> = AHashMap::new();
158    let mut bars_map: AHashMap<BarType, Vec<Bar>> = AHashMap::new();
159
160    let mut msg_count = 0;
161
162    while let Some(result) = stream.next().await {
163        match result {
164            Ok(msg) => {
165                match msg {
166                    Data::Deltas(msg) => {
167                        handle_deltas_msg(
168                            &msg,
169                            &mut deltas_map,
170                            &mut deltas_cursors,
171                            &path,
172                            compression,
173                        );
174                    }
175                    Data::Depth10(msg) => {
176                        handle_depth10_msg(
177                            *msg,
178                            &mut depths_map,
179                            &mut depths_cursors,
180                            &path,
181                            compression,
182                        );
183                    }
184                    Data::Quote(msg) => {
185                        handle_quote_msg(
186                            msg,
187                            &mut quotes_map,
188                            &mut quotes_cursors,
189                            &path,
190                            compression,
191                        );
192                    }
193                    Data::Trade(msg) => {
194                        handle_trade_msg(
195                            msg,
196                            &mut trades_map,
197                            &mut trades_cursors,
198                            &path,
199                            compression,
200                        );
201                    }
202                    Data::Bar(msg) => {
203                        handle_bar_msg(msg, &mut bars_map, &mut bars_cursors, &path, compression);
204                    }
205                    Data::Delta(delta) => {
206                        log::warn!(
207                            "Skipping individual delta message for {} (use Deltas batch instead)",
208                            delta.instrument_id
209                        );
210                    }
211                    Data::MarkPriceUpdate(_)
212                    | Data::IndexPriceUpdate(_)
213                    | Data::InstrumentStatus(_)
214                    | Data::InstrumentClose(_)
215                    | Data::Custom(_) => {
216                        log::debug!(
217                            "Skipping unsupported data type for instrument {}",
218                            msg.instrument_id()
219                        );
220                    }
221                }
222
223                msg_count += 1;
224                if msg_count % 100_000 == 0 {
225                    log::debug!("Processed {} messages", msg_count.separate_with_commas());
226                }
227            }
228            Err(e) => {
229                log::error!("Stream error: {e:?}");
230                break;
231            }
232        }
233    }
234
235    // Iterate through every remaining type and instrument sequentially
236
237    for (instrument_id, deltas) in &deltas_map {
238        let cursor = deltas_cursors.get(instrument_id).expect("Expected cursor");
239        batch_and_write_deltas(deltas, instrument_id, cursor.date_utc, &path, compression);
240    }
241
242    for (instrument_id, depths) in &depths_map {
243        let cursor = depths_cursors.get(instrument_id).expect("Expected cursor");
244        batch_and_write_depths(depths, instrument_id, cursor.date_utc, &path, compression);
245    }
246
247    for (instrument_id, quotes) in &quotes_map {
248        let cursor = quotes_cursors.get(instrument_id).expect("Expected cursor");
249        batch_and_write_quotes(quotes, instrument_id, cursor.date_utc, &path, compression);
250    }
251
252    for (instrument_id, trades) in &trades_map {
253        let cursor = trades_cursors.get(instrument_id).expect("Expected cursor");
254        batch_and_write_trades(trades, instrument_id, cursor.date_utc, &path, compression);
255    }
256
257    for (bar_type, bars) in &bars_map {
258        let cursor = bars_cursors.get(bar_type).expect("Expected cursor");
259        batch_and_write_bars(bars, bar_type, cursor.date_utc, &path, compression);
260    }
261
262    log::info!(
263        "Replay completed after {} messages",
264        msg_count.separate_with_commas()
265    );
266    Ok(())
267}
268
269fn handle_deltas_msg(
270    deltas: &OrderBookDeltas_API,
271    map: &mut AHashMap<InstrumentId, Vec<OrderBookDelta>>,
272    cursors: &mut AHashMap<InstrumentId, DateCursor>,
273    path: &Path,
274    compression: Compression,
275) {
276    let cursor = cursors
277        .entry(deltas.instrument_id)
278        .or_insert_with(|| DateCursor::new(deltas.ts_init));
279
280    if deltas.ts_init > cursor.end_ns {
281        if let Some(deltas_vec) = map.remove(&deltas.instrument_id) {
282            batch_and_write_deltas(
283                &deltas_vec,
284                &deltas.instrument_id,
285                cursor.date_utc,
286                path,
287                compression,
288            );
289        }
290        // Update cursor
291        *cursor = DateCursor::new(deltas.ts_init);
292    }
293
294    map.entry(deltas.instrument_id)
295        .or_insert_with(|| Vec::with_capacity(100_000))
296        .extend(&*deltas.deltas);
297}
298
299fn handle_depth10_msg(
300    depth10: OrderBookDepth10,
301    map: &mut AHashMap<InstrumentId, Vec<OrderBookDepth10>>,
302    cursors: &mut AHashMap<InstrumentId, DateCursor>,
303    path: &Path,
304    compression: Compression,
305) {
306    let cursor = cursors
307        .entry(depth10.instrument_id)
308        .or_insert_with(|| DateCursor::new(depth10.ts_init));
309
310    if depth10.ts_init > cursor.end_ns {
311        if let Some(depths_vec) = map.remove(&depth10.instrument_id) {
312            batch_and_write_depths(
313                &depths_vec,
314                &depth10.instrument_id,
315                cursor.date_utc,
316                path,
317                compression,
318            );
319        }
320        // Update cursor
321        *cursor = DateCursor::new(depth10.ts_init);
322    }
323
324    map.entry(depth10.instrument_id)
325        .or_insert_with(|| Vec::with_capacity(100_000))
326        .push(depth10);
327}
328
329fn handle_quote_msg(
330    quote: QuoteTick,
331    map: &mut AHashMap<InstrumentId, Vec<QuoteTick>>,
332    cursors: &mut AHashMap<InstrumentId, DateCursor>,
333    path: &Path,
334    compression: Compression,
335) {
336    let cursor = cursors
337        .entry(quote.instrument_id)
338        .or_insert_with(|| DateCursor::new(quote.ts_init));
339
340    if quote.ts_init > cursor.end_ns {
341        if let Some(quotes_vec) = map.remove(&quote.instrument_id) {
342            batch_and_write_quotes(
343                &quotes_vec,
344                &quote.instrument_id,
345                cursor.date_utc,
346                path,
347                compression,
348            );
349        }
350        // Update cursor
351        *cursor = DateCursor::new(quote.ts_init);
352    }
353
354    map.entry(quote.instrument_id)
355        .or_insert_with(|| Vec::with_capacity(100_000))
356        .push(quote);
357}
358
359fn handle_trade_msg(
360    trade: TradeTick,
361    map: &mut AHashMap<InstrumentId, Vec<TradeTick>>,
362    cursors: &mut AHashMap<InstrumentId, DateCursor>,
363    path: &Path,
364    compression: Compression,
365) {
366    let cursor = cursors
367        .entry(trade.instrument_id)
368        .or_insert_with(|| DateCursor::new(trade.ts_init));
369
370    if trade.ts_init > cursor.end_ns {
371        if let Some(trades_vec) = map.remove(&trade.instrument_id) {
372            batch_and_write_trades(
373                &trades_vec,
374                &trade.instrument_id,
375                cursor.date_utc,
376                path,
377                compression,
378            );
379        }
380        // Update cursor
381        *cursor = DateCursor::new(trade.ts_init);
382    }
383
384    map.entry(trade.instrument_id)
385        .or_insert_with(|| Vec::with_capacity(100_000))
386        .push(trade);
387}
388
389fn handle_bar_msg(
390    bar: Bar,
391    map: &mut AHashMap<BarType, Vec<Bar>>,
392    cursors: &mut AHashMap<BarType, DateCursor>,
393    path: &Path,
394    compression: Compression,
395) {
396    let cursor = cursors
397        .entry(bar.bar_type)
398        .or_insert_with(|| DateCursor::new(bar.ts_init));
399
400    if bar.ts_init > cursor.end_ns {
401        if let Some(bars_vec) = map.remove(&bar.bar_type) {
402            batch_and_write_bars(&bars_vec, &bar.bar_type, cursor.date_utc, path, compression);
403        }
404        // Update cursor
405        *cursor = DateCursor::new(bar.ts_init);
406    }
407
408    map.entry(bar.bar_type)
409        .or_insert_with(|| Vec::with_capacity(100_000))
410        .push(bar);
411}
412
413fn batch_and_write_deltas(
414    deltas: &[OrderBookDelta],
415    instrument_id: &InstrumentId,
416    date: NaiveDate,
417    path: &Path,
418    compression: Compression,
419) {
420    match book_deltas_to_arrow_record_batch_bytes(deltas) {
421        Ok(batch) => write_batch(
422            &batch,
423            "order_book_deltas",
424            instrument_id,
425            date,
426            path,
427            compression,
428        ),
429        Err(e) => {
430            log::error!("Error converting OrderBookDeltas to Arrow: {e:?}");
431        }
432    }
433}
434
435fn batch_and_write_depths(
436    depths: &[OrderBookDepth10],
437    instrument_id: &InstrumentId,
438    date: NaiveDate,
439    path: &Path,
440    compression: Compression,
441) {
442    match book_depth10_to_arrow_record_batch_bytes(depths) {
443        Ok(batch) => write_batch(
444            &batch,
445            "order_book_depths",
446            instrument_id,
447            date,
448            path,
449            compression,
450        ),
451        Err(e) => {
452            log::error!("Error converting OrderBookDepth10 to Arrow: {e:?}");
453        }
454    }
455}
456
457fn batch_and_write_quotes(
458    quotes: &[QuoteTick],
459    instrument_id: &InstrumentId,
460    date: NaiveDate,
461    path: &Path,
462    compression: Compression,
463) {
464    match quotes_to_arrow_record_batch_bytes(quotes) {
465        Ok(batch) => write_batch(&batch, "quote_tick", instrument_id, date, path, compression),
466        Err(e) => {
467            log::error!("Error converting QuoteTick to Arrow: {e:?}");
468        }
469    }
470}
471
472fn batch_and_write_trades(
473    trades: &[TradeTick],
474    instrument_id: &InstrumentId,
475    date: NaiveDate,
476    path: &Path,
477    compression: Compression,
478) {
479    match trades_to_arrow_record_batch_bytes(trades) {
480        Ok(batch) => write_batch(&batch, "trade_tick", instrument_id, date, path, compression),
481        Err(e) => {
482            log::error!("Error converting TradeTick to Arrow: {e:?}");
483        }
484    }
485}
486
487fn batch_and_write_bars(
488    bars: &[Bar],
489    bar_type: &BarType,
490    date: NaiveDate,
491    path: &Path,
492    compression: Compression,
493) {
494    let batch = match bars_to_arrow_record_batch_bytes(bars) {
495        Ok(batch) => batch,
496        Err(e) => {
497            log::error!("Error converting Bar to Arrow: {e:?}");
498            return;
499        }
500    };
501
502    let filepath = path.join(parquet_filepath_bars(bar_type, date));
503    if let Err(e) = write_parquet_local(&batch, &filepath, compression) {
504        log::error!("Error writing {}: {e}", filepath.display());
505    } else {
506        log::info!("File written: {}", filepath.display());
507    }
508}
509
510/// Asserts that the given date is on or after the UNIX epoch (1970-01-01).
511///
512/// # Panics
513///
514/// Panics if the date is before 1970-01-01, as pre-epoch dates cannot be
515/// reliably represented as UnixNanos without overflow issues.
516fn assert_post_epoch(date: NaiveDate) {
517    let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("UNIX epoch must exist");
518    assert!(
519        date >= epoch,
520        "Tardis replay filenames require dates on or after 1970-01-01; received {date}"
521    );
522}
523
524/// Converts an ISO 8601 timestamp to a filesystem-safe format.
525///
526/// This function replaces colons and dots with hyphens to make the timestamp
527/// safe for use in filenames across different filesystems.
528fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
529    iso_timestamp.replace([':', '.'], "-")
530}
531
532/// Converts timestamps to a filename using ISO 8601 format.
533///
534/// This function converts two Unix nanosecond timestamps to a filename that uses
535/// ISO 8601 format with filesystem-safe characters, matching the catalog convention.
536fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
537    let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
538    let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
539
540    format!("{datetime_1}_{datetime_2}.parquet")
541}
542
543fn parquet_filepath(typename: &str, instrument_id: &InstrumentId, date: NaiveDate) -> PathBuf {
544    assert_post_epoch(date);
545
546    let instrument_id_str = instrument_id.to_string().replace('/', "");
547
548    let start_utc = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
549    let end_utc = date.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
550
551    let start_nanos = start_utc
552        .timestamp_nanos_opt()
553        .expect("valid nanosecond timestamp");
554    let end_nanos = (end_utc.and_utc())
555        .timestamp_nanos_opt()
556        .expect("valid nanosecond timestamp");
557
558    let filename = timestamps_to_filename(
559        UnixNanos::from(start_nanos as u64),
560        UnixNanos::from(end_nanos as u64),
561    );
562
563    PathBuf::new()
564        .join(typename)
565        .join(instrument_id_str)
566        .join(filename)
567}
568
569fn parquet_filepath_bars(bar_type: &BarType, date: NaiveDate) -> PathBuf {
570    assert_post_epoch(date);
571
572    let bar_type_str = bar_type.to_string().replace('/', "");
573
574    // Calculate start and end timestamps for the day (UTC)
575    let start_utc = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
576    let end_utc = date.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
577
578    let start_nanos = start_utc
579        .timestamp_nanos_opt()
580        .expect("valid nanosecond timestamp");
581    let end_nanos = (end_utc.and_utc())
582        .timestamp_nanos_opt()
583        .expect("valid nanosecond timestamp");
584
585    let filename = timestamps_to_filename(
586        UnixNanos::from(start_nanos as u64),
587        UnixNanos::from(end_nanos as u64),
588    );
589
590    PathBuf::new().join("bar").join(bar_type_str).join(filename)
591}
592
593fn write_batch(
594    batch: &RecordBatch,
595    typename: &str,
596    instrument_id: &InstrumentId,
597    date: NaiveDate,
598    path: &Path,
599    compression: Compression,
600) {
601    let filepath = path.join(parquet_filepath(typename, instrument_id, date));
602    if let Err(e) = write_parquet_local(batch, &filepath, compression) {
603        log::error!("Error writing {}: {e}", filepath.display());
604    } else {
605        log::info!("File written: {}", filepath.display());
606    }
607}
608
609fn write_parquet_local(
610    batch: &RecordBatch,
611    file_path: &Path,
612    compression: Compression,
613) -> anyhow::Result<()> {
614    if let Some(parent) = file_path.parent() {
615        std::fs::create_dir_all(parent)?;
616    }
617
618    let file = std::fs::File::create(file_path)?;
619    let props = WriterProperties::builder()
620        .set_compression(compression)
621        .build();
622
623    let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
624    writer.write(batch)?;
625    writer.close()?;
626    Ok(())
627}
628
629#[cfg(test)]
630mod tests {
631    use chrono::{TimeZone, Utc};
632    use rstest::rstest;
633
634    use super::*;
635
636    #[rstest]
637    #[case(
638    // Start of day: 2024-01-01 00:00:00 UTC
639    Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
640    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
641    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
642)]
643    #[case(
644    // Midday: 2024-01-01 12:00:00 UTC
645    Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
646    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
647    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
648)]
649    #[case(
650    // End of day: 2024-01-01 23:59:59.999999999 UTC
651    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999,
652    NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
653    Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
654)]
655    #[case(
656    // Start of new day: 2024-01-02 00:00:00 UTC
657    Utc.with_ymd_and_hms(2024, 1, 2, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
658    NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
659    Utc.with_ymd_and_hms(2024, 1, 2, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
660)]
661    fn test_date_cursor(
662        #[case] timestamp: u64,
663        #[case] expected_date: NaiveDate,
664        #[case] expected_end_ns: u64,
665    ) {
666        let unix_nanos = UnixNanos::from(timestamp);
667        let cursor = DateCursor::new(unix_nanos);
668
669        assert_eq!(cursor.date_utc, expected_date);
670        assert_eq!(cursor.end_ns, UnixNanos::from(expected_end_ns));
671    }
672}