1use std::{io::Read, path::Path};
17
18use csv::{Reader, StringRecord};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21 data::{DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
22 enums::{OrderSide, RecordFlag},
23 identifiers::InstrumentId,
24 types::Quantity,
25};
26#[cfg(feature = "python")]
27use nautilus_model::{
28 data::{Data, OrderBookDeltas, OrderBookDeltas_API},
29 python::data::data_to_pycapsule,
30};
31#[cfg(feature = "python")]
32use pyo3::{Py, PyAny, Python};
33
34use crate::{
35 common::parse::{parse_instrument_id, parse_timestamp},
36 csv::{
37 create_book_order, create_csv_reader, infer_precision, parse_delta_record,
38 parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
39 record::{
40 TardisBookUpdateRecord, TardisOrderBookSnapshot5Record,
41 TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
42 },
43 },
44};
45
46struct DeltaStreamIterator {
52 reader: Reader<Box<dyn std::io::Read>>,
53 record: StringRecord,
54 buffer: Vec<OrderBookDelta>,
55 chunk_size: usize,
56 instrument_id: Option<InstrumentId>,
57 price_precision: u8,
58 size_precision: u8,
59 last_ts_event: UnixNanos,
60 last_is_snapshot: bool,
61 limit: Option<usize>,
62 deltas_emitted: usize,
63
64 pending_record: Option<TardisBookUpdateRecord>,
66}
67
68impl DeltaStreamIterator {
69 fn new<P: AsRef<Path>>(
75 filepath: P,
76 chunk_size: usize,
77 price_precision: Option<u8>,
78 size_precision: Option<u8>,
79 instrument_id: Option<InstrumentId>,
80 limit: Option<usize>,
81 ) -> anyhow::Result<Self> {
82 let (final_price_precision, final_size_precision) =
83 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
84 (price_prec, size_prec)
86 } else {
87 let mut reader = create_csv_reader(&filepath)?;
89 let mut record = StringRecord::new();
90 let (detected_price, detected_size) =
91 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000);
92 (
93 price_precision.unwrap_or(detected_price),
94 size_precision.unwrap_or(detected_size),
95 )
96 };
97
98 let reader = create_csv_reader(filepath)?;
99
100 Ok(Self {
101 reader,
102 record: StringRecord::new(),
103 buffer: Vec::with_capacity(chunk_size),
104 chunk_size,
105 instrument_id,
106 price_precision: final_price_precision,
107 size_precision: final_size_precision,
108 last_ts_event: UnixNanos::default(),
109 last_is_snapshot: false,
110 limit,
111 deltas_emitted: 0,
112 pending_record: None,
113 })
114 }
115
116 fn detect_precision_from_sample(
117 reader: &mut Reader<Box<dyn std::io::Read>>,
118 record: &mut StringRecord,
119 sample_size: usize,
120 ) -> (u8, u8) {
121 let mut max_price_precision = 0u8;
122 let mut max_size_precision = 0u8;
123 let mut records_scanned = 0;
124
125 while records_scanned < sample_size {
126 match reader.read_record(record) {
127 Ok(true) => {
128 if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
129 max_price_precision = max_price_precision.max(infer_precision(data.price));
130 max_size_precision = max_size_precision.max(infer_precision(data.amount));
131 records_scanned += 1;
132 }
133 }
134 Ok(false) => break, Err(_) => records_scanned += 1, }
137 }
138
139 (max_price_precision, max_size_precision)
140 }
141}
142
143impl Iterator for DeltaStreamIterator {
144 type Item = anyhow::Result<Vec<OrderBookDelta>>;
145
146 fn next(&mut self) -> Option<Self::Item> {
147 if let Some(limit) = self.limit
148 && self.deltas_emitted >= limit
149 {
150 return None;
151 }
152
153 self.buffer.clear();
154
155 loop {
156 if self.buffer.len() >= self.chunk_size {
157 break;
158 }
159
160 if let Some(limit) = self.limit
161 && self.deltas_emitted >= limit
162 {
163 break;
164 }
165
166 let data = if let Some(pending) = self.pending_record.take() {
168 pending
169 } else {
170 match self.reader.read_record(&mut self.record) {
171 Ok(true) => match self.record.deserialize::<TardisBookUpdateRecord>(None) {
172 Ok(data) => data,
173 Err(e) => {
174 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
175 }
176 },
177 Ok(false) => {
178 if self.buffer.is_empty() {
179 return None;
180 }
181
182 if let Some(last_delta) = self.buffer.last_mut() {
183 last_delta.flags = RecordFlag::F_LAST.value();
184 }
185 return Some(Ok(self.buffer.clone()));
186 }
187 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
188 }
189 };
190
191 if data.is_snapshot && !self.last_is_snapshot {
193 let clear_instrument_id = self
194 .instrument_id
195 .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
196 let ts_event = parse_timestamp(data.timestamp);
197 let ts_init = parse_timestamp(data.local_timestamp);
198
199 if self.last_ts_event != ts_event
200 && let Some(last_delta) = self.buffer.last_mut()
201 {
202 last_delta.flags = RecordFlag::F_LAST.value();
203 }
204 self.last_ts_event = ts_event;
205
206 let clear_delta = OrderBookDelta::clear(clear_instrument_id, 0, ts_event, ts_init);
207 self.buffer.push(clear_delta);
208 self.deltas_emitted += 1;
209
210 if self.buffer.len() >= self.chunk_size
212 || self.limit.is_some_and(|l| self.deltas_emitted >= l)
213 {
214 self.last_is_snapshot = data.is_snapshot;
215 self.pending_record = Some(data);
216 break;
217 }
218 }
219 self.last_is_snapshot = data.is_snapshot;
220
221 let delta = match parse_delta_record(
222 &data,
223 self.price_precision,
224 self.size_precision,
225 self.instrument_id,
226 ) {
227 Ok(d) => d,
228 Err(e) => {
229 log::warn!("Skipping invalid delta record: {e}");
230 continue;
231 }
232 };
233
234 if self.last_ts_event != delta.ts_event
235 && let Some(last_delta) = self.buffer.last_mut()
236 {
237 last_delta.flags = RecordFlag::F_LAST.value();
238 }
239
240 self.last_ts_event = delta.ts_event;
241
242 self.buffer.push(delta);
243 self.deltas_emitted += 1;
244 }
245
246 if self.buffer.is_empty() {
247 None
248 } else {
249 if let Some(limit) = self.limit
252 && self.deltas_emitted >= limit
253 && let Some(last_delta) = self.buffer.last_mut()
254 {
255 last_delta.flags = RecordFlag::F_LAST.value();
256 }
257 Some(Ok(self.buffer.clone()))
258 }
259 }
260}
261
262pub fn stream_deltas<P: AsRef<Path>>(
277 filepath: P,
278 chunk_size: usize,
279 price_precision: Option<u8>,
280 size_precision: Option<u8>,
281 instrument_id: Option<InstrumentId>,
282 limit: Option<usize>,
283) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDelta>>>> {
284 DeltaStreamIterator::new(
285 filepath,
286 chunk_size,
287 price_precision,
288 size_precision,
289 instrument_id,
290 limit,
291 )
292}
293
294#[cfg(feature = "python")]
299struct BatchedDeltasStreamIterator {
301 reader: Reader<Box<dyn std::io::Read>>,
302 record: StringRecord,
303 buffer: Vec<Py<PyAny>>,
304 current_batch: Vec<OrderBookDelta>,
305 pending_batches: Vec<Vec<OrderBookDelta>>,
306 chunk_size: usize,
307 instrument_id: InstrumentId,
308 price_precision: u8,
309 size_precision: u8,
310 last_ts_event: UnixNanos,
311 last_is_snapshot: bool,
312 limit: Option<usize>,
313 deltas_emitted: usize,
314}
315
316#[cfg(feature = "python")]
317impl BatchedDeltasStreamIterator {
318 fn new<P: AsRef<Path>>(
324 filepath: P,
325 chunk_size: usize,
326 price_precision: Option<u8>,
327 size_precision: Option<u8>,
328 instrument_id: Option<InstrumentId>,
329 limit: Option<usize>,
330 ) -> anyhow::Result<Self> {
331 let mut reader = create_csv_reader(&filepath)?;
332 let mut record = StringRecord::new();
333
334 let first_record = if reader.read_record(&mut record)? {
335 record.deserialize::<TardisBookUpdateRecord>(None)?
336 } else {
337 anyhow::bail!("CSV file is empty");
338 };
339
340 let final_instrument_id = instrument_id
341 .unwrap_or_else(|| parse_instrument_id(&first_record.exchange, first_record.symbol));
342
343 let (final_price_precision, final_size_precision) =
344 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
345 (price_prec, size_prec)
347 } else {
348 let (detected_price, detected_size) =
350 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000);
351 (
352 price_precision.unwrap_or(detected_price),
353 size_precision.unwrap_or(detected_size),
354 )
355 };
356
357 let reader = create_csv_reader(filepath)?;
358
359 Ok(Self {
360 reader,
361 record: StringRecord::new(),
362 buffer: Vec::with_capacity(chunk_size),
363 current_batch: Vec::new(),
364 pending_batches: Vec::with_capacity(chunk_size),
365 chunk_size,
366 instrument_id: final_instrument_id,
367 price_precision: final_price_precision,
368 size_precision: final_size_precision,
369 last_ts_event: UnixNanos::default(),
370 last_is_snapshot: false,
371 limit,
372 deltas_emitted: 0,
373 })
374 }
375
376 fn detect_precision_from_sample(
377 reader: &mut Reader<Box<dyn std::io::Read>>,
378 record: &mut StringRecord,
379 sample_size: usize,
380 ) -> (u8, u8) {
381 let mut max_price_precision = 0u8;
382 let mut max_size_precision = 0u8;
383 let mut records_scanned = 0;
384
385 while records_scanned < sample_size {
386 match reader.read_record(record) {
387 Ok(true) => {
388 if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
389 max_price_precision = max_price_precision.max(infer_precision(data.price));
390 max_size_precision = max_size_precision.max(infer_precision(data.amount));
391 records_scanned += 1;
392 }
393 }
394 Ok(false) => break, Err(_) => records_scanned += 1, }
397 }
398
399 (max_price_precision, max_size_precision)
400 }
401
402 fn fill_pending_batches(&mut self) -> Option<anyhow::Result<()>> {
403 self.pending_batches.clear();
404 let mut batches_created = 0;
405
406 while batches_created < self.chunk_size {
407 if let Some(limit) = self.limit
408 && self.deltas_emitted >= limit
409 {
410 break;
411 }
412
413 match self.reader.read_record(&mut self.record) {
414 Ok(true) => {
415 let data = match self.record.deserialize::<TardisBookUpdateRecord>(None) {
416 Ok(data) => data,
417 Err(e) => {
418 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
419 }
420 };
421
422 let ts_event = parse_timestamp(data.timestamp);
423 let ts_init = parse_timestamp(data.local_timestamp);
424
425 let delta = match parse_delta_record(
428 &data,
429 self.price_precision,
430 self.size_precision,
431 Some(self.instrument_id),
432 ) {
433 Ok(d) => d,
434 Err(e) => {
435 log::warn!("Skipping invalid delta record: {e}");
436 continue;
437 }
438 };
439
440 if self.last_ts_event != ts_event && !self.current_batch.is_empty() {
441 if let Some(last_delta) = self.current_batch.last_mut() {
443 last_delta.flags = RecordFlag::F_LAST.value();
444 }
445 self.pending_batches
446 .push(std::mem::take(&mut self.current_batch));
447 batches_created += 1;
448 }
449
450 self.last_ts_event = ts_event;
451
452 if data.is_snapshot && !self.last_is_snapshot {
454 let clear_delta =
455 OrderBookDelta::clear(self.instrument_id, 0, ts_event, ts_init);
456 self.current_batch.push(clear_delta);
457 self.deltas_emitted += 1;
458
459 if let Some(limit) = self.limit
460 && self.deltas_emitted >= limit
461 {
462 self.last_is_snapshot = data.is_snapshot;
463 break;
464 }
465 }
466 self.last_is_snapshot = data.is_snapshot;
467
468 self.current_batch.push(delta);
469 self.deltas_emitted += 1;
470
471 if let Some(limit) = self.limit
472 && self.deltas_emitted >= limit
473 {
474 break;
475 }
476 }
477 Ok(false) => {
478 break;
480 }
481 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
482 }
483 }
484
485 if !self.current_batch.is_empty() && batches_created < self.chunk_size {
486 if let Some(last_delta) = self.current_batch.last_mut() {
488 last_delta.flags = RecordFlag::F_LAST.value();
489 }
490 self.pending_batches
491 .push(std::mem::take(&mut self.current_batch));
492 }
493
494 if self.pending_batches.is_empty() {
495 None
496 } else {
497 Some(Ok(()))
498 }
499 }
500}
501
502#[cfg(feature = "python")]
503impl Iterator for BatchedDeltasStreamIterator {
504 type Item = anyhow::Result<Vec<Py<PyAny>>>;
505
506 fn next(&mut self) -> Option<Self::Item> {
507 if let Some(limit) = self.limit
508 && self.deltas_emitted >= limit
509 {
510 return None;
511 }
512
513 self.buffer.clear();
514
515 if let Some(Err(e)) = self.fill_pending_batches() {
516 return Some(Err(e));
517 }
518
519 if self.pending_batches.is_empty() {
520 None
521 } else {
522 Python::attach(|py| {
524 for batch in self.pending_batches.drain(..) {
525 let deltas = OrderBookDeltas::new(self.instrument_id, batch);
526 let deltas = OrderBookDeltas_API::new(deltas);
527 let capsule = data_to_pycapsule(py, Data::Deltas(deltas));
528 self.buffer.push(capsule);
529 }
530 });
531 Some(Ok(std::mem::take(&mut self.buffer)))
532 }
533 }
534}
535
536#[cfg(feature = "python")]
537pub fn stream_batched_deltas<P: AsRef<Path>>(
544 filepath: P,
545 chunk_size: usize,
546 price_precision: Option<u8>,
547 size_precision: Option<u8>,
548 instrument_id: Option<InstrumentId>,
549 limit: Option<usize>,
550) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<Py<PyAny>>>>> {
551 BatchedDeltasStreamIterator::new(
552 filepath,
553 chunk_size,
554 price_precision,
555 size_precision,
556 instrument_id,
557 limit,
558 )
559}
560
561struct QuoteStreamIterator {
567 reader: Reader<Box<dyn Read>>,
568 record: StringRecord,
569 buffer: Vec<QuoteTick>,
570 chunk_size: usize,
571 instrument_id: Option<InstrumentId>,
572 price_precision: u8,
573 size_precision: u8,
574 limit: Option<usize>,
575 records_processed: usize,
576}
577
578impl QuoteStreamIterator {
579 pub fn new<P: AsRef<Path>>(
585 filepath: P,
586 chunk_size: usize,
587 price_precision: Option<u8>,
588 size_precision: Option<u8>,
589 instrument_id: Option<InstrumentId>,
590 limit: Option<usize>,
591 ) -> anyhow::Result<Self> {
592 let (final_price_precision, final_size_precision) =
593 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
594 (price_prec, size_prec)
596 } else {
597 let mut reader = create_csv_reader(&filepath)?;
599 let mut record = StringRecord::new();
600 let (detected_price, detected_size) =
601 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000);
602 (
603 price_precision.unwrap_or(detected_price),
604 size_precision.unwrap_or(detected_size),
605 )
606 };
607
608 let reader = create_csv_reader(filepath)?;
609
610 Ok(Self {
611 reader,
612 record: StringRecord::new(),
613 buffer: Vec::with_capacity(chunk_size),
614 chunk_size,
615 instrument_id,
616 price_precision: final_price_precision,
617 size_precision: final_size_precision,
618 limit,
619 records_processed: 0,
620 })
621 }
622
623 fn detect_precision_from_sample(
624 reader: &mut Reader<Box<dyn std::io::Read>>,
625 record: &mut StringRecord,
626 sample_size: usize,
627 ) -> (u8, u8) {
628 let mut max_price_precision = 2u8;
629 let mut max_size_precision = 0u8;
630 let mut records_scanned = 0;
631
632 while records_scanned < sample_size {
633 match reader.read_record(record) {
634 Ok(true) => {
635 if let Ok(data) = record.deserialize::<TardisQuoteRecord>(None) {
636 if let Some(bid_price_val) = data.bid_price {
637 max_price_precision =
638 max_price_precision.max(infer_precision(bid_price_val));
639 }
640
641 if let Some(ask_price_val) = data.ask_price {
642 max_price_precision =
643 max_price_precision.max(infer_precision(ask_price_val));
644 }
645
646 if let Some(bid_amount_val) = data.bid_amount {
647 max_size_precision =
648 max_size_precision.max(infer_precision(bid_amount_val));
649 }
650
651 if let Some(ask_amount_val) = data.ask_amount {
652 max_size_precision =
653 max_size_precision.max(infer_precision(ask_amount_val));
654 }
655 records_scanned += 1;
656 }
657 }
658 Ok(false) => break, Err(_) => records_scanned += 1, }
661 }
662
663 (max_price_precision, max_size_precision)
664 }
665}
666
667impl Iterator for QuoteStreamIterator {
668 type Item = anyhow::Result<Vec<QuoteTick>>;
669
670 fn next(&mut self) -> Option<Self::Item> {
671 if let Some(limit) = self.limit
672 && self.records_processed >= limit
673 {
674 return None;
675 }
676
677 self.buffer.clear();
678 let mut records_read = 0;
679
680 while records_read < self.chunk_size {
681 match self.reader.read_record(&mut self.record) {
682 Ok(true) => match self.record.deserialize::<TardisQuoteRecord>(None) {
683 Ok(data) => {
684 let quote = parse_quote_record(
685 &data,
686 self.price_precision,
687 self.size_precision,
688 self.instrument_id,
689 );
690
691 self.buffer.push(quote);
692 records_read += 1;
693 self.records_processed += 1;
694
695 if let Some(limit) = self.limit
696 && self.records_processed >= limit
697 {
698 break;
699 }
700 }
701 Err(e) => {
702 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
703 }
704 },
705 Ok(false) => {
706 if self.buffer.is_empty() {
707 return None;
708 }
709 return Some(Ok(self.buffer.clone()));
710 }
711 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
712 }
713 }
714
715 if self.buffer.is_empty() {
716 None
717 } else {
718 Some(Ok(self.buffer.clone()))
719 }
720 }
721}
722
723pub fn stream_quotes<P: AsRef<Path>>(
738 filepath: P,
739 chunk_size: usize,
740 price_precision: Option<u8>,
741 size_precision: Option<u8>,
742 instrument_id: Option<InstrumentId>,
743 limit: Option<usize>,
744) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<QuoteTick>>>> {
745 QuoteStreamIterator::new(
746 filepath,
747 chunk_size,
748 price_precision,
749 size_precision,
750 instrument_id,
751 limit,
752 )
753}
754
755struct TradeStreamIterator {
761 reader: Reader<Box<dyn Read>>,
762 record: StringRecord,
763 buffer: Vec<TradeTick>,
764 chunk_size: usize,
765 instrument_id: Option<InstrumentId>,
766 price_precision: u8,
767 size_precision: u8,
768 limit: Option<usize>,
769 records_processed: usize,
770}
771
772impl TradeStreamIterator {
773 pub fn new<P: AsRef<Path>>(
779 filepath: P,
780 chunk_size: usize,
781 price_precision: Option<u8>,
782 size_precision: Option<u8>,
783 instrument_id: Option<InstrumentId>,
784 limit: Option<usize>,
785 ) -> anyhow::Result<Self> {
786 let (final_price_precision, final_size_precision) =
787 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
788 (price_prec, size_prec)
790 } else {
791 let mut reader = create_csv_reader(&filepath)?;
793 let mut record = StringRecord::new();
794 let (detected_price, detected_size) =
795 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000);
796 (
797 price_precision.unwrap_or(detected_price),
798 size_precision.unwrap_or(detected_size),
799 )
800 };
801
802 let reader = create_csv_reader(filepath)?;
803
804 Ok(Self {
805 reader,
806 record: StringRecord::new(),
807 buffer: Vec::with_capacity(chunk_size),
808 chunk_size,
809 instrument_id,
810 price_precision: final_price_precision,
811 size_precision: final_size_precision,
812 limit,
813 records_processed: 0,
814 })
815 }
816
817 fn detect_precision_from_sample(
818 reader: &mut Reader<Box<dyn std::io::Read>>,
819 record: &mut StringRecord,
820 sample_size: usize,
821 ) -> (u8, u8) {
822 let mut max_price_precision = 2u8;
823 let mut max_size_precision = 0u8;
824 let mut records_scanned = 0;
825
826 while records_scanned < sample_size {
827 match reader.read_record(record) {
828 Ok(true) => {
829 if let Ok(data) = record.deserialize::<TardisTradeRecord>(None) {
830 max_price_precision = max_price_precision.max(infer_precision(data.price));
831 max_size_precision = max_size_precision.max(infer_precision(data.amount));
832 records_scanned += 1;
833 }
834 }
835 Ok(false) => break, Err(_) => records_scanned += 1, }
838 }
839
840 (max_price_precision, max_size_precision)
841 }
842}
843
844impl Iterator for TradeStreamIterator {
845 type Item = anyhow::Result<Vec<TradeTick>>;
846
847 fn next(&mut self) -> Option<Self::Item> {
848 if let Some(limit) = self.limit
849 && self.records_processed >= limit
850 {
851 return None;
852 }
853
854 self.buffer.clear();
855 let mut records_read = 0;
856
857 while records_read < self.chunk_size {
858 match self.reader.read_record(&mut self.record) {
859 Ok(true) => match self.record.deserialize::<TardisTradeRecord>(None) {
860 Ok(data) => {
861 let size = Quantity::new(data.amount, self.size_precision);
862
863 if size.is_positive() {
864 let trade = parse_trade_record(
865 &data,
866 size,
867 self.price_precision,
868 self.instrument_id,
869 );
870
871 self.buffer.push(trade);
872 records_read += 1;
873 self.records_processed += 1;
874
875 if let Some(limit) = self.limit
876 && self.records_processed >= limit
877 {
878 break;
879 }
880 } else {
881 log::warn!("Skipping zero-sized trade: {data:?}");
882 }
883 }
884 Err(e) => {
885 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
886 }
887 },
888 Ok(false) => {
889 if self.buffer.is_empty() {
890 return None;
891 }
892 return Some(Ok(self.buffer.clone()));
893 }
894 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
895 }
896 }
897
898 if self.buffer.is_empty() {
899 None
900 } else {
901 Some(Ok(self.buffer.clone()))
902 }
903 }
904}
905
906pub fn stream_trades<P: AsRef<Path>>(
921 filepath: P,
922 chunk_size: usize,
923 price_precision: Option<u8>,
924 size_precision: Option<u8>,
925 instrument_id: Option<InstrumentId>,
926 limit: Option<usize>,
927) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<TradeTick>>>> {
928 TradeStreamIterator::new(
929 filepath,
930 chunk_size,
931 price_precision,
932 size_precision,
933 instrument_id,
934 limit,
935 )
936}
937
938struct Depth10StreamIterator {
944 reader: Reader<Box<dyn Read>>,
945 record: StringRecord,
946 buffer: Vec<OrderBookDepth10>,
947 chunk_size: usize,
948 levels: u8,
949 instrument_id: Option<InstrumentId>,
950 price_precision: u8,
951 size_precision: u8,
952 limit: Option<usize>,
953 records_processed: usize,
954}
955
956impl Depth10StreamIterator {
957 pub fn new<P: AsRef<Path>>(
963 filepath: P,
964 chunk_size: usize,
965 levels: u8,
966 price_precision: Option<u8>,
967 size_precision: Option<u8>,
968 instrument_id: Option<InstrumentId>,
969 limit: Option<usize>,
970 ) -> anyhow::Result<Self> {
971 anyhow::ensure!(
972 levels == 5 || levels == 25,
973 "Invalid levels: {levels}. Must be 5 or 25."
974 );
975
976 let (final_price_precision, final_size_precision) =
977 if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
978 (price_prec, size_prec)
980 } else {
981 let mut reader = create_csv_reader(&filepath)?;
983 let mut record = StringRecord::new();
984 let (detected_price, detected_size) =
985 Self::detect_precision_from_sample(&mut reader, &mut record, 10_000);
986 (
987 price_precision.unwrap_or(detected_price),
988 size_precision.unwrap_or(detected_size),
989 )
990 };
991
992 let reader = create_csv_reader(filepath)?;
993
994 Ok(Self {
995 reader,
996 record: StringRecord::new(),
997 buffer: Vec::with_capacity(chunk_size),
998 chunk_size,
999 levels,
1000 instrument_id,
1001 price_precision: final_price_precision,
1002 size_precision: final_size_precision,
1003 limit,
1004 records_processed: 0,
1005 })
1006 }
1007
1008 fn process_snapshot5(&self, data: &TardisOrderBookSnapshot5Record) -> OrderBookDepth10 {
1009 let instrument_id = self
1010 .instrument_id
1011 .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
1012
1013 let mut bids = [NULL_ORDER; DEPTH10_LEN];
1014 let mut asks = [NULL_ORDER; DEPTH10_LEN];
1015 let mut bid_counts = [0_u32; DEPTH10_LEN];
1016 let mut ask_counts = [0_u32; DEPTH10_LEN];
1017
1018 for i in 0..5 {
1020 let (bid_price, bid_amount) = match i {
1021 0 => (data.bids_0_price, data.bids_0_amount),
1022 1 => (data.bids_1_price, data.bids_1_amount),
1023 2 => (data.bids_2_price, data.bids_2_amount),
1024 3 => (data.bids_3_price, data.bids_3_amount),
1025 4 => (data.bids_4_price, data.bids_4_amount),
1026 _ => unreachable!(),
1027 };
1028
1029 let (ask_price, ask_amount) = match i {
1030 0 => (data.asks_0_price, data.asks_0_amount),
1031 1 => (data.asks_1_price, data.asks_1_amount),
1032 2 => (data.asks_2_price, data.asks_2_amount),
1033 3 => (data.asks_3_price, data.asks_3_amount),
1034 4 => (data.asks_4_price, data.asks_4_amount),
1035 _ => unreachable!(),
1036 };
1037
1038 let (bid_order, bid_count) = create_book_order(
1039 OrderSide::Buy,
1040 bid_price,
1041 bid_amount,
1042 self.price_precision,
1043 self.size_precision,
1044 );
1045 bids[i] = bid_order;
1046 bid_counts[i] = bid_count;
1047
1048 let (ask_order, ask_count) = create_book_order(
1049 OrderSide::Sell,
1050 ask_price,
1051 ask_amount,
1052 self.price_precision,
1053 self.size_precision,
1054 );
1055 asks[i] = ask_order;
1056 ask_counts[i] = ask_count;
1057 }
1058
1059 let flags = RecordFlag::F_SNAPSHOT.value();
1060 let sequence = 0;
1061 let ts_event = parse_timestamp(data.timestamp);
1062 let ts_init = parse_timestamp(data.local_timestamp);
1063
1064 OrderBookDepth10::new(
1065 instrument_id,
1066 bids,
1067 asks,
1068 bid_counts,
1069 ask_counts,
1070 flags,
1071 sequence,
1072 ts_event,
1073 ts_init,
1074 )
1075 }
1076
1077 fn process_snapshot25(&self, data: &TardisOrderBookSnapshot25Record) -> OrderBookDepth10 {
1078 let instrument_id = self
1079 .instrument_id
1080 .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
1081
1082 let mut bids = [NULL_ORDER; DEPTH10_LEN];
1083 let mut asks = [NULL_ORDER; DEPTH10_LEN];
1084 let mut bid_counts = [0_u32; DEPTH10_LEN];
1085 let mut ask_counts = [0_u32; DEPTH10_LEN];
1086
1087 for i in 0..DEPTH10_LEN {
1089 let (bid_price, bid_amount) = match i {
1090 0 => (data.bids_0_price, data.bids_0_amount),
1091 1 => (data.bids_1_price, data.bids_1_amount),
1092 2 => (data.bids_2_price, data.bids_2_amount),
1093 3 => (data.bids_3_price, data.bids_3_amount),
1094 4 => (data.bids_4_price, data.bids_4_amount),
1095 5 => (data.bids_5_price, data.bids_5_amount),
1096 6 => (data.bids_6_price, data.bids_6_amount),
1097 7 => (data.bids_7_price, data.bids_7_amount),
1098 8 => (data.bids_8_price, data.bids_8_amount),
1099 9 => (data.bids_9_price, data.bids_9_amount),
1100 _ => unreachable!(),
1101 };
1102
1103 let (ask_price, ask_amount) = match i {
1104 0 => (data.asks_0_price, data.asks_0_amount),
1105 1 => (data.asks_1_price, data.asks_1_amount),
1106 2 => (data.asks_2_price, data.asks_2_amount),
1107 3 => (data.asks_3_price, data.asks_3_amount),
1108 4 => (data.asks_4_price, data.asks_4_amount),
1109 5 => (data.asks_5_price, data.asks_5_amount),
1110 6 => (data.asks_6_price, data.asks_6_amount),
1111 7 => (data.asks_7_price, data.asks_7_amount),
1112 8 => (data.asks_8_price, data.asks_8_amount),
1113 9 => (data.asks_9_price, data.asks_9_amount),
1114 _ => unreachable!(),
1115 };
1116
1117 let (bid_order, bid_count) = create_book_order(
1118 OrderSide::Buy,
1119 bid_price,
1120 bid_amount,
1121 self.price_precision,
1122 self.size_precision,
1123 );
1124 bids[i] = bid_order;
1125 bid_counts[i] = bid_count;
1126
1127 let (ask_order, ask_count) = create_book_order(
1128 OrderSide::Sell,
1129 ask_price,
1130 ask_amount,
1131 self.price_precision,
1132 self.size_precision,
1133 );
1134 asks[i] = ask_order;
1135 ask_counts[i] = ask_count;
1136 }
1137
1138 let flags = RecordFlag::F_SNAPSHOT.value();
1139 let sequence = 0;
1140 let ts_event = parse_timestamp(data.timestamp);
1141 let ts_init = parse_timestamp(data.local_timestamp);
1142
1143 OrderBookDepth10::new(
1144 instrument_id,
1145 bids,
1146 asks,
1147 bid_counts,
1148 ask_counts,
1149 flags,
1150 sequence,
1151 ts_event,
1152 ts_init,
1153 )
1154 }
1155
1156 fn detect_precision_from_sample(
1157 reader: &mut Reader<Box<dyn std::io::Read>>,
1158 record: &mut StringRecord,
1159 sample_size: usize,
1160 ) -> (u8, u8) {
1161 let mut max_price_precision = 2u8;
1162 let mut max_size_precision = 0u8;
1163 let mut records_scanned = 0;
1164
1165 while records_scanned < sample_size {
1166 match reader.read_record(record) {
1167 Ok(true) => {
1168 if let Ok(data) = record.deserialize::<TardisOrderBookSnapshot5Record>(None) {
1170 if let Some(bid_price) = data.bids_0_price {
1171 max_price_precision =
1172 max_price_precision.max(infer_precision(bid_price));
1173 }
1174
1175 if let Some(ask_price) = data.asks_0_price {
1176 max_price_precision =
1177 max_price_precision.max(infer_precision(ask_price));
1178 }
1179
1180 if let Some(bid_amount) = data.bids_0_amount {
1181 max_size_precision =
1182 max_size_precision.max(infer_precision(bid_amount));
1183 }
1184
1185 if let Some(ask_amount) = data.asks_0_amount {
1186 max_size_precision =
1187 max_size_precision.max(infer_precision(ask_amount));
1188 }
1189 records_scanned += 1;
1190 } else if let Ok(data) =
1191 record.deserialize::<TardisOrderBookSnapshot25Record>(None)
1192 {
1193 if let Some(bid_price) = data.bids_0_price {
1194 max_price_precision =
1195 max_price_precision.max(infer_precision(bid_price));
1196 }
1197
1198 if let Some(ask_price) = data.asks_0_price {
1199 max_price_precision =
1200 max_price_precision.max(infer_precision(ask_price));
1201 }
1202
1203 if let Some(bid_amount) = data.bids_0_amount {
1204 max_size_precision =
1205 max_size_precision.max(infer_precision(bid_amount));
1206 }
1207
1208 if let Some(ask_amount) = data.asks_0_amount {
1209 max_size_precision =
1210 max_size_precision.max(infer_precision(ask_amount));
1211 }
1212 records_scanned += 1;
1213 }
1214 }
1215 Ok(false) => break, Err(_) => records_scanned += 1, }
1218 }
1219
1220 (max_price_precision, max_size_precision)
1221 }
1222}
1223
1224impl Iterator for Depth10StreamIterator {
1225 type Item = anyhow::Result<Vec<OrderBookDepth10>>;
1226
1227 fn next(&mut self) -> Option<Self::Item> {
1228 if let Some(limit) = self.limit
1229 && self.records_processed >= limit
1230 {
1231 return None;
1232 }
1233
1234 if !self.buffer.is_empty() {
1235 let chunk = self.buffer.split_off(0);
1236 return Some(Ok(chunk));
1237 }
1238
1239 self.buffer.clear();
1240 let mut records_read = 0;
1241
1242 while records_read < self.chunk_size {
1243 match self.reader.read_record(&mut self.record) {
1244 Ok(true) => {
1245 let result = match self.levels {
1246 5 => self
1247 .record
1248 .deserialize::<TardisOrderBookSnapshot5Record>(None)
1249 .map(|data| self.process_snapshot5(&data)),
1250 25 => self
1251 .record
1252 .deserialize::<TardisOrderBookSnapshot25Record>(None)
1253 .map(|data| self.process_snapshot25(&data)),
1254 _ => return Some(Err(anyhow::anyhow!("Invalid levels: {}", self.levels))),
1255 };
1256
1257 match result {
1258 Ok(depth) => {
1259 self.buffer.push(depth);
1260 records_read += 1;
1261 self.records_processed += 1;
1262
1263 if let Some(limit) = self.limit
1264 && self.records_processed >= limit
1265 {
1266 break;
1267 }
1268 }
1269 Err(e) => {
1270 return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
1271 }
1272 }
1273 }
1274 Ok(false) => {
1275 if self.buffer.is_empty() {
1276 return None;
1277 }
1278 let chunk = self.buffer.split_off(0);
1279 return Some(Ok(chunk));
1280 }
1281 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1282 }
1283 }
1284
1285 if self.buffer.is_empty() {
1286 None
1287 } else {
1288 let chunk = self.buffer.split_off(0);
1289 Some(Ok(chunk))
1290 }
1291 }
1292}
1293
1294pub fn stream_depth10_from_snapshot5<P: AsRef<Path>>(
1309 filepath: P,
1310 chunk_size: usize,
1311 price_precision: Option<u8>,
1312 size_precision: Option<u8>,
1313 instrument_id: Option<InstrumentId>,
1314 limit: Option<usize>,
1315) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1316 Depth10StreamIterator::new(
1317 filepath,
1318 chunk_size,
1319 5,
1320 price_precision,
1321 size_precision,
1322 instrument_id,
1323 limit,
1324 )
1325}
1326
1327pub fn stream_depth10_from_snapshot25<P: AsRef<Path>>(
1342 filepath: P,
1343 chunk_size: usize,
1344 price_precision: Option<u8>,
1345 size_precision: Option<u8>,
1346 instrument_id: Option<InstrumentId>,
1347 limit: Option<usize>,
1348) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1349 Depth10StreamIterator::new(
1350 filepath,
1351 chunk_size,
1352 25,
1353 price_precision,
1354 size_precision,
1355 instrument_id,
1356 limit,
1357 )
1358}
1359
1360use nautilus_model::data::FundingRateUpdate;
1365
1366use crate::csv::record::TardisDerivativeTickerRecord;
1367
1368struct FundingRateStreamIterator {
1370 reader: Reader<Box<dyn Read>>,
1371 record: StringRecord,
1372 buffer: Vec<FundingRateUpdate>,
1373 chunk_size: usize,
1374 instrument_id: Option<InstrumentId>,
1375 limit: Option<usize>,
1376 records_processed: usize,
1377}
1378
1379impl FundingRateStreamIterator {
1380 fn new<P: AsRef<Path>>(
1386 filepath: P,
1387 chunk_size: usize,
1388 instrument_id: Option<InstrumentId>,
1389 limit: Option<usize>,
1390 ) -> anyhow::Result<Self> {
1391 let reader = create_csv_reader(filepath)?;
1392
1393 Ok(Self {
1394 reader,
1395 record: StringRecord::new(),
1396 buffer: Vec::with_capacity(chunk_size),
1397 chunk_size,
1398 instrument_id,
1399 limit,
1400 records_processed: 0,
1401 })
1402 }
1403}
1404
1405impl Iterator for FundingRateStreamIterator {
1406 type Item = anyhow::Result<Vec<FundingRateUpdate>>;
1407
1408 fn next(&mut self) -> Option<Self::Item> {
1409 if let Some(limit) = self.limit
1410 && self.records_processed >= limit
1411 {
1412 return None;
1413 }
1414
1415 if !self.buffer.is_empty() {
1416 let chunk = self.buffer.split_off(0);
1417 return Some(Ok(chunk));
1418 }
1419
1420 self.buffer.clear();
1421 let mut records_read = 0;
1422
1423 while records_read < self.chunk_size {
1424 match self.reader.read_record(&mut self.record) {
1425 Ok(true) => {
1426 let result = self
1427 .record
1428 .deserialize::<TardisDerivativeTickerRecord>(None)
1429 .map_err(anyhow::Error::from)
1430 .map(|data| parse_derivative_ticker_record(&data, self.instrument_id));
1431
1432 match result {
1433 Ok(Some(funding_rate)) => {
1434 self.buffer.push(funding_rate);
1435 records_read += 1;
1436 self.records_processed += 1;
1437
1438 if let Some(limit) = self.limit
1439 && self.records_processed >= limit
1440 {
1441 break;
1442 }
1443 }
1444 Ok(None) => {
1445 self.records_processed += 1;
1447 }
1448 Err(e) => {
1449 return Some(Err(anyhow::anyhow!(
1450 "Failed to parse funding rate record: {e}"
1451 )));
1452 }
1453 }
1454 }
1455 Ok(false) => {
1456 if self.buffer.is_empty() {
1457 return None;
1458 }
1459 let chunk = self.buffer.split_off(0);
1460 return Some(Ok(chunk));
1461 }
1462 Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1463 }
1464 }
1465
1466 if self.buffer.is_empty() {
1467 None
1468 } else {
1469 let chunk = self.buffer.split_off(0);
1470 Some(Ok(chunk))
1471 }
1472 }
1473}
1474
1475pub fn stream_funding_rates<P: AsRef<Path>>(
1485 filepath: P,
1486 chunk_size: usize,
1487 instrument_id: Option<InstrumentId>,
1488 limit: Option<usize>,
1489) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<FundingRateUpdate>>>> {
1490 FundingRateStreamIterator::new(filepath, chunk_size, instrument_id, limit)
1491}
1492
1493#[cfg(test)]
1494mod tests {
1495 use nautilus_model::{
1496 enums::{AggressorSide, BookAction},
1497 identifiers::TradeId,
1498 types::Price,
1499 };
1500 use rstest::*;
1501
1502 use super::*;
1503 use crate::{
1504 common::{parse::parse_price, testing::get_test_data_path},
1505 csv::load::load_deltas,
1506 };
1507
1508 #[rstest]
1509 #[case(0.0, 0)]
1510 #[case(42.0, 0)]
1511 #[case(0.1, 1)]
1512 #[case(0.25, 2)]
1513 #[case(123.0001, 4)]
1514 #[case(-42.987654321, 9)]
1515 #[case(1.234_567_890_123, 12)]
1516 fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
1517 assert_eq!(infer_precision(input), expected);
1518 }
1519
1520 #[rstest]
1521 pub fn test_stream_deltas_chunked() {
1522 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1523binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1524binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1525binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1526binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
1527binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
1528
1529 let temp_file = std::env::temp_dir().join("test_stream_deltas.csv");
1530 std::fs::write(&temp_file, csv_data).unwrap();
1531
1532 let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1533 let chunks: Vec<_> = stream.collect();
1534
1535 assert_eq!(chunks.len(), 3);
1537
1538 let chunk1 = chunks[0].as_ref().unwrap();
1539 assert_eq!(chunk1.len(), 2);
1540 assert_eq!(chunk1[0].action, BookAction::Clear); assert_eq!(chunk1[1].order.price.precision, 4); let chunk2 = chunks[1].as_ref().unwrap();
1544 assert_eq!(chunk2.len(), 2);
1545 assert_eq!(chunk2[0].order.price.precision, 4);
1546 assert_eq!(chunk2[1].order.price.precision, 4);
1547
1548 let chunk3 = chunks[2].as_ref().unwrap();
1549 assert_eq!(chunk3.len(), 2);
1550 assert_eq!(chunk3[0].order.price.precision, 4);
1551 assert_eq!(chunk3[1].order.price.precision, 4);
1552
1553 let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1554 assert_eq!(total_deltas, 6);
1555
1556 std::fs::remove_file(&temp_file).ok();
1557 }
1558
1559 #[cfg(feature = "python")]
1560 #[rstest]
1561 pub fn test_stream_batched_deltas_clear_and_limit() {
1562 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1563binance,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1564binance,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1565binance,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1566binance,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
1567binance,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
1568
1569 let temp_file = std::env::temp_dir().join("test_stream_batched_deltas.csv");
1570 std::fs::write(&temp_file, csv_data).unwrap();
1571
1572 let mut iterator =
1574 BatchedDeltasStreamIterator::new(&temp_file, 10, Some(4), Some(1), None, Some(1))
1575 .unwrap();
1576 iterator.fill_pending_batches().transpose().unwrap();
1577 assert_eq!(iterator.pending_batches.len(), 1);
1578 assert_eq!(iterator.pending_batches[0].len(), 1);
1579 assert_eq!(iterator.pending_batches[0][0].action, BookAction::Clear);
1580
1581 let mut iterator =
1583 BatchedDeltasStreamIterator::new(&temp_file, 10, Some(4), Some(1), None, None).unwrap();
1584 iterator.fill_pending_batches().transpose().unwrap();
1585 assert_eq!(iterator.pending_batches.len(), 5);
1586 assert_eq!(iterator.pending_batches[0].len(), 2);
1587 assert_eq!(iterator.pending_batches[0][0].action, BookAction::Clear);
1588 assert_ne!(iterator.pending_batches[0][1].action, BookAction::Clear);
1589 let total_deltas: usize = iterator
1590 .pending_batches
1591 .iter()
1592 .map(|batch| batch.len())
1593 .sum();
1594 assert_eq!(total_deltas, 6);
1595
1596 std::fs::remove_file(&temp_file).ok();
1597 }
1598
1599 #[cfg(feature = "python")]
1600 #[rstest]
1601 pub fn test_stream_batched_deltas_with_mid_snapshot_inserts_clear() {
1602 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1608binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1609binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1610binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1611binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1612binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,bid,50100.0,3.0
1613binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,ask,50101.0,4.0
1614binance-futures,BTCUSDT,1640995301000000,1640995301100000,false,bid,50099.0,1.0";
1615
1616 let temp_file = std::env::temp_dir().join("test_stream_batched_mid_snapshot.csv");
1617 std::fs::write(&temp_file, csv_data).unwrap();
1618
1619 let mut iterator =
1620 BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, None)
1621 .unwrap();
1622 iterator.fill_pending_batches().transpose().unwrap();
1623
1624 let all_deltas: Vec<_> = iterator.pending_batches.iter().flatten().collect();
1625 let clear_count = all_deltas
1626 .iter()
1627 .filter(|d| d.action == BookAction::Clear)
1628 .count();
1629
1630 assert_eq!(
1632 clear_count, 2,
1633 "Expected 2 CLEAR deltas (initial + mid-day snapshot), found {clear_count}"
1634 );
1635
1636 assert_eq!(all_deltas[0].action, BookAction::Clear);
1639 assert_eq!(all_deltas[5].action, BookAction::Clear);
1640
1641 assert_eq!(
1643 all_deltas[0].flags & RecordFlag::F_LAST.value(),
1644 0,
1645 "CLEAR at index 0 should not have F_LAST flag"
1646 );
1647 assert_eq!(
1648 all_deltas[5].flags & RecordFlag::F_LAST.value(),
1649 0,
1650 "CLEAR at index 5 should not have F_LAST flag"
1651 );
1652
1653 std::fs::remove_file(&temp_file).ok();
1654 }
1655
1656 #[cfg(feature = "python")]
1657 #[rstest]
1658 pub fn test_stream_batched_deltas_limit_includes_clear() {
1659 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1661binance,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1662binance,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1663binance,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1664binance,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5
1665binance,BTCUSDT,1640995204000000,1640995204100000,false,ask,50003.0,1.0";
1666
1667 let temp_file = std::env::temp_dir().join("test_stream_batched_limit_includes_clear.csv");
1668 std::fs::write(&temp_file, csv_data).unwrap();
1669
1670 let mut iterator =
1671 BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, Some(4))
1672 .unwrap();
1673 iterator.fill_pending_batches().transpose().unwrap();
1674
1675 let all_deltas: Vec<_> = iterator.pending_batches.iter().flatten().collect();
1676
1677 assert_eq!(all_deltas.len(), 4);
1679 assert_eq!(all_deltas[0].action, BookAction::Clear);
1680 assert_eq!(all_deltas[1].action, BookAction::Add);
1681 assert_eq!(all_deltas[2].action, BookAction::Update);
1682 assert_eq!(all_deltas[3].action, BookAction::Update);
1683
1684 std::fs::remove_file(&temp_file).ok();
1685 }
1686
1687 #[cfg(feature = "python")]
1688 #[rstest]
1689 pub fn test_stream_batched_deltas_limit_sets_f_last() {
1690 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1692binance,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1693binance,BTCUSDT,1640995201000000,1640995201100000,false,ask,50001.0,2.0
1694binance,BTCUSDT,1640995202000000,1640995202100000,false,bid,49999.0,0.5
1695binance,BTCUSDT,1640995203000000,1640995203100000,false,ask,50002.0,1.5";
1696
1697 let temp_file = std::env::temp_dir().join("test_stream_batched_limit_f_last.csv");
1698 std::fs::write(&temp_file, csv_data).unwrap();
1699
1700 let mut iterator =
1702 BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, Some(3))
1703 .unwrap();
1704 iterator.fill_pending_batches().transpose().unwrap();
1705
1706 let all_deltas: Vec<_> = iterator.pending_batches.iter().flatten().collect();
1707
1708 assert_eq!(all_deltas.len(), 3);
1709 assert_eq!(
1710 all_deltas[2].flags & RecordFlag::F_LAST.value(),
1711 RecordFlag::F_LAST.value(),
1712 "Final delta should have F_LAST flag when limit is reached"
1713 );
1714
1715 std::fs::remove_file(&temp_file).ok();
1716 }
1717
1718 #[cfg(feature = "python")]
1719 #[rstest]
1720 pub fn test_stream_batched_deltas_snapshot_batch_flags() {
1721 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1723binance,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1724binance,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1725binance,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5";
1726
1727 let temp_file = std::env::temp_dir().join("test_stream_batched_snapshot_batch_flags.csv");
1728 std::fs::write(&temp_file, csv_data).unwrap();
1729
1730 let mut iterator =
1731 BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, None)
1732 .unwrap();
1733 iterator.fill_pending_batches().transpose().unwrap();
1734
1735 assert_eq!(iterator.pending_batches.len(), 2);
1736 let first_batch = &iterator.pending_batches[0];
1737
1738 assert_eq!(first_batch.len(), 3);
1740 assert_eq!(first_batch[0].action, BookAction::Clear);
1741 assert_eq!(first_batch[0].flags & RecordFlag::F_LAST.value(), 0);
1742 assert_eq!(first_batch[1].flags & RecordFlag::F_LAST.value(), 0);
1743 assert_eq!(
1744 first_batch[2].flags & RecordFlag::F_LAST.value(),
1745 RecordFlag::F_LAST.value()
1746 );
1747
1748 assert_eq!(iterator.pending_batches[1].len(), 1);
1750 assert_eq!(
1751 iterator.pending_batches[1][0].flags & RecordFlag::F_LAST.value(),
1752 RecordFlag::F_LAST.value()
1753 );
1754
1755 std::fs::remove_file(&temp_file).ok();
1756 }
1757
1758 #[rstest]
1759 pub fn test_stream_quotes_chunked() {
1760 let csv_data =
1761 "exchange,symbol,timestamp,local_timestamp,ask_amount,ask_price,bid_price,bid_amount
1762binance,BTCUSDT,1640995200000000,1640995200100000,1.0,50000.0,49999.0,1.5
1763binance,BTCUSDT,1640995201000000,1640995201100000,2.0,50000.5,49999.5,2.5
1764binance,BTCUSDT,1640995202000000,1640995202100000,1.5,50000.12,49999.12,1.8
1765binance,BTCUSDT,1640995203000000,1640995203100000,3.0,50000.123,49999.123,3.2
1766binance,BTCUSDT,1640995204000000,1640995204100000,0.5,50000.1234,49999.1234,0.8";
1767
1768 let temp_file = std::env::temp_dir().join("test_stream_quotes.csv");
1769 std::fs::write(&temp_file, csv_data).unwrap();
1770
1771 let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1772 let chunks: Vec<_> = stream.collect();
1773
1774 assert_eq!(chunks.len(), 3);
1775
1776 let chunk1 = chunks[0].as_ref().unwrap();
1777 assert_eq!(chunk1.len(), 2);
1778 assert_eq!(chunk1[0].bid_price.precision, 4);
1779 assert_eq!(chunk1[1].bid_price.precision, 4);
1780
1781 let chunk2 = chunks[1].as_ref().unwrap();
1782 assert_eq!(chunk2.len(), 2);
1783 assert_eq!(chunk2[0].bid_price.precision, 4);
1784 assert_eq!(chunk2[1].bid_price.precision, 4);
1785
1786 let chunk3 = chunks[2].as_ref().unwrap();
1787 assert_eq!(chunk3.len(), 1);
1788 assert_eq!(chunk3[0].bid_price.precision, 4);
1789
1790 let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1791 assert_eq!(total_quotes, 5);
1792
1793 std::fs::remove_file(&temp_file).ok();
1794 }
1795
1796 #[rstest]
1797 pub fn test_stream_trades_chunked() {
1798 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1799binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1800binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
1801binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1802binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
1803binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
1804
1805 let temp_file = std::env::temp_dir().join("test_stream_trades.csv");
1806 std::fs::write(&temp_file, csv_data).unwrap();
1807
1808 let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1809 let chunks: Vec<_> = stream.collect();
1810
1811 assert_eq!(chunks.len(), 2);
1812
1813 let chunk1 = chunks[0].as_ref().unwrap();
1814 assert_eq!(chunk1.len(), 3);
1815 assert_eq!(chunk1[0].price.precision, 4);
1816 assert_eq!(chunk1[1].price.precision, 4);
1817 assert_eq!(chunk1[2].price.precision, 4);
1818
1819 let chunk2 = chunks[1].as_ref().unwrap();
1820 assert_eq!(chunk2.len(), 2);
1821 assert_eq!(chunk2[0].price.precision, 4);
1822 assert_eq!(chunk2[1].price.precision, 4);
1823
1824 assert_eq!(chunk1[0].aggressor_side, AggressorSide::Buyer);
1825 assert_eq!(chunk1[1].aggressor_side, AggressorSide::Seller);
1826
1827 let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1828 assert_eq!(total_trades, 5);
1829
1830 std::fs::remove_file(&temp_file).ok();
1831 }
1832
1833 #[rstest]
1834 pub fn test_stream_trades_with_zero_sized_trade() {
1835 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1837binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1838binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1839binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1840binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1841
1842 let temp_file = std::env::temp_dir().join("test_stream_trades_zero_size.csv");
1843 std::fs::write(&temp_file, csv_data).unwrap();
1844
1845 let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1846 let chunks: Vec<_> = stream.collect();
1847
1848 assert_eq!(chunks.len(), 1);
1850
1851 let chunk1 = chunks[0].as_ref().unwrap();
1852 assert_eq!(chunk1.len(), 3);
1853
1854 assert_eq!(chunk1[0].size, Quantity::from("1.0"));
1856 assert_eq!(chunk1[1].size, Quantity::from("1.5"));
1857 assert_eq!(chunk1[2].size, Quantity::from("3.0"));
1858
1859 assert_eq!(chunk1[0].trade_id, TradeId::new("trade1"));
1861 assert_eq!(chunk1[1].trade_id, TradeId::new("trade3"));
1862 assert_eq!(chunk1[2].trade_id, TradeId::new("trade4"));
1863
1864 std::fs::remove_file(&temp_file).ok();
1865 }
1866
1867 #[rstest]
1868 pub fn test_stream_depth10_from_snapshot5_chunked() {
1869 let csv_data = "exchange,symbol,timestamp,local_timestamp,asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1870binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,49999.0,1.5,50002.0,2.0,49998.0,2.5,50003.0,3.0,49997.0,3.5,50004.0,4.0,49996.0,4.5,50005.0,5.0,49995.0,5.5
1871binance,BTCUSDT,1640995201000000,1640995201100000,50001.5,1.1,49999.5,1.6,50002.5,2.1,49998.5,2.6,50003.5,3.1,49997.5,3.6,50004.5,4.1,49996.5,4.6,50005.5,5.1,49995.5,5.6
1872binance,BTCUSDT,1640995202000000,1640995202100000,50001.12,1.12,49999.12,1.62,50002.12,2.12,49998.12,2.62,50003.12,3.12,49997.12,3.62,50004.12,4.12,49996.12,4.62,50005.12,5.12,49995.12,5.62";
1873
1874 let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot5.csv");
1876 std::fs::write(&temp_file, csv_data).unwrap();
1877
1878 let stream = stream_depth10_from_snapshot5(&temp_file, 2, None, None, None, None).unwrap();
1880 let chunks: Vec<_> = stream.collect();
1881
1882 assert_eq!(chunks.len(), 2);
1884
1885 let chunk1 = chunks[0].as_ref().unwrap();
1887 assert_eq!(chunk1.len(), 2);
1888
1889 let chunk2 = chunks[1].as_ref().unwrap();
1891 assert_eq!(chunk2.len(), 1);
1892
1893 let first_depth = &chunk1[0];
1895 assert_eq!(first_depth.bids.len(), 10); assert_eq!(first_depth.asks.len(), 10);
1897
1898 assert_eq!(first_depth.bids[0].price, parse_price(49999.0, 1));
1900 assert_eq!(first_depth.asks[0].price, parse_price(50001.0, 1));
1901
1902 let total_depths: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1904 assert_eq!(total_depths, 3);
1905
1906 std::fs::remove_file(&temp_file).ok();
1908 }
1909
1910 #[rstest]
1911 pub fn test_stream_depth10_from_snapshot25_chunked() {
1912 let mut header_parts = vec!["exchange", "symbol", "timestamp", "local_timestamp"];
1914
1915 let mut bid_headers = Vec::new();
1917 let mut ask_headers = Vec::new();
1918
1919 for i in 0..25 {
1920 bid_headers.push(format!("bids[{i}].price"));
1921 bid_headers.push(format!("bids[{i}].amount"));
1922 }
1923
1924 for i in 0..25 {
1925 ask_headers.push(format!("asks[{i}].price"));
1926 ask_headers.push(format!("asks[{i}].amount"));
1927 }
1928
1929 for header in &bid_headers {
1930 header_parts.push(header);
1931 }
1932
1933 for header in &ask_headers {
1934 header_parts.push(header);
1935 }
1936
1937 let header = header_parts.join(",");
1938
1939 let mut row1_parts = vec![
1941 "binance".to_string(),
1942 "BTCUSDT".to_string(),
1943 "1640995200000000".to_string(),
1944 "1640995200100000".to_string(),
1945 ];
1946
1947 for i in 0..25 {
1949 if i < 5 {
1950 let bid_price = f64::from(i).mul_add(-0.01, 49999.0);
1951 let bid_amount = 1.0 + f64::from(i);
1952 row1_parts.push(bid_price.to_string());
1953 row1_parts.push(bid_amount.to_string());
1954 } else {
1955 row1_parts.push(String::new());
1956 row1_parts.push(String::new());
1957 }
1958 }
1959
1960 for i in 0..25 {
1962 if i < 5 {
1963 let ask_price = f64::from(i).mul_add(0.01, 50000.0);
1964 let ask_amount = 1.0 + f64::from(i);
1965 row1_parts.push(ask_price.to_string());
1966 row1_parts.push(ask_amount.to_string());
1967 } else {
1968 row1_parts.push(String::new());
1969 row1_parts.push(String::new());
1970 }
1971 }
1972
1973 let csv_data = format!("{}\n{}", header, row1_parts.join(","));
1974
1975 let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot25.csv");
1977 std::fs::write(&temp_file, &csv_data).unwrap();
1978
1979 let stream = stream_depth10_from_snapshot25(&temp_file, 1, None, None, None, None).unwrap();
1981 let chunks: Vec<_> = stream.collect();
1982
1983 assert_eq!(chunks.len(), 1);
1985
1986 let chunk1 = chunks[0].as_ref().unwrap();
1987 assert_eq!(chunk1.len(), 1);
1988
1989 let depth = &chunk1[0];
1991 assert_eq!(depth.bids.len(), 10); assert_eq!(depth.asks.len(), 10);
1993
1994 let actual_bid_price = depth.bids[0].price;
1996 let actual_ask_price = depth.asks[0].price;
1997 assert!(actual_bid_price.as_f64() > 0.0);
1998 assert!(actual_ask_price.as_f64() > 0.0);
1999
2000 std::fs::remove_file(&temp_file).ok();
2002 }
2003
2004 #[rstest]
2005 pub fn test_stream_error_handling() {
2006 let non_existent = std::path::Path::new("does_not_exist.csv");
2008
2009 let result = stream_deltas(non_existent, 10, None, None, None, None);
2010 assert!(result.is_err());
2011
2012 let result = stream_quotes(non_existent, 10, None, None, None, None);
2013 assert!(result.is_err());
2014
2015 let result = stream_trades(non_existent, 10, None, None, None, None);
2016 assert!(result.is_err());
2017
2018 let result = stream_depth10_from_snapshot5(non_existent, 10, None, None, None, None);
2019 assert!(result.is_err());
2020
2021 let result = stream_depth10_from_snapshot25(non_existent, 10, None, None, None, None);
2022 assert!(result.is_err());
2023 }
2024
2025 #[rstest]
2026 pub fn test_stream_empty_file() {
2027 let temp_file = std::env::temp_dir().join("test_empty.csv");
2029 std::fs::write(&temp_file, "").unwrap();
2030
2031 let stream = stream_deltas(&temp_file, 10, None, None, None, None).unwrap();
2032 assert_eq!(stream.count(), 0);
2033
2034 std::fs::remove_file(&temp_file).ok();
2036 }
2037
2038 #[rstest]
2039 pub fn test_stream_precision_consistency() {
2040 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2042binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
2043binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
2044binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
2045binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0";
2046
2047 let temp_file = std::env::temp_dir().join("test_precision_consistency.csv");
2048 std::fs::write(&temp_file, csv_data).unwrap();
2049
2050 let bulk_deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
2052
2053 let stream = stream_deltas(&temp_file, 2, None, None, None, None).unwrap();
2055 let streamed_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2056
2057 assert_eq!(bulk_deltas.len(), streamed_deltas.len());
2059
2060 for (bulk, streamed) in bulk_deltas.iter().zip(streamed_deltas.iter()) {
2062 assert_eq!(bulk.instrument_id, streamed.instrument_id);
2063 assert_eq!(bulk.action, streamed.action);
2064 assert_eq!(bulk.order.side, streamed.order.side);
2065 assert_eq!(bulk.ts_event, streamed.ts_event);
2066 assert_eq!(bulk.ts_init, streamed.ts_init);
2067 }
2069
2070 std::fs::remove_file(&temp_file).ok();
2072 }
2073
2074 #[rstest]
2075 pub fn test_stream_trades_from_local_file() {
2076 let filepath = get_test_data_path("csv/trades_1.csv");
2077 let mut stream = stream_trades(filepath, 1, Some(1), Some(0), None, None).unwrap();
2078
2079 let chunk1 = stream.next().unwrap().unwrap();
2080 assert_eq!(chunk1.len(), 1);
2081 assert_eq!(chunk1[0].price, Price::from("8531.5"));
2082
2083 let chunk2 = stream.next().unwrap().unwrap();
2084 assert_eq!(chunk2.len(), 1);
2085 assert_eq!(chunk2[0].size, Quantity::from("1000"));
2086
2087 assert!(stream.next().is_none());
2088 }
2089
2090 #[rstest]
2091 pub fn test_stream_deltas_from_local_file() {
2092 let filepath = get_test_data_path("csv/deltas_1.csv");
2093 let mut stream = stream_deltas(filepath, 1, Some(1), Some(0), None, None).unwrap();
2094
2095 let chunk1 = stream.next().unwrap().unwrap();
2098 assert_eq!(chunk1.len(), 1);
2099 assert_eq!(chunk1[0].action, BookAction::Clear);
2100
2101 let chunk2 = stream.next().unwrap().unwrap();
2103 assert_eq!(chunk2.len(), 1);
2104 assert_eq!(chunk2[0].order.price, Price::from("6421.5"));
2105
2106 let chunk3 = stream.next().unwrap().unwrap();
2108 assert_eq!(chunk3.len(), 1);
2109 assert_eq!(chunk3[0].order.size, Quantity::from("10000"));
2110
2111 assert!(stream.next().is_none());
2112 }
2113
2114 #[rstest]
2115 pub fn test_stream_deltas_with_limit() {
2116 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2117binance,BTCUSDT,1640995200000000,1640995200100000,false,bid,50000.0,1.0
2118binance,BTCUSDT,1640995201000000,1640995201100000,false,ask,50001.0,2.0
2119binance,BTCUSDT,1640995202000000,1640995202100000,false,bid,49999.0,1.5
2120binance,BTCUSDT,1640995203000000,1640995203100000,false,ask,50002.0,3.0
2121binance,BTCUSDT,1640995204000000,1640995204100000,false,bid,49998.0,0.5";
2122
2123 let temp_file = std::env::temp_dir().join("test_stream_deltas_limit.csv");
2124 std::fs::write(&temp_file, csv_data).unwrap();
2125
2126 let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
2128 let chunks: Vec<_> = stream.collect();
2129
2130 assert_eq!(chunks.len(), 2);
2132 let chunk1 = chunks[0].as_ref().unwrap();
2133 assert_eq!(chunk1.len(), 2);
2134 let chunk2 = chunks[1].as_ref().unwrap();
2135 assert_eq!(chunk2.len(), 1);
2136
2137 let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
2139 assert_eq!(total_deltas, 3);
2140
2141 std::fs::remove_file(&temp_file).ok();
2142 }
2143
2144 #[rstest]
2145 pub fn test_stream_quotes_with_limit() {
2146 let csv_data =
2147 "exchange,symbol,timestamp,local_timestamp,ask_price,ask_amount,bid_price,bid_amount
2148binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,50000.0,1.5
2149binance,BTCUSDT,1640995201000000,1640995201100000,50002.0,2.0,49999.0,2.5
2150binance,BTCUSDT,1640995202000000,1640995202100000,50003.0,1.5,49998.0,3.0
2151binance,BTCUSDT,1640995203000000,1640995203100000,50004.0,3.0,49997.0,3.5";
2152
2153 let temp_file = std::env::temp_dir().join("test_stream_quotes_limit.csv");
2154 std::fs::write(&temp_file, csv_data).unwrap();
2155
2156 let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, Some(2)).unwrap();
2158 let chunks: Vec<_> = stream.collect();
2159
2160 assert_eq!(chunks.len(), 1);
2162 let chunk1 = chunks[0].as_ref().unwrap();
2163 assert_eq!(chunk1.len(), 2);
2164
2165 let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
2167 assert_eq!(total_quotes, 2);
2168
2169 std::fs::remove_file(&temp_file).ok();
2170 }
2171
2172 #[rstest]
2173 pub fn test_stream_trades_with_limit() {
2174 let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
2175binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
2176binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
2177binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
2178binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
2179binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
2180
2181 let temp_file = std::env::temp_dir().join("test_stream_trades_limit.csv");
2182 std::fs::write(&temp_file, csv_data).unwrap();
2183
2184 let stream = stream_trades(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
2186 let chunks: Vec<_> = stream.collect();
2187
2188 assert_eq!(chunks.len(), 2);
2190 let chunk1 = chunks[0].as_ref().unwrap();
2191 assert_eq!(chunk1.len(), 2);
2192 let chunk2 = chunks[1].as_ref().unwrap();
2193 assert_eq!(chunk2.len(), 1);
2194
2195 let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
2197 assert_eq!(total_trades, 3);
2198
2199 std::fs::remove_file(&temp_file).ok();
2200 }
2201
2202 #[rstest]
2203 pub fn test_depth10_invalid_levels_error_at_construction() {
2204 let temp_file = std::env::temp_dir().join("test_depth10_invalid_levels.csv");
2205 std::fs::write(&temp_file, "exchange,symbol,timestamp,local_timestamp\n").unwrap();
2206
2207 let result = Depth10StreamIterator::new(&temp_file, 10, 10, None, None, None, None);
2208 assert!(result.is_err());
2209 let err_msg = result.err().unwrap().to_string();
2210 assert!(
2211 err_msg.contains("Invalid levels"),
2212 "Error should mention 'Invalid levels': {err_msg}"
2213 );
2214
2215 let result = Depth10StreamIterator::new(&temp_file, 10, 3, None, None, None, None);
2216 assert!(result.is_err());
2217
2218 let result = Depth10StreamIterator::new(&temp_file, 10, 5, None, None, None, None);
2219 assert!(result.is_ok());
2220
2221 let result = Depth10StreamIterator::new(&temp_file, 10, 25, None, None, None, None);
2222 assert!(result.is_ok());
2223
2224 std::fs::remove_file(&temp_file).ok();
2225 }
2226
2227 #[rstest]
2228 pub fn test_stream_deltas_with_mid_snapshot_inserts_clear() {
2229 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2235binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2236binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2237binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2238binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2239binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,bid,50100.0,3.0
2240binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,ask,50101.0,4.0
2241binance-futures,BTCUSDT,1640995301000000,1640995301100000,false,bid,50099.0,1.0";
2242
2243 let temp_file = std::env::temp_dir().join("test_stream_deltas_mid_snapshot.csv");
2244 std::fs::write(&temp_file, csv_data).unwrap();
2245
2246 let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, None).unwrap();
2247 let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2248
2249 let clear_count = all_deltas
2250 .iter()
2251 .filter(|d| d.action == BookAction::Clear)
2252 .count();
2253
2254 assert_eq!(
2256 clear_count, 2,
2257 "Expected 2 CLEAR deltas (initial + mid-day snapshot), found {clear_count}"
2258 );
2259
2260 assert_eq!(all_deltas[0].action, BookAction::Clear);
2263 assert_eq!(all_deltas[5].action, BookAction::Clear);
2264
2265 assert_eq!(
2267 all_deltas[0].flags & RecordFlag::F_LAST.value(),
2268 0,
2269 "CLEAR at index 0 should not have F_LAST flag"
2270 );
2271 assert_eq!(
2272 all_deltas[5].flags & RecordFlag::F_LAST.value(),
2273 0,
2274 "CLEAR at index 5 should not have F_LAST flag"
2275 );
2276
2277 std::fs::remove_file(&temp_file).ok();
2278 }
2279
2280 #[rstest]
2281 pub fn test_load_deltas_with_mid_snapshot_inserts_clear() {
2282 let filepath = get_test_data_path("csv/deltas_with_snapshot.csv");
2283 let deltas = load_deltas(&filepath, Some(1), Some(1), None, None).unwrap();
2284
2285 let clear_count = deltas
2286 .iter()
2287 .filter(|d| d.action == BookAction::Clear)
2288 .count();
2289
2290 assert_eq!(
2292 clear_count, 2,
2293 "Expected 2 CLEAR deltas (initial + mid-day snapshot), found {clear_count}"
2294 );
2295
2296 assert_eq!(deltas[0].action, BookAction::Clear);
2297
2298 let second_clear_idx = deltas
2299 .iter()
2300 .enumerate()
2301 .filter(|(_, d)| d.action == BookAction::Clear)
2302 .nth(1)
2303 .map(|(i, _)| i)
2304 .expect("Should have second CLEAR");
2305
2306 assert_eq!(
2308 second_clear_idx, 6,
2309 "Second CLEAR should be at index 6, found {second_clear_idx}"
2310 );
2311
2312 assert_eq!(
2314 deltas[0].flags & RecordFlag::F_LAST.value(),
2315 0,
2316 "CLEAR at index 0 should not have F_LAST flag"
2317 );
2318 assert_eq!(
2319 deltas[6].flags & RecordFlag::F_LAST.value(),
2320 0,
2321 "CLEAR at index 6 should not have F_LAST flag"
2322 );
2323 }
2324
2325 #[rstest]
2326 fn test_stream_deltas_chunk_size_respects_clear() {
2327 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2331binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2332binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
2333
2334 let temp_file = std::env::temp_dir().join("test_stream_chunk_size_clear.csv");
2335 std::fs::write(&temp_file, csv_data).unwrap();
2336
2337 let stream = stream_deltas(&temp_file, 1, Some(1), Some(1), None, None).unwrap();
2339 let chunks: Vec<_> = stream.collect();
2340
2341 assert_eq!(chunks.len(), 3, "Expected 3 chunks with chunk_size=1");
2343 assert_eq!(chunks[0].as_ref().unwrap().len(), 1);
2344 assert_eq!(chunks[1].as_ref().unwrap().len(), 1);
2345 assert_eq!(chunks[2].as_ref().unwrap().len(), 1);
2346
2347 assert_eq!(chunks[0].as_ref().unwrap()[0].action, BookAction::Clear);
2349 assert_eq!(chunks[1].as_ref().unwrap()[0].action, BookAction::Add);
2351 assert_eq!(chunks[2].as_ref().unwrap()[0].action, BookAction::Add);
2352
2353 std::fs::remove_file(&temp_file).ok();
2354 }
2355
2356 #[rstest]
2357 fn test_stream_deltas_limit_stops_at_clear() {
2358 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2360binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2361binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
2362
2363 let temp_file = std::env::temp_dir().join("test_stream_limit_stops_at_clear.csv");
2364 std::fs::write(&temp_file, csv_data).unwrap();
2365
2366 let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(1)).unwrap();
2368 let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2369
2370 assert_eq!(all_deltas.len(), 1);
2371 assert_eq!(all_deltas[0].action, BookAction::Clear);
2372
2373 std::fs::remove_file(&temp_file).ok();
2374 }
2375
2376 #[rstest]
2377 fn test_stream_deltas_limit_includes_clear() {
2378 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2380binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2381binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2382binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2383binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2384binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5";
2385
2386 let temp_file = std::env::temp_dir().join("test_stream_limit_includes_clear.csv");
2387 std::fs::write(&temp_file, csv_data).unwrap();
2388
2389 let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(4)).unwrap();
2391 let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2392
2393 assert_eq!(all_deltas.len(), 4);
2394 assert_eq!(all_deltas[0].action, BookAction::Clear);
2395 assert_eq!(all_deltas[1].action, BookAction::Add);
2396 assert_eq!(all_deltas[2].action, BookAction::Add);
2397 assert_eq!(all_deltas[3].action, BookAction::Update);
2398
2399 std::fs::remove_file(&temp_file).ok();
2400 }
2401
2402 #[rstest]
2403 fn test_stream_deltas_limit_sets_f_last() {
2404 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2406binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2407binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2408binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2409binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2410binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5";
2411
2412 let temp_file = std::env::temp_dir().join("test_stream_limit_f_last.csv");
2413 std::fs::write(&temp_file, csv_data).unwrap();
2414
2415 let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(3)).unwrap();
2417 let chunks: Vec<_> = stream.collect();
2418
2419 assert_eq!(chunks.len(), 1);
2421 let deltas = chunks[0].as_ref().unwrap();
2422 assert_eq!(deltas.len(), 3);
2423
2424 assert_eq!(
2426 deltas[2].flags & RecordFlag::F_LAST.value(),
2427 RecordFlag::F_LAST.value(),
2428 "Final delta should have F_LAST flag when limit is reached"
2429 );
2430
2431 std::fs::remove_file(&temp_file).ok();
2432 }
2433
2434 #[rstest]
2435 fn test_stream_deltas_chunk_boundary_no_f_last() {
2436 let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2438binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,bid,50000.0,1.0
2439binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,ask,50001.0,2.0
2440binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,bid,49999.0,0.5";
2441
2442 let temp_file = std::env::temp_dir().join("test_stream_chunk_no_f_last.csv");
2443 std::fs::write(&temp_file, csv_data).unwrap();
2444
2445 let mut stream = stream_deltas(&temp_file, 2, Some(1), Some(1), None, None).unwrap();
2447
2448 let chunk1 = stream.next().unwrap().unwrap();
2449 assert_eq!(chunk1.len(), 2);
2450
2451 assert_eq!(
2453 chunk1[1].flags & RecordFlag::F_LAST.value(),
2454 0,
2455 "Mid-stream chunk should not have F_LAST flag"
2456 );
2457
2458 let chunk2 = stream.next().unwrap().unwrap();
2460 assert_eq!(chunk2.len(), 1);
2461 assert_eq!(
2462 chunk2[0].flags & RecordFlag::F_LAST.value(),
2463 RecordFlag::F_LAST.value(),
2464 "Final chunk at EOF should have F_LAST flag"
2465 );
2466
2467 std::fs::remove_file(&temp_file).ok();
2468 }
2469}