Skip to main content

nautilus_tardis/csv/
load.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{error::Error, path::Path};
17
18use csv::StringRecord;
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    data::{
22        DEPTH10_LEN, FundingRateUpdate, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick,
23        TradeTick,
24    },
25    enums::{OrderSide, RecordFlag},
26    identifiers::InstrumentId,
27    types::{Quantity, fixed::FIXED_PRECISION},
28};
29
30use crate::{
31    common::parse::{parse_instrument_id, parse_timestamp},
32    csv::{
33        create_book_order, create_csv_reader, infer_precision, parse_delta_record,
34        parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
35        record::{
36            TardisBookUpdateRecord, TardisDerivativeTickerRecord, TardisOrderBookSnapshot5Record,
37            TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
38        },
39    },
40};
41
42fn update_precision_if_needed(current: &mut u8, value: f64, explicit: Option<u8>) -> bool {
43    if explicit.is_some() {
44        return false;
45    }
46
47    let inferred = infer_precision(value).min(FIXED_PRECISION);
48    if inferred > *current {
49        *current = inferred;
50        true
51    } else {
52        false
53    }
54}
55
56fn update_deltas_precision(
57    deltas: &mut [OrderBookDelta],
58    price_precision: Option<u8>,
59    size_precision: Option<u8>,
60    current_price_precision: u8,
61    current_size_precision: u8,
62) {
63    for delta in deltas {
64        if price_precision.is_none() {
65            delta.order.price.precision = current_price_precision;
66        }
67
68        if size_precision.is_none() {
69            delta.order.size.precision = current_size_precision;
70        }
71    }
72}
73
74fn update_quotes_precision(
75    quotes: &mut [QuoteTick],
76    price_precision: Option<u8>,
77    size_precision: Option<u8>,
78    current_price_precision: u8,
79    current_size_precision: u8,
80) {
81    for quote in quotes {
82        if price_precision.is_none() {
83            quote.bid_price.precision = current_price_precision;
84            quote.ask_price.precision = current_price_precision;
85        }
86
87        if size_precision.is_none() {
88            quote.bid_size.precision = current_size_precision;
89            quote.ask_size.precision = current_size_precision;
90        }
91    }
92}
93
94fn update_trades_precision(
95    trades: &mut [TradeTick],
96    price_precision: Option<u8>,
97    size_precision: Option<u8>,
98    current_price_precision: u8,
99    current_size_precision: u8,
100) {
101    for trade in trades {
102        if price_precision.is_none() {
103            trade.price.precision = current_price_precision;
104        }
105
106        if size_precision.is_none() {
107            trade.size.precision = current_size_precision;
108        }
109    }
110}
111
112/// Loads [`OrderBookDelta`]s from a Tardis format CSV at the given `filepath`,
113/// automatically applying `GZip` decompression for files ending in ".gz".
114/// Load order book delta records from a CSV or gzipped CSV file.
115///
116/// # Errors
117///
118/// Returns an error if the file cannot be opened, read, or parsed as CSV.
119pub fn load_deltas<P: AsRef<Path>>(
120    filepath: P,
121    price_precision: Option<u8>,
122    size_precision: Option<u8>,
123    instrument_id: Option<InstrumentId>,
124    limit: Option<usize>,
125) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
126    // Estimate capacity for Vec pre-allocation
127    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
128    let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(estimated_capacity);
129
130    let mut current_price_precision = price_precision.unwrap_or(0);
131    let mut current_size_precision = size_precision.unwrap_or(0);
132    let mut last_ts_event = UnixNanos::default();
133    let mut last_is_snapshot = false;
134
135    let mut reader = create_csv_reader(filepath)?;
136    let mut record = StringRecord::new();
137
138    while reader.read_record(&mut record)? {
139        if let Some(limit) = limit
140            && deltas.len() >= limit
141        {
142            break;
143        }
144
145        let data: TardisBookUpdateRecord = record.deserialize(None)?;
146
147        update_precision_if_needed(&mut current_price_precision, data.price, price_precision);
148        update_precision_if_needed(&mut current_size_precision, data.amount, size_precision);
149
150        // Insert CLEAR on snapshot boundary to reset order book state
151        if data.is_snapshot && !last_is_snapshot {
152            let clear_instrument_id =
153                instrument_id.unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
154            let ts_event = parse_timestamp(data.timestamp);
155            let ts_init = parse_timestamp(data.local_timestamp);
156
157            if last_ts_event != ts_event
158                && let Some(last_delta) = deltas.last_mut()
159            {
160                last_delta.flags = RecordFlag::F_LAST.value();
161            }
162            last_ts_event = ts_event;
163
164            let clear_delta = OrderBookDelta::clear(clear_instrument_id, 0, ts_event, ts_init);
165            deltas.push(clear_delta);
166
167            if let Some(limit) = limit
168                && deltas.len() >= limit
169            {
170                break;
171            }
172        }
173        last_is_snapshot = data.is_snapshot;
174
175        let delta = match parse_delta_record(
176            &data,
177            current_price_precision,
178            current_size_precision,
179            instrument_id,
180        ) {
181            Ok(d) => d,
182            Err(e) => {
183                log::warn!("Skipping invalid delta record: {e}");
184                continue;
185            }
186        };
187
188        let ts_event = delta.ts_event;
189        if last_ts_event != ts_event
190            && let Some(last_delta) = deltas.last_mut()
191        {
192            last_delta.flags = RecordFlag::F_LAST.value();
193        }
194
195        last_ts_event = ts_event;
196
197        deltas.push(delta);
198    }
199
200    // Set F_LAST flag for final delta
201    if let Some(last_delta) = deltas.last_mut() {
202        last_delta.flags = RecordFlag::F_LAST.value();
203    }
204
205    // Update all deltas to use the final (maximum) precision discovered
206    // This is done once at the end instead of on every precision change (O(n) vs O(n²))
207    update_deltas_precision(
208        &mut deltas,
209        price_precision,
210        size_precision,
211        current_price_precision,
212        current_size_precision,
213    );
214
215    Ok(deltas)
216}
217
218/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
219/// automatically applying `GZip` decompression for files ending in ".gz".
220/// Load order book depth-10 snapshots (5-level) from a CSV or gzipped CSV file.
221///
222/// # Errors
223///
224/// Returns an error if the file cannot be opened, read, or parsed as CSV.
225///
226/// # Panics
227///
228/// Panics if a record level cannot be parsed to depth-10.
229pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
230    filepath: P,
231    price_precision: Option<u8>,
232    size_precision: Option<u8>,
233    instrument_id: Option<InstrumentId>,
234    limit: Option<usize>,
235) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
236    // Estimate capacity for Vec pre-allocation
237    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
238    let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
239
240    let mut current_price_precision = price_precision.unwrap_or(0);
241    let mut current_size_precision = size_precision.unwrap_or(0);
242
243    let mut reader = create_csv_reader(filepath)?;
244    let mut record = StringRecord::new();
245
246    while reader.read_record(&mut record)? {
247        let data: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
248
249        // Update precisions dynamically if not explicitly set
250        let mut precision_updated = false;
251
252        if price_precision.is_none()
253            && let Some(bid_price) = data.bids_0_price
254        {
255            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
256            if inferred_price_precision > current_price_precision {
257                current_price_precision = inferred_price_precision;
258                precision_updated = true;
259            }
260        }
261
262        if size_precision.is_none()
263            && let Some(bid_amount) = data.bids_0_amount
264        {
265            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
266            if inferred_size_precision > current_size_precision {
267                current_size_precision = inferred_size_precision;
268                precision_updated = true;
269            }
270        }
271
272        // If precision increased, update all previous depths
273        if precision_updated {
274            for depth in &mut depths {
275                for i in 0..DEPTH10_LEN {
276                    if price_precision.is_none() {
277                        depth.bids[i].price.precision = current_price_precision;
278                        depth.asks[i].price.precision = current_price_precision;
279                    }
280
281                    if size_precision.is_none() {
282                        depth.bids[i].size.precision = current_size_precision;
283                        depth.asks[i].size.precision = current_size_precision;
284                    }
285                }
286            }
287        }
288
289        let instrument_id = match &instrument_id {
290            Some(id) => *id,
291            None => parse_instrument_id(&data.exchange, data.symbol),
292        };
293        // Mark as both snapshot and last (consistent with streaming implementation)
294        let flags = RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value();
295        let sequence = 0; // Sequence not available
296        let ts_event = parse_timestamp(data.timestamp);
297        let ts_init = parse_timestamp(data.local_timestamp);
298
299        // Initialize empty arrays
300        let mut bids = [NULL_ORDER; DEPTH10_LEN];
301        let mut asks = [NULL_ORDER; DEPTH10_LEN];
302        let mut bid_counts = [0u32; DEPTH10_LEN];
303        let mut ask_counts = [0u32; DEPTH10_LEN];
304
305        for i in 0..=4 {
306            // Create bids
307            let (bid_order, bid_count) = create_book_order(
308                OrderSide::Buy,
309                match i {
310                    0 => data.bids_0_price,
311                    1 => data.bids_1_price,
312                    2 => data.bids_2_price,
313                    3 => data.bids_3_price,
314                    4 => data.bids_4_price,
315                    _ => unreachable!("i is constrained to 0..=4 by loop"),
316                },
317                match i {
318                    0 => data.bids_0_amount,
319                    1 => data.bids_1_amount,
320                    2 => data.bids_2_amount,
321                    3 => data.bids_3_amount,
322                    4 => data.bids_4_amount,
323                    _ => unreachable!("i is constrained to 0..=4 by loop"),
324                },
325                current_price_precision,
326                current_size_precision,
327            );
328            bids[i] = bid_order;
329            bid_counts[i] = bid_count;
330
331            // Create asks
332            let (ask_order, ask_count) = create_book_order(
333                OrderSide::Sell,
334                match i {
335                    0 => data.asks_0_price,
336                    1 => data.asks_1_price,
337                    2 => data.asks_2_price,
338                    3 => data.asks_3_price,
339                    4 => data.asks_4_price,
340                    _ => None, // Unreachable, but for safety
341                },
342                match i {
343                    0 => data.asks_0_amount,
344                    1 => data.asks_1_amount,
345                    2 => data.asks_2_amount,
346                    3 => data.asks_3_amount,
347                    4 => data.asks_4_amount,
348                    _ => None, // Unreachable, but for safety
349                },
350                current_price_precision,
351                current_size_precision,
352            );
353            asks[i] = ask_order;
354            ask_counts[i] = ask_count;
355        }
356
357        let depth = OrderBookDepth10::new(
358            instrument_id,
359            bids,
360            asks,
361            bid_counts,
362            ask_counts,
363            flags,
364            sequence,
365            ts_event,
366            ts_init,
367        );
368
369        depths.push(depth);
370
371        if let Some(limit) = limit
372            && depths.len() >= limit
373        {
374            break;
375        }
376    }
377
378    Ok(depths)
379}
380
381/// Loads [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
382/// automatically applying `GZip` decompression for files ending in ".gz".
383/// Load order book depth-10 snapshots (25-level) from a CSV or gzipped CSV file.
384///
385/// # Errors
386///
387/// Returns an error if the file cannot be opened, read, or parsed as CSV.
388pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
389    filepath: P,
390    price_precision: Option<u8>,
391    size_precision: Option<u8>,
392    instrument_id: Option<InstrumentId>,
393    limit: Option<usize>,
394) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
395    // Estimate capacity for Vec pre-allocation
396    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
397    let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
398
399    let mut current_price_precision = price_precision.unwrap_or(0);
400    let mut current_size_precision = size_precision.unwrap_or(0);
401    let mut reader = create_csv_reader(filepath)?;
402    let mut record = StringRecord::new();
403
404    while reader.read_record(&mut record)? {
405        let data: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
406
407        // Update precisions dynamically if not explicitly set
408        let mut precision_updated = false;
409
410        if price_precision.is_none()
411            && let Some(bid_price) = data.bids_0_price
412        {
413            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
414            if inferred_price_precision > current_price_precision {
415                current_price_precision = inferred_price_precision;
416                precision_updated = true;
417            }
418        }
419
420        if size_precision.is_none()
421            && let Some(bid_amount) = data.bids_0_amount
422        {
423            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
424            if inferred_size_precision > current_size_precision {
425                current_size_precision = inferred_size_precision;
426                precision_updated = true;
427            }
428        }
429
430        // If precision increased, update all previous depths
431        if precision_updated {
432            for depth in &mut depths {
433                for i in 0..DEPTH10_LEN {
434                    if price_precision.is_none() {
435                        depth.bids[i].price.precision = current_price_precision;
436                        depth.asks[i].price.precision = current_price_precision;
437                    }
438
439                    if size_precision.is_none() {
440                        depth.bids[i].size.precision = current_size_precision;
441                        depth.asks[i].size.precision = current_size_precision;
442                    }
443                }
444            }
445        }
446
447        let instrument_id = match &instrument_id {
448            Some(id) => *id,
449            None => parse_instrument_id(&data.exchange, data.symbol),
450        };
451        // Mark as both snapshot and last (consistent with streaming implementation)
452        let flags = RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value();
453        let sequence = 0; // Sequence not available
454        let ts_event = parse_timestamp(data.timestamp);
455        let ts_init = parse_timestamp(data.local_timestamp);
456
457        // Initialize empty arrays for the first 10 levels only
458        let mut bids = [NULL_ORDER; DEPTH10_LEN];
459        let mut asks = [NULL_ORDER; DEPTH10_LEN];
460        let mut bid_counts = [0u32; DEPTH10_LEN];
461        let mut ask_counts = [0u32; DEPTH10_LEN];
462
463        // Fill only the first 10 levels from the 25-level record
464        for i in 0..DEPTH10_LEN {
465            // Create bids
466            let (bid_order, bid_count) = create_book_order(
467                OrderSide::Buy,
468                match i {
469                    0 => data.bids_0_price,
470                    1 => data.bids_1_price,
471                    2 => data.bids_2_price,
472                    3 => data.bids_3_price,
473                    4 => data.bids_4_price,
474                    5 => data.bids_5_price,
475                    6 => data.bids_6_price,
476                    7 => data.bids_7_price,
477                    8 => data.bids_8_price,
478                    9 => data.bids_9_price,
479                    _ => unreachable!("i is constrained to 0..10 by loop"),
480                },
481                match i {
482                    0 => data.bids_0_amount,
483                    1 => data.bids_1_amount,
484                    2 => data.bids_2_amount,
485                    3 => data.bids_3_amount,
486                    4 => data.bids_4_amount,
487                    5 => data.bids_5_amount,
488                    6 => data.bids_6_amount,
489                    7 => data.bids_7_amount,
490                    8 => data.bids_8_amount,
491                    9 => data.bids_9_amount,
492                    _ => unreachable!("i is constrained to 0..10 by loop"),
493                },
494                current_price_precision,
495                current_size_precision,
496            );
497            bids[i] = bid_order;
498            bid_counts[i] = bid_count;
499
500            // Create asks
501            let (ask_order, ask_count) = create_book_order(
502                OrderSide::Sell,
503                match i {
504                    0 => data.asks_0_price,
505                    1 => data.asks_1_price,
506                    2 => data.asks_2_price,
507                    3 => data.asks_3_price,
508                    4 => data.asks_4_price,
509                    5 => data.asks_5_price,
510                    6 => data.asks_6_price,
511                    7 => data.asks_7_price,
512                    8 => data.asks_8_price,
513                    9 => data.asks_9_price,
514                    _ => unreachable!("i is constrained to 0..10 by loop"),
515                },
516                match i {
517                    0 => data.asks_0_amount,
518                    1 => data.asks_1_amount,
519                    2 => data.asks_2_amount,
520                    3 => data.asks_3_amount,
521                    4 => data.asks_4_amount,
522                    5 => data.asks_5_amount,
523                    6 => data.asks_6_amount,
524                    7 => data.asks_7_amount,
525                    8 => data.asks_8_amount,
526                    9 => data.asks_9_amount,
527                    _ => unreachable!("i is constrained to 0..10 by loop"),
528                },
529                current_price_precision,
530                current_size_precision,
531            );
532            asks[i] = ask_order;
533            ask_counts[i] = ask_count;
534        }
535
536        let depth = OrderBookDepth10::new(
537            instrument_id,
538            bids,
539            asks,
540            bid_counts,
541            ask_counts,
542            flags,
543            sequence,
544            ts_event,
545            ts_init,
546        );
547
548        depths.push(depth);
549
550        if let Some(limit) = limit
551            && depths.len() >= limit
552        {
553            break;
554        }
555    }
556
557    Ok(depths)
558}
559
560/// Loads [`QuoteTick`]s from a Tardis format CSV at the given `filepath`,
561/// automatically applying `GZip` decompression for files ending in ".gz".
562/// Load quote ticks from a CSV or gzipped CSV file.
563///
564/// # Errors
565///
566/// Returns an error if the file cannot be opened, read, or parsed as CSV.
567pub fn load_quotes<P: AsRef<Path>>(
568    filepath: P,
569    price_precision: Option<u8>,
570    size_precision: Option<u8>,
571    instrument_id: Option<InstrumentId>,
572    limit: Option<usize>,
573) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
574    // Estimate capacity for Vec pre-allocation
575    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
576    let mut quotes: Vec<QuoteTick> = Vec::with_capacity(estimated_capacity);
577
578    let mut current_price_precision = price_precision.unwrap_or(0);
579    let mut current_size_precision = size_precision.unwrap_or(0);
580    let mut reader = create_csv_reader(filepath)?;
581    let mut record = StringRecord::new();
582
583    while reader.read_record(&mut record)? {
584        let data: TardisQuoteRecord = record.deserialize(None)?;
585
586        if price_precision.is_none()
587            && let Some(bid_price) = data.bid_price
588        {
589            let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
590            if inferred_price_precision > current_price_precision {
591                current_price_precision = inferred_price_precision;
592            }
593        }
594
595        if size_precision.is_none()
596            && let Some(bid_amount) = data.bid_amount
597        {
598            let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
599            if inferred_size_precision > current_size_precision {
600                current_size_precision = inferred_size_precision;
601            }
602        }
603
604        let quote = parse_quote_record(
605            &data,
606            current_price_precision,
607            current_size_precision,
608            instrument_id,
609        );
610
611        quotes.push(quote);
612
613        if let Some(limit) = limit
614            && quotes.len() >= limit
615        {
616            break;
617        }
618    }
619
620    // Update all quotes to use the final (maximum) precision discovered
621    // This is done once at the end instead of on every precision change (O(n) vs O(n²))
622    update_quotes_precision(
623        &mut quotes,
624        price_precision,
625        size_precision,
626        current_price_precision,
627        current_size_precision,
628    );
629
630    Ok(quotes)
631}
632
633/// Loads [`TradeTick`]s from a Tardis format CSV at the given `filepath`,
634/// automatically applying `GZip` decompression for files ending in ".gz".
635/// Load trade ticks from a CSV or gzipped CSV file.
636///
637/// # Errors
638///
639/// Returns an error if the file cannot be opened, read, or parsed as CSV.
640pub fn load_trades<P: AsRef<Path>>(
641    filepath: P,
642    price_precision: Option<u8>,
643    size_precision: Option<u8>,
644    instrument_id: Option<InstrumentId>,
645    limit: Option<usize>,
646) -> Result<Vec<TradeTick>, Box<dyn Error>> {
647    // Estimate capacity for Vec pre-allocation
648    let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
649    let mut trades: Vec<TradeTick> = Vec::with_capacity(estimated_capacity);
650
651    let mut current_price_precision = price_precision.unwrap_or(0);
652    let mut current_size_precision = size_precision.unwrap_or(0);
653    let mut reader = create_csv_reader(filepath)?;
654    let mut record = StringRecord::new();
655
656    while reader.read_record(&mut record)? {
657        let data: TardisTradeRecord = record.deserialize(None)?;
658
659        if price_precision.is_none() {
660            let inferred_price_precision = infer_precision(data.price).min(FIXED_PRECISION);
661            if inferred_price_precision > current_price_precision {
662                current_price_precision = inferred_price_precision;
663            }
664        }
665
666        if size_precision.is_none() {
667            let inferred_size_precision = infer_precision(data.amount).min(FIXED_PRECISION);
668            if inferred_size_precision > current_size_precision {
669                current_size_precision = inferred_size_precision;
670            }
671        }
672
673        let size = Quantity::new_checked(data.amount, current_size_precision)?;
674
675        if size.is_positive() {
676            let trade = parse_trade_record(&data, size, current_price_precision, instrument_id);
677
678            trades.push(trade);
679
680            if let Some(limit) = limit
681                && trades.len() >= limit
682            {
683                break;
684            }
685        } else {
686            log::warn!("Skipping zero-sized trade: {data:?}");
687        }
688    }
689
690    // Update all trades to use the final (maximum) precision discovered
691    // This is done once at the end instead of on every precision change (O(n) vs O(n²))
692    update_trades_precision(
693        &mut trades,
694        price_precision,
695        size_precision,
696        current_price_precision,
697        current_size_precision,
698    );
699
700    Ok(trades)
701}
702
703/// Loads [`FundingRateUpdate`]s from a Tardis format derivative ticker CSV at the given `filepath`,
704/// automatically applying `GZip` decompression for files ending in ".gz".
705///
706/// This function parses the `funding_rate`, `predicted_funding_rate`, and `funding_timestamp`
707/// fields from derivative ticker data to create funding rate updates.
708///
709/// # Errors
710///
711/// Returns an error if the file cannot be opened, read, or parsed as CSV.
712pub fn load_funding_rates<P: AsRef<Path>>(
713    filepath: P,
714    instrument_id: Option<InstrumentId>,
715    limit: Option<usize>,
716) -> Result<Vec<FundingRateUpdate>, Box<dyn Error>> {
717    // Estimate capacity for Vec pre-allocation
718    let estimated_capacity = limit.unwrap_or(100_000).min(1_000_000);
719    let mut funding_rates: Vec<FundingRateUpdate> = Vec::with_capacity(estimated_capacity);
720
721    let mut reader = create_csv_reader(filepath)?;
722    let mut record = StringRecord::new();
723
724    while reader.read_record(&mut record)? {
725        let data: TardisDerivativeTickerRecord = record.deserialize(None)?;
726
727        // Parse to funding rate update (returns None if no funding data)
728        if let Some(funding_rate) = parse_derivative_ticker_record(&data, instrument_id) {
729            funding_rates.push(funding_rate);
730
731            if let Some(limit) = limit
732                && funding_rates.len() >= limit
733            {
734                break;
735            }
736        }
737    }
738
739    Ok(funding_rates)
740}
741
742#[cfg(test)]
743mod tests {
744    use std::{fs, fs::File, sync::Arc};
745
746    use nautilus_core::paths::get_test_data_path as get_test_data_root;
747    use nautilus_model::{
748        enums::{AggressorSide, BookAction},
749        identifiers::{InstrumentId, TradeId},
750        types::Price,
751    };
752    use nautilus_serialization::arrow::{ArrowSchemaProvider, EncodeToRecordBatch};
753    use nautilus_testkit::common::{
754        get_tardis_binance_snapshot5_path, get_tardis_binance_snapshot25_path,
755        get_tardis_bitmex_trades_path, get_tardis_deribit_book_l2_path,
756        get_tardis_huobi_quotes_path,
757    };
758    use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
759    use rstest::*;
760
761    use super::*;
762    use crate::common::{parse::parse_price, testing::get_test_data_path};
763
764    #[rstest]
765    #[case(0.0, 0)]
766    #[case(42.0, 0)]
767    #[case(0.1, 1)]
768    #[case(0.25, 2)]
769    #[case(123.0001, 4)]
770    #[case(-42.987654321,       9)]
771    #[case(1.234_567_890_123, 12)]
772    fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
773        assert_eq!(infer_precision(input), expected);
774    }
775
776    #[rstest]
777    pub fn test_dynamic_precision_inference() {
778        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
779binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
780binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
781binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
782binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
783binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
784
785        let temp_file = std::env::temp_dir().join("test_dynamic_precision.csv");
786        std::fs::write(&temp_file, csv_data).unwrap();
787
788        let deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
789
790        // 5 data rows + 1 CLEAR delta at start (first row is snapshot)
791        assert_eq!(deltas.len(), 6);
792
793        // Skip the CLEAR delta at index 0
794        for (i, delta) in deltas.iter().skip(1).enumerate() {
795            assert_eq!(
796                delta.order.price.precision, 4,
797                "Price precision should be 4 for delta {i}",
798            );
799            assert_eq!(
800                delta.order.size.precision, 1,
801                "Size precision should be 1 for delta {i}",
802            );
803        }
804
805        // Test exact values to ensure retroactive precision updates work correctly
806        // Index 0 is CLEAR, data starts at index 1
807        assert_eq!(deltas[0].action, BookAction::Clear);
808
809        assert_eq!(deltas[1].order.price, parse_price(50000.0, 4));
810        assert_eq!(deltas[1].order.size, Quantity::new(1.0, 1));
811
812        assert_eq!(deltas[2].order.price, parse_price(49999.5, 4));
813        assert_eq!(deltas[2].order.size, Quantity::new(2.0, 1));
814
815        assert_eq!(deltas[3].order.price, parse_price(50000.12, 4));
816        assert_eq!(deltas[3].order.size, Quantity::new(1.5, 1));
817
818        assert_eq!(deltas[4].order.price, parse_price(49999.123, 4));
819        assert_eq!(deltas[4].order.size, Quantity::new(3.0, 1));
820
821        assert_eq!(deltas[5].order.price, parse_price(50000.1234, 4));
822        assert_eq!(deltas[5].order.size, Quantity::new(0.5, 1));
823
824        assert_eq!(
825            deltas[1].order.price.precision,
826            deltas[5].order.price.precision
827        );
828        assert_eq!(
829            deltas[1].order.size.precision,
830            deltas[3].order.size.precision
831        );
832
833        std::fs::remove_file(&temp_file).ok();
834    }
835
836    #[rstest]
837    #[case(Some(1), Some(0))] // Explicit precisions
838    #[case(None, None)] // Inferred precisions
839    pub fn test_read_deltas(
840        #[case] price_precision: Option<u8>,
841        #[case] size_precision: Option<u8>,
842    ) {
843        let filepath = get_tardis_deribit_book_l2_path();
844        let deltas =
845            load_deltas(filepath, price_precision, size_precision, None, Some(100)).unwrap();
846
847        // 15 data rows + 1 CLEAR delta at start (first row is snapshot)
848        assert_eq!(deltas.len(), 16);
849
850        // Index 0 is CLEAR delta
851        assert_eq!(deltas[0].action, BookAction::Clear);
852
853        // Index 1 is first data delta
854        assert_eq!(
855            deltas[1].instrument_id,
856            InstrumentId::from("BTC-PERPETUAL.DERIBIT")
857        );
858        assert_eq!(deltas[1].action, BookAction::Add);
859        assert_eq!(deltas[1].order.side, OrderSide::Sell);
860        assert_eq!(deltas[1].order.price, Price::from("6421.5"));
861        assert_eq!(deltas[1].order.size, Quantity::from("18640"));
862        assert_eq!(deltas[1].flags, 0);
863        assert_eq!(deltas[1].sequence, 0);
864        assert_eq!(deltas[1].ts_event, 1585699200245000000);
865        assert_eq!(deltas[1].ts_init, 1585699200355684000);
866    }
867
868    #[rstest]
869    #[case(Some(2), Some(3))] // Explicit precisions
870    #[case(None, None)] // Inferred precisions
871    pub fn test_read_depth10s_from_snapshot5(
872        #[case] price_precision: Option<u8>,
873        #[case] size_precision: Option<u8>,
874    ) {
875        let filepath = get_tardis_binance_snapshot5_path();
876        let depths =
877            load_depth10_from_snapshot5(filepath, price_precision, size_precision, None, Some(100))
878                .unwrap();
879
880        assert_eq!(depths.len(), 10);
881        assert_eq!(
882            depths[0].instrument_id,
883            InstrumentId::from("BTCUSDT.BINANCE")
884        );
885        assert_eq!(depths[0].bids.len(), 10);
886        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
887        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
888        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
889        assert_eq!(depths[0].bids[0].order_id, 0);
890        assert_eq!(depths[0].asks.len(), 10);
891        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
892        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
893        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
894        assert_eq!(depths[0].asks[0].order_id, 0);
895        assert_eq!(depths[0].bid_counts[0], 1);
896        assert_eq!(depths[0].ask_counts[0], 1);
897        // F_SNAPSHOT (32) | F_LAST (128) = 160
898        assert_eq!(
899            depths[0].flags,
900            RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
901        );
902        assert_eq!(depths[0].ts_event, 1598918403696000000);
903        assert_eq!(depths[0].ts_init, 1598918403810979000);
904        assert_eq!(depths[0].sequence, 0);
905    }
906
907    #[rstest]
908    #[case(Some(2), Some(3))] // Explicit precisions
909    #[case(None, None)] // Inferred precisions
910    pub fn test_read_depth10s_from_snapshot25(
911        #[case] price_precision: Option<u8>,
912        #[case] size_precision: Option<u8>,
913    ) {
914        let filepath = get_tardis_binance_snapshot25_path();
915        let depths = load_depth10_from_snapshot25(
916            filepath,
917            price_precision,
918            size_precision,
919            None,
920            Some(100),
921        )
922        .unwrap();
923
924        assert_eq!(depths.len(), 10);
925        assert_eq!(
926            depths[0].instrument_id,
927            InstrumentId::from("BTCUSDT.BINANCE")
928        );
929        assert_eq!(depths[0].bids.len(), 10);
930        assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
931        assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
932        assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
933        assert_eq!(depths[0].bids[0].order_id, 0);
934        assert_eq!(depths[0].asks.len(), 10);
935        assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
936        assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
937        assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
938        assert_eq!(depths[0].asks[0].order_id, 0);
939        assert_eq!(depths[0].bid_counts[0], 1);
940        assert_eq!(depths[0].ask_counts[0], 1);
941        // F_SNAPSHOT (32) | F_LAST (128) = 160
942        assert_eq!(
943            depths[0].flags,
944            RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
945        );
946        assert_eq!(depths[0].ts_event, 1598918403696000000);
947        assert_eq!(depths[0].ts_init, 1598918403810979000);
948        assert_eq!(depths[0].sequence, 0);
949    }
950
951    #[rstest]
952    #[case(Some(1), Some(0))] // Explicit precisions
953    #[case(None, None)] // Inferred precisions
954    pub fn test_read_quotes(
955        #[case] price_precision: Option<u8>,
956        #[case] size_precision: Option<u8>,
957    ) {
958        let filepath = get_tardis_huobi_quotes_path();
959        let quotes =
960            load_quotes(filepath, price_precision, size_precision, None, Some(100)).unwrap();
961
962        assert_eq!(quotes.len(), 10);
963        assert_eq!(
964            quotes[0].instrument_id,
965            InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
966        );
967        assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
968        assert_eq!(quotes[0].bid_size, Quantity::from("806"));
969        assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
970        assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
971        assert_eq!(quotes[0].ts_event, 1588291201099000000);
972        assert_eq!(quotes[0].ts_init, 1588291201234268000);
973    }
974
975    #[rstest]
976    #[case(Some(1), Some(0))] // Explicit precisions
977    #[case(None, None)] // Inferred precisions
978    pub fn test_read_trades(
979        #[case] price_precision: Option<u8>,
980        #[case] size_precision: Option<u8>,
981    ) {
982        let filepath = get_tardis_bitmex_trades_path();
983        let trades =
984            load_trades(filepath, price_precision, size_precision, None, Some(100)).unwrap();
985
986        assert_eq!(trades.len(), 10);
987        assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
988        assert_eq!(trades[0].price, Price::from("8531.5"));
989        assert_eq!(trades[0].size, Quantity::from("2152"));
990        assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
991        assert_eq!(
992            trades[0].trade_id,
993            TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
994        );
995        assert_eq!(trades[0].ts_event, 1583020803145000000);
996        assert_eq!(trades[0].ts_init, 1583020803307160000);
997    }
998
999    #[rstest]
1000    pub fn test_load_trades_derives_id_when_csv_id_empty() {
1001        // Two rows with empty `id` column must both hash deterministically
1002        // to the same TradeId, and a row with differing price must hash differently.
1003        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1004binance,BTCUSDT,1640995200000000,1640995200100000,,buy,50000.0,1.0
1005binance,BTCUSDT,1640995200000000,1640995200100000,,buy,50000.0,1.0
1006binance,BTCUSDT,1640995200000000,1640995200100000,,buy,50001.0,1.0";
1007
1008        let temp_file = std::env::temp_dir().join("test_load_trades_empty_id.csv");
1009        std::fs::write(&temp_file, csv_data).unwrap();
1010
1011        let trades = load_trades(&temp_file, Some(2), Some(1), None, None).unwrap();
1012        assert_eq!(trades.len(), 3);
1013
1014        assert_eq!(trades[0].trade_id, trades[1].trade_id);
1015        assert_eq!(trades[0].trade_id.as_str().len(), 16);
1016        assert_ne!(trades[0].trade_id, trades[2].trade_id);
1017
1018        std::fs::remove_file(&temp_file).ok();
1019    }
1020
1021    #[rstest]
1022    pub fn test_load_trades_with_zero_sized_trade() {
1023        // Create test CSV data with one zero-sized trade that should be skipped
1024        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1025binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1026binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1027binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1028binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1029
1030        let temp_file = std::env::temp_dir().join("test_load_trades_zero_size.csv");
1031        std::fs::write(&temp_file, csv_data).unwrap();
1032
1033        let trades = load_trades(
1034            &temp_file,
1035            Some(4),
1036            Some(1),
1037            None,
1038            None, // No limit, load all
1039        )
1040        .unwrap();
1041
1042        // Should have 3 trades (zero-sized trade skipped)
1043        assert_eq!(trades.len(), 3);
1044
1045        // Verify the correct trades were loaded (not the zero-sized one)
1046        assert_eq!(trades[0].size, Quantity::from("1.0"));
1047        assert_eq!(trades[1].size, Quantity::from("1.5"));
1048        assert_eq!(trades[2].size, Quantity::from("3.0"));
1049
1050        // Verify trade IDs to confirm correct trades were loaded
1051        assert_eq!(trades[0].trade_id, TradeId::new("trade1"));
1052        assert_eq!(trades[1].trade_id, TradeId::new("trade3"));
1053        assert_eq!(trades[2].trade_id, TradeId::new("trade4"));
1054
1055        std::fs::remove_file(&temp_file).ok();
1056    }
1057
1058    #[rstest]
1059    pub fn test_load_trades_from_local_file() {
1060        let filepath = get_test_data_path("csv/trades_1.csv");
1061        let trades = load_trades(filepath, Some(1), Some(0), None, None).unwrap();
1062        assert_eq!(trades.len(), 2);
1063        assert_eq!(trades[0].price, Price::from("8531.5"));
1064        assert_eq!(trades[1].size, Quantity::from("1000"));
1065    }
1066
1067    #[rstest]
1068    pub fn test_load_deltas_from_local_file() {
1069        let filepath = get_test_data_path("csv/deltas_1.csv");
1070        let deltas = load_deltas(filepath, Some(1), Some(0), None, None).unwrap();
1071
1072        // 2 data rows + 1 CLEAR delta at start (first row is snapshot)
1073        assert_eq!(deltas.len(), 3);
1074        assert_eq!(deltas[0].action, BookAction::Clear);
1075        assert_eq!(deltas[1].order.price, Price::from("6421.5"));
1076        assert_eq!(deltas[2].order.size, Quantity::from("10000"));
1077    }
1078
1079    #[rstest]
1080    fn test_load_depth10_from_snapshot5_comprehensive() {
1081        let filepath = get_tardis_binance_snapshot5_path();
1082        let depths = load_depth10_from_snapshot5(&filepath, None, None, None, Some(100)).unwrap();
1083
1084        assert_eq!(depths.len(), 10);
1085
1086        let first = &depths[0];
1087        assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1088        assert_eq!(first.bids.len(), 10);
1089        assert_eq!(first.asks.len(), 10);
1090
1091        // Check all bid levels (first 5 from data, rest empty)
1092        assert_eq!(first.bids[0].price, Price::from("11657.07"));
1093        assert_eq!(first.bids[0].size, Quantity::from("10.896"));
1094        assert_eq!(first.bids[0].side, OrderSide::Buy);
1095
1096        assert_eq!(first.bids[1].price, Price::from("11656.97"));
1097        assert_eq!(first.bids[1].size, Quantity::from("0.2"));
1098        assert_eq!(first.bids[1].side, OrderSide::Buy);
1099
1100        assert_eq!(first.bids[2].price, Price::from("11655.78"));
1101        assert_eq!(first.bids[2].size, Quantity::from("0.2"));
1102        assert_eq!(first.bids[2].side, OrderSide::Buy);
1103
1104        assert_eq!(first.bids[3].price, Price::from("11655.77"));
1105        assert_eq!(first.bids[3].size, Quantity::from("0.98"));
1106        assert_eq!(first.bids[3].side, OrderSide::Buy);
1107
1108        assert_eq!(first.bids[4].price, Price::from("11655.68"));
1109        assert_eq!(first.bids[4].size, Quantity::from("0.111"));
1110        assert_eq!(first.bids[4].side, OrderSide::Buy);
1111
1112        // Empty levels
1113        for i in 5..10 {
1114            assert_eq!(first.bids[i].price.raw, 0);
1115            assert_eq!(first.bids[i].size.raw, 0);
1116            assert_eq!(first.bids[i].side, OrderSide::NoOrderSide);
1117        }
1118
1119        // Check all ask levels (first 5 from data, rest empty)
1120        assert_eq!(first.asks[0].price, Price::from("11657.08"));
1121        assert_eq!(first.asks[0].size, Quantity::from("1.714"));
1122        assert_eq!(first.asks[0].side, OrderSide::Sell);
1123
1124        assert_eq!(first.asks[1].price, Price::from("11657.54"));
1125        assert_eq!(first.asks[1].size, Quantity::from("5.4"));
1126        assert_eq!(first.asks[1].side, OrderSide::Sell);
1127
1128        assert_eq!(first.asks[2].price, Price::from("11657.56"));
1129        assert_eq!(first.asks[2].size, Quantity::from("0.238"));
1130        assert_eq!(first.asks[2].side, OrderSide::Sell);
1131
1132        assert_eq!(first.asks[3].price, Price::from("11657.61"));
1133        assert_eq!(first.asks[3].size, Quantity::from("0.077"));
1134        assert_eq!(first.asks[3].side, OrderSide::Sell);
1135
1136        assert_eq!(first.asks[4].price, Price::from("11657.92"));
1137        assert_eq!(first.asks[4].size, Quantity::from("0.918"));
1138        assert_eq!(first.asks[4].side, OrderSide::Sell);
1139
1140        // Empty levels
1141        for i in 5..10 {
1142            assert_eq!(first.asks[i].price.raw, 0);
1143            assert_eq!(first.asks[i].size.raw, 0);
1144            assert_eq!(first.asks[i].side, OrderSide::NoOrderSide);
1145        }
1146
1147        // Logical checks: bid prices should decrease
1148        for i in 1..5 {
1149            assert!(
1150                first.bids[i].price < first.bids[i - 1].price,
1151                "Bid price at level {} should be less than level {}",
1152                i,
1153                i - 1
1154            );
1155        }
1156
1157        // Logical checks: ask prices should increase
1158        for i in 1..5 {
1159            assert!(
1160                first.asks[i].price > first.asks[i - 1].price,
1161                "Ask price at level {} should be greater than level {}",
1162                i,
1163                i - 1
1164            );
1165        }
1166
1167        // Logical check: spread should be positive
1168        assert!(
1169            first.asks[0].price > first.bids[0].price,
1170            "Best ask should be greater than best bid"
1171        );
1172
1173        // Check counts
1174        for i in 0..5 {
1175            assert_eq!(first.bid_counts[i], 1);
1176            assert_eq!(first.ask_counts[i], 1);
1177        }
1178
1179        for i in 5..10 {
1180            assert_eq!(first.bid_counts[i], 0);
1181            assert_eq!(first.ask_counts[i], 0);
1182        }
1183
1184        // Check metadata - F_SNAPSHOT (32) | F_LAST (128) = 160
1185        assert_eq!(
1186            first.flags,
1187            RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
1188        );
1189        assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1190        assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1191        assert_eq!(first.sequence, 0);
1192    }
1193
1194    #[rstest]
1195    fn test_load_depth10_from_snapshot25_comprehensive() {
1196        let filepath = get_tardis_binance_snapshot25_path();
1197        let depths = load_depth10_from_snapshot25(&filepath, None, None, None, Some(100)).unwrap();
1198
1199        assert_eq!(depths.len(), 10);
1200
1201        let first = &depths[0];
1202        assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1203        assert_eq!(first.bids.len(), 10);
1204        assert_eq!(first.asks.len(), 10);
1205
1206        // Check all 10 bid levels from snapshot25
1207        let expected_bids = vec![
1208            ("11657.07", "10.896"),
1209            ("11656.97", "0.2"),
1210            ("11655.78", "0.2"),
1211            ("11655.77", "0.98"),
1212            ("11655.68", "0.111"),
1213            ("11655.66", "0.077"),
1214            ("11655.57", "0.34"),
1215            ("11655.48", "0.4"),
1216            ("11655.26", "1.185"),
1217            ("11654.86", "0.195"),
1218        ];
1219
1220        for (i, (price, size)) in expected_bids.iter().enumerate() {
1221            assert_eq!(first.bids[i].price, Price::from(*price));
1222            assert_eq!(first.bids[i].size, Quantity::from(*size));
1223            assert_eq!(first.bids[i].side, OrderSide::Buy);
1224        }
1225
1226        // Check all 10 ask levels from snapshot25
1227        let expected_asks = vec![
1228            ("11657.08", "1.714"),
1229            ("11657.54", "5.4"),
1230            ("11657.56", "0.238"),
1231            ("11657.61", "0.077"),
1232            ("11657.92", "0.918"),
1233            ("11658.09", "1.015"),
1234            ("11658.12", "0.665"),
1235            ("11658.19", "0.583"),
1236            ("11658.28", "0.255"),
1237            ("11658.29", "0.656"),
1238        ];
1239
1240        for (i, (price, size)) in expected_asks.iter().enumerate() {
1241            assert_eq!(first.asks[i].price, Price::from(*price));
1242            assert_eq!(first.asks[i].size, Quantity::from(*size));
1243            assert_eq!(first.asks[i].side, OrderSide::Sell);
1244        }
1245
1246        // Logical checks: bid prices should strictly decrease
1247        for i in 1..10 {
1248            assert!(
1249                first.bids[i].price < first.bids[i - 1].price,
1250                "Bid price at level {} ({}) should be less than level {} ({})",
1251                i,
1252                first.bids[i].price,
1253                i - 1,
1254                first.bids[i - 1].price
1255            );
1256        }
1257
1258        // Logical checks: ask prices should strictly increase
1259        for i in 1..10 {
1260            assert!(
1261                first.asks[i].price > first.asks[i - 1].price,
1262                "Ask price at level {} ({}) should be greater than level {} ({})",
1263                i,
1264                first.asks[i].price,
1265                i - 1,
1266                first.asks[i - 1].price
1267            );
1268        }
1269
1270        // Logical check: spread should be positive
1271        assert!(
1272            first.asks[0].price > first.bids[0].price,
1273            "Best ask ({}) should be greater than best bid ({})",
1274            first.asks[0].price,
1275            first.bids[0].price
1276        );
1277
1278        // Check counts (all should be 1 for snapshot data)
1279        for i in 0..10 {
1280            assert_eq!(first.bid_counts[i], 1);
1281            assert_eq!(first.ask_counts[i], 1);
1282        }
1283
1284        // Check metadata - F_SNAPSHOT (32) | F_LAST (128) = 160
1285        assert_eq!(
1286            first.flags,
1287            RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
1288        );
1289        assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1290        assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1291        assert_eq!(first.sequence, 0);
1292    }
1293
1294    #[rstest]
1295    fn test_snapshot_csv_field_order_interleaved() {
1296        // This test verifies that the CSV structs correctly handle the interleaved
1297        // asks/bids field ordering from Tardis CSV files
1298
1299        let csv_data = "exchange,symbol,timestamp,local_timestamp,\
1300asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,\
1301asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,\
1302asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,\
1303asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,\
1304asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1305binance-futures,BTCUSDT,1000000,2000000,\
1306100.5,1.0,100.4,2.0,\
1307100.6,1.1,100.3,2.1,\
1308100.7,1.2,100.2,2.2,\
1309100.8,1.3,100.1,2.3,\
1310100.9,1.4,100.0,2.4";
1311
1312        let temp_file = std::env::temp_dir().join("test_interleaved_snapshot5.csv");
1313        std::fs::write(&temp_file, csv_data).unwrap();
1314
1315        let depths = load_depth10_from_snapshot5(&temp_file, None, None, None, Some(1)).unwrap();
1316        assert_eq!(depths.len(), 1);
1317
1318        let depth = &depths[0];
1319
1320        // Verify bids are correctly parsed (should be decreasing)
1321        assert_eq!(depth.bids[0].price, Price::from("100.4"));
1322        assert_eq!(depth.bids[1].price, Price::from("100.3"));
1323        assert_eq!(depth.bids[2].price, Price::from("100.2"));
1324        assert_eq!(depth.bids[3].price, Price::from("100.1"));
1325        assert_eq!(depth.bids[4].price, Price::from("100.0"));
1326
1327        // Verify asks are correctly parsed (should be increasing)
1328        assert_eq!(depth.asks[0].price, Price::from("100.5"));
1329        assert_eq!(depth.asks[1].price, Price::from("100.6"));
1330        assert_eq!(depth.asks[2].price, Price::from("100.7"));
1331        assert_eq!(depth.asks[3].price, Price::from("100.8"));
1332        assert_eq!(depth.asks[4].price, Price::from("100.9"));
1333
1334        // Verify sizes
1335        assert_eq!(depth.bids[0].size, Quantity::from("2.0"));
1336        assert_eq!(depth.asks[0].size, Quantity::from("1.0"));
1337
1338        std::fs::remove_file(temp_file).unwrap();
1339    }
1340
1341    #[rstest]
1342    fn test_load_deltas_limit_includes_clear_deltas() {
1343        // Test that limit counts total emitted deltas (including CLEARs)
1344        // When limit=5, we should get exactly 5 deltas: 1 CLEAR + 4 data deltas
1345        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1346binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1347binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1348binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1349binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1350binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5
1351binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50003.0,2.0
1352binance-futures,BTCUSDT,1640995205000000,1640995205100000,false,bid,49997.0,0.5";
1353
1354        let temp_file = std::env::temp_dir().join("test_load_deltas_limit.csv");
1355        std::fs::write(&temp_file, csv_data).unwrap();
1356
1357        // Load with limit=5 (should emit exactly 5 deltas including CLEAR)
1358        let deltas = load_deltas(&temp_file, Some(1), Some(1), None, Some(5)).unwrap();
1359
1360        // Should have exactly 5 deltas: 1 CLEAR + 4 data deltas
1361        assert_eq!(deltas.len(), 5);
1362        assert_eq!(deltas[0].action, BookAction::Clear);
1363        assert_eq!(deltas[1].action, BookAction::Add);
1364        assert_eq!(deltas[2].action, BookAction::Add);
1365        assert_eq!(deltas[3].action, BookAction::Update);
1366        assert_eq!(deltas[4].action, BookAction::Update);
1367
1368        // Verify the last delta is from the 4th CSV record (49999.0 bid)
1369        assert_eq!(deltas[3].order.price, parse_price(49999.0, 1));
1370
1371        std::fs::remove_file(&temp_file).ok();
1372    }
1373
1374    #[rstest]
1375    fn test_load_deltas_limit_stops_at_clear() {
1376        // Test that limit=1 with snapshot data returns only the CLEAR delta
1377        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1378binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1379binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
1380
1381        let temp_file = std::env::temp_dir().join("test_load_deltas_limit_stops_at_clear.csv");
1382        std::fs::write(&temp_file, csv_data).unwrap();
1383
1384        // Load with limit=1 should only get the CLEAR delta
1385        let deltas = load_deltas(&temp_file, Some(1), Some(1), None, Some(1)).unwrap();
1386
1387        assert_eq!(deltas.len(), 1);
1388        assert_eq!(deltas[0].action, BookAction::Clear);
1389
1390        std::fs::remove_file(&temp_file).ok();
1391    }
1392
1393    #[rstest]
1394    fn test_load_deltas_limit_with_mid_day_snapshot() {
1395        // Test limit behavior when there's a mid-day snapshot
1396        // The limit counts total emitted deltas including CLEARs
1397        let filepath = get_test_data_path("csv/deltas_with_snapshot.csv");
1398        let deltas = load_deltas(filepath, Some(1), Some(1), None, Some(5)).unwrap();
1399
1400        // With limit=5, we get exactly 5 deltas
1401        // First snapshot inserts CLEAR, then we get 4 more data deltas
1402        assert_eq!(deltas.len(), 5);
1403        assert_eq!(deltas[0].action, BookAction::Clear);
1404    }
1405
1406    // Curates the large Tardis Deribit CSV.gz into NautilusTrader Parquet format.
1407    // Run manually: `cargo test -p nautilus-tardis test_curate_deribit_deltas -- --ignored --nocapture`
1408    #[rstest]
1409    #[ignore = "one-time dataset curation, not for routine CI"]
1410    fn test_curate_deribit_deltas() {
1411        let csv_path = get_test_data_root()
1412            .join("large")
1413            .join("tardis_deribit_incremental_book_L2_2020-04-01_BTC-PERPETUAL.csv.gz");
1414
1415        let instrument_id = InstrumentId::from("BTC-PERPETUAL.DERIBIT");
1416        let parquet_path = "/tmp/tardis_BTC-PERPETUAL.DERIBIT_2020-04-01_deltas.parquet";
1417
1418        println!("Loading deltas from {}", csv_path.display());
1419        let deltas = load_deltas(&csv_path, None, None, Some(instrument_id), None).unwrap();
1420        let count = deltas.len();
1421        println!("Loaded {count} deltas");
1422
1423        let sample = deltas
1424            .iter()
1425            .find(|d| d.order.price.precision > 0)
1426            .expect("Should have at least one non-CLEAR delta");
1427        let price_precision = sample.order.price.precision;
1428        let size_precision = sample.order.size.precision;
1429        println!("Precision: price={price_precision}, size={size_precision}");
1430
1431        // Write in chunks to avoid stack overflow on large batches
1432        let metadata =
1433            OrderBookDelta::get_metadata(&instrument_id, price_precision, size_precision);
1434        let schema = OrderBookDelta::get_schema(Some(metadata.clone()));
1435
1436        println!("Writing Parquet to {parquet_path}");
1437        let file = File::create(parquet_path).unwrap();
1438        let zstd_level = parquet::basic::ZstdLevel::try_new(3).unwrap();
1439        let props = WriterProperties::builder()
1440            .set_compression(parquet::basic::Compression::ZSTD(zstd_level))
1441            .set_max_row_group_row_count(Some(1_000_000))
1442            .build();
1443        let mut writer = ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap();
1444
1445        let chunk_size = 1_000_000;
1446        for (i, chunk) in deltas.chunks(chunk_size).enumerate() {
1447            println!("  Encoding chunk {} ({} records)...", i + 1, chunk.len());
1448            let batch = OrderBookDelta::encode_batch(&metadata, chunk).unwrap();
1449            writer.write(&batch).unwrap();
1450        }
1451        writer.close().unwrap();
1452
1453        let file_size = fs::metadata(parquet_path).unwrap().len();
1454        println!("\n=== CURATION COMPLETE ===");
1455        println!("Records: {count}");
1456        println!("Price precision: {price_precision}");
1457        println!("Size precision: {size_precision}");
1458        println!(
1459            "File size: {} bytes ({:.1} MB)",
1460            file_size,
1461            file_size as f64 / 1_048_576.0
1462        );
1463        println!("Output: {parquet_path}");
1464        println!("\nNext steps:");
1465        println!("  sha256sum {parquet_path}");
1466    }
1467}