1use std::{error::Error, path::Path};
17
18use csv::StringRecord;
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21 data::{
22 DEPTH10_LEN, FundingRateUpdate, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick,
23 TradeTick,
24 },
25 enums::{OrderSide, RecordFlag},
26 identifiers::InstrumentId,
27 types::{Quantity, fixed::FIXED_PRECISION},
28};
29
30use crate::{
31 common::parse::{parse_instrument_id, parse_timestamp},
32 csv::{
33 create_book_order, create_csv_reader, infer_precision, parse_delta_record,
34 parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
35 record::{
36 TardisBookUpdateRecord, TardisDerivativeTickerRecord, TardisOrderBookSnapshot5Record,
37 TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
38 },
39 },
40};
41
42fn update_precision_if_needed(current: &mut u8, value: f64, explicit: Option<u8>) -> bool {
43 if explicit.is_some() {
44 return false;
45 }
46
47 let inferred = infer_precision(value).min(FIXED_PRECISION);
48 if inferred > *current {
49 *current = inferred;
50 true
51 } else {
52 false
53 }
54}
55
56fn update_deltas_precision(
57 deltas: &mut [OrderBookDelta],
58 price_precision: Option<u8>,
59 size_precision: Option<u8>,
60 current_price_precision: u8,
61 current_size_precision: u8,
62) {
63 for delta in deltas {
64 if price_precision.is_none() {
65 delta.order.price.precision = current_price_precision;
66 }
67
68 if size_precision.is_none() {
69 delta.order.size.precision = current_size_precision;
70 }
71 }
72}
73
74fn update_quotes_precision(
75 quotes: &mut [QuoteTick],
76 price_precision: Option<u8>,
77 size_precision: Option<u8>,
78 current_price_precision: u8,
79 current_size_precision: u8,
80) {
81 for quote in quotes {
82 if price_precision.is_none() {
83 quote.bid_price.precision = current_price_precision;
84 quote.ask_price.precision = current_price_precision;
85 }
86
87 if size_precision.is_none() {
88 quote.bid_size.precision = current_size_precision;
89 quote.ask_size.precision = current_size_precision;
90 }
91 }
92}
93
94fn update_trades_precision(
95 trades: &mut [TradeTick],
96 price_precision: Option<u8>,
97 size_precision: Option<u8>,
98 current_price_precision: u8,
99 current_size_precision: u8,
100) {
101 for trade in trades {
102 if price_precision.is_none() {
103 trade.price.precision = current_price_precision;
104 }
105
106 if size_precision.is_none() {
107 trade.size.precision = current_size_precision;
108 }
109 }
110}
111
112pub fn load_deltas<P: AsRef<Path>>(
120 filepath: P,
121 price_precision: Option<u8>,
122 size_precision: Option<u8>,
123 instrument_id: Option<InstrumentId>,
124 limit: Option<usize>,
125) -> Result<Vec<OrderBookDelta>, Box<dyn Error>> {
126 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
128 let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(estimated_capacity);
129
130 let mut current_price_precision = price_precision.unwrap_or(0);
131 let mut current_size_precision = size_precision.unwrap_or(0);
132 let mut last_ts_event = UnixNanos::default();
133 let mut last_is_snapshot = false;
134
135 let mut reader = create_csv_reader(filepath)?;
136 let mut record = StringRecord::new();
137
138 while reader.read_record(&mut record)? {
139 if let Some(limit) = limit
140 && deltas.len() >= limit
141 {
142 break;
143 }
144
145 let data: TardisBookUpdateRecord = record.deserialize(None)?;
146
147 update_precision_if_needed(&mut current_price_precision, data.price, price_precision);
148 update_precision_if_needed(&mut current_size_precision, data.amount, size_precision);
149
150 if data.is_snapshot && !last_is_snapshot {
152 let clear_instrument_id =
153 instrument_id.unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
154 let ts_event = parse_timestamp(data.timestamp);
155 let ts_init = parse_timestamp(data.local_timestamp);
156
157 if last_ts_event != ts_event
158 && let Some(last_delta) = deltas.last_mut()
159 {
160 last_delta.flags = RecordFlag::F_LAST.value();
161 }
162 last_ts_event = ts_event;
163
164 let clear_delta = OrderBookDelta::clear(clear_instrument_id, 0, ts_event, ts_init);
165 deltas.push(clear_delta);
166
167 if let Some(limit) = limit
168 && deltas.len() >= limit
169 {
170 break;
171 }
172 }
173 last_is_snapshot = data.is_snapshot;
174
175 let delta = match parse_delta_record(
176 &data,
177 current_price_precision,
178 current_size_precision,
179 instrument_id,
180 ) {
181 Ok(d) => d,
182 Err(e) => {
183 log::warn!("Skipping invalid delta record: {e}");
184 continue;
185 }
186 };
187
188 let ts_event = delta.ts_event;
189 if last_ts_event != ts_event
190 && let Some(last_delta) = deltas.last_mut()
191 {
192 last_delta.flags = RecordFlag::F_LAST.value();
193 }
194
195 last_ts_event = ts_event;
196
197 deltas.push(delta);
198 }
199
200 if let Some(last_delta) = deltas.last_mut() {
202 last_delta.flags = RecordFlag::F_LAST.value();
203 }
204
205 update_deltas_precision(
208 &mut deltas,
209 price_precision,
210 size_precision,
211 current_price_precision,
212 current_size_precision,
213 );
214
215 Ok(deltas)
216}
217
218pub fn load_depth10_from_snapshot5<P: AsRef<Path>>(
230 filepath: P,
231 price_precision: Option<u8>,
232 size_precision: Option<u8>,
233 instrument_id: Option<InstrumentId>,
234 limit: Option<usize>,
235) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
236 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
238 let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
239
240 let mut current_price_precision = price_precision.unwrap_or(0);
241 let mut current_size_precision = size_precision.unwrap_or(0);
242
243 let mut reader = create_csv_reader(filepath)?;
244 let mut record = StringRecord::new();
245
246 while reader.read_record(&mut record)? {
247 let data: TardisOrderBookSnapshot5Record = record.deserialize(None)?;
248
249 let mut precision_updated = false;
251
252 if price_precision.is_none()
253 && let Some(bid_price) = data.bids_0_price
254 {
255 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
256 if inferred_price_precision > current_price_precision {
257 current_price_precision = inferred_price_precision;
258 precision_updated = true;
259 }
260 }
261
262 if size_precision.is_none()
263 && let Some(bid_amount) = data.bids_0_amount
264 {
265 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
266 if inferred_size_precision > current_size_precision {
267 current_size_precision = inferred_size_precision;
268 precision_updated = true;
269 }
270 }
271
272 if precision_updated {
274 for depth in &mut depths {
275 for i in 0..DEPTH10_LEN {
276 if price_precision.is_none() {
277 depth.bids[i].price.precision = current_price_precision;
278 depth.asks[i].price.precision = current_price_precision;
279 }
280
281 if size_precision.is_none() {
282 depth.bids[i].size.precision = current_size_precision;
283 depth.asks[i].size.precision = current_size_precision;
284 }
285 }
286 }
287 }
288
289 let instrument_id = match &instrument_id {
290 Some(id) => *id,
291 None => parse_instrument_id(&data.exchange, data.symbol),
292 };
293 let flags = RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value();
295 let sequence = 0; let ts_event = parse_timestamp(data.timestamp);
297 let ts_init = parse_timestamp(data.local_timestamp);
298
299 let mut bids = [NULL_ORDER; DEPTH10_LEN];
301 let mut asks = [NULL_ORDER; DEPTH10_LEN];
302 let mut bid_counts = [0u32; DEPTH10_LEN];
303 let mut ask_counts = [0u32; DEPTH10_LEN];
304
305 for i in 0..=4 {
306 let (bid_order, bid_count) = create_book_order(
308 OrderSide::Buy,
309 match i {
310 0 => data.bids_0_price,
311 1 => data.bids_1_price,
312 2 => data.bids_2_price,
313 3 => data.bids_3_price,
314 4 => data.bids_4_price,
315 _ => unreachable!("i is constrained to 0..=4 by loop"),
316 },
317 match i {
318 0 => data.bids_0_amount,
319 1 => data.bids_1_amount,
320 2 => data.bids_2_amount,
321 3 => data.bids_3_amount,
322 4 => data.bids_4_amount,
323 _ => unreachable!("i is constrained to 0..=4 by loop"),
324 },
325 current_price_precision,
326 current_size_precision,
327 );
328 bids[i] = bid_order;
329 bid_counts[i] = bid_count;
330
331 let (ask_order, ask_count) = create_book_order(
333 OrderSide::Sell,
334 match i {
335 0 => data.asks_0_price,
336 1 => data.asks_1_price,
337 2 => data.asks_2_price,
338 3 => data.asks_3_price,
339 4 => data.asks_4_price,
340 _ => None, },
342 match i {
343 0 => data.asks_0_amount,
344 1 => data.asks_1_amount,
345 2 => data.asks_2_amount,
346 3 => data.asks_3_amount,
347 4 => data.asks_4_amount,
348 _ => None, },
350 current_price_precision,
351 current_size_precision,
352 );
353 asks[i] = ask_order;
354 ask_counts[i] = ask_count;
355 }
356
357 let depth = OrderBookDepth10::new(
358 instrument_id,
359 bids,
360 asks,
361 bid_counts,
362 ask_counts,
363 flags,
364 sequence,
365 ts_event,
366 ts_init,
367 );
368
369 depths.push(depth);
370
371 if let Some(limit) = limit
372 && depths.len() >= limit
373 {
374 break;
375 }
376 }
377
378 Ok(depths)
379}
380
381pub fn load_depth10_from_snapshot25<P: AsRef<Path>>(
389 filepath: P,
390 price_precision: Option<u8>,
391 size_precision: Option<u8>,
392 instrument_id: Option<InstrumentId>,
393 limit: Option<usize>,
394) -> Result<Vec<OrderBookDepth10>, Box<dyn Error>> {
395 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
397 let mut depths: Vec<OrderBookDepth10> = Vec::with_capacity(estimated_capacity);
398
399 let mut current_price_precision = price_precision.unwrap_or(0);
400 let mut current_size_precision = size_precision.unwrap_or(0);
401 let mut reader = create_csv_reader(filepath)?;
402 let mut record = StringRecord::new();
403
404 while reader.read_record(&mut record)? {
405 let data: TardisOrderBookSnapshot25Record = record.deserialize(None)?;
406
407 let mut precision_updated = false;
409
410 if price_precision.is_none()
411 && let Some(bid_price) = data.bids_0_price
412 {
413 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
414 if inferred_price_precision > current_price_precision {
415 current_price_precision = inferred_price_precision;
416 precision_updated = true;
417 }
418 }
419
420 if size_precision.is_none()
421 && let Some(bid_amount) = data.bids_0_amount
422 {
423 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
424 if inferred_size_precision > current_size_precision {
425 current_size_precision = inferred_size_precision;
426 precision_updated = true;
427 }
428 }
429
430 if precision_updated {
432 for depth in &mut depths {
433 for i in 0..DEPTH10_LEN {
434 if price_precision.is_none() {
435 depth.bids[i].price.precision = current_price_precision;
436 depth.asks[i].price.precision = current_price_precision;
437 }
438
439 if size_precision.is_none() {
440 depth.bids[i].size.precision = current_size_precision;
441 depth.asks[i].size.precision = current_size_precision;
442 }
443 }
444 }
445 }
446
447 let instrument_id = match &instrument_id {
448 Some(id) => *id,
449 None => parse_instrument_id(&data.exchange, data.symbol),
450 };
451 let flags = RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value();
453 let sequence = 0; let ts_event = parse_timestamp(data.timestamp);
455 let ts_init = parse_timestamp(data.local_timestamp);
456
457 let mut bids = [NULL_ORDER; DEPTH10_LEN];
459 let mut asks = [NULL_ORDER; DEPTH10_LEN];
460 let mut bid_counts = [0u32; DEPTH10_LEN];
461 let mut ask_counts = [0u32; DEPTH10_LEN];
462
463 for i in 0..DEPTH10_LEN {
465 let (bid_order, bid_count) = create_book_order(
467 OrderSide::Buy,
468 match i {
469 0 => data.bids_0_price,
470 1 => data.bids_1_price,
471 2 => data.bids_2_price,
472 3 => data.bids_3_price,
473 4 => data.bids_4_price,
474 5 => data.bids_5_price,
475 6 => data.bids_6_price,
476 7 => data.bids_7_price,
477 8 => data.bids_8_price,
478 9 => data.bids_9_price,
479 _ => unreachable!("i is constrained to 0..10 by loop"),
480 },
481 match i {
482 0 => data.bids_0_amount,
483 1 => data.bids_1_amount,
484 2 => data.bids_2_amount,
485 3 => data.bids_3_amount,
486 4 => data.bids_4_amount,
487 5 => data.bids_5_amount,
488 6 => data.bids_6_amount,
489 7 => data.bids_7_amount,
490 8 => data.bids_8_amount,
491 9 => data.bids_9_amount,
492 _ => unreachable!("i is constrained to 0..10 by loop"),
493 },
494 current_price_precision,
495 current_size_precision,
496 );
497 bids[i] = bid_order;
498 bid_counts[i] = bid_count;
499
500 let (ask_order, ask_count) = create_book_order(
502 OrderSide::Sell,
503 match i {
504 0 => data.asks_0_price,
505 1 => data.asks_1_price,
506 2 => data.asks_2_price,
507 3 => data.asks_3_price,
508 4 => data.asks_4_price,
509 5 => data.asks_5_price,
510 6 => data.asks_6_price,
511 7 => data.asks_7_price,
512 8 => data.asks_8_price,
513 9 => data.asks_9_price,
514 _ => unreachable!("i is constrained to 0..10 by loop"),
515 },
516 match i {
517 0 => data.asks_0_amount,
518 1 => data.asks_1_amount,
519 2 => data.asks_2_amount,
520 3 => data.asks_3_amount,
521 4 => data.asks_4_amount,
522 5 => data.asks_5_amount,
523 6 => data.asks_6_amount,
524 7 => data.asks_7_amount,
525 8 => data.asks_8_amount,
526 9 => data.asks_9_amount,
527 _ => unreachable!("i is constrained to 0..10 by loop"),
528 },
529 current_price_precision,
530 current_size_precision,
531 );
532 asks[i] = ask_order;
533 ask_counts[i] = ask_count;
534 }
535
536 let depth = OrderBookDepth10::new(
537 instrument_id,
538 bids,
539 asks,
540 bid_counts,
541 ask_counts,
542 flags,
543 sequence,
544 ts_event,
545 ts_init,
546 );
547
548 depths.push(depth);
549
550 if let Some(limit) = limit
551 && depths.len() >= limit
552 {
553 break;
554 }
555 }
556
557 Ok(depths)
558}
559
560pub fn load_quotes<P: AsRef<Path>>(
568 filepath: P,
569 price_precision: Option<u8>,
570 size_precision: Option<u8>,
571 instrument_id: Option<InstrumentId>,
572 limit: Option<usize>,
573) -> Result<Vec<QuoteTick>, Box<dyn Error>> {
574 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
576 let mut quotes: Vec<QuoteTick> = Vec::with_capacity(estimated_capacity);
577
578 let mut current_price_precision = price_precision.unwrap_or(0);
579 let mut current_size_precision = size_precision.unwrap_or(0);
580 let mut reader = create_csv_reader(filepath)?;
581 let mut record = StringRecord::new();
582
583 while reader.read_record(&mut record)? {
584 let data: TardisQuoteRecord = record.deserialize(None)?;
585
586 if price_precision.is_none()
587 && let Some(bid_price) = data.bid_price
588 {
589 let inferred_price_precision = infer_precision(bid_price).min(FIXED_PRECISION);
590 if inferred_price_precision > current_price_precision {
591 current_price_precision = inferred_price_precision;
592 }
593 }
594
595 if size_precision.is_none()
596 && let Some(bid_amount) = data.bid_amount
597 {
598 let inferred_size_precision = infer_precision(bid_amount).min(FIXED_PRECISION);
599 if inferred_size_precision > current_size_precision {
600 current_size_precision = inferred_size_precision;
601 }
602 }
603
604 let quote = parse_quote_record(
605 &data,
606 current_price_precision,
607 current_size_precision,
608 instrument_id,
609 );
610
611 quotes.push(quote);
612
613 if let Some(limit) = limit
614 && quotes.len() >= limit
615 {
616 break;
617 }
618 }
619
620 update_quotes_precision(
623 &mut quotes,
624 price_precision,
625 size_precision,
626 current_price_precision,
627 current_size_precision,
628 );
629
630 Ok(quotes)
631}
632
633pub fn load_trades<P: AsRef<Path>>(
641 filepath: P,
642 price_precision: Option<u8>,
643 size_precision: Option<u8>,
644 instrument_id: Option<InstrumentId>,
645 limit: Option<usize>,
646) -> Result<Vec<TradeTick>, Box<dyn Error>> {
647 let estimated_capacity = limit.unwrap_or(1_000_000).min(10_000_000);
649 let mut trades: Vec<TradeTick> = Vec::with_capacity(estimated_capacity);
650
651 let mut current_price_precision = price_precision.unwrap_or(0);
652 let mut current_size_precision = size_precision.unwrap_or(0);
653 let mut reader = create_csv_reader(filepath)?;
654 let mut record = StringRecord::new();
655
656 while reader.read_record(&mut record)? {
657 let data: TardisTradeRecord = record.deserialize(None)?;
658
659 if price_precision.is_none() {
660 let inferred_price_precision = infer_precision(data.price).min(FIXED_PRECISION);
661 if inferred_price_precision > current_price_precision {
662 current_price_precision = inferred_price_precision;
663 }
664 }
665
666 if size_precision.is_none() {
667 let inferred_size_precision = infer_precision(data.amount).min(FIXED_PRECISION);
668 if inferred_size_precision > current_size_precision {
669 current_size_precision = inferred_size_precision;
670 }
671 }
672
673 let size = Quantity::new_checked(data.amount, current_size_precision)?;
674
675 if size.is_positive() {
676 let trade = parse_trade_record(&data, size, current_price_precision, instrument_id);
677
678 trades.push(trade);
679
680 if let Some(limit) = limit
681 && trades.len() >= limit
682 {
683 break;
684 }
685 } else {
686 log::warn!("Skipping zero-sized trade: {data:?}");
687 }
688 }
689
690 update_trades_precision(
693 &mut trades,
694 price_precision,
695 size_precision,
696 current_price_precision,
697 current_size_precision,
698 );
699
700 Ok(trades)
701}
702
703pub fn load_funding_rates<P: AsRef<Path>>(
713 filepath: P,
714 instrument_id: Option<InstrumentId>,
715 limit: Option<usize>,
716) -> Result<Vec<FundingRateUpdate>, Box<dyn Error>> {
717 let estimated_capacity = limit.unwrap_or(100_000).min(1_000_000);
719 let mut funding_rates: Vec<FundingRateUpdate> = Vec::with_capacity(estimated_capacity);
720
721 let mut reader = create_csv_reader(filepath)?;
722 let mut record = StringRecord::new();
723
724 while reader.read_record(&mut record)? {
725 let data: TardisDerivativeTickerRecord = record.deserialize(None)?;
726
727 if let Some(funding_rate) = parse_derivative_ticker_record(&data, instrument_id) {
729 funding_rates.push(funding_rate);
730
731 if let Some(limit) = limit
732 && funding_rates.len() >= limit
733 {
734 break;
735 }
736 }
737 }
738
739 Ok(funding_rates)
740}
741
742#[cfg(test)]
743mod tests {
744 use std::{fs, fs::File, sync::Arc};
745
746 use nautilus_core::paths::get_test_data_path as get_test_data_root;
747 use nautilus_model::{
748 enums::{AggressorSide, BookAction},
749 identifiers::{InstrumentId, TradeId},
750 types::Price,
751 };
752 use nautilus_serialization::arrow::{ArrowSchemaProvider, EncodeToRecordBatch};
753 use nautilus_testkit::common::{
754 get_tardis_binance_snapshot5_path, get_tardis_binance_snapshot25_path,
755 get_tardis_bitmex_trades_path, get_tardis_deribit_book_l2_path,
756 get_tardis_huobi_quotes_path,
757 };
758 use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
759 use rstest::*;
760
761 use super::*;
762 use crate::common::{parse::parse_price, testing::get_test_data_path};
763
764 #[rstest]
765 #[case(0.0, 0)]
766 #[case(42.0, 0)]
767 #[case(0.1, 1)]
768 #[case(0.25, 2)]
769 #[case(123.0001, 4)]
770 #[case(-42.987654321, 9)]
771 #[case(1.234_567_890_123, 12)]
772 fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
773 assert_eq!(infer_precision(input), expected);
774 }
775
776 #[rstest]
777 pub fn test_dynamic_precision_inference() {
778 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
779binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
780binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
781binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
782binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
783binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
784
785 let temp_file = std::env::temp_dir().join("test_dynamic_precision.csv");
786 std::fs::write(&temp_file, csv_data).unwrap();
787
788 let deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
789
790 assert_eq!(deltas.len(), 6);
792
793 for (i, delta) in deltas.iter().skip(1).enumerate() {
795 assert_eq!(
796 delta.order.price.precision, 4,
797 "Price precision should be 4 for delta {i}",
798 );
799 assert_eq!(
800 delta.order.size.precision, 1,
801 "Size precision should be 1 for delta {i}",
802 );
803 }
804
805 assert_eq!(deltas[0].action, BookAction::Clear);
808
809 assert_eq!(deltas[1].order.price, parse_price(50000.0, 4));
810 assert_eq!(deltas[1].order.size, Quantity::new(1.0, 1));
811
812 assert_eq!(deltas[2].order.price, parse_price(49999.5, 4));
813 assert_eq!(deltas[2].order.size, Quantity::new(2.0, 1));
814
815 assert_eq!(deltas[3].order.price, parse_price(50000.12, 4));
816 assert_eq!(deltas[3].order.size, Quantity::new(1.5, 1));
817
818 assert_eq!(deltas[4].order.price, parse_price(49999.123, 4));
819 assert_eq!(deltas[4].order.size, Quantity::new(3.0, 1));
820
821 assert_eq!(deltas[5].order.price, parse_price(50000.1234, 4));
822 assert_eq!(deltas[5].order.size, Quantity::new(0.5, 1));
823
824 assert_eq!(
825 deltas[1].order.price.precision,
826 deltas[5].order.price.precision
827 );
828 assert_eq!(
829 deltas[1].order.size.precision,
830 deltas[3].order.size.precision
831 );
832
833 std::fs::remove_file(&temp_file).ok();
834 }
835
836 #[rstest]
837 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_deltas(
840 #[case] price_precision: Option<u8>,
841 #[case] size_precision: Option<u8>,
842 ) {
843 let filepath = get_tardis_deribit_book_l2_path();
844 let deltas =
845 load_deltas(filepath, price_precision, size_precision, None, Some(100)).unwrap();
846
847 assert_eq!(deltas.len(), 16);
849
850 assert_eq!(deltas[0].action, BookAction::Clear);
852
853 assert_eq!(
855 deltas[1].instrument_id,
856 InstrumentId::from("BTC-PERPETUAL.DERIBIT")
857 );
858 assert_eq!(deltas[1].action, BookAction::Add);
859 assert_eq!(deltas[1].order.side, OrderSide::Sell);
860 assert_eq!(deltas[1].order.price, Price::from("6421.5"));
861 assert_eq!(deltas[1].order.size, Quantity::from("18640"));
862 assert_eq!(deltas[1].flags, 0);
863 assert_eq!(deltas[1].sequence, 0);
864 assert_eq!(deltas[1].ts_event, 1585699200245000000);
865 assert_eq!(deltas[1].ts_init, 1585699200355684000);
866 }
867
868 #[rstest]
869 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot5(
872 #[case] price_precision: Option<u8>,
873 #[case] size_precision: Option<u8>,
874 ) {
875 let filepath = get_tardis_binance_snapshot5_path();
876 let depths =
877 load_depth10_from_snapshot5(filepath, price_precision, size_precision, None, Some(100))
878 .unwrap();
879
880 assert_eq!(depths.len(), 10);
881 assert_eq!(
882 depths[0].instrument_id,
883 InstrumentId::from("BTCUSDT.BINANCE")
884 );
885 assert_eq!(depths[0].bids.len(), 10);
886 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
887 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
888 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
889 assert_eq!(depths[0].bids[0].order_id, 0);
890 assert_eq!(depths[0].asks.len(), 10);
891 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
892 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
893 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
894 assert_eq!(depths[0].asks[0].order_id, 0);
895 assert_eq!(depths[0].bid_counts[0], 1);
896 assert_eq!(depths[0].ask_counts[0], 1);
897 assert_eq!(
899 depths[0].flags,
900 RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
901 );
902 assert_eq!(depths[0].ts_event, 1598918403696000000);
903 assert_eq!(depths[0].ts_init, 1598918403810979000);
904 assert_eq!(depths[0].sequence, 0);
905 }
906
907 #[rstest]
908 #[case(Some(2), Some(3))] #[case(None, None)] pub fn test_read_depth10s_from_snapshot25(
911 #[case] price_precision: Option<u8>,
912 #[case] size_precision: Option<u8>,
913 ) {
914 let filepath = get_tardis_binance_snapshot25_path();
915 let depths = load_depth10_from_snapshot25(
916 filepath,
917 price_precision,
918 size_precision,
919 None,
920 Some(100),
921 )
922 .unwrap();
923
924 assert_eq!(depths.len(), 10);
925 assert_eq!(
926 depths[0].instrument_id,
927 InstrumentId::from("BTCUSDT.BINANCE")
928 );
929 assert_eq!(depths[0].bids.len(), 10);
930 assert_eq!(depths[0].bids[0].price, Price::from("11657.07"));
931 assert_eq!(depths[0].bids[0].size, Quantity::from("10.896"));
932 assert_eq!(depths[0].bids[0].side, OrderSide::Buy);
933 assert_eq!(depths[0].bids[0].order_id, 0);
934 assert_eq!(depths[0].asks.len(), 10);
935 assert_eq!(depths[0].asks[0].price, Price::from("11657.08"));
936 assert_eq!(depths[0].asks[0].size, Quantity::from("1.714"));
937 assert_eq!(depths[0].asks[0].side, OrderSide::Sell);
938 assert_eq!(depths[0].asks[0].order_id, 0);
939 assert_eq!(depths[0].bid_counts[0], 1);
940 assert_eq!(depths[0].ask_counts[0], 1);
941 assert_eq!(
943 depths[0].flags,
944 RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
945 );
946 assert_eq!(depths[0].ts_event, 1598918403696000000);
947 assert_eq!(depths[0].ts_init, 1598918403810979000);
948 assert_eq!(depths[0].sequence, 0);
949 }
950
951 #[rstest]
952 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_quotes(
955 #[case] price_precision: Option<u8>,
956 #[case] size_precision: Option<u8>,
957 ) {
958 let filepath = get_tardis_huobi_quotes_path();
959 let quotes =
960 load_quotes(filepath, price_precision, size_precision, None, Some(100)).unwrap();
961
962 assert_eq!(quotes.len(), 10);
963 assert_eq!(
964 quotes[0].instrument_id,
965 InstrumentId::from("BTC-USD.HUOBI_DELIVERY")
966 );
967 assert_eq!(quotes[0].bid_price, Price::from("8629.2"));
968 assert_eq!(quotes[0].bid_size, Quantity::from("806"));
969 assert_eq!(quotes[0].ask_price, Price::from("8629.3"));
970 assert_eq!(quotes[0].ask_size, Quantity::from("5494"));
971 assert_eq!(quotes[0].ts_event, 1588291201099000000);
972 assert_eq!(quotes[0].ts_init, 1588291201234268000);
973 }
974
975 #[rstest]
976 #[case(Some(1), Some(0))] #[case(None, None)] pub fn test_read_trades(
979 #[case] price_precision: Option<u8>,
980 #[case] size_precision: Option<u8>,
981 ) {
982 let filepath = get_tardis_bitmex_trades_path();
983 let trades =
984 load_trades(filepath, price_precision, size_precision, None, Some(100)).unwrap();
985
986 assert_eq!(trades.len(), 10);
987 assert_eq!(trades[0].instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
988 assert_eq!(trades[0].price, Price::from("8531.5"));
989 assert_eq!(trades[0].size, Quantity::from("2152"));
990 assert_eq!(trades[0].aggressor_side, AggressorSide::Seller);
991 assert_eq!(
992 trades[0].trade_id,
993 TradeId::new("ccc3c1fa-212c-e8b0-1706-9b9c4f3d5ecf")
994 );
995 assert_eq!(trades[0].ts_event, 1583020803145000000);
996 assert_eq!(trades[0].ts_init, 1583020803307160000);
997 }
998
999 #[rstest]
1000 pub fn test_load_trades_derives_id_when_csv_id_empty() {
1001 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1004binance,BTCUSDT,1640995200000000,1640995200100000,,buy,50000.0,1.0
1005binance,BTCUSDT,1640995200000000,1640995200100000,,buy,50000.0,1.0
1006binance,BTCUSDT,1640995200000000,1640995200100000,,buy,50001.0,1.0";
1007
1008 let temp_file = std::env::temp_dir().join("test_load_trades_empty_id.csv");
1009 std::fs::write(&temp_file, csv_data).unwrap();
1010
1011 let trades = load_trades(&temp_file, Some(2), Some(1), None, None).unwrap();
1012 assert_eq!(trades.len(), 3);
1013
1014 assert_eq!(trades[0].trade_id, trades[1].trade_id);
1015 assert_eq!(trades[0].trade_id.as_str().len(), 16);
1016 assert_ne!(trades[0].trade_id, trades[2].trade_id);
1017
1018 std::fs::remove_file(&temp_file).ok();
1019 }
1020
1021 #[rstest]
1022 pub fn test_load_trades_with_zero_sized_trade() {
1023 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1025binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1026binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1027binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1028binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1029
1030 let temp_file = std::env::temp_dir().join("test_load_trades_zero_size.csv");
1031 std::fs::write(&temp_file, csv_data).unwrap();
1032
1033 let trades = load_trades(
1034 &temp_file,
1035 Some(4),
1036 Some(1),
1037 None,
1038 None, )
1040 .unwrap();
1041
1042 assert_eq!(trades.len(), 3);
1044
1045 assert_eq!(trades[0].size, Quantity::from("1.0"));
1047 assert_eq!(trades[1].size, Quantity::from("1.5"));
1048 assert_eq!(trades[2].size, Quantity::from("3.0"));
1049
1050 assert_eq!(trades[0].trade_id, TradeId::new("trade1"));
1052 assert_eq!(trades[1].trade_id, TradeId::new("trade3"));
1053 assert_eq!(trades[2].trade_id, TradeId::new("trade4"));
1054
1055 std::fs::remove_file(&temp_file).ok();
1056 }
1057
1058 #[rstest]
1059 pub fn test_load_trades_from_local_file() {
1060 let filepath = get_test_data_path("csv/trades_1.csv");
1061 let trades = load_trades(filepath, Some(1), Some(0), None, None).unwrap();
1062 assert_eq!(trades.len(), 2);
1063 assert_eq!(trades[0].price, Price::from("8531.5"));
1064 assert_eq!(trades[1].size, Quantity::from("1000"));
1065 }
1066
1067 #[rstest]
1068 pub fn test_load_deltas_from_local_file() {
1069 let filepath = get_test_data_path("csv/deltas_1.csv");
1070 let deltas = load_deltas(filepath, Some(1), Some(0), None, None).unwrap();
1071
1072 assert_eq!(deltas.len(), 3);
1074 assert_eq!(deltas[0].action, BookAction::Clear);
1075 assert_eq!(deltas[1].order.price, Price::from("6421.5"));
1076 assert_eq!(deltas[2].order.size, Quantity::from("10000"));
1077 }
1078
1079 #[rstest]
1080 fn test_load_depth10_from_snapshot5_comprehensive() {
1081 let filepath = get_tardis_binance_snapshot5_path();
1082 let depths = load_depth10_from_snapshot5(&filepath, None, None, None, Some(100)).unwrap();
1083
1084 assert_eq!(depths.len(), 10);
1085
1086 let first = &depths[0];
1087 assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1088 assert_eq!(first.bids.len(), 10);
1089 assert_eq!(first.asks.len(), 10);
1090
1091 assert_eq!(first.bids[0].price, Price::from("11657.07"));
1093 assert_eq!(first.bids[0].size, Quantity::from("10.896"));
1094 assert_eq!(first.bids[0].side, OrderSide::Buy);
1095
1096 assert_eq!(first.bids[1].price, Price::from("11656.97"));
1097 assert_eq!(first.bids[1].size, Quantity::from("0.2"));
1098 assert_eq!(first.bids[1].side, OrderSide::Buy);
1099
1100 assert_eq!(first.bids[2].price, Price::from("11655.78"));
1101 assert_eq!(first.bids[2].size, Quantity::from("0.2"));
1102 assert_eq!(first.bids[2].side, OrderSide::Buy);
1103
1104 assert_eq!(first.bids[3].price, Price::from("11655.77"));
1105 assert_eq!(first.bids[3].size, Quantity::from("0.98"));
1106 assert_eq!(first.bids[3].side, OrderSide::Buy);
1107
1108 assert_eq!(first.bids[4].price, Price::from("11655.68"));
1109 assert_eq!(first.bids[4].size, Quantity::from("0.111"));
1110 assert_eq!(first.bids[4].side, OrderSide::Buy);
1111
1112 for i in 5..10 {
1114 assert_eq!(first.bids[i].price.raw, 0);
1115 assert_eq!(first.bids[i].size.raw, 0);
1116 assert_eq!(first.bids[i].side, OrderSide::NoOrderSide);
1117 }
1118
1119 assert_eq!(first.asks[0].price, Price::from("11657.08"));
1121 assert_eq!(first.asks[0].size, Quantity::from("1.714"));
1122 assert_eq!(first.asks[0].side, OrderSide::Sell);
1123
1124 assert_eq!(first.asks[1].price, Price::from("11657.54"));
1125 assert_eq!(first.asks[1].size, Quantity::from("5.4"));
1126 assert_eq!(first.asks[1].side, OrderSide::Sell);
1127
1128 assert_eq!(first.asks[2].price, Price::from("11657.56"));
1129 assert_eq!(first.asks[2].size, Quantity::from("0.238"));
1130 assert_eq!(first.asks[2].side, OrderSide::Sell);
1131
1132 assert_eq!(first.asks[3].price, Price::from("11657.61"));
1133 assert_eq!(first.asks[3].size, Quantity::from("0.077"));
1134 assert_eq!(first.asks[3].side, OrderSide::Sell);
1135
1136 assert_eq!(first.asks[4].price, Price::from("11657.92"));
1137 assert_eq!(first.asks[4].size, Quantity::from("0.918"));
1138 assert_eq!(first.asks[4].side, OrderSide::Sell);
1139
1140 for i in 5..10 {
1142 assert_eq!(first.asks[i].price.raw, 0);
1143 assert_eq!(first.asks[i].size.raw, 0);
1144 assert_eq!(first.asks[i].side, OrderSide::NoOrderSide);
1145 }
1146
1147 for i in 1..5 {
1149 assert!(
1150 first.bids[i].price < first.bids[i - 1].price,
1151 "Bid price at level {} should be less than level {}",
1152 i,
1153 i - 1
1154 );
1155 }
1156
1157 for i in 1..5 {
1159 assert!(
1160 first.asks[i].price > first.asks[i - 1].price,
1161 "Ask price at level {} should be greater than level {}",
1162 i,
1163 i - 1
1164 );
1165 }
1166
1167 assert!(
1169 first.asks[0].price > first.bids[0].price,
1170 "Best ask should be greater than best bid"
1171 );
1172
1173 for i in 0..5 {
1175 assert_eq!(first.bid_counts[i], 1);
1176 assert_eq!(first.ask_counts[i], 1);
1177 }
1178
1179 for i in 5..10 {
1180 assert_eq!(first.bid_counts[i], 0);
1181 assert_eq!(first.ask_counts[i], 0);
1182 }
1183
1184 assert_eq!(
1186 first.flags,
1187 RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
1188 );
1189 assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1190 assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1191 assert_eq!(first.sequence, 0);
1192 }
1193
1194 #[rstest]
1195 fn test_load_depth10_from_snapshot25_comprehensive() {
1196 let filepath = get_tardis_binance_snapshot25_path();
1197 let depths = load_depth10_from_snapshot25(&filepath, None, None, None, Some(100)).unwrap();
1198
1199 assert_eq!(depths.len(), 10);
1200
1201 let first = &depths[0];
1202 assert_eq!(first.instrument_id.to_string(), "BTCUSDT.BINANCE");
1203 assert_eq!(first.bids.len(), 10);
1204 assert_eq!(first.asks.len(), 10);
1205
1206 let expected_bids = vec![
1208 ("11657.07", "10.896"),
1209 ("11656.97", "0.2"),
1210 ("11655.78", "0.2"),
1211 ("11655.77", "0.98"),
1212 ("11655.68", "0.111"),
1213 ("11655.66", "0.077"),
1214 ("11655.57", "0.34"),
1215 ("11655.48", "0.4"),
1216 ("11655.26", "1.185"),
1217 ("11654.86", "0.195"),
1218 ];
1219
1220 for (i, (price, size)) in expected_bids.iter().enumerate() {
1221 assert_eq!(first.bids[i].price, Price::from(*price));
1222 assert_eq!(first.bids[i].size, Quantity::from(*size));
1223 assert_eq!(first.bids[i].side, OrderSide::Buy);
1224 }
1225
1226 let expected_asks = vec![
1228 ("11657.08", "1.714"),
1229 ("11657.54", "5.4"),
1230 ("11657.56", "0.238"),
1231 ("11657.61", "0.077"),
1232 ("11657.92", "0.918"),
1233 ("11658.09", "1.015"),
1234 ("11658.12", "0.665"),
1235 ("11658.19", "0.583"),
1236 ("11658.28", "0.255"),
1237 ("11658.29", "0.656"),
1238 ];
1239
1240 for (i, (price, size)) in expected_asks.iter().enumerate() {
1241 assert_eq!(first.asks[i].price, Price::from(*price));
1242 assert_eq!(first.asks[i].size, Quantity::from(*size));
1243 assert_eq!(first.asks[i].side, OrderSide::Sell);
1244 }
1245
1246 for i in 1..10 {
1248 assert!(
1249 first.bids[i].price < first.bids[i - 1].price,
1250 "Bid price at level {} ({}) should be less than level {} ({})",
1251 i,
1252 first.bids[i].price,
1253 i - 1,
1254 first.bids[i - 1].price
1255 );
1256 }
1257
1258 for i in 1..10 {
1260 assert!(
1261 first.asks[i].price > first.asks[i - 1].price,
1262 "Ask price at level {} ({}) should be greater than level {} ({})",
1263 i,
1264 first.asks[i].price,
1265 i - 1,
1266 first.asks[i - 1].price
1267 );
1268 }
1269
1270 assert!(
1272 first.asks[0].price > first.bids[0].price,
1273 "Best ask ({}) should be greater than best bid ({})",
1274 first.asks[0].price,
1275 first.bids[0].price
1276 );
1277
1278 for i in 0..10 {
1280 assert_eq!(first.bid_counts[i], 1);
1281 assert_eq!(first.ask_counts[i], 1);
1282 }
1283
1284 assert_eq!(
1286 first.flags,
1287 RecordFlag::F_SNAPSHOT.value() | RecordFlag::F_LAST.value()
1288 );
1289 assert_eq!(first.ts_event.as_u64(), 1598918403696000000);
1290 assert_eq!(first.ts_init.as_u64(), 1598918403810979000);
1291 assert_eq!(first.sequence, 0);
1292 }
1293
1294 #[rstest]
1295 fn test_snapshot_csv_field_order_interleaved() {
1296 let csv_data = "exchange,symbol,timestamp,local_timestamp,\
1300asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,\
1301asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,\
1302asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,\
1303asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,\
1304asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1305binance-futures,BTCUSDT,1000000,2000000,\
1306100.5,1.0,100.4,2.0,\
1307100.6,1.1,100.3,2.1,\
1308100.7,1.2,100.2,2.2,\
1309100.8,1.3,100.1,2.3,\
1310100.9,1.4,100.0,2.4";
1311
1312 let temp_file = std::env::temp_dir().join("test_interleaved_snapshot5.csv");
1313 std::fs::write(&temp_file, csv_data).unwrap();
1314
1315 let depths = load_depth10_from_snapshot5(&temp_file, None, None, None, Some(1)).unwrap();
1316 assert_eq!(depths.len(), 1);
1317
1318 let depth = &depths[0];
1319
1320 assert_eq!(depth.bids[0].price, Price::from("100.4"));
1322 assert_eq!(depth.bids[1].price, Price::from("100.3"));
1323 assert_eq!(depth.bids[2].price, Price::from("100.2"));
1324 assert_eq!(depth.bids[3].price, Price::from("100.1"));
1325 assert_eq!(depth.bids[4].price, Price::from("100.0"));
1326
1327 assert_eq!(depth.asks[0].price, Price::from("100.5"));
1329 assert_eq!(depth.asks[1].price, Price::from("100.6"));
1330 assert_eq!(depth.asks[2].price, Price::from("100.7"));
1331 assert_eq!(depth.asks[3].price, Price::from("100.8"));
1332 assert_eq!(depth.asks[4].price, Price::from("100.9"));
1333
1334 assert_eq!(depth.bids[0].size, Quantity::from("2.0"));
1336 assert_eq!(depth.asks[0].size, Quantity::from("1.0"));
1337
1338 std::fs::remove_file(temp_file).unwrap();
1339 }
1340
1341 #[rstest]
1342 fn test_load_deltas_limit_includes_clear_deltas() {
1343 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1346binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1347binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1348binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1349binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1350binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5
1351binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50003.0,2.0
1352binance-futures,BTCUSDT,1640995205000000,1640995205100000,false,bid,49997.0,0.5";
1353
1354 let temp_file = std::env::temp_dir().join("test_load_deltas_limit.csv");
1355 std::fs::write(&temp_file, csv_data).unwrap();
1356
1357 let deltas = load_deltas(&temp_file, Some(1), Some(1), None, Some(5)).unwrap();
1359
1360 assert_eq!(deltas.len(), 5);
1362 assert_eq!(deltas[0].action, BookAction::Clear);
1363 assert_eq!(deltas[1].action, BookAction::Add);
1364 assert_eq!(deltas[2].action, BookAction::Add);
1365 assert_eq!(deltas[3].action, BookAction::Update);
1366 assert_eq!(deltas[4].action, BookAction::Update);
1367
1368 assert_eq!(deltas[3].order.price, parse_price(49999.0, 1));
1370
1371 std::fs::remove_file(&temp_file).ok();
1372 }
1373
1374 #[rstest]
1375 fn test_load_deltas_limit_stops_at_clear() {
1376 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1378binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1379binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
1380
1381 let temp_file = std::env::temp_dir().join("test_load_deltas_limit_stops_at_clear.csv");
1382 std::fs::write(&temp_file, csv_data).unwrap();
1383
1384 let deltas = load_deltas(&temp_file, Some(1), Some(1), None, Some(1)).unwrap();
1386
1387 assert_eq!(deltas.len(), 1);
1388 assert_eq!(deltas[0].action, BookAction::Clear);
1389
1390 std::fs::remove_file(&temp_file).ok();
1391 }
1392
1393 #[rstest]
1394 fn test_load_deltas_limit_with_mid_day_snapshot() {
1395 let filepath = get_test_data_path("csv/deltas_with_snapshot.csv");
1398 let deltas = load_deltas(filepath, Some(1), Some(1), None, Some(5)).unwrap();
1399
1400 assert_eq!(deltas.len(), 5);
1403 assert_eq!(deltas[0].action, BookAction::Clear);
1404 }
1405
1406 #[rstest]
1409 #[ignore = "one-time dataset curation, not for routine CI"]
1410 fn test_curate_deribit_deltas() {
1411 let csv_path = get_test_data_root()
1412 .join("large")
1413 .join("tardis_deribit_incremental_book_L2_2020-04-01_BTC-PERPETUAL.csv.gz");
1414
1415 let instrument_id = InstrumentId::from("BTC-PERPETUAL.DERIBIT");
1416 let parquet_path = "/tmp/tardis_BTC-PERPETUAL.DERIBIT_2020-04-01_deltas.parquet";
1417
1418 println!("Loading deltas from {}", csv_path.display());
1419 let deltas = load_deltas(&csv_path, None, None, Some(instrument_id), None).unwrap();
1420 let count = deltas.len();
1421 println!("Loaded {count} deltas");
1422
1423 let sample = deltas
1424 .iter()
1425 .find(|d| d.order.price.precision > 0)
1426 .expect("Should have at least one non-CLEAR delta");
1427 let price_precision = sample.order.price.precision;
1428 let size_precision = sample.order.size.precision;
1429 println!("Precision: price={price_precision}, size={size_precision}");
1430
1431 let metadata =
1433 OrderBookDelta::get_metadata(&instrument_id, price_precision, size_precision);
1434 let schema = OrderBookDelta::get_schema(Some(metadata.clone()));
1435
1436 println!("Writing Parquet to {parquet_path}");
1437 let file = File::create(parquet_path).unwrap();
1438 let zstd_level = parquet::basic::ZstdLevel::try_new(3).unwrap();
1439 let props = WriterProperties::builder()
1440 .set_compression(parquet::basic::Compression::ZSTD(zstd_level))
1441 .set_max_row_group_row_count(Some(1_000_000))
1442 .build();
1443 let mut writer = ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap();
1444
1445 let chunk_size = 1_000_000;
1446 for (i, chunk) in deltas.chunks(chunk_size).enumerate() {
1447 println!(" Encoding chunk {} ({} records)...", i + 1, chunk.len());
1448 let batch = OrderBookDelta::encode_batch(&metadata, chunk).unwrap();
1449 writer.write(&batch).unwrap();
1450 }
1451 writer.close().unwrap();
1452
1453 let file_size = fs::metadata(parquet_path).unwrap().len();
1454 println!("\n=== CURATION COMPLETE ===");
1455 println!("Records: {count}");
1456 println!("Price precision: {price_precision}");
1457 println!("Size precision: {size_precision}");
1458 println!(
1459 "File size: {} bytes ({:.1} MB)",
1460 file_size,
1461 file_size as f64 / 1_048_576.0
1462 );
1463 println!("Output: {parquet_path}");
1464 println!("\nNext steps:");
1465 println!(" sha256sum {parquet_path}");
1466 }
1467}