Skip to main content

nautilus_tardis/csv/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16pub mod load;
17mod record;
18pub mod stream;
19
20use std::{
21    ffi::OsStr,
22    fs::File,
23    io::{BufReader, Read, Seek, SeekFrom},
24    path::Path,
25    time::Duration,
26};
27
28use csv::{Reader, ReaderBuilder};
29use flate2::read::GzDecoder;
30pub use load::{
31    load_deltas, load_depth10_from_snapshot5, load_depth10_from_snapshot25, load_funding_rates,
32    load_quotes, load_trades,
33};
34use nautilus_model::{
35    data::{BookOrder, FundingRateUpdate, NULL_ORDER, OrderBookDelta, QuoteTick, TradeTick},
36    enums::{BookAction, OrderSide},
37    identifiers::{InstrumentId, TradeId},
38    types::Quantity,
39};
40use rust_decimal::Decimal;
41pub use stream::{
42    stream_deltas, stream_depth10_from_snapshot5, stream_depth10_from_snapshot25,
43    stream_funding_rates, stream_quotes, stream_trades,
44};
45
46use super::csv::record::{
47    TardisBookUpdateRecord, TardisDerivativeTickerRecord, TardisQuoteRecord, TardisTradeRecord,
48};
49use crate::common::parse::{
50    derive_trade_id, parse_aggressor_side, parse_book_action, parse_instrument_id,
51    parse_order_side, parse_price, parse_timestamp,
52};
53
54fn infer_precision(value: f64) -> u8 {
55    let mut buf = ryu::Buffer::new(); // Stack allocation
56    let s = buf.format(value);
57
58    match s.rsplit_once('.') {
59        Some((_, frac)) if frac != "0" => frac.len() as u8,
60        _ => 0,
61    }
62}
63
64fn create_csv_reader<P: AsRef<Path>>(
65    filepath: P,
66) -> anyhow::Result<Reader<Box<dyn std::io::Read>>> {
67    const MAX_RETRIES: u8 = 3;
68    const DELAY_MS: u64 = 100;
69    const BUFFER_SIZE: usize = 8 * 1024 * 1024; // 8MB buffer for large files
70
71    fn open_file_with_retry<P: AsRef<Path>>(
72        path: P,
73        max_retries: u8,
74        delay_ms: u64,
75    ) -> anyhow::Result<File> {
76        let path_ref = path.as_ref();
77        for attempt in 1..=max_retries {
78            match File::open(path_ref) {
79                Ok(file) => return Ok(file),
80                Err(e) => {
81                    if attempt == max_retries {
82                        anyhow::bail!(
83                            "Failed to open file '{}' after {max_retries} attempts: {e}",
84                            path_ref.display()
85                        );
86                    }
87                    log::warn!(
88                        "Attempt {attempt}/{max_retries} failed to open file '{}': {e}. Retrying after {delay_ms}ms...",
89                        path_ref.display()
90                    );
91                    std::thread::sleep(Duration::from_millis(delay_ms));
92                }
93            }
94        }
95        unreachable!("Loop should return either Ok or Err");
96    }
97
98    let filepath_ref = filepath.as_ref();
99    let mut file = open_file_with_retry(filepath_ref, MAX_RETRIES, DELAY_MS)?;
100
101    let is_gzipped = filepath_ref
102        .extension()
103        .and_then(OsStr::to_str)
104        .is_some_and(|ext| ext.eq_ignore_ascii_case("gz"));
105
106    if !is_gzipped {
107        let buf_reader = BufReader::with_capacity(BUFFER_SIZE, file);
108        return Ok(ReaderBuilder::new()
109            .has_headers(true)
110            .buffer_capacity(1024 * 1024) // 1MB CSV buffer
111            .from_reader(Box::new(buf_reader)));
112    }
113
114    let file_size = file.metadata()?.len();
115    if file_size < 2 {
116        anyhow::bail!("File too small to be a valid gzip file");
117    }
118
119    let mut header_buf = [0u8; 2];
120    for attempt in 1..=MAX_RETRIES {
121        match file.read_exact(&mut header_buf) {
122            Ok(()) => break,
123            Err(e) => {
124                if attempt == MAX_RETRIES {
125                    anyhow::bail!(
126                        "Failed to read gzip header from '{}' after {MAX_RETRIES} attempts: {e}",
127                        filepath_ref.display()
128                    );
129                }
130                log::warn!(
131                    "Attempt {attempt}/{MAX_RETRIES} failed to read header from '{}': {e}. Retrying after {DELAY_MS}ms...",
132                    filepath_ref.display()
133                );
134                std::thread::sleep(Duration::from_millis(DELAY_MS));
135            }
136        }
137    }
138
139    if header_buf[0] != 0x1f || header_buf[1] != 0x8b {
140        anyhow::bail!(
141            "File '{}' has .gz extension but invalid gzip header",
142            filepath_ref.display()
143        );
144    }
145
146    for attempt in 1..=MAX_RETRIES {
147        match file.seek(SeekFrom::Start(0)) {
148            Ok(_) => break,
149            Err(e) => {
150                if attempt == MAX_RETRIES {
151                    anyhow::bail!(
152                        "Failed to reset file position for '{}' after {MAX_RETRIES} attempts: {e}",
153                        filepath_ref.display()
154                    );
155                }
156                log::warn!(
157                    "Attempt {attempt}/{MAX_RETRIES} failed to seek in '{}': {e}. Retrying after {DELAY_MS}ms...",
158                    filepath_ref.display()
159                );
160                std::thread::sleep(Duration::from_millis(DELAY_MS));
161            }
162        }
163    }
164
165    let buf_reader = BufReader::with_capacity(BUFFER_SIZE, file);
166    let decoder = GzDecoder::new(buf_reader);
167
168    Ok(ReaderBuilder::new()
169        .has_headers(true)
170        .buffer_capacity(1024 * 1024) // 1MB CSV buffer
171        .from_reader(Box::new(decoder)))
172}
173
174fn create_book_order(
175    side: OrderSide,
176    price: Option<f64>,
177    amount: Option<f64>,
178    price_precision: u8,
179    size_precision: u8,
180) -> (BookOrder, u32) {
181    match price {
182        Some(price) => (
183            BookOrder::new(
184                side,
185                parse_price(price, price_precision),
186                Quantity::new(amount.unwrap_or(0.0), size_precision),
187                0,
188            ),
189            1, // Count set to 1 if order exists
190        ),
191        None => (NULL_ORDER, 0), // NULL_ORDER if price is None
192    }
193}
194
195fn parse_delta_record(
196    data: &TardisBookUpdateRecord,
197    price_precision: u8,
198    size_precision: u8,
199    instrument_id: Option<InstrumentId>,
200) -> anyhow::Result<OrderBookDelta> {
201    let instrument_id = match instrument_id {
202        Some(id) => id,
203        None => parse_instrument_id(&data.exchange, data.symbol),
204    };
205
206    let side = parse_order_side(&data.side);
207    let price = parse_price(data.price, price_precision);
208    let size = Quantity::new(data.amount, size_precision);
209    let order_id = 0; // Not applicable for L2 data
210    let order = BookOrder::new(side, price, size, order_id);
211
212    let action = parse_book_action(data.is_snapshot, size.as_f64());
213    let flags = 0; // Will be set later if needed
214    let sequence = 0; // Sequence not available
215    let ts_event = parse_timestamp(data.timestamp);
216    let ts_init = parse_timestamp(data.local_timestamp);
217
218    anyhow::ensure!(
219        !(action != BookAction::Delete && size.is_zero()),
220        "Invalid delta: action {action} when size zero, check size_precision ({size_precision}) vs data; {data:?}"
221    );
222
223    Ok(OrderBookDelta::new(
224        instrument_id,
225        action,
226        order,
227        flags,
228        sequence,
229        ts_event,
230        ts_init,
231    ))
232}
233
234fn parse_quote_record(
235    data: &TardisQuoteRecord,
236    price_precision: u8,
237    size_precision: u8,
238    instrument_id: Option<InstrumentId>,
239) -> QuoteTick {
240    let instrument_id = match instrument_id {
241        Some(id) => id,
242        None => parse_instrument_id(&data.exchange, data.symbol),
243    };
244
245    let bid_price = parse_price(data.bid_price.unwrap_or(0.0), price_precision);
246    let ask_price = parse_price(data.ask_price.unwrap_or(0.0), price_precision);
247    let bid_size = Quantity::new(data.bid_amount.unwrap_or(0.0), size_precision);
248    let ask_size = Quantity::new(data.ask_amount.unwrap_or(0.0), size_precision);
249    let ts_event = parse_timestamp(data.timestamp);
250    let ts_init = parse_timestamp(data.local_timestamp);
251
252    QuoteTick::new(
253        instrument_id,
254        bid_price,
255        ask_price,
256        bid_size,
257        ask_size,
258        ts_event,
259        ts_init,
260    )
261}
262
263fn parse_trade_record(
264    data: &TardisTradeRecord,
265    size: Quantity,
266    price_precision: u8,
267    instrument_id: Option<InstrumentId>,
268) -> TradeTick {
269    let instrument_id = match instrument_id {
270        Some(id) => id,
271        None => parse_instrument_id(&data.exchange, data.symbol),
272    };
273
274    let price = parse_price(data.price, price_precision);
275    let aggressor_side = parse_aggressor_side(&data.side);
276    let ts_event = parse_timestamp(data.timestamp);
277    let ts_init = parse_timestamp(data.local_timestamp);
278    let trade_id = if data.id.is_empty() {
279        derive_trade_id(
280            data.symbol,
281            ts_event.as_u64(),
282            data.price,
283            data.amount,
284            &data.side,
285        )
286    } else {
287        TradeId::new(&data.id)
288    };
289
290    TradeTick::new(
291        instrument_id,
292        price,
293        size,
294        aggressor_side,
295        trade_id,
296        ts_event,
297        ts_init,
298    )
299}
300
301fn parse_derivative_ticker_record(
302    data: &TardisDerivativeTickerRecord,
303    instrument_id: Option<InstrumentId>,
304) -> Option<FundingRateUpdate> {
305    // Only create funding rate update if we have funding rate data
306    let funding_rate = data.funding_rate?;
307
308    let instrument_id = match instrument_id {
309        Some(id) => id,
310        None => parse_instrument_id(&data.exchange, data.symbol),
311    };
312
313    let rate = Decimal::try_from(funding_rate).ok()?;
314    let next_funding_ns = if data.predicted_funding_rate.is_some() {
315        data.funding_timestamp.map(parse_timestamp)
316    } else {
317        None
318    };
319    let ts_event = parse_timestamp(data.timestamp);
320    let ts_init = parse_timestamp(data.local_timestamp);
321
322    Some(FundingRateUpdate::new(
323        instrument_id,
324        rate,
325        None,
326        next_funding_ns,
327        ts_event,
328        ts_init,
329    ))
330}