1use std::{
17 fs,
18 path::{Path, PathBuf},
19};
20
21use ahash::{AHashMap, AHashSet};
22use anyhow::Context;
23use arrow::record_batch::RecordBatch;
24use chrono::{DateTime, Duration, NaiveDate};
25use futures_util::{StreamExt, pin_mut};
26use nautilus_core::{UnixNanos, datetime::unix_nanos_to_iso8601, string::formatting::Separable};
27use nautilus_model::{
28 data::{
29 Bar, BarType, Data, OrderBookDelta, OrderBookDeltas_API, OrderBookDepth10, QuoteTick,
30 TradeTick,
31 },
32 identifiers::InstrumentId,
33};
34use nautilus_serialization::arrow::{
35 bars_to_arrow_record_batch_bytes, book_deltas_to_arrow_record_batch_bytes,
36 book_depth10_to_arrow_record_batch_bytes, quotes_to_arrow_record_batch_bytes,
37 trades_to_arrow_record_batch_bytes,
38};
39use parquet::{arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties};
40
41use crate::{
42 config::{BookSnapshotOutput, ParquetCompression, TardisReplayConfig},
43 http::TardisHttpClient,
44 machine::TardisMachineClient,
45};
46
47struct DateCursor {
48 date_utc: NaiveDate,
50 end_ns: UnixNanos,
52}
53
54impl DateCursor {
55 fn new(current_ns: UnixNanos) -> Self {
57 let current_utc = DateTime::from_timestamp_nanos(current_ns.as_i64());
58 let date_utc = current_utc.date_naive();
59
60 let end_utc =
62 date_utc.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
63 let end_ns = UnixNanos::from(end_utc.and_utc().timestamp_nanos_opt().unwrap() as u64);
64
65 Self { date_utc, end_ns }
66 }
67}
68
69pub async fn run_tardis_machine_replay_from_config(config_filepath: &Path) -> anyhow::Result<()> {
80 log::info!("Starting replay");
81 log::info!("Config filepath: {}", config_filepath.display());
82
83 let config_data = fs::read_to_string(config_filepath)
85 .with_context(|| format!("Failed to read config file: {}", config_filepath.display()))?;
86 let config: TardisReplayConfig = serde_json::from_str(&config_data)
87 .context("failed to parse config JSON into TardisReplayConfig")?;
88
89 let path = config
90 .output_path
91 .as_deref()
92 .map(Path::new)
93 .map(Path::to_path_buf)
94 .or_else(|| {
95 std::env::var("NAUTILUS_PATH")
96 .ok()
97 .map(|env_path| PathBuf::from(env_path).join("catalog").join("data"))
98 })
99 .unwrap_or_else(|| std::env::current_dir().expect("Failed to get current directory"));
100
101 log::info!("Output path: {}", path.display());
102
103 let normalize_symbols = config.normalize_symbols.unwrap_or(true);
104 log::info!("normalize_symbols={normalize_symbols}");
105
106 let book_snapshot_output = config
107 .book_snapshot_output
108 .clone()
109 .unwrap_or(BookSnapshotOutput::Deltas);
110 log::info!("book_snapshot_output={book_snapshot_output:?}");
111
112 let compression = config
113 .compression
114 .clone()
115 .unwrap_or(ParquetCompression::Zstd);
116 log::info!("compression={compression:?}");
117 let compression = compression.as_parquet_compression();
118
119 let http_client = TardisHttpClient::new(
120 None,
121 None,
122 None,
123 normalize_symbols,
124 config.proxy_url.clone(),
125 )?;
126 let mut machine_client = TardisMachineClient::new(
127 config.tardis_ws_url.as_deref(),
128 normalize_symbols,
129 book_snapshot_output,
130 )?;
131
132 let exchanges: AHashSet<_> = config.options.iter().map(|opt| opt.exchange).collect();
133 let (instrument_map, _instruments) = http_client
134 .bootstrap_instruments(&exchanges)
135 .await
136 .context("failed to bootstrap instruments")?;
137
138 for (_, info) in &instrument_map {
139 machine_client.add_instrument_info((**info).clone());
140 }
141
142 log::info!("Starting tardis-machine stream");
143 let stream = machine_client.replay(config.options).await?;
144 pin_mut!(stream);
145
146 let mut deltas_cursors: AHashMap<InstrumentId, DateCursor> = AHashMap::new();
148 let mut depths_cursors: AHashMap<InstrumentId, DateCursor> = AHashMap::new();
149 let mut quotes_cursors: AHashMap<InstrumentId, DateCursor> = AHashMap::new();
150 let mut trades_cursors: AHashMap<InstrumentId, DateCursor> = AHashMap::new();
151 let mut bars_cursors: AHashMap<BarType, DateCursor> = AHashMap::new();
152
153 let mut deltas_map: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
155 let mut depths_map: AHashMap<InstrumentId, Vec<OrderBookDepth10>> = AHashMap::new();
156 let mut quotes_map: AHashMap<InstrumentId, Vec<QuoteTick>> = AHashMap::new();
157 let mut trades_map: AHashMap<InstrumentId, Vec<TradeTick>> = AHashMap::new();
158 let mut bars_map: AHashMap<BarType, Vec<Bar>> = AHashMap::new();
159
160 let mut msg_count = 0;
161
162 while let Some(result) = stream.next().await {
163 match result {
164 Ok(msg) => {
165 match msg {
166 Data::Deltas(msg) => {
167 handle_deltas_msg(
168 &msg,
169 &mut deltas_map,
170 &mut deltas_cursors,
171 &path,
172 compression,
173 );
174 }
175 Data::Depth10(msg) => {
176 handle_depth10_msg(
177 *msg,
178 &mut depths_map,
179 &mut depths_cursors,
180 &path,
181 compression,
182 );
183 }
184 Data::Quote(msg) => {
185 handle_quote_msg(
186 msg,
187 &mut quotes_map,
188 &mut quotes_cursors,
189 &path,
190 compression,
191 );
192 }
193 Data::Trade(msg) => {
194 handle_trade_msg(
195 msg,
196 &mut trades_map,
197 &mut trades_cursors,
198 &path,
199 compression,
200 );
201 }
202 Data::Bar(msg) => {
203 handle_bar_msg(msg, &mut bars_map, &mut bars_cursors, &path, compression);
204 }
205 Data::Delta(delta) => {
206 log::warn!(
207 "Skipping individual delta message for {} (use Deltas batch instead)",
208 delta.instrument_id
209 );
210 }
211 Data::MarkPriceUpdate(_)
212 | Data::IndexPriceUpdate(_)
213 | Data::InstrumentStatus(_)
214 | Data::InstrumentClose(_)
215 | Data::Custom(_) => {
216 log::debug!(
217 "Skipping unsupported data type for instrument {}",
218 msg.instrument_id()
219 );
220 }
221 }
222
223 msg_count += 1;
224 if msg_count % 100_000 == 0 {
225 log::debug!("Processed {} messages", msg_count.separate_with_commas());
226 }
227 }
228 Err(e) => {
229 log::error!("Stream error: {e:?}");
230 break;
231 }
232 }
233 }
234
235 for (instrument_id, deltas) in &deltas_map {
238 let cursor = deltas_cursors.get(instrument_id).expect("Expected cursor");
239 batch_and_write_deltas(deltas, instrument_id, cursor.date_utc, &path, compression);
240 }
241
242 for (instrument_id, depths) in &depths_map {
243 let cursor = depths_cursors.get(instrument_id).expect("Expected cursor");
244 batch_and_write_depths(depths, instrument_id, cursor.date_utc, &path, compression);
245 }
246
247 for (instrument_id, quotes) in "es_map {
248 let cursor = quotes_cursors.get(instrument_id).expect("Expected cursor");
249 batch_and_write_quotes(quotes, instrument_id, cursor.date_utc, &path, compression);
250 }
251
252 for (instrument_id, trades) in &trades_map {
253 let cursor = trades_cursors.get(instrument_id).expect("Expected cursor");
254 batch_and_write_trades(trades, instrument_id, cursor.date_utc, &path, compression);
255 }
256
257 for (bar_type, bars) in &bars_map {
258 let cursor = bars_cursors.get(bar_type).expect("Expected cursor");
259 batch_and_write_bars(bars, bar_type, cursor.date_utc, &path, compression);
260 }
261
262 log::info!(
263 "Replay completed after {} messages",
264 msg_count.separate_with_commas()
265 );
266 Ok(())
267}
268
269fn handle_deltas_msg(
270 deltas: &OrderBookDeltas_API,
271 map: &mut AHashMap<InstrumentId, Vec<OrderBookDelta>>,
272 cursors: &mut AHashMap<InstrumentId, DateCursor>,
273 path: &Path,
274 compression: Compression,
275) {
276 let cursor = cursors
277 .entry(deltas.instrument_id)
278 .or_insert_with(|| DateCursor::new(deltas.ts_init));
279
280 if deltas.ts_init > cursor.end_ns {
281 if let Some(deltas_vec) = map.remove(&deltas.instrument_id) {
282 batch_and_write_deltas(
283 &deltas_vec,
284 &deltas.instrument_id,
285 cursor.date_utc,
286 path,
287 compression,
288 );
289 }
290 *cursor = DateCursor::new(deltas.ts_init);
292 }
293
294 map.entry(deltas.instrument_id)
295 .or_insert_with(|| Vec::with_capacity(100_000))
296 .extend(&*deltas.deltas);
297}
298
299fn handle_depth10_msg(
300 depth10: OrderBookDepth10,
301 map: &mut AHashMap<InstrumentId, Vec<OrderBookDepth10>>,
302 cursors: &mut AHashMap<InstrumentId, DateCursor>,
303 path: &Path,
304 compression: Compression,
305) {
306 let cursor = cursors
307 .entry(depth10.instrument_id)
308 .or_insert_with(|| DateCursor::new(depth10.ts_init));
309
310 if depth10.ts_init > cursor.end_ns {
311 if let Some(depths_vec) = map.remove(&depth10.instrument_id) {
312 batch_and_write_depths(
313 &depths_vec,
314 &depth10.instrument_id,
315 cursor.date_utc,
316 path,
317 compression,
318 );
319 }
320 *cursor = DateCursor::new(depth10.ts_init);
322 }
323
324 map.entry(depth10.instrument_id)
325 .or_insert_with(|| Vec::with_capacity(100_000))
326 .push(depth10);
327}
328
329fn handle_quote_msg(
330 quote: QuoteTick,
331 map: &mut AHashMap<InstrumentId, Vec<QuoteTick>>,
332 cursors: &mut AHashMap<InstrumentId, DateCursor>,
333 path: &Path,
334 compression: Compression,
335) {
336 let cursor = cursors
337 .entry(quote.instrument_id)
338 .or_insert_with(|| DateCursor::new(quote.ts_init));
339
340 if quote.ts_init > cursor.end_ns {
341 if let Some(quotes_vec) = map.remove("e.instrument_id) {
342 batch_and_write_quotes(
343 "es_vec,
344 "e.instrument_id,
345 cursor.date_utc,
346 path,
347 compression,
348 );
349 }
350 *cursor = DateCursor::new(quote.ts_init);
352 }
353
354 map.entry(quote.instrument_id)
355 .or_insert_with(|| Vec::with_capacity(100_000))
356 .push(quote);
357}
358
359fn handle_trade_msg(
360 trade: TradeTick,
361 map: &mut AHashMap<InstrumentId, Vec<TradeTick>>,
362 cursors: &mut AHashMap<InstrumentId, DateCursor>,
363 path: &Path,
364 compression: Compression,
365) {
366 let cursor = cursors
367 .entry(trade.instrument_id)
368 .or_insert_with(|| DateCursor::new(trade.ts_init));
369
370 if trade.ts_init > cursor.end_ns {
371 if let Some(trades_vec) = map.remove(&trade.instrument_id) {
372 batch_and_write_trades(
373 &trades_vec,
374 &trade.instrument_id,
375 cursor.date_utc,
376 path,
377 compression,
378 );
379 }
380 *cursor = DateCursor::new(trade.ts_init);
382 }
383
384 map.entry(trade.instrument_id)
385 .or_insert_with(|| Vec::with_capacity(100_000))
386 .push(trade);
387}
388
389fn handle_bar_msg(
390 bar: Bar,
391 map: &mut AHashMap<BarType, Vec<Bar>>,
392 cursors: &mut AHashMap<BarType, DateCursor>,
393 path: &Path,
394 compression: Compression,
395) {
396 let cursor = cursors
397 .entry(bar.bar_type)
398 .or_insert_with(|| DateCursor::new(bar.ts_init));
399
400 if bar.ts_init > cursor.end_ns {
401 if let Some(bars_vec) = map.remove(&bar.bar_type) {
402 batch_and_write_bars(&bars_vec, &bar.bar_type, cursor.date_utc, path, compression);
403 }
404 *cursor = DateCursor::new(bar.ts_init);
406 }
407
408 map.entry(bar.bar_type)
409 .or_insert_with(|| Vec::with_capacity(100_000))
410 .push(bar);
411}
412
413fn batch_and_write_deltas(
414 deltas: &[OrderBookDelta],
415 instrument_id: &InstrumentId,
416 date: NaiveDate,
417 path: &Path,
418 compression: Compression,
419) {
420 match book_deltas_to_arrow_record_batch_bytes(deltas) {
421 Ok(batch) => write_batch(
422 &batch,
423 "order_book_deltas",
424 instrument_id,
425 date,
426 path,
427 compression,
428 ),
429 Err(e) => {
430 log::error!("Error converting OrderBookDeltas to Arrow: {e:?}");
431 }
432 }
433}
434
435fn batch_and_write_depths(
436 depths: &[OrderBookDepth10],
437 instrument_id: &InstrumentId,
438 date: NaiveDate,
439 path: &Path,
440 compression: Compression,
441) {
442 match book_depth10_to_arrow_record_batch_bytes(depths) {
443 Ok(batch) => write_batch(
444 &batch,
445 "order_book_depths",
446 instrument_id,
447 date,
448 path,
449 compression,
450 ),
451 Err(e) => {
452 log::error!("Error converting OrderBookDepth10 to Arrow: {e:?}");
453 }
454 }
455}
456
457fn batch_and_write_quotes(
458 quotes: &[QuoteTick],
459 instrument_id: &InstrumentId,
460 date: NaiveDate,
461 path: &Path,
462 compression: Compression,
463) {
464 match quotes_to_arrow_record_batch_bytes(quotes) {
465 Ok(batch) => write_batch(&batch, "quote_tick", instrument_id, date, path, compression),
466 Err(e) => {
467 log::error!("Error converting QuoteTick to Arrow: {e:?}");
468 }
469 }
470}
471
472fn batch_and_write_trades(
473 trades: &[TradeTick],
474 instrument_id: &InstrumentId,
475 date: NaiveDate,
476 path: &Path,
477 compression: Compression,
478) {
479 match trades_to_arrow_record_batch_bytes(trades) {
480 Ok(batch) => write_batch(&batch, "trade_tick", instrument_id, date, path, compression),
481 Err(e) => {
482 log::error!("Error converting TradeTick to Arrow: {e:?}");
483 }
484 }
485}
486
487fn batch_and_write_bars(
488 bars: &[Bar],
489 bar_type: &BarType,
490 date: NaiveDate,
491 path: &Path,
492 compression: Compression,
493) {
494 let batch = match bars_to_arrow_record_batch_bytes(bars) {
495 Ok(batch) => batch,
496 Err(e) => {
497 log::error!("Error converting Bar to Arrow: {e:?}");
498 return;
499 }
500 };
501
502 let filepath = path.join(parquet_filepath_bars(bar_type, date));
503 if let Err(e) = write_parquet_local(&batch, &filepath, compression) {
504 log::error!("Error writing {}: {e}", filepath.display());
505 } else {
506 log::info!("File written: {}", filepath.display());
507 }
508}
509
510fn assert_post_epoch(date: NaiveDate) {
517 let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).expect("UNIX epoch must exist");
518 assert!(
519 date >= epoch,
520 "Tardis replay filenames require dates on or after 1970-01-01; received {date}"
521 );
522}
523
524fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
529 iso_timestamp.replace([':', '.'], "-")
530}
531
532fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
537 let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
538 let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
539
540 format!("{datetime_1}_{datetime_2}.parquet")
541}
542
543fn parquet_filepath(typename: &str, instrument_id: &InstrumentId, date: NaiveDate) -> PathBuf {
544 assert_post_epoch(date);
545
546 let instrument_id_str = instrument_id.to_string().replace('/', "");
547
548 let start_utc = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
549 let end_utc = date.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
550
551 let start_nanos = start_utc
552 .timestamp_nanos_opt()
553 .expect("valid nanosecond timestamp");
554 let end_nanos = (end_utc.and_utc())
555 .timestamp_nanos_opt()
556 .expect("valid nanosecond timestamp");
557
558 let filename = timestamps_to_filename(
559 UnixNanos::from(start_nanos as u64),
560 UnixNanos::from(end_nanos as u64),
561 );
562
563 PathBuf::new()
564 .join(typename)
565 .join(instrument_id_str)
566 .join(filename)
567}
568
569fn parquet_filepath_bars(bar_type: &BarType, date: NaiveDate) -> PathBuf {
570 assert_post_epoch(date);
571
572 let bar_type_str = bar_type.to_string().replace('/', "");
573
574 let start_utc = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
576 let end_utc = date.and_hms_opt(23, 59, 59).unwrap() + Duration::nanoseconds(999_999_999);
577
578 let start_nanos = start_utc
579 .timestamp_nanos_opt()
580 .expect("valid nanosecond timestamp");
581 let end_nanos = (end_utc.and_utc())
582 .timestamp_nanos_opt()
583 .expect("valid nanosecond timestamp");
584
585 let filename = timestamps_to_filename(
586 UnixNanos::from(start_nanos as u64),
587 UnixNanos::from(end_nanos as u64),
588 );
589
590 PathBuf::new().join("bar").join(bar_type_str).join(filename)
591}
592
593fn write_batch(
594 batch: &RecordBatch,
595 typename: &str,
596 instrument_id: &InstrumentId,
597 date: NaiveDate,
598 path: &Path,
599 compression: Compression,
600) {
601 let filepath = path.join(parquet_filepath(typename, instrument_id, date));
602 if let Err(e) = write_parquet_local(batch, &filepath, compression) {
603 log::error!("Error writing {}: {e}", filepath.display());
604 } else {
605 log::info!("File written: {}", filepath.display());
606 }
607}
608
609fn write_parquet_local(
610 batch: &RecordBatch,
611 file_path: &Path,
612 compression: Compression,
613) -> anyhow::Result<()> {
614 if let Some(parent) = file_path.parent() {
615 std::fs::create_dir_all(parent)?;
616 }
617
618 let file = std::fs::File::create(file_path)?;
619 let props = WriterProperties::builder()
620 .set_compression(compression)
621 .build();
622
623 let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))?;
624 writer.write(batch)?;
625 writer.close()?;
626 Ok(())
627}
628
629#[cfg(test)]
630mod tests {
631 use chrono::{TimeZone, Utc};
632 use rstest::rstest;
633
634 use super::*;
635
636 #[rstest]
637 #[case(
638 Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
640 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
641 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
642)]
643 #[case(
644 Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
646 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
647 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
648)]
649 #[case(
650 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999,
652 NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
653 Utc.with_ymd_and_hms(2024, 1, 1, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
654)]
655 #[case(
656 Utc.with_ymd_and_hms(2024, 1, 2, 0, 0, 0).unwrap().timestamp_nanos_opt().unwrap() as u64,
658 NaiveDate::from_ymd_opt(2024, 1, 2).unwrap(),
659 Utc.with_ymd_and_hms(2024, 1, 2, 23, 59, 59).unwrap().timestamp_nanos_opt().unwrap() as u64 + 999_999_999
660)]
661 fn test_date_cursor(
662 #[case] timestamp: u64,
663 #[case] expected_date: NaiveDate,
664 #[case] expected_end_ns: u64,
665 ) {
666 let unix_nanos = UnixNanos::from(timestamp);
667 let cursor = DateCursor::new(unix_nanos);
668
669 assert_eq!(cursor.date_utc, expected_date);
670 assert_eq!(cursor.end_ns, UnixNanos::from(expected_end_ns));
671 }
672}