1pub mod load;
17mod record;
18pub mod stream;
19
20use std::{
21 ffi::OsStr,
22 fs::File,
23 io::{BufReader, Read, Seek, SeekFrom},
24 path::Path,
25 time::Duration,
26};
27
28use csv::{Reader, ReaderBuilder};
29use flate2::read::GzDecoder;
30pub use load::{
31 load_deltas, load_depth10_from_snapshot5, load_depth10_from_snapshot25, load_funding_rates,
32 load_quotes, load_trades,
33};
34use nautilus_model::{
35 data::{BookOrder, FundingRateUpdate, NULL_ORDER, OrderBookDelta, QuoteTick, TradeTick},
36 enums::{BookAction, OrderSide},
37 identifiers::{InstrumentId, TradeId},
38 types::Quantity,
39};
40use rust_decimal::Decimal;
41pub use stream::{
42 stream_deltas, stream_depth10_from_snapshot5, stream_depth10_from_snapshot25,
43 stream_funding_rates, stream_quotes, stream_trades,
44};
45
46use super::csv::record::{
47 TardisBookUpdateRecord, TardisDerivativeTickerRecord, TardisQuoteRecord, TardisTradeRecord,
48};
49use crate::common::parse::{
50 derive_trade_id, parse_aggressor_side, parse_book_action, parse_instrument_id,
51 parse_order_side, parse_price, parse_timestamp,
52};
53
54fn infer_precision(value: f64) -> u8 {
55 let mut buf = ryu::Buffer::new(); let s = buf.format(value);
57
58 match s.rsplit_once('.') {
59 Some((_, frac)) if frac != "0" => frac.len() as u8,
60 _ => 0,
61 }
62}
63
64fn create_csv_reader<P: AsRef<Path>>(
65 filepath: P,
66) -> anyhow::Result<Reader<Box<dyn std::io::Read>>> {
67 const MAX_RETRIES: u8 = 3;
68 const DELAY_MS: u64 = 100;
69 const BUFFER_SIZE: usize = 8 * 1024 * 1024; fn open_file_with_retry<P: AsRef<Path>>(
72 path: P,
73 max_retries: u8,
74 delay_ms: u64,
75 ) -> anyhow::Result<File> {
76 let path_ref = path.as_ref();
77 for attempt in 1..=max_retries {
78 match File::open(path_ref) {
79 Ok(file) => return Ok(file),
80 Err(e) => {
81 if attempt == max_retries {
82 anyhow::bail!(
83 "Failed to open file '{}' after {max_retries} attempts: {e}",
84 path_ref.display()
85 );
86 }
87 log::warn!(
88 "Attempt {attempt}/{max_retries} failed to open file '{}': {e}. Retrying after {delay_ms}ms...",
89 path_ref.display()
90 );
91 std::thread::sleep(Duration::from_millis(delay_ms));
92 }
93 }
94 }
95 unreachable!("Loop should return either Ok or Err");
96 }
97
98 let filepath_ref = filepath.as_ref();
99 let mut file = open_file_with_retry(filepath_ref, MAX_RETRIES, DELAY_MS)?;
100
101 let is_gzipped = filepath_ref
102 .extension()
103 .and_then(OsStr::to_str)
104 .is_some_and(|ext| ext.eq_ignore_ascii_case("gz"));
105
106 if !is_gzipped {
107 let buf_reader = BufReader::with_capacity(BUFFER_SIZE, file);
108 return Ok(ReaderBuilder::new()
109 .has_headers(true)
110 .buffer_capacity(1024 * 1024) .from_reader(Box::new(buf_reader)));
112 }
113
114 let file_size = file.metadata()?.len();
115 if file_size < 2 {
116 anyhow::bail!("File too small to be a valid gzip file");
117 }
118
119 let mut header_buf = [0u8; 2];
120 for attempt in 1..=MAX_RETRIES {
121 match file.read_exact(&mut header_buf) {
122 Ok(()) => break,
123 Err(e) => {
124 if attempt == MAX_RETRIES {
125 anyhow::bail!(
126 "Failed to read gzip header from '{}' after {MAX_RETRIES} attempts: {e}",
127 filepath_ref.display()
128 );
129 }
130 log::warn!(
131 "Attempt {attempt}/{MAX_RETRIES} failed to read header from '{}': {e}. Retrying after {DELAY_MS}ms...",
132 filepath_ref.display()
133 );
134 std::thread::sleep(Duration::from_millis(DELAY_MS));
135 }
136 }
137 }
138
139 if header_buf[0] != 0x1f || header_buf[1] != 0x8b {
140 anyhow::bail!(
141 "File '{}' has .gz extension but invalid gzip header",
142 filepath_ref.display()
143 );
144 }
145
146 for attempt in 1..=MAX_RETRIES {
147 match file.seek(SeekFrom::Start(0)) {
148 Ok(_) => break,
149 Err(e) => {
150 if attempt == MAX_RETRIES {
151 anyhow::bail!(
152 "Failed to reset file position for '{}' after {MAX_RETRIES} attempts: {e}",
153 filepath_ref.display()
154 );
155 }
156 log::warn!(
157 "Attempt {attempt}/{MAX_RETRIES} failed to seek in '{}': {e}. Retrying after {DELAY_MS}ms...",
158 filepath_ref.display()
159 );
160 std::thread::sleep(Duration::from_millis(DELAY_MS));
161 }
162 }
163 }
164
165 let buf_reader = BufReader::with_capacity(BUFFER_SIZE, file);
166 let decoder = GzDecoder::new(buf_reader);
167
168 Ok(ReaderBuilder::new()
169 .has_headers(true)
170 .buffer_capacity(1024 * 1024) .from_reader(Box::new(decoder)))
172}
173
174fn create_book_order(
175 side: OrderSide,
176 price: Option<f64>,
177 amount: Option<f64>,
178 price_precision: u8,
179 size_precision: u8,
180) -> (BookOrder, u32) {
181 match price {
182 Some(price) => (
183 BookOrder::new(
184 side,
185 parse_price(price, price_precision),
186 Quantity::new(amount.unwrap_or(0.0), size_precision),
187 0,
188 ),
189 1, ),
191 None => (NULL_ORDER, 0), }
193}
194
195fn parse_delta_record(
196 data: &TardisBookUpdateRecord,
197 price_precision: u8,
198 size_precision: u8,
199 instrument_id: Option<InstrumentId>,
200) -> anyhow::Result<OrderBookDelta> {
201 let instrument_id = match instrument_id {
202 Some(id) => id,
203 None => parse_instrument_id(&data.exchange, data.symbol),
204 };
205
206 let side = parse_order_side(&data.side);
207 let price = parse_price(data.price, price_precision);
208 let size = Quantity::new(data.amount, size_precision);
209 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
211
212 let action = parse_book_action(data.is_snapshot, size.as_f64());
213 let flags = 0; let sequence = 0; let ts_event = parse_timestamp(data.timestamp);
216 let ts_init = parse_timestamp(data.local_timestamp);
217
218 anyhow::ensure!(
219 !(action != BookAction::Delete && size.is_zero()),
220 "Invalid delta: action {action} when size zero, check size_precision ({size_precision}) vs data; {data:?}"
221 );
222
223 Ok(OrderBookDelta::new(
224 instrument_id,
225 action,
226 order,
227 flags,
228 sequence,
229 ts_event,
230 ts_init,
231 ))
232}
233
234fn parse_quote_record(
235 data: &TardisQuoteRecord,
236 price_precision: u8,
237 size_precision: u8,
238 instrument_id: Option<InstrumentId>,
239) -> QuoteTick {
240 let instrument_id = match instrument_id {
241 Some(id) => id,
242 None => parse_instrument_id(&data.exchange, data.symbol),
243 };
244
245 let bid_price = parse_price(data.bid_price.unwrap_or(0.0), price_precision);
246 let ask_price = parse_price(data.ask_price.unwrap_or(0.0), price_precision);
247 let bid_size = Quantity::new(data.bid_amount.unwrap_or(0.0), size_precision);
248 let ask_size = Quantity::new(data.ask_amount.unwrap_or(0.0), size_precision);
249 let ts_event = parse_timestamp(data.timestamp);
250 let ts_init = parse_timestamp(data.local_timestamp);
251
252 QuoteTick::new(
253 instrument_id,
254 bid_price,
255 ask_price,
256 bid_size,
257 ask_size,
258 ts_event,
259 ts_init,
260 )
261}
262
263fn parse_trade_record(
264 data: &TardisTradeRecord,
265 size: Quantity,
266 price_precision: u8,
267 instrument_id: Option<InstrumentId>,
268) -> TradeTick {
269 let instrument_id = match instrument_id {
270 Some(id) => id,
271 None => parse_instrument_id(&data.exchange, data.symbol),
272 };
273
274 let price = parse_price(data.price, price_precision);
275 let aggressor_side = parse_aggressor_side(&data.side);
276 let ts_event = parse_timestamp(data.timestamp);
277 let ts_init = parse_timestamp(data.local_timestamp);
278 let trade_id = if data.id.is_empty() {
279 derive_trade_id(
280 data.symbol,
281 ts_event.as_u64(),
282 data.price,
283 data.amount,
284 &data.side,
285 )
286 } else {
287 TradeId::new(&data.id)
288 };
289
290 TradeTick::new(
291 instrument_id,
292 price,
293 size,
294 aggressor_side,
295 trade_id,
296 ts_event,
297 ts_init,
298 )
299}
300
301fn parse_derivative_ticker_record(
302 data: &TardisDerivativeTickerRecord,
303 instrument_id: Option<InstrumentId>,
304) -> Option<FundingRateUpdate> {
305 let funding_rate = data.funding_rate?;
307
308 let instrument_id = match instrument_id {
309 Some(id) => id,
310 None => parse_instrument_id(&data.exchange, data.symbol),
311 };
312
313 let rate = Decimal::try_from(funding_rate).ok()?;
314 let next_funding_ns = if data.predicted_funding_rate.is_some() {
315 data.funding_timestamp.map(parse_timestamp)
316 } else {
317 None
318 };
319 let ts_event = parse_timestamp(data.timestamp);
320 let ts_init = parse_timestamp(data.local_timestamp);
321
322 Some(FundingRateUpdate::new(
323 instrument_id,
324 rate,
325 None,
326 next_funding_ns,
327 ts_event,
328 ts_init,
329 ))
330}