Skip to main content

nautilus_tardis/
config.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use parquet::basic::{Compression, ZstdLevel};
17use serde::{Deserialize, Serialize};
18
19use super::machine::types::{ReplayNormalizedRequestOptions, StreamNormalizedRequestOptions};
20
21/// Determines the output format for Tardis `book_snapshot_*` messages.
22#[derive(Debug, Clone, Default, Serialize, Deserialize)]
23#[serde(rename_all = "snake_case")]
24pub enum BookSnapshotOutput {
25    /// Convert book snapshots to `OrderBookDeltas` and write to `order_book_deltas/`.
26    #[default]
27    Deltas,
28    /// Convert book snapshots to `OrderBookDepth10` and write to `order_book_depths/`.
29    Depth10,
30}
31
32/// Determines the compression codec for Parquet files written by Tardis replay.
33#[derive(Debug, Clone, Default, Serialize, Deserialize)]
34#[serde(rename_all = "snake_case")]
35pub enum ParquetCompression {
36    /// Use Zstandard compression with level 3.
37    #[default]
38    Zstd,
39    /// Use Snappy compression.
40    Snappy,
41    /// Write uncompressed Parquet files.
42    Uncompressed,
43}
44
45impl ParquetCompression {
46    /// Converts the replay config compression value to a Parquet compression value.
47    ///
48    /// # Panics
49    ///
50    /// Panics if the hard-coded Zstandard level 3 is rejected by the Parquet crate.
51    #[must_use]
52    pub fn as_parquet_compression(&self) -> Compression {
53        match self {
54            Self::Zstd => {
55                let level = ZstdLevel::try_new(3).expect("zstd level 3 is valid");
56                Compression::ZSTD(level)
57            }
58            Self::Snappy => Compression::SNAPPY,
59            Self::Uncompressed => Compression::UNCOMPRESSED,
60        }
61    }
62}
63
64/// Provides a configuration for a Tardis Machine -> Nautilus data -> Parquet replay run.
65#[derive(Debug, Clone, Serialize, Deserialize, bon::Builder)]
66#[serde(deny_unknown_fields)]
67pub struct TardisReplayConfig {
68    /// The Tardis Machine websocket url.
69    pub tardis_ws_url: Option<String>,
70    /// If symbols should be normalized with Nautilus conventions.
71    pub normalize_symbols: Option<bool>,
72    /// The output directory for writing Nautilus format Parquet files.
73    pub output_path: Option<String>,
74    /// The Tardis Machine replay options.
75    #[builder(default)]
76    pub options: Vec<ReplayNormalizedRequestOptions>,
77    /// Optional proxy URL for the Tardis HTTP API client.
78    /// The Tardis Machine WebSocket transport does not yet support proxying.
79    pub proxy_url: Option<String>,
80    /// The output format for `book_snapshot_*` messages.
81    ///
82    /// - `deltas`: Convert to `OrderBookDeltas` and write to `order_book_deltas/` (default).
83    /// - `depth10`: Convert to `OrderBookDepth10` and write to `order_book_depths/`.
84    pub book_snapshot_output: Option<BookSnapshotOutput>,
85    /// The compression codec for written data files.
86    ///
87    /// - `zstd`: Use Zstandard compression level 3 (default).
88    /// - `snappy`: Use Snappy compression.
89    /// - `uncompressed`: Write uncompressed Parquet files.
90    pub compression: Option<ParquetCompression>,
91}
92
93/// Configuration for the Tardis data client.
94#[derive(Clone, Debug, bon::Builder)]
95#[cfg_attr(
96    feature = "python",
97    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.tardis", from_py_object)
98)]
99#[cfg_attr(
100    feature = "python",
101    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.tardis")
102)]
103pub struct TardisDataClientConfig {
104    /// Tardis API key for HTTP instrument fetching.
105    /// Falls back to `TARDIS_API_KEY` env var if not set.
106    pub api_key: Option<String>,
107    /// Tardis Machine Server WebSocket URL.
108    /// Falls back to `TARDIS_MACHINE_WS_URL` env var if not set.
109    pub tardis_ws_url: Option<String>,
110    /// Optional proxy URL for the Tardis HTTP API client.
111    /// The Tardis Machine WebSocket transport does not yet support proxying.
112    pub proxy_url: Option<String>,
113    /// Whether to normalize symbols to Nautilus conventions.
114    #[builder(default = true)]
115    pub normalize_symbols: bool,
116    /// Output format for `book_snapshot_*` messages.
117    #[builder(default)]
118    pub book_snapshot_output: BookSnapshotOutput,
119    /// Replay options defining exchanges, symbols, date ranges, and data types.
120    /// When non-empty the client connects to `ws-replay-normalized`.
121    #[builder(default)]
122    pub options: Vec<ReplayNormalizedRequestOptions>,
123    /// Live stream options defining exchanges, symbols, and data types.
124    /// When non-empty (and `options` is empty) the client connects to
125    /// `ws-stream-normalized` with automatic reconnection.
126    #[builder(default)]
127    pub stream_options: Vec<StreamNormalizedRequestOptions>,
128}
129
130impl Default for TardisDataClientConfig {
131    fn default() -> Self {
132        Self::builder().build()
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use rstest::rstest;
139
140    use super::*;
141
142    #[rstest]
143    fn test_default_config_values() {
144        let config = TardisDataClientConfig::default();
145        assert!(config.api_key.is_none());
146        assert!(config.tardis_ws_url.is_none());
147        assert!(config.proxy_url.is_none());
148        assert!(config.normalize_symbols);
149        assert!(matches!(
150            config.book_snapshot_output,
151            BookSnapshotOutput::Deltas
152        ));
153        assert!(config.options.is_empty());
154        assert!(config.stream_options.is_empty());
155    }
156
157    #[rstest]
158    fn test_book_snapshot_output_default_is_deltas() {
159        assert!(matches!(
160            BookSnapshotOutput::default(),
161            BookSnapshotOutput::Deltas
162        ));
163    }
164
165    #[rstest]
166    fn test_book_snapshot_output_serde_roundtrip_deltas() {
167        let json = serde_json::to_string(&BookSnapshotOutput::Deltas).unwrap();
168        assert_eq!(json, "\"deltas\"");
169
170        let deserialized: BookSnapshotOutput = serde_json::from_str(&json).unwrap();
171        assert!(matches!(deserialized, BookSnapshotOutput::Deltas));
172    }
173
174    #[rstest]
175    fn test_book_snapshot_output_serde_roundtrip_depth10() {
176        let json = serde_json::to_string(&BookSnapshotOutput::Depth10).unwrap();
177        assert_eq!(json, "\"depth10\"");
178
179        let deserialized: BookSnapshotOutput = serde_json::from_str(&json).unwrap();
180        assert!(matches!(deserialized, BookSnapshotOutput::Depth10));
181    }
182
183    #[rstest]
184    fn test_parquet_compression_default_is_zstd() {
185        assert!(matches!(
186            ParquetCompression::default(),
187            ParquetCompression::Zstd
188        ));
189        assert!(matches!(
190            ParquetCompression::default().as_parquet_compression(),
191            Compression::ZSTD(_)
192        ));
193    }
194
195    #[rstest]
196    fn test_parquet_compression_serde_roundtrip() {
197        let cases = [
198            (ParquetCompression::Zstd, "\"zstd\""),
199            (ParquetCompression::Snappy, "\"snappy\""),
200            (ParquetCompression::Uncompressed, "\"uncompressed\""),
201        ];
202
203        for (compression, expected_json) in cases {
204            let json = serde_json::to_string(&compression).unwrap();
205            assert_eq!(json, expected_json);
206
207            let deserialized: ParquetCompression = serde_json::from_str(&json).unwrap();
208            assert_eq!(
209                compression.as_parquet_compression(),
210                deserialized.as_parquet_compression()
211            );
212        }
213    }
214
215    #[rstest]
216    fn test_replay_config_deserializes_compression() {
217        let json = r#"{
218            "tardis_ws_url": null,
219            "normalize_symbols": true,
220            "output_path": null,
221            "options": [],
222            "proxy_url": null,
223            "book_snapshot_output": "depth10",
224            "compression": "zstd"
225        }"#;
226
227        let config: TardisReplayConfig = serde_json::from_str(json).unwrap();
228
229        assert!(matches!(
230            config.book_snapshot_output,
231            Some(BookSnapshotOutput::Depth10)
232        ));
233        assert!(matches!(config.compression, Some(ParquetCompression::Zstd)));
234    }
235}