Skip to main content

nautilus_tardis/csv/
stream.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{io::Read, path::Path};
17
18use csv::{Reader, StringRecord};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    data::{DEPTH10_LEN, NULL_ORDER, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
22    enums::{OrderSide, RecordFlag},
23    identifiers::InstrumentId,
24    types::Quantity,
25};
26#[cfg(feature = "python")]
27use nautilus_model::{
28    data::{Data, OrderBookDeltas, OrderBookDeltas_API},
29    python::data::data_to_pycapsule,
30};
31#[cfg(feature = "python")]
32use pyo3::{Py, PyAny, Python};
33
34use crate::{
35    common::parse::{parse_instrument_id, parse_timestamp},
36    csv::{
37        create_book_order, create_csv_reader, infer_precision, parse_delta_record,
38        parse_derivative_ticker_record, parse_quote_record, parse_trade_record,
39        record::{
40            TardisBookUpdateRecord, TardisOrderBookSnapshot5Record,
41            TardisOrderBookSnapshot25Record, TardisQuoteRecord, TardisTradeRecord,
42        },
43    },
44};
45
46////////////////////////////////////////////////////////////////////////////////
47// OrderBookDelta Streaming
48////////////////////////////////////////////////////////////////////////////////
49
50/// Streaming iterator over CSV records that yields chunks of parsed data.
51struct DeltaStreamIterator {
52    reader: Reader<Box<dyn std::io::Read>>,
53    record: StringRecord,
54    buffer: Vec<OrderBookDelta>,
55    chunk_size: usize,
56    instrument_id: Option<InstrumentId>,
57    price_precision: u8,
58    size_precision: u8,
59    last_ts_event: UnixNanos,
60    last_is_snapshot: bool,
61    limit: Option<usize>,
62    deltas_emitted: usize,
63
64    /// Pending record to process in next iteration (when CLEAR filled the chunk).
65    pending_record: Option<TardisBookUpdateRecord>,
66}
67
68impl DeltaStreamIterator {
69    /// Creates a new [`DeltaStreamIterator`].
70    ///
71    /// # Errors
72    ///
73    /// Returns an error if the file cannot be opened or read.
74    fn new<P: AsRef<Path>>(
75        filepath: P,
76        chunk_size: usize,
77        price_precision: Option<u8>,
78        size_precision: Option<u8>,
79        instrument_id: Option<InstrumentId>,
80        limit: Option<usize>,
81    ) -> anyhow::Result<Self> {
82        let (final_price_precision, final_size_precision) =
83            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
84                // Both precisions provided, use them directly
85                (price_prec, size_prec)
86            } else {
87                // One or both precisions missing, detect only the missing ones
88                let mut reader = create_csv_reader(&filepath)?;
89                let mut record = StringRecord::new();
90                let (detected_price, detected_size) =
91                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000);
92                (
93                    price_precision.unwrap_or(detected_price),
94                    size_precision.unwrap_or(detected_size),
95                )
96            };
97
98        let reader = create_csv_reader(filepath)?;
99
100        Ok(Self {
101            reader,
102            record: StringRecord::new(),
103            buffer: Vec::with_capacity(chunk_size),
104            chunk_size,
105            instrument_id,
106            price_precision: final_price_precision,
107            size_precision: final_size_precision,
108            last_ts_event: UnixNanos::default(),
109            last_is_snapshot: false,
110            limit,
111            deltas_emitted: 0,
112            pending_record: None,
113        })
114    }
115
116    fn detect_precision_from_sample(
117        reader: &mut Reader<Box<dyn std::io::Read>>,
118        record: &mut StringRecord,
119        sample_size: usize,
120    ) -> (u8, u8) {
121        let mut max_price_precision = 0u8;
122        let mut max_size_precision = 0u8;
123        let mut records_scanned = 0;
124
125        while records_scanned < sample_size {
126            match reader.read_record(record) {
127                Ok(true) => {
128                    if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
129                        max_price_precision = max_price_precision.max(infer_precision(data.price));
130                        max_size_precision = max_size_precision.max(infer_precision(data.amount));
131                        records_scanned += 1;
132                    }
133                }
134                Ok(false) => break,             // End of file
135                Err(_) => records_scanned += 1, // Skip malformed records
136            }
137        }
138
139        (max_price_precision, max_size_precision)
140    }
141}
142
143impl Iterator for DeltaStreamIterator {
144    type Item = anyhow::Result<Vec<OrderBookDelta>>;
145
146    fn next(&mut self) -> Option<Self::Item> {
147        if let Some(limit) = self.limit
148            && self.deltas_emitted >= limit
149        {
150            return None;
151        }
152
153        self.buffer.clear();
154
155        loop {
156            if self.buffer.len() >= self.chunk_size {
157                break;
158            }
159
160            if let Some(limit) = self.limit
161                && self.deltas_emitted >= limit
162            {
163                break;
164            }
165
166            // Use pending record from previous iteration, or read new
167            let data = if let Some(pending) = self.pending_record.take() {
168                pending
169            } else {
170                match self.reader.read_record(&mut self.record) {
171                    Ok(true) => match self.record.deserialize::<TardisBookUpdateRecord>(None) {
172                        Ok(data) => data,
173                        Err(e) => {
174                            return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
175                        }
176                    },
177                    Ok(false) => {
178                        if self.buffer.is_empty() {
179                            return None;
180                        }
181
182                        if let Some(last_delta) = self.buffer.last_mut() {
183                            last_delta.flags = RecordFlag::F_LAST.value();
184                        }
185                        return Some(Ok(self.buffer.clone()));
186                    }
187                    Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
188                }
189            };
190
191            // Insert CLEAR on snapshot boundary to reset order book state
192            if data.is_snapshot && !self.last_is_snapshot {
193                let clear_instrument_id = self
194                    .instrument_id
195                    .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
196                let ts_event = parse_timestamp(data.timestamp);
197                let ts_init = parse_timestamp(data.local_timestamp);
198
199                if self.last_ts_event != ts_event
200                    && let Some(last_delta) = self.buffer.last_mut()
201                {
202                    last_delta.flags = RecordFlag::F_LAST.value();
203                }
204                self.last_ts_event = ts_event;
205
206                let clear_delta = OrderBookDelta::clear(clear_instrument_id, 0, ts_event, ts_init);
207                self.buffer.push(clear_delta);
208                self.deltas_emitted += 1;
209
210                // Defer real delta to next chunk if constraints reached
211                if self.buffer.len() >= self.chunk_size
212                    || self.limit.is_some_and(|l| self.deltas_emitted >= l)
213                {
214                    self.last_is_snapshot = data.is_snapshot;
215                    self.pending_record = Some(data);
216                    break;
217                }
218            }
219            self.last_is_snapshot = data.is_snapshot;
220
221            let delta = match parse_delta_record(
222                &data,
223                self.price_precision,
224                self.size_precision,
225                self.instrument_id,
226            ) {
227                Ok(d) => d,
228                Err(e) => {
229                    log::warn!("Skipping invalid delta record: {e}");
230                    continue;
231                }
232            };
233
234            if self.last_ts_event != delta.ts_event
235                && let Some(last_delta) = self.buffer.last_mut()
236            {
237                last_delta.flags = RecordFlag::F_LAST.value();
238            }
239
240            self.last_ts_event = delta.ts_event;
241
242            self.buffer.push(delta);
243            self.deltas_emitted += 1;
244        }
245
246        if self.buffer.is_empty() {
247            None
248        } else {
249            // Only set F_LAST when limit reached (stream ending), not on chunk
250            // boundary where more same-timestamp deltas may follow
251            if let Some(limit) = self.limit
252                && self.deltas_emitted >= limit
253                && let Some(last_delta) = self.buffer.last_mut()
254            {
255                last_delta.flags = RecordFlag::F_LAST.value();
256            }
257            Some(Ok(self.buffer.clone()))
258        }
259    }
260}
261
262/// Streams [`OrderBookDelta`]s from a Tardis format CSV at the given `filepath`,
263/// yielding chunks of the specified size.
264///
265/// # Precision Inference Warning
266///
267/// When using streaming with precision inference (not providing explicit precisions),
268/// the inferred precision may differ from bulk loading the entire file. This is because
269/// precision inference works within chunk boundaries, and different chunks may contain
270/// values with different precision requirements. For deterministic precision behavior,
271/// provide explicit `price_precision` and `size_precision` parameters.
272///
273/// # Errors
274///
275/// Returns an error if the file cannot be opened, read, or parsed as CSV.
276pub fn stream_deltas<P: AsRef<Path>>(
277    filepath: P,
278    chunk_size: usize,
279    price_precision: Option<u8>,
280    size_precision: Option<u8>,
281    instrument_id: Option<InstrumentId>,
282    limit: Option<usize>,
283) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDelta>>>> {
284    DeltaStreamIterator::new(
285        filepath,
286        chunk_size,
287        price_precision,
288        size_precision,
289        instrument_id,
290        limit,
291    )
292}
293
294////////////////////////////////////////////////////////////////////////////////
295// Vec<Py<PyAny>> (OrderBookDeltas as PyCapsule) Streaming
296////////////////////////////////////////////////////////////////////////////////
297
298#[cfg(feature = "python")]
299/// Streaming iterator over CSV records that yields chunks of parsed data.
300struct BatchedDeltasStreamIterator {
301    reader: Reader<Box<dyn std::io::Read>>,
302    record: StringRecord,
303    buffer: Vec<Py<PyAny>>,
304    current_batch: Vec<OrderBookDelta>,
305    pending_batches: Vec<Vec<OrderBookDelta>>,
306    chunk_size: usize,
307    instrument_id: InstrumentId,
308    price_precision: u8,
309    size_precision: u8,
310    last_ts_event: UnixNanos,
311    last_is_snapshot: bool,
312    limit: Option<usize>,
313    deltas_emitted: usize,
314}
315
316#[cfg(feature = "python")]
317impl BatchedDeltasStreamIterator {
318    /// Creates a new [`DeltaStreamIterator`].
319    ///
320    /// # Errors
321    ///
322    /// Returns an error if the file cannot be opened or read.
323    fn new<P: AsRef<Path>>(
324        filepath: P,
325        chunk_size: usize,
326        price_precision: Option<u8>,
327        size_precision: Option<u8>,
328        instrument_id: Option<InstrumentId>,
329        limit: Option<usize>,
330    ) -> anyhow::Result<Self> {
331        let mut reader = create_csv_reader(&filepath)?;
332        let mut record = StringRecord::new();
333
334        let first_record = if reader.read_record(&mut record)? {
335            record.deserialize::<TardisBookUpdateRecord>(None)?
336        } else {
337            anyhow::bail!("CSV file is empty");
338        };
339
340        let final_instrument_id = instrument_id
341            .unwrap_or_else(|| parse_instrument_id(&first_record.exchange, first_record.symbol));
342
343        let (final_price_precision, final_size_precision) =
344            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
345                // Both precisions provided, use them directly
346                (price_prec, size_prec)
347            } else {
348                // One or both precisions missing, detect from sample including first record
349                let (detected_price, detected_size) =
350                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000);
351                (
352                    price_precision.unwrap_or(detected_price),
353                    size_precision.unwrap_or(detected_size),
354                )
355            };
356
357        let reader = create_csv_reader(filepath)?;
358
359        Ok(Self {
360            reader,
361            record: StringRecord::new(),
362            buffer: Vec::with_capacity(chunk_size),
363            current_batch: Vec::new(),
364            pending_batches: Vec::with_capacity(chunk_size),
365            chunk_size,
366            instrument_id: final_instrument_id,
367            price_precision: final_price_precision,
368            size_precision: final_size_precision,
369            last_ts_event: UnixNanos::default(),
370            last_is_snapshot: false,
371            limit,
372            deltas_emitted: 0,
373        })
374    }
375
376    fn detect_precision_from_sample(
377        reader: &mut Reader<Box<dyn std::io::Read>>,
378        record: &mut StringRecord,
379        sample_size: usize,
380    ) -> (u8, u8) {
381        let mut max_price_precision = 0u8;
382        let mut max_size_precision = 0u8;
383        let mut records_scanned = 0;
384
385        while records_scanned < sample_size {
386            match reader.read_record(record) {
387                Ok(true) => {
388                    if let Ok(data) = record.deserialize::<TardisBookUpdateRecord>(None) {
389                        max_price_precision = max_price_precision.max(infer_precision(data.price));
390                        max_size_precision = max_size_precision.max(infer_precision(data.amount));
391                        records_scanned += 1;
392                    }
393                }
394                Ok(false) => break,             // End of file
395                Err(_) => records_scanned += 1, // Skip malformed records
396            }
397        }
398
399        (max_price_precision, max_size_precision)
400    }
401
402    fn fill_pending_batches(&mut self) -> Option<anyhow::Result<()>> {
403        self.pending_batches.clear();
404        let mut batches_created = 0;
405
406        while batches_created < self.chunk_size {
407            if let Some(limit) = self.limit
408                && self.deltas_emitted >= limit
409            {
410                break;
411            }
412
413            match self.reader.read_record(&mut self.record) {
414                Ok(true) => {
415                    let data = match self.record.deserialize::<TardisBookUpdateRecord>(None) {
416                        Ok(data) => data,
417                        Err(e) => {
418                            return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
419                        }
420                    };
421
422                    let ts_event = parse_timestamp(data.timestamp);
423                    let ts_init = parse_timestamp(data.local_timestamp);
424
425                    // Parse before any state changes so invalid records
426                    // don't corrupt batch boundaries or snapshot tracking
427                    let delta = match parse_delta_record(
428                        &data,
429                        self.price_precision,
430                        self.size_precision,
431                        Some(self.instrument_id),
432                    ) {
433                        Ok(d) => d,
434                        Err(e) => {
435                            log::warn!("Skipping invalid delta record: {e}");
436                            continue;
437                        }
438                    };
439
440                    if self.last_ts_event != ts_event && !self.current_batch.is_empty() {
441                        // Set F_LAST on the last delta of the completed batch
442                        if let Some(last_delta) = self.current_batch.last_mut() {
443                            last_delta.flags = RecordFlag::F_LAST.value();
444                        }
445                        self.pending_batches
446                            .push(std::mem::take(&mut self.current_batch));
447                        batches_created += 1;
448                    }
449
450                    self.last_ts_event = ts_event;
451
452                    // Insert CLEAR on snapshot boundary to reset order book state
453                    if data.is_snapshot && !self.last_is_snapshot {
454                        let clear_delta =
455                            OrderBookDelta::clear(self.instrument_id, 0, ts_event, ts_init);
456                        self.current_batch.push(clear_delta);
457                        self.deltas_emitted += 1;
458
459                        if let Some(limit) = self.limit
460                            && self.deltas_emitted >= limit
461                        {
462                            self.last_is_snapshot = data.is_snapshot;
463                            break;
464                        }
465                    }
466                    self.last_is_snapshot = data.is_snapshot;
467
468                    self.current_batch.push(delta);
469                    self.deltas_emitted += 1;
470
471                    if let Some(limit) = self.limit
472                        && self.deltas_emitted >= limit
473                    {
474                        break;
475                    }
476                }
477                Ok(false) => {
478                    // End of file
479                    break;
480                }
481                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
482            }
483        }
484
485        if !self.current_batch.is_empty() && batches_created < self.chunk_size {
486            // Ensure the last delta of the last batch has F_LAST set
487            if let Some(last_delta) = self.current_batch.last_mut() {
488                last_delta.flags = RecordFlag::F_LAST.value();
489            }
490            self.pending_batches
491                .push(std::mem::take(&mut self.current_batch));
492        }
493
494        if self.pending_batches.is_empty() {
495            None
496        } else {
497            Some(Ok(()))
498        }
499    }
500}
501
502#[cfg(feature = "python")]
503impl Iterator for BatchedDeltasStreamIterator {
504    type Item = anyhow::Result<Vec<Py<PyAny>>>;
505
506    fn next(&mut self) -> Option<Self::Item> {
507        if let Some(limit) = self.limit
508            && self.deltas_emitted >= limit
509        {
510            return None;
511        }
512
513        self.buffer.clear();
514
515        if let Some(Err(e)) = self.fill_pending_batches() {
516            return Some(Err(e));
517        }
518
519        if self.pending_batches.is_empty() {
520            None
521        } else {
522            // Create all capsules in a single GIL acquisition
523            Python::attach(|py| {
524                for batch in self.pending_batches.drain(..) {
525                    let deltas = OrderBookDeltas::new(self.instrument_id, batch);
526                    let deltas = OrderBookDeltas_API::new(deltas);
527                    let capsule = data_to_pycapsule(py, Data::Deltas(deltas));
528                    self.buffer.push(capsule);
529                }
530            });
531            Some(Ok(std::mem::take(&mut self.buffer)))
532        }
533    }
534}
535
536#[cfg(feature = "python")]
537/// Streams [`Vec<Py<PyAny>>`]s (`PyCapsule`) from a Tardis format CSV at the given `filepath`,
538/// yielding chunks of the specified size.
539///
540/// # Errors
541///
542/// Returns an error if the file cannot be opened, read, or parsed as CSV.
543pub fn stream_batched_deltas<P: AsRef<Path>>(
544    filepath: P,
545    chunk_size: usize,
546    price_precision: Option<u8>,
547    size_precision: Option<u8>,
548    instrument_id: Option<InstrumentId>,
549    limit: Option<usize>,
550) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<Py<PyAny>>>>> {
551    BatchedDeltasStreamIterator::new(
552        filepath,
553        chunk_size,
554        price_precision,
555        size_precision,
556        instrument_id,
557        limit,
558    )
559}
560
561////////////////////////////////////////////////////////////////////////////////
562// Quote Streaming
563////////////////////////////////////////////////////////////////////////////////
564
565/// An iterator for streaming [`QuoteTick`]s from a Tardis CSV file in chunks.
566struct QuoteStreamIterator {
567    reader: Reader<Box<dyn Read>>,
568    record: StringRecord,
569    buffer: Vec<QuoteTick>,
570    chunk_size: usize,
571    instrument_id: Option<InstrumentId>,
572    price_precision: u8,
573    size_precision: u8,
574    limit: Option<usize>,
575    records_processed: usize,
576}
577
578impl QuoteStreamIterator {
579    /// Creates a new [`QuoteStreamIterator`].
580    ///
581    /// # Errors
582    ///
583    /// Returns an error if the file cannot be opened or read.
584    pub fn new<P: AsRef<Path>>(
585        filepath: P,
586        chunk_size: usize,
587        price_precision: Option<u8>,
588        size_precision: Option<u8>,
589        instrument_id: Option<InstrumentId>,
590        limit: Option<usize>,
591    ) -> anyhow::Result<Self> {
592        let (final_price_precision, final_size_precision) =
593            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
594                // Both precisions provided, use them directly
595                (price_prec, size_prec)
596            } else {
597                // One or both precisions missing, detect only the missing ones
598                let mut reader = create_csv_reader(&filepath)?;
599                let mut record = StringRecord::new();
600                let (detected_price, detected_size) =
601                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000);
602                (
603                    price_precision.unwrap_or(detected_price),
604                    size_precision.unwrap_or(detected_size),
605                )
606            };
607
608        let reader = create_csv_reader(filepath)?;
609
610        Ok(Self {
611            reader,
612            record: StringRecord::new(),
613            buffer: Vec::with_capacity(chunk_size),
614            chunk_size,
615            instrument_id,
616            price_precision: final_price_precision,
617            size_precision: final_size_precision,
618            limit,
619            records_processed: 0,
620        })
621    }
622
623    fn detect_precision_from_sample(
624        reader: &mut Reader<Box<dyn std::io::Read>>,
625        record: &mut StringRecord,
626        sample_size: usize,
627    ) -> (u8, u8) {
628        let mut max_price_precision = 2u8;
629        let mut max_size_precision = 0u8;
630        let mut records_scanned = 0;
631
632        while records_scanned < sample_size {
633            match reader.read_record(record) {
634                Ok(true) => {
635                    if let Ok(data) = record.deserialize::<TardisQuoteRecord>(None) {
636                        if let Some(bid_price_val) = data.bid_price {
637                            max_price_precision =
638                                max_price_precision.max(infer_precision(bid_price_val));
639                        }
640
641                        if let Some(ask_price_val) = data.ask_price {
642                            max_price_precision =
643                                max_price_precision.max(infer_precision(ask_price_val));
644                        }
645
646                        if let Some(bid_amount_val) = data.bid_amount {
647                            max_size_precision =
648                                max_size_precision.max(infer_precision(bid_amount_val));
649                        }
650
651                        if let Some(ask_amount_val) = data.ask_amount {
652                            max_size_precision =
653                                max_size_precision.max(infer_precision(ask_amount_val));
654                        }
655                        records_scanned += 1;
656                    }
657                }
658                Ok(false) => break,             // End of file
659                Err(_) => records_scanned += 1, // Skip malformed records
660            }
661        }
662
663        (max_price_precision, max_size_precision)
664    }
665}
666
667impl Iterator for QuoteStreamIterator {
668    type Item = anyhow::Result<Vec<QuoteTick>>;
669
670    fn next(&mut self) -> Option<Self::Item> {
671        if let Some(limit) = self.limit
672            && self.records_processed >= limit
673        {
674            return None;
675        }
676
677        self.buffer.clear();
678        let mut records_read = 0;
679
680        while records_read < self.chunk_size {
681            match self.reader.read_record(&mut self.record) {
682                Ok(true) => match self.record.deserialize::<TardisQuoteRecord>(None) {
683                    Ok(data) => {
684                        let quote = parse_quote_record(
685                            &data,
686                            self.price_precision,
687                            self.size_precision,
688                            self.instrument_id,
689                        );
690
691                        self.buffer.push(quote);
692                        records_read += 1;
693                        self.records_processed += 1;
694
695                        if let Some(limit) = self.limit
696                            && self.records_processed >= limit
697                        {
698                            break;
699                        }
700                    }
701                    Err(e) => {
702                        return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
703                    }
704                },
705                Ok(false) => {
706                    if self.buffer.is_empty() {
707                        return None;
708                    }
709                    return Some(Ok(self.buffer.clone()));
710                }
711                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
712            }
713        }
714
715        if self.buffer.is_empty() {
716            None
717        } else {
718            Some(Ok(self.buffer.clone()))
719        }
720    }
721}
722
723/// Streams [`QuoteTick`]s from a Tardis format CSV at the given `filepath`,
724/// yielding chunks of the specified size.
725///
726/// # Precision Inference Warning
727///
728/// When using streaming with precision inference (not providing explicit precisions),
729/// the inferred precision may differ from bulk loading the entire file. This is because
730/// precision inference works within chunk boundaries, and different chunks may contain
731/// values with different precision requirements. For deterministic precision behavior,
732/// provide explicit `price_precision` and `size_precision` parameters.
733///
734/// # Errors
735///
736/// Returns an error if the file cannot be opened, read, or parsed as CSV.
737pub fn stream_quotes<P: AsRef<Path>>(
738    filepath: P,
739    chunk_size: usize,
740    price_precision: Option<u8>,
741    size_precision: Option<u8>,
742    instrument_id: Option<InstrumentId>,
743    limit: Option<usize>,
744) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<QuoteTick>>>> {
745    QuoteStreamIterator::new(
746        filepath,
747        chunk_size,
748        price_precision,
749        size_precision,
750        instrument_id,
751        limit,
752    )
753}
754
755////////////////////////////////////////////////////////////////////////////////
756// Trade Streaming
757////////////////////////////////////////////////////////////////////////////////
758
759/// An iterator for streaming [`TradeTick`]s from a Tardis CSV file in chunks.
760struct TradeStreamIterator {
761    reader: Reader<Box<dyn Read>>,
762    record: StringRecord,
763    buffer: Vec<TradeTick>,
764    chunk_size: usize,
765    instrument_id: Option<InstrumentId>,
766    price_precision: u8,
767    size_precision: u8,
768    limit: Option<usize>,
769    records_processed: usize,
770}
771
772impl TradeStreamIterator {
773    /// Creates a new [`TradeStreamIterator`].
774    ///
775    /// # Errors
776    ///
777    /// Returns an error if the file cannot be opened or read.
778    pub fn new<P: AsRef<Path>>(
779        filepath: P,
780        chunk_size: usize,
781        price_precision: Option<u8>,
782        size_precision: Option<u8>,
783        instrument_id: Option<InstrumentId>,
784        limit: Option<usize>,
785    ) -> anyhow::Result<Self> {
786        let (final_price_precision, final_size_precision) =
787            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
788                // Both precisions provided, use them directly
789                (price_prec, size_prec)
790            } else {
791                // One or both precisions missing, detect only the missing ones
792                let mut reader = create_csv_reader(&filepath)?;
793                let mut record = StringRecord::new();
794                let (detected_price, detected_size) =
795                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000);
796                (
797                    price_precision.unwrap_or(detected_price),
798                    size_precision.unwrap_or(detected_size),
799                )
800            };
801
802        let reader = create_csv_reader(filepath)?;
803
804        Ok(Self {
805            reader,
806            record: StringRecord::new(),
807            buffer: Vec::with_capacity(chunk_size),
808            chunk_size,
809            instrument_id,
810            price_precision: final_price_precision,
811            size_precision: final_size_precision,
812            limit,
813            records_processed: 0,
814        })
815    }
816
817    fn detect_precision_from_sample(
818        reader: &mut Reader<Box<dyn std::io::Read>>,
819        record: &mut StringRecord,
820        sample_size: usize,
821    ) -> (u8, u8) {
822        let mut max_price_precision = 2u8;
823        let mut max_size_precision = 0u8;
824        let mut records_scanned = 0;
825
826        while records_scanned < sample_size {
827            match reader.read_record(record) {
828                Ok(true) => {
829                    if let Ok(data) = record.deserialize::<TardisTradeRecord>(None) {
830                        max_price_precision = max_price_precision.max(infer_precision(data.price));
831                        max_size_precision = max_size_precision.max(infer_precision(data.amount));
832                        records_scanned += 1;
833                    }
834                }
835                Ok(false) => break,             // End of file
836                Err(_) => records_scanned += 1, // Skip malformed records
837            }
838        }
839
840        (max_price_precision, max_size_precision)
841    }
842}
843
844impl Iterator for TradeStreamIterator {
845    type Item = anyhow::Result<Vec<TradeTick>>;
846
847    fn next(&mut self) -> Option<Self::Item> {
848        if let Some(limit) = self.limit
849            && self.records_processed >= limit
850        {
851            return None;
852        }
853
854        self.buffer.clear();
855        let mut records_read = 0;
856
857        while records_read < self.chunk_size {
858            match self.reader.read_record(&mut self.record) {
859                Ok(true) => match self.record.deserialize::<TardisTradeRecord>(None) {
860                    Ok(data) => {
861                        let size = Quantity::new(data.amount, self.size_precision);
862
863                        if size.is_positive() {
864                            let trade = parse_trade_record(
865                                &data,
866                                size,
867                                self.price_precision,
868                                self.instrument_id,
869                            );
870
871                            self.buffer.push(trade);
872                            records_read += 1;
873                            self.records_processed += 1;
874
875                            if let Some(limit) = self.limit
876                                && self.records_processed >= limit
877                            {
878                                break;
879                            }
880                        } else {
881                            log::warn!("Skipping zero-sized trade: {data:?}");
882                        }
883                    }
884                    Err(e) => {
885                        return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
886                    }
887                },
888                Ok(false) => {
889                    if self.buffer.is_empty() {
890                        return None;
891                    }
892                    return Some(Ok(self.buffer.clone()));
893                }
894                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
895            }
896        }
897
898        if self.buffer.is_empty() {
899            None
900        } else {
901            Some(Ok(self.buffer.clone()))
902        }
903    }
904}
905
906/// Streams [`TradeTick`]s from a Tardis format CSV at the given `filepath`,
907/// yielding chunks of the specified size.
908///
909/// # Precision Inference Warning
910///
911/// When using streaming with precision inference (not providing explicit precisions),
912/// the inferred precision may differ from bulk loading the entire file. This is because
913/// precision inference works within chunk boundaries, and different chunks may contain
914/// values with different precision requirements. For deterministic precision behavior,
915/// provide explicit `price_precision` and `size_precision` parameters.
916///
917/// # Errors
918///
919/// Returns an error if the file cannot be opened, read, or parsed as CSV.
920pub fn stream_trades<P: AsRef<Path>>(
921    filepath: P,
922    chunk_size: usize,
923    price_precision: Option<u8>,
924    size_precision: Option<u8>,
925    instrument_id: Option<InstrumentId>,
926    limit: Option<usize>,
927) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<TradeTick>>>> {
928    TradeStreamIterator::new(
929        filepath,
930        chunk_size,
931        price_precision,
932        size_precision,
933        instrument_id,
934        limit,
935    )
936}
937
938////////////////////////////////////////////////////////////////////////////////
939// Depth10 Streaming
940////////////////////////////////////////////////////////////////////////////////
941
942/// An iterator for streaming [`OrderBookDepth10`]s from a Tardis CSV file in chunks.
943struct Depth10StreamIterator {
944    reader: Reader<Box<dyn Read>>,
945    record: StringRecord,
946    buffer: Vec<OrderBookDepth10>,
947    chunk_size: usize,
948    levels: u8,
949    instrument_id: Option<InstrumentId>,
950    price_precision: u8,
951    size_precision: u8,
952    limit: Option<usize>,
953    records_processed: usize,
954}
955
956impl Depth10StreamIterator {
957    /// Creates a new [`Depth10StreamIterator`].
958    ///
959    /// # Errors
960    ///
961    /// Returns an error if the file cannot be opened or read, or if `levels` is not 5 or 25.
962    pub fn new<P: AsRef<Path>>(
963        filepath: P,
964        chunk_size: usize,
965        levels: u8,
966        price_precision: Option<u8>,
967        size_precision: Option<u8>,
968        instrument_id: Option<InstrumentId>,
969        limit: Option<usize>,
970    ) -> anyhow::Result<Self> {
971        anyhow::ensure!(
972            levels == 5 || levels == 25,
973            "Invalid levels: {levels}. Must be 5 or 25."
974        );
975
976        let (final_price_precision, final_size_precision) =
977            if let (Some(price_prec), Some(size_prec)) = (price_precision, size_precision) {
978                // Both precisions provided, use them directly
979                (price_prec, size_prec)
980            } else {
981                // One or both precisions missing, detect only the missing ones
982                let mut reader = create_csv_reader(&filepath)?;
983                let mut record = StringRecord::new();
984                let (detected_price, detected_size) =
985                    Self::detect_precision_from_sample(&mut reader, &mut record, 10_000);
986                (
987                    price_precision.unwrap_or(detected_price),
988                    size_precision.unwrap_or(detected_size),
989                )
990            };
991
992        let reader = create_csv_reader(filepath)?;
993
994        Ok(Self {
995            reader,
996            record: StringRecord::new(),
997            buffer: Vec::with_capacity(chunk_size),
998            chunk_size,
999            levels,
1000            instrument_id,
1001            price_precision: final_price_precision,
1002            size_precision: final_size_precision,
1003            limit,
1004            records_processed: 0,
1005        })
1006    }
1007
1008    fn process_snapshot5(&self, data: &TardisOrderBookSnapshot5Record) -> OrderBookDepth10 {
1009        let instrument_id = self
1010            .instrument_id
1011            .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
1012
1013        let mut bids = [NULL_ORDER; DEPTH10_LEN];
1014        let mut asks = [NULL_ORDER; DEPTH10_LEN];
1015        let mut bid_counts = [0_u32; DEPTH10_LEN];
1016        let mut ask_counts = [0_u32; DEPTH10_LEN];
1017
1018        // Process first 5 levels from snapshot5 data
1019        for i in 0..5 {
1020            let (bid_price, bid_amount) = match i {
1021                0 => (data.bids_0_price, data.bids_0_amount),
1022                1 => (data.bids_1_price, data.bids_1_amount),
1023                2 => (data.bids_2_price, data.bids_2_amount),
1024                3 => (data.bids_3_price, data.bids_3_amount),
1025                4 => (data.bids_4_price, data.bids_4_amount),
1026                _ => unreachable!(),
1027            };
1028
1029            let (ask_price, ask_amount) = match i {
1030                0 => (data.asks_0_price, data.asks_0_amount),
1031                1 => (data.asks_1_price, data.asks_1_amount),
1032                2 => (data.asks_2_price, data.asks_2_amount),
1033                3 => (data.asks_3_price, data.asks_3_amount),
1034                4 => (data.asks_4_price, data.asks_4_amount),
1035                _ => unreachable!(),
1036            };
1037
1038            let (bid_order, bid_count) = create_book_order(
1039                OrderSide::Buy,
1040                bid_price,
1041                bid_amount,
1042                self.price_precision,
1043                self.size_precision,
1044            );
1045            bids[i] = bid_order;
1046            bid_counts[i] = bid_count;
1047
1048            let (ask_order, ask_count) = create_book_order(
1049                OrderSide::Sell,
1050                ask_price,
1051                ask_amount,
1052                self.price_precision,
1053                self.size_precision,
1054            );
1055            asks[i] = ask_order;
1056            ask_counts[i] = ask_count;
1057        }
1058
1059        let flags = RecordFlag::F_SNAPSHOT.value();
1060        let sequence = 0;
1061        let ts_event = parse_timestamp(data.timestamp);
1062        let ts_init = parse_timestamp(data.local_timestamp);
1063
1064        OrderBookDepth10::new(
1065            instrument_id,
1066            bids,
1067            asks,
1068            bid_counts,
1069            ask_counts,
1070            flags,
1071            sequence,
1072            ts_event,
1073            ts_init,
1074        )
1075    }
1076
1077    fn process_snapshot25(&self, data: &TardisOrderBookSnapshot25Record) -> OrderBookDepth10 {
1078        let instrument_id = self
1079            .instrument_id
1080            .unwrap_or_else(|| parse_instrument_id(&data.exchange, data.symbol));
1081
1082        let mut bids = [NULL_ORDER; DEPTH10_LEN];
1083        let mut asks = [NULL_ORDER; DEPTH10_LEN];
1084        let mut bid_counts = [0_u32; DEPTH10_LEN];
1085        let mut ask_counts = [0_u32; DEPTH10_LEN];
1086
1087        // Process first 10 levels from snapshot25 data
1088        for i in 0..DEPTH10_LEN {
1089            let (bid_price, bid_amount) = match i {
1090                0 => (data.bids_0_price, data.bids_0_amount),
1091                1 => (data.bids_1_price, data.bids_1_amount),
1092                2 => (data.bids_2_price, data.bids_2_amount),
1093                3 => (data.bids_3_price, data.bids_3_amount),
1094                4 => (data.bids_4_price, data.bids_4_amount),
1095                5 => (data.bids_5_price, data.bids_5_amount),
1096                6 => (data.bids_6_price, data.bids_6_amount),
1097                7 => (data.bids_7_price, data.bids_7_amount),
1098                8 => (data.bids_8_price, data.bids_8_amount),
1099                9 => (data.bids_9_price, data.bids_9_amount),
1100                _ => unreachable!(),
1101            };
1102
1103            let (ask_price, ask_amount) = match i {
1104                0 => (data.asks_0_price, data.asks_0_amount),
1105                1 => (data.asks_1_price, data.asks_1_amount),
1106                2 => (data.asks_2_price, data.asks_2_amount),
1107                3 => (data.asks_3_price, data.asks_3_amount),
1108                4 => (data.asks_4_price, data.asks_4_amount),
1109                5 => (data.asks_5_price, data.asks_5_amount),
1110                6 => (data.asks_6_price, data.asks_6_amount),
1111                7 => (data.asks_7_price, data.asks_7_amount),
1112                8 => (data.asks_8_price, data.asks_8_amount),
1113                9 => (data.asks_9_price, data.asks_9_amount),
1114                _ => unreachable!(),
1115            };
1116
1117            let (bid_order, bid_count) = create_book_order(
1118                OrderSide::Buy,
1119                bid_price,
1120                bid_amount,
1121                self.price_precision,
1122                self.size_precision,
1123            );
1124            bids[i] = bid_order;
1125            bid_counts[i] = bid_count;
1126
1127            let (ask_order, ask_count) = create_book_order(
1128                OrderSide::Sell,
1129                ask_price,
1130                ask_amount,
1131                self.price_precision,
1132                self.size_precision,
1133            );
1134            asks[i] = ask_order;
1135            ask_counts[i] = ask_count;
1136        }
1137
1138        let flags = RecordFlag::F_SNAPSHOT.value();
1139        let sequence = 0;
1140        let ts_event = parse_timestamp(data.timestamp);
1141        let ts_init = parse_timestamp(data.local_timestamp);
1142
1143        OrderBookDepth10::new(
1144            instrument_id,
1145            bids,
1146            asks,
1147            bid_counts,
1148            ask_counts,
1149            flags,
1150            sequence,
1151            ts_event,
1152            ts_init,
1153        )
1154    }
1155
1156    fn detect_precision_from_sample(
1157        reader: &mut Reader<Box<dyn std::io::Read>>,
1158        record: &mut StringRecord,
1159        sample_size: usize,
1160    ) -> (u8, u8) {
1161        let mut max_price_precision = 2u8;
1162        let mut max_size_precision = 0u8;
1163        let mut records_scanned = 0;
1164
1165        while records_scanned < sample_size {
1166            match reader.read_record(record) {
1167                Ok(true) => {
1168                    // Try to deserialize as snapshot5 record first
1169                    if let Ok(data) = record.deserialize::<TardisOrderBookSnapshot5Record>(None) {
1170                        if let Some(bid_price) = data.bids_0_price {
1171                            max_price_precision =
1172                                max_price_precision.max(infer_precision(bid_price));
1173                        }
1174
1175                        if let Some(ask_price) = data.asks_0_price {
1176                            max_price_precision =
1177                                max_price_precision.max(infer_precision(ask_price));
1178                        }
1179
1180                        if let Some(bid_amount) = data.bids_0_amount {
1181                            max_size_precision =
1182                                max_size_precision.max(infer_precision(bid_amount));
1183                        }
1184
1185                        if let Some(ask_amount) = data.asks_0_amount {
1186                            max_size_precision =
1187                                max_size_precision.max(infer_precision(ask_amount));
1188                        }
1189                        records_scanned += 1;
1190                    } else if let Ok(data) =
1191                        record.deserialize::<TardisOrderBookSnapshot25Record>(None)
1192                    {
1193                        if let Some(bid_price) = data.bids_0_price {
1194                            max_price_precision =
1195                                max_price_precision.max(infer_precision(bid_price));
1196                        }
1197
1198                        if let Some(ask_price) = data.asks_0_price {
1199                            max_price_precision =
1200                                max_price_precision.max(infer_precision(ask_price));
1201                        }
1202
1203                        if let Some(bid_amount) = data.bids_0_amount {
1204                            max_size_precision =
1205                                max_size_precision.max(infer_precision(bid_amount));
1206                        }
1207
1208                        if let Some(ask_amount) = data.asks_0_amount {
1209                            max_size_precision =
1210                                max_size_precision.max(infer_precision(ask_amount));
1211                        }
1212                        records_scanned += 1;
1213                    }
1214                }
1215                Ok(false) => break,             // End of file
1216                Err(_) => records_scanned += 1, // Skip malformed records
1217            }
1218        }
1219
1220        (max_price_precision, max_size_precision)
1221    }
1222}
1223
1224impl Iterator for Depth10StreamIterator {
1225    type Item = anyhow::Result<Vec<OrderBookDepth10>>;
1226
1227    fn next(&mut self) -> Option<Self::Item> {
1228        if let Some(limit) = self.limit
1229            && self.records_processed >= limit
1230        {
1231            return None;
1232        }
1233
1234        if !self.buffer.is_empty() {
1235            let chunk = self.buffer.split_off(0);
1236            return Some(Ok(chunk));
1237        }
1238
1239        self.buffer.clear();
1240        let mut records_read = 0;
1241
1242        while records_read < self.chunk_size {
1243            match self.reader.read_record(&mut self.record) {
1244                Ok(true) => {
1245                    let result = match self.levels {
1246                        5 => self
1247                            .record
1248                            .deserialize::<TardisOrderBookSnapshot5Record>(None)
1249                            .map(|data| self.process_snapshot5(&data)),
1250                        25 => self
1251                            .record
1252                            .deserialize::<TardisOrderBookSnapshot25Record>(None)
1253                            .map(|data| self.process_snapshot25(&data)),
1254                        _ => return Some(Err(anyhow::anyhow!("Invalid levels: {}", self.levels))),
1255                    };
1256
1257                    match result {
1258                        Ok(depth) => {
1259                            self.buffer.push(depth);
1260                            records_read += 1;
1261                            self.records_processed += 1;
1262
1263                            if let Some(limit) = self.limit
1264                                && self.records_processed >= limit
1265                            {
1266                                break;
1267                            }
1268                        }
1269                        Err(e) => {
1270                            return Some(Err(anyhow::anyhow!("Failed to deserialize record: {e}")));
1271                        }
1272                    }
1273                }
1274                Ok(false) => {
1275                    if self.buffer.is_empty() {
1276                        return None;
1277                    }
1278                    let chunk = self.buffer.split_off(0);
1279                    return Some(Ok(chunk));
1280                }
1281                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1282            }
1283        }
1284
1285        if self.buffer.is_empty() {
1286            None
1287        } else {
1288            let chunk = self.buffer.split_off(0);
1289            Some(Ok(chunk))
1290        }
1291    }
1292}
1293
1294/// Streams [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
1295/// yielding chunks of the specified size.
1296///
1297/// # Precision Inference Warning
1298///
1299/// When using streaming with precision inference (not providing explicit precisions),
1300/// the inferred precision may differ from bulk loading the entire file. This is because
1301/// precision inference works within chunk boundaries, and different chunks may contain
1302/// values with different precision requirements. For deterministic precision behavior,
1303/// provide explicit `price_precision` and `size_precision` parameters.
1304///
1305/// # Errors
1306///
1307/// Returns an error if the file cannot be opened, read, or parsed as CSV.
1308pub fn stream_depth10_from_snapshot5<P: AsRef<Path>>(
1309    filepath: P,
1310    chunk_size: usize,
1311    price_precision: Option<u8>,
1312    size_precision: Option<u8>,
1313    instrument_id: Option<InstrumentId>,
1314    limit: Option<usize>,
1315) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1316    Depth10StreamIterator::new(
1317        filepath,
1318        chunk_size,
1319        5,
1320        price_precision,
1321        size_precision,
1322        instrument_id,
1323        limit,
1324    )
1325}
1326
1327/// Streams [`OrderBookDepth10`]s from a Tardis format CSV at the given `filepath`,
1328/// yielding chunks of the specified size.
1329///
1330/// # Precision Inference Warning
1331///
1332/// When using streaming with precision inference (not providing explicit precisions),
1333/// the inferred precision may differ from bulk loading the entire file. This is because
1334/// precision inference works within chunk boundaries, and different chunks may contain
1335/// values with different precision requirements. For deterministic precision behavior,
1336/// provide explicit `price_precision` and `size_precision` parameters.
1337///
1338/// # Errors
1339///
1340/// Returns an error if the file cannot be opened, read, or parsed as CSV.
1341pub fn stream_depth10_from_snapshot25<P: AsRef<Path>>(
1342    filepath: P,
1343    chunk_size: usize,
1344    price_precision: Option<u8>,
1345    size_precision: Option<u8>,
1346    instrument_id: Option<InstrumentId>,
1347    limit: Option<usize>,
1348) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<OrderBookDepth10>>>> {
1349    Depth10StreamIterator::new(
1350        filepath,
1351        chunk_size,
1352        25,
1353        price_precision,
1354        size_precision,
1355        instrument_id,
1356        limit,
1357    )
1358}
1359
1360////////////////////////////////////////////////////////////////////////////////
1361// FundingRateUpdate Streaming
1362////////////////////////////////////////////////////////////////////////////////
1363
1364use nautilus_model::data::FundingRateUpdate;
1365
1366use crate::csv::record::TardisDerivativeTickerRecord;
1367
1368/// An iterator for streaming [`FundingRateUpdate`]s from a Tardis CSV file in chunks.
1369struct FundingRateStreamIterator {
1370    reader: Reader<Box<dyn Read>>,
1371    record: StringRecord,
1372    buffer: Vec<FundingRateUpdate>,
1373    chunk_size: usize,
1374    instrument_id: Option<InstrumentId>,
1375    limit: Option<usize>,
1376    records_processed: usize,
1377}
1378
1379impl FundingRateStreamIterator {
1380    /// Creates a new [`FundingRateStreamIterator`].
1381    ///
1382    /// # Errors
1383    ///
1384    /// Returns an error if the file cannot be opened or read.
1385    fn new<P: AsRef<Path>>(
1386        filepath: P,
1387        chunk_size: usize,
1388        instrument_id: Option<InstrumentId>,
1389        limit: Option<usize>,
1390    ) -> anyhow::Result<Self> {
1391        let reader = create_csv_reader(filepath)?;
1392
1393        Ok(Self {
1394            reader,
1395            record: StringRecord::new(),
1396            buffer: Vec::with_capacity(chunk_size),
1397            chunk_size,
1398            instrument_id,
1399            limit,
1400            records_processed: 0,
1401        })
1402    }
1403}
1404
1405impl Iterator for FundingRateStreamIterator {
1406    type Item = anyhow::Result<Vec<FundingRateUpdate>>;
1407
1408    fn next(&mut self) -> Option<Self::Item> {
1409        if let Some(limit) = self.limit
1410            && self.records_processed >= limit
1411        {
1412            return None;
1413        }
1414
1415        if !self.buffer.is_empty() {
1416            let chunk = self.buffer.split_off(0);
1417            return Some(Ok(chunk));
1418        }
1419
1420        self.buffer.clear();
1421        let mut records_read = 0;
1422
1423        while records_read < self.chunk_size {
1424            match self.reader.read_record(&mut self.record) {
1425                Ok(true) => {
1426                    let result = self
1427                        .record
1428                        .deserialize::<TardisDerivativeTickerRecord>(None)
1429                        .map_err(anyhow::Error::from)
1430                        .map(|data| parse_derivative_ticker_record(&data, self.instrument_id));
1431
1432                    match result {
1433                        Ok(Some(funding_rate)) => {
1434                            self.buffer.push(funding_rate);
1435                            records_read += 1;
1436                            self.records_processed += 1;
1437
1438                            if let Some(limit) = self.limit
1439                                && self.records_processed >= limit
1440                            {
1441                                break;
1442                            }
1443                        }
1444                        Ok(None) => {
1445                            // Skip this record as it has no funding data
1446                            self.records_processed += 1;
1447                        }
1448                        Err(e) => {
1449                            return Some(Err(anyhow::anyhow!(
1450                                "Failed to parse funding rate record: {e}"
1451                            )));
1452                        }
1453                    }
1454                }
1455                Ok(false) => {
1456                    if self.buffer.is_empty() {
1457                        return None;
1458                    }
1459                    let chunk = self.buffer.split_off(0);
1460                    return Some(Ok(chunk));
1461                }
1462                Err(e) => return Some(Err(anyhow::anyhow!("Failed to read record: {e}"))),
1463            }
1464        }
1465
1466        if self.buffer.is_empty() {
1467            None
1468        } else {
1469            let chunk = self.buffer.split_off(0);
1470            Some(Ok(chunk))
1471        }
1472    }
1473}
1474
1475/// Streams [`FundingRateUpdate`]s from a Tardis derivative ticker CSV file,
1476/// yielding chunks of the specified size.
1477///
1478/// This function parses the `funding_rate`, `predicted_funding_rate`, and `funding_timestamp`
1479/// fields from derivative ticker data to create funding rate updates.
1480///
1481/// # Errors
1482///
1483/// Returns an error if the file cannot be opened, read, or parsed as CSV.
1484pub fn stream_funding_rates<P: AsRef<Path>>(
1485    filepath: P,
1486    chunk_size: usize,
1487    instrument_id: Option<InstrumentId>,
1488    limit: Option<usize>,
1489) -> anyhow::Result<impl Iterator<Item = anyhow::Result<Vec<FundingRateUpdate>>>> {
1490    FundingRateStreamIterator::new(filepath, chunk_size, instrument_id, limit)
1491}
1492
1493#[cfg(test)]
1494mod tests {
1495    use nautilus_model::{
1496        enums::{AggressorSide, BookAction},
1497        identifiers::TradeId,
1498        types::Price,
1499    };
1500    use rstest::*;
1501
1502    use super::*;
1503    use crate::{
1504        common::{parse::parse_price, testing::get_test_data_path},
1505        csv::load::load_deltas,
1506    };
1507
1508    #[rstest]
1509    #[case(0.0, 0)]
1510    #[case(42.0, 0)]
1511    #[case(0.1, 1)]
1512    #[case(0.25, 2)]
1513    #[case(123.0001, 4)]
1514    #[case(-42.987654321,       9)]
1515    #[case(1.234_567_890_123, 12)]
1516    fn test_infer_precision(#[case] input: f64, #[case] expected: u8) {
1517        assert_eq!(infer_precision(input), expected);
1518    }
1519
1520    #[rstest]
1521    pub fn test_stream_deltas_chunked() {
1522        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1523binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1524binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1525binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1526binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
1527binance-futures,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
1528
1529        let temp_file = std::env::temp_dir().join("test_stream_deltas.csv");
1530        std::fs::write(&temp_file, csv_data).unwrap();
1531
1532        let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1533        let chunks: Vec<_> = stream.collect();
1534
1535        // 5 data rows + 1 CLEAR = 6 deltas, in chunks of 2
1536        assert_eq!(chunks.len(), 3);
1537
1538        let chunk1 = chunks[0].as_ref().unwrap();
1539        assert_eq!(chunk1.len(), 2);
1540        assert_eq!(chunk1[0].action, BookAction::Clear); // CLEAR first
1541        assert_eq!(chunk1[1].order.price.precision, 4); // First data delta
1542
1543        let chunk2 = chunks[1].as_ref().unwrap();
1544        assert_eq!(chunk2.len(), 2);
1545        assert_eq!(chunk2[0].order.price.precision, 4);
1546        assert_eq!(chunk2[1].order.price.precision, 4);
1547
1548        let chunk3 = chunks[2].as_ref().unwrap();
1549        assert_eq!(chunk3.len(), 2);
1550        assert_eq!(chunk3[0].order.price.precision, 4);
1551        assert_eq!(chunk3[1].order.price.precision, 4);
1552
1553        let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1554        assert_eq!(total_deltas, 6);
1555
1556        std::fs::remove_file(&temp_file).ok();
1557    }
1558
1559    #[cfg(feature = "python")]
1560    #[rstest]
1561    pub fn test_stream_batched_deltas_clear_and_limit() {
1562        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1563binance,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
1564binance,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
1565binance,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
1566binance,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0
1567binance,BTCUSDT,1640995204000000,1640995204100000,false,ask,50000.1234,0.5";
1568
1569        let temp_file = std::env::temp_dir().join("test_stream_batched_deltas.csv");
1570        std::fs::write(&temp_file, csv_data).unwrap();
1571
1572        // limit=1 should return only the synthetic CLEAR delta
1573        let mut iterator =
1574            BatchedDeltasStreamIterator::new(&temp_file, 10, Some(4), Some(1), None, Some(1))
1575                .unwrap();
1576        iterator.fill_pending_batches().transpose().unwrap();
1577        assert_eq!(iterator.pending_batches.len(), 1);
1578        assert_eq!(iterator.pending_batches[0].len(), 1);
1579        assert_eq!(iterator.pending_batches[0][0].action, BookAction::Clear);
1580
1581        // No limit should return all batches (first batch starts with CLEAR)
1582        let mut iterator =
1583            BatchedDeltasStreamIterator::new(&temp_file, 10, Some(4), Some(1), None, None).unwrap();
1584        iterator.fill_pending_batches().transpose().unwrap();
1585        assert_eq!(iterator.pending_batches.len(), 5);
1586        assert_eq!(iterator.pending_batches[0].len(), 2);
1587        assert_eq!(iterator.pending_batches[0][0].action, BookAction::Clear);
1588        assert_ne!(iterator.pending_batches[0][1].action, BookAction::Clear);
1589        let total_deltas: usize = iterator
1590            .pending_batches
1591            .iter()
1592            .map(|batch| batch.len())
1593            .sum();
1594        assert_eq!(total_deltas, 6);
1595
1596        std::fs::remove_file(&temp_file).ok();
1597    }
1598
1599    #[cfg(feature = "python")]
1600    #[rstest]
1601    pub fn test_stream_batched_deltas_with_mid_snapshot_inserts_clear() {
1602        // CSV with:
1603        // - Initial snapshot (is_snapshot=true) at start
1604        // - Some deltas (is_snapshot=false)
1605        // - Mid-day snapshot (is_snapshot=true) - should trigger CLEAR
1606        // - Back to deltas (is_snapshot=false)
1607        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1608binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1609binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1610binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1611binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1612binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,bid,50100.0,3.0
1613binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,ask,50101.0,4.0
1614binance-futures,BTCUSDT,1640995301000000,1640995301100000,false,bid,50099.0,1.0";
1615
1616        let temp_file = std::env::temp_dir().join("test_stream_batched_mid_snapshot.csv");
1617        std::fs::write(&temp_file, csv_data).unwrap();
1618
1619        let mut iterator =
1620            BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, None)
1621                .unwrap();
1622        iterator.fill_pending_batches().transpose().unwrap();
1623
1624        let all_deltas: Vec<_> = iterator.pending_batches.iter().flatten().collect();
1625        let clear_count = all_deltas
1626            .iter()
1627            .filter(|d| d.action == BookAction::Clear)
1628            .count();
1629
1630        // Should have 2 CLEAR deltas: initial snapshot + mid-day snapshot
1631        assert_eq!(
1632            clear_count, 2,
1633            "Expected 2 CLEAR deltas (initial + mid-day snapshot), found {clear_count}"
1634        );
1635
1636        // Verify CLEAR positions:
1637        // 0=CLEAR, 1=Add, 2=Add, 3=Update, 4=Update, 5=CLEAR, 6=Add, 7=Add, 8=Update
1638        assert_eq!(all_deltas[0].action, BookAction::Clear);
1639        assert_eq!(all_deltas[5].action, BookAction::Clear);
1640
1641        // CLEAR deltas should NOT have F_LAST when followed by same-timestamp deltas
1642        assert_eq!(
1643            all_deltas[0].flags & RecordFlag::F_LAST.value(),
1644            0,
1645            "CLEAR at index 0 should not have F_LAST flag"
1646        );
1647        assert_eq!(
1648            all_deltas[5].flags & RecordFlag::F_LAST.value(),
1649            0,
1650            "CLEAR at index 5 should not have F_LAST flag"
1651        );
1652
1653        std::fs::remove_file(&temp_file).ok();
1654    }
1655
1656    #[cfg(feature = "python")]
1657    #[rstest]
1658    pub fn test_stream_batched_deltas_limit_includes_clear() {
1659        // Test that limit counts total emitted deltas (including CLEARs)
1660        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1661binance,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1662binance,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
1663binance,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
1664binance,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5
1665binance,BTCUSDT,1640995204000000,1640995204100000,false,ask,50003.0,1.0";
1666
1667        let temp_file = std::env::temp_dir().join("test_stream_batched_limit_includes_clear.csv");
1668        std::fs::write(&temp_file, csv_data).unwrap();
1669
1670        let mut iterator =
1671            BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, Some(4))
1672                .unwrap();
1673        iterator.fill_pending_batches().transpose().unwrap();
1674
1675        let all_deltas: Vec<_> = iterator.pending_batches.iter().flatten().collect();
1676
1677        // limit=4 should get exactly 4 deltas: 1 CLEAR + 3 data deltas
1678        assert_eq!(all_deltas.len(), 4);
1679        assert_eq!(all_deltas[0].action, BookAction::Clear);
1680        assert_eq!(all_deltas[1].action, BookAction::Add);
1681        assert_eq!(all_deltas[2].action, BookAction::Update);
1682        assert_eq!(all_deltas[3].action, BookAction::Update);
1683
1684        std::fs::remove_file(&temp_file).ok();
1685    }
1686
1687    #[cfg(feature = "python")]
1688    #[rstest]
1689    pub fn test_stream_batched_deltas_limit_sets_f_last() {
1690        // Test that F_LAST is set on the final delta when limit is reached
1691        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1692binance,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1693binance,BTCUSDT,1640995201000000,1640995201100000,false,ask,50001.0,2.0
1694binance,BTCUSDT,1640995202000000,1640995202100000,false,bid,49999.0,0.5
1695binance,BTCUSDT,1640995203000000,1640995203100000,false,ask,50002.0,1.5";
1696
1697        let temp_file = std::env::temp_dir().join("test_stream_batched_limit_f_last.csv");
1698        std::fs::write(&temp_file, csv_data).unwrap();
1699
1700        // limit=3 should get 3 deltas with F_LAST on the last one
1701        let mut iterator =
1702            BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, Some(3))
1703                .unwrap();
1704        iterator.fill_pending_batches().transpose().unwrap();
1705
1706        let all_deltas: Vec<_> = iterator.pending_batches.iter().flatten().collect();
1707
1708        assert_eq!(all_deltas.len(), 3);
1709        assert_eq!(
1710            all_deltas[2].flags & RecordFlag::F_LAST.value(),
1711            RecordFlag::F_LAST.value(),
1712            "Final delta should have F_LAST flag when limit is reached"
1713        );
1714
1715        std::fs::remove_file(&temp_file).ok();
1716    }
1717
1718    #[cfg(feature = "python")]
1719    #[rstest]
1720    pub fn test_stream_batched_deltas_snapshot_batch_flags() {
1721        // Test that CLEAR is first in batch and only the last delta has F_LAST
1722        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
1723binance,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
1724binance,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
1725binance,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5";
1726
1727        let temp_file = std::env::temp_dir().join("test_stream_batched_snapshot_batch_flags.csv");
1728        std::fs::write(&temp_file, csv_data).unwrap();
1729
1730        let mut iterator =
1731            BatchedDeltasStreamIterator::new(&temp_file, 100, Some(1), Some(1), None, None)
1732                .unwrap();
1733        iterator.fill_pending_batches().transpose().unwrap();
1734
1735        assert_eq!(iterator.pending_batches.len(), 2);
1736        let first_batch = &iterator.pending_batches[0];
1737
1738        // First batch contains CLEAR + 2 snapshot deltas
1739        assert_eq!(first_batch.len(), 3);
1740        assert_eq!(first_batch[0].action, BookAction::Clear);
1741        assert_eq!(first_batch[0].flags & RecordFlag::F_LAST.value(), 0);
1742        assert_eq!(first_batch[1].flags & RecordFlag::F_LAST.value(), 0);
1743        assert_eq!(
1744            first_batch[2].flags & RecordFlag::F_LAST.value(),
1745            RecordFlag::F_LAST.value()
1746        );
1747
1748        // Second batch should have F_LAST set (end of file)
1749        assert_eq!(iterator.pending_batches[1].len(), 1);
1750        assert_eq!(
1751            iterator.pending_batches[1][0].flags & RecordFlag::F_LAST.value(),
1752            RecordFlag::F_LAST.value()
1753        );
1754
1755        std::fs::remove_file(&temp_file).ok();
1756    }
1757
1758    #[rstest]
1759    pub fn test_stream_quotes_chunked() {
1760        let csv_data =
1761            "exchange,symbol,timestamp,local_timestamp,ask_amount,ask_price,bid_price,bid_amount
1762binance,BTCUSDT,1640995200000000,1640995200100000,1.0,50000.0,49999.0,1.5
1763binance,BTCUSDT,1640995201000000,1640995201100000,2.0,50000.5,49999.5,2.5
1764binance,BTCUSDT,1640995202000000,1640995202100000,1.5,50000.12,49999.12,1.8
1765binance,BTCUSDT,1640995203000000,1640995203100000,3.0,50000.123,49999.123,3.2
1766binance,BTCUSDT,1640995204000000,1640995204100000,0.5,50000.1234,49999.1234,0.8";
1767
1768        let temp_file = std::env::temp_dir().join("test_stream_quotes.csv");
1769        std::fs::write(&temp_file, csv_data).unwrap();
1770
1771        let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, None).unwrap();
1772        let chunks: Vec<_> = stream.collect();
1773
1774        assert_eq!(chunks.len(), 3);
1775
1776        let chunk1 = chunks[0].as_ref().unwrap();
1777        assert_eq!(chunk1.len(), 2);
1778        assert_eq!(chunk1[0].bid_price.precision, 4);
1779        assert_eq!(chunk1[1].bid_price.precision, 4);
1780
1781        let chunk2 = chunks[1].as_ref().unwrap();
1782        assert_eq!(chunk2.len(), 2);
1783        assert_eq!(chunk2[0].bid_price.precision, 4);
1784        assert_eq!(chunk2[1].bid_price.precision, 4);
1785
1786        let chunk3 = chunks[2].as_ref().unwrap();
1787        assert_eq!(chunk3.len(), 1);
1788        assert_eq!(chunk3[0].bid_price.precision, 4);
1789
1790        let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1791        assert_eq!(total_quotes, 5);
1792
1793        std::fs::remove_file(&temp_file).ok();
1794    }
1795
1796    #[rstest]
1797    pub fn test_stream_trades_chunked() {
1798        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1799binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1800binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
1801binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1802binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
1803binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
1804
1805        let temp_file = std::env::temp_dir().join("test_stream_trades.csv");
1806        std::fs::write(&temp_file, csv_data).unwrap();
1807
1808        let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1809        let chunks: Vec<_> = stream.collect();
1810
1811        assert_eq!(chunks.len(), 2);
1812
1813        let chunk1 = chunks[0].as_ref().unwrap();
1814        assert_eq!(chunk1.len(), 3);
1815        assert_eq!(chunk1[0].price.precision, 4);
1816        assert_eq!(chunk1[1].price.precision, 4);
1817        assert_eq!(chunk1[2].price.precision, 4);
1818
1819        let chunk2 = chunks[1].as_ref().unwrap();
1820        assert_eq!(chunk2.len(), 2);
1821        assert_eq!(chunk2[0].price.precision, 4);
1822        assert_eq!(chunk2[1].price.precision, 4);
1823
1824        assert_eq!(chunk1[0].aggressor_side, AggressorSide::Buyer);
1825        assert_eq!(chunk1[1].aggressor_side, AggressorSide::Seller);
1826
1827        let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1828        assert_eq!(total_trades, 5);
1829
1830        std::fs::remove_file(&temp_file).ok();
1831    }
1832
1833    #[rstest]
1834    pub fn test_stream_trades_with_zero_sized_trade() {
1835        // Test CSV data with one zero-sized trade that should be skipped
1836        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
1837binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
1838binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,0.0
1839binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
1840binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0";
1841
1842        let temp_file = std::env::temp_dir().join("test_stream_trades_zero_size.csv");
1843        std::fs::write(&temp_file, csv_data).unwrap();
1844
1845        let stream = stream_trades(&temp_file, 3, Some(4), Some(1), None, None).unwrap();
1846        let chunks: Vec<_> = stream.collect();
1847
1848        // Should have 1 chunk with 3 valid trades (zero-sized trade skipped)
1849        assert_eq!(chunks.len(), 1);
1850
1851        let chunk1 = chunks[0].as_ref().unwrap();
1852        assert_eq!(chunk1.len(), 3);
1853
1854        // Verify the trades are the correct ones (not the zero-sized one)
1855        assert_eq!(chunk1[0].size, Quantity::from("1.0"));
1856        assert_eq!(chunk1[1].size, Quantity::from("1.5"));
1857        assert_eq!(chunk1[2].size, Quantity::from("3.0"));
1858
1859        // Verify trade IDs to confirm correct trades were loaded
1860        assert_eq!(chunk1[0].trade_id, TradeId::new("trade1"));
1861        assert_eq!(chunk1[1].trade_id, TradeId::new("trade3"));
1862        assert_eq!(chunk1[2].trade_id, TradeId::new("trade4"));
1863
1864        std::fs::remove_file(&temp_file).ok();
1865    }
1866
1867    #[rstest]
1868    pub fn test_stream_depth10_from_snapshot5_chunked() {
1869        let csv_data = "exchange,symbol,timestamp,local_timestamp,asks[0].price,asks[0].amount,bids[0].price,bids[0].amount,asks[1].price,asks[1].amount,bids[1].price,bids[1].amount,asks[2].price,asks[2].amount,bids[2].price,bids[2].amount,asks[3].price,asks[3].amount,bids[3].price,bids[3].amount,asks[4].price,asks[4].amount,bids[4].price,bids[4].amount
1870binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,49999.0,1.5,50002.0,2.0,49998.0,2.5,50003.0,3.0,49997.0,3.5,50004.0,4.0,49996.0,4.5,50005.0,5.0,49995.0,5.5
1871binance,BTCUSDT,1640995201000000,1640995201100000,50001.5,1.1,49999.5,1.6,50002.5,2.1,49998.5,2.6,50003.5,3.1,49997.5,3.6,50004.5,4.1,49996.5,4.6,50005.5,5.1,49995.5,5.6
1872binance,BTCUSDT,1640995202000000,1640995202100000,50001.12,1.12,49999.12,1.62,50002.12,2.12,49998.12,2.62,50003.12,3.12,49997.12,3.62,50004.12,4.12,49996.12,4.62,50005.12,5.12,49995.12,5.62";
1873
1874        // Write to temporary file
1875        let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot5.csv");
1876        std::fs::write(&temp_file, csv_data).unwrap();
1877
1878        // Stream with chunk size of 2
1879        let stream = stream_depth10_from_snapshot5(&temp_file, 2, None, None, None, None).unwrap();
1880        let chunks: Vec<_> = stream.collect();
1881
1882        // Should have 2 chunks: [2 items, 1 item]
1883        assert_eq!(chunks.len(), 2);
1884
1885        // First chunk: 2 depth snapshots
1886        let chunk1 = chunks[0].as_ref().unwrap();
1887        assert_eq!(chunk1.len(), 2);
1888
1889        // Second chunk: 1 depth snapshot
1890        let chunk2 = chunks[1].as_ref().unwrap();
1891        assert_eq!(chunk2.len(), 1);
1892
1893        // Verify depth structure
1894        let first_depth = &chunk1[0];
1895        assert_eq!(first_depth.bids.len(), 10); // Should have 10 levels
1896        assert_eq!(first_depth.asks.len(), 10);
1897
1898        // Verify some specific prices
1899        assert_eq!(first_depth.bids[0].price, parse_price(49999.0, 1));
1900        assert_eq!(first_depth.asks[0].price, parse_price(50001.0, 1));
1901
1902        // Verify total count
1903        let total_depths: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
1904        assert_eq!(total_depths, 3);
1905
1906        // Clean up
1907        std::fs::remove_file(&temp_file).ok();
1908    }
1909
1910    #[rstest]
1911    pub fn test_stream_depth10_from_snapshot25_chunked() {
1912        // Create minimal snapshot25 CSV data (first 10 levels only for testing)
1913        let mut header_parts = vec!["exchange", "symbol", "timestamp", "local_timestamp"];
1914
1915        // Add bid and ask levels (we'll only populate first few for testing)
1916        let mut bid_headers = Vec::new();
1917        let mut ask_headers = Vec::new();
1918
1919        for i in 0..25 {
1920            bid_headers.push(format!("bids[{i}].price"));
1921            bid_headers.push(format!("bids[{i}].amount"));
1922        }
1923
1924        for i in 0..25 {
1925            ask_headers.push(format!("asks[{i}].price"));
1926            ask_headers.push(format!("asks[{i}].amount"));
1927        }
1928
1929        for header in &bid_headers {
1930            header_parts.push(header);
1931        }
1932
1933        for header in &ask_headers {
1934            header_parts.push(header);
1935        }
1936
1937        let header = header_parts.join(",");
1938
1939        // Create a row with data for first 5 levels (rest will be empty)
1940        let mut row1_parts = vec![
1941            "binance".to_string(),
1942            "BTCUSDT".to_string(),
1943            "1640995200000000".to_string(),
1944            "1640995200100000".to_string(),
1945        ];
1946
1947        // Add bid levels (first 5 with data, rest empty)
1948        for i in 0..25 {
1949            if i < 5 {
1950                let bid_price = f64::from(i).mul_add(-0.01, 49999.0);
1951                let bid_amount = 1.0 + f64::from(i);
1952                row1_parts.push(bid_price.to_string());
1953                row1_parts.push(bid_amount.to_string());
1954            } else {
1955                row1_parts.push(String::new());
1956                row1_parts.push(String::new());
1957            }
1958        }
1959
1960        // Add ask levels (first 5 with data, rest empty)
1961        for i in 0..25 {
1962            if i < 5 {
1963                let ask_price = f64::from(i).mul_add(0.01, 50000.0);
1964                let ask_amount = 1.0 + f64::from(i);
1965                row1_parts.push(ask_price.to_string());
1966                row1_parts.push(ask_amount.to_string());
1967            } else {
1968                row1_parts.push(String::new());
1969                row1_parts.push(String::new());
1970            }
1971        }
1972
1973        let csv_data = format!("{}\n{}", header, row1_parts.join(","));
1974
1975        // Write to temporary file
1976        let temp_file = std::env::temp_dir().join("test_stream_depth10_snapshot25.csv");
1977        std::fs::write(&temp_file, &csv_data).unwrap();
1978
1979        // Stream with chunk size of 1
1980        let stream = stream_depth10_from_snapshot25(&temp_file, 1, None, None, None, None).unwrap();
1981        let chunks: Vec<_> = stream.collect();
1982
1983        // Should have 1 chunk with 1 item
1984        assert_eq!(chunks.len(), 1);
1985
1986        let chunk1 = chunks[0].as_ref().unwrap();
1987        assert_eq!(chunk1.len(), 1);
1988
1989        // Verify depth structure
1990        let depth = &chunk1[0];
1991        assert_eq!(depth.bids.len(), 10); // Should have 10 levels
1992        assert_eq!(depth.asks.len(), 10);
1993
1994        // Verify first level has data - check whatever we actually get
1995        let actual_bid_price = depth.bids[0].price;
1996        let actual_ask_price = depth.asks[0].price;
1997        assert!(actual_bid_price.as_f64() > 0.0);
1998        assert!(actual_ask_price.as_f64() > 0.0);
1999
2000        // Clean up
2001        std::fs::remove_file(&temp_file).ok();
2002    }
2003
2004    #[rstest]
2005    pub fn test_stream_error_handling() {
2006        // Test with non-existent file
2007        let non_existent = std::path::Path::new("does_not_exist.csv");
2008
2009        let result = stream_deltas(non_existent, 10, None, None, None, None);
2010        assert!(result.is_err());
2011
2012        let result = stream_quotes(non_existent, 10, None, None, None, None);
2013        assert!(result.is_err());
2014
2015        let result = stream_trades(non_existent, 10, None, None, None, None);
2016        assert!(result.is_err());
2017
2018        let result = stream_depth10_from_snapshot5(non_existent, 10, None, None, None, None);
2019        assert!(result.is_err());
2020
2021        let result = stream_depth10_from_snapshot25(non_existent, 10, None, None, None, None);
2022        assert!(result.is_err());
2023    }
2024
2025    #[rstest]
2026    pub fn test_stream_empty_file() {
2027        // Test with empty CSV file
2028        let temp_file = std::env::temp_dir().join("test_empty.csv");
2029        std::fs::write(&temp_file, "").unwrap();
2030
2031        let stream = stream_deltas(&temp_file, 10, None, None, None, None).unwrap();
2032        assert_eq!(stream.count(), 0);
2033
2034        // Clean up
2035        std::fs::remove_file(&temp_file).ok();
2036    }
2037
2038    #[rstest]
2039    pub fn test_stream_precision_consistency() {
2040        // Test that streaming produces same results as bulk loading for precision inference
2041        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2042binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50000.0,1.0
2043binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.5,2.0
2044binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50000.12,1.5
2045binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49999.123,3.0";
2046
2047        let temp_file = std::env::temp_dir().join("test_precision_consistency.csv");
2048        std::fs::write(&temp_file, csv_data).unwrap();
2049
2050        // Load all at once
2051        let bulk_deltas = load_deltas(&temp_file, None, None, None, None).unwrap();
2052
2053        // Stream in chunks and collect
2054        let stream = stream_deltas(&temp_file, 2, None, None, None, None).unwrap();
2055        let streamed_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2056
2057        // Should have same number of deltas
2058        assert_eq!(bulk_deltas.len(), streamed_deltas.len());
2059
2060        // Compare key properties (precision inference will be different due to chunking)
2061        for (bulk, streamed) in bulk_deltas.iter().zip(streamed_deltas.iter()) {
2062            assert_eq!(bulk.instrument_id, streamed.instrument_id);
2063            assert_eq!(bulk.action, streamed.action);
2064            assert_eq!(bulk.order.side, streamed.order.side);
2065            assert_eq!(bulk.ts_event, streamed.ts_event);
2066            assert_eq!(bulk.ts_init, streamed.ts_init);
2067            // Note: precision may differ between bulk and streaming due to chunk boundaries
2068        }
2069
2070        // Clean up
2071        std::fs::remove_file(&temp_file).ok();
2072    }
2073
2074    #[rstest]
2075    pub fn test_stream_trades_from_local_file() {
2076        let filepath = get_test_data_path("csv/trades_1.csv");
2077        let mut stream = stream_trades(filepath, 1, Some(1), Some(0), None, None).unwrap();
2078
2079        let chunk1 = stream.next().unwrap().unwrap();
2080        assert_eq!(chunk1.len(), 1);
2081        assert_eq!(chunk1[0].price, Price::from("8531.5"));
2082
2083        let chunk2 = stream.next().unwrap().unwrap();
2084        assert_eq!(chunk2.len(), 1);
2085        assert_eq!(chunk2[0].size, Quantity::from("1000"));
2086
2087        assert!(stream.next().is_none());
2088    }
2089
2090    #[rstest]
2091    pub fn test_stream_deltas_from_local_file() {
2092        let filepath = get_test_data_path("csv/deltas_1.csv");
2093        let mut stream = stream_deltas(filepath, 1, Some(1), Some(0), None, None).unwrap();
2094
2095        // With chunk_size=1, each delta gets its own chunk
2096        // First chunk: CLEAR
2097        let chunk1 = stream.next().unwrap().unwrap();
2098        assert_eq!(chunk1.len(), 1);
2099        assert_eq!(chunk1[0].action, BookAction::Clear);
2100
2101        // Second chunk: first data delta
2102        let chunk2 = stream.next().unwrap().unwrap();
2103        assert_eq!(chunk2.len(), 1);
2104        assert_eq!(chunk2[0].order.price, Price::from("6421.5"));
2105
2106        // Third chunk: second data delta
2107        let chunk3 = stream.next().unwrap().unwrap();
2108        assert_eq!(chunk3.len(), 1);
2109        assert_eq!(chunk3[0].order.size, Quantity::from("10000"));
2110
2111        assert!(stream.next().is_none());
2112    }
2113
2114    #[rstest]
2115    pub fn test_stream_deltas_with_limit() {
2116        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2117binance,BTCUSDT,1640995200000000,1640995200100000,false,bid,50000.0,1.0
2118binance,BTCUSDT,1640995201000000,1640995201100000,false,ask,50001.0,2.0
2119binance,BTCUSDT,1640995202000000,1640995202100000,false,bid,49999.0,1.5
2120binance,BTCUSDT,1640995203000000,1640995203100000,false,ask,50002.0,3.0
2121binance,BTCUSDT,1640995204000000,1640995204100000,false,bid,49998.0,0.5";
2122
2123        let temp_file = std::env::temp_dir().join("test_stream_deltas_limit.csv");
2124        std::fs::write(&temp_file, csv_data).unwrap();
2125
2126        // Test with limit of 3 records
2127        let stream = stream_deltas(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
2128        let chunks: Vec<_> = stream.collect();
2129
2130        // Should have 2 chunks: [2 items, 1 item] = 3 total (limited)
2131        assert_eq!(chunks.len(), 2);
2132        let chunk1 = chunks[0].as_ref().unwrap();
2133        assert_eq!(chunk1.len(), 2);
2134        let chunk2 = chunks[1].as_ref().unwrap();
2135        assert_eq!(chunk2.len(), 1);
2136
2137        // Total should be exactly 3 records due to limit
2138        let total_deltas: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
2139        assert_eq!(total_deltas, 3);
2140
2141        std::fs::remove_file(&temp_file).ok();
2142    }
2143
2144    #[rstest]
2145    pub fn test_stream_quotes_with_limit() {
2146        let csv_data =
2147            "exchange,symbol,timestamp,local_timestamp,ask_price,ask_amount,bid_price,bid_amount
2148binance,BTCUSDT,1640995200000000,1640995200100000,50001.0,1.0,50000.0,1.5
2149binance,BTCUSDT,1640995201000000,1640995201100000,50002.0,2.0,49999.0,2.5
2150binance,BTCUSDT,1640995202000000,1640995202100000,50003.0,1.5,49998.0,3.0
2151binance,BTCUSDT,1640995203000000,1640995203100000,50004.0,3.0,49997.0,3.5";
2152
2153        let temp_file = std::env::temp_dir().join("test_stream_quotes_limit.csv");
2154        std::fs::write(&temp_file, csv_data).unwrap();
2155
2156        // Test with limit of 2 records
2157        let stream = stream_quotes(&temp_file, 2, Some(4), Some(1), None, Some(2)).unwrap();
2158        let chunks: Vec<_> = stream.collect();
2159
2160        // Should have 1 chunk with 2 items (limited)
2161        assert_eq!(chunks.len(), 1);
2162        let chunk1 = chunks[0].as_ref().unwrap();
2163        assert_eq!(chunk1.len(), 2);
2164
2165        // Verify we get exactly 2 records
2166        let total_quotes: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
2167        assert_eq!(total_quotes, 2);
2168
2169        std::fs::remove_file(&temp_file).ok();
2170    }
2171
2172    #[rstest]
2173    pub fn test_stream_trades_with_limit() {
2174        let csv_data = "exchange,symbol,timestamp,local_timestamp,id,side,price,amount
2175binance,BTCUSDT,1640995200000000,1640995200100000,trade1,buy,50000.0,1.0
2176binance,BTCUSDT,1640995201000000,1640995201100000,trade2,sell,49999.5,2.0
2177binance,BTCUSDT,1640995202000000,1640995202100000,trade3,buy,50000.12,1.5
2178binance,BTCUSDT,1640995203000000,1640995203100000,trade4,sell,49999.123,3.0
2179binance,BTCUSDT,1640995204000000,1640995204100000,trade5,buy,50000.1234,0.5";
2180
2181        let temp_file = std::env::temp_dir().join("test_stream_trades_limit.csv");
2182        std::fs::write(&temp_file, csv_data).unwrap();
2183
2184        // Test with limit of 3 records
2185        let stream = stream_trades(&temp_file, 2, Some(4), Some(1), None, Some(3)).unwrap();
2186        let chunks: Vec<_> = stream.collect();
2187
2188        // Should have 2 chunks: [2 items, 1 item] = 3 total (limited)
2189        assert_eq!(chunks.len(), 2);
2190        let chunk1 = chunks[0].as_ref().unwrap();
2191        assert_eq!(chunk1.len(), 2);
2192        let chunk2 = chunks[1].as_ref().unwrap();
2193        assert_eq!(chunk2.len(), 1);
2194
2195        // Verify we get exactly 3 records
2196        let total_trades: usize = chunks.iter().map(|c| c.as_ref().unwrap().len()).sum();
2197        assert_eq!(total_trades, 3);
2198
2199        std::fs::remove_file(&temp_file).ok();
2200    }
2201
2202    #[rstest]
2203    pub fn test_depth10_invalid_levels_error_at_construction() {
2204        let temp_file = std::env::temp_dir().join("test_depth10_invalid_levels.csv");
2205        std::fs::write(&temp_file, "exchange,symbol,timestamp,local_timestamp\n").unwrap();
2206
2207        let result = Depth10StreamIterator::new(&temp_file, 10, 10, None, None, None, None);
2208        assert!(result.is_err());
2209        let err_msg = result.err().unwrap().to_string();
2210        assert!(
2211            err_msg.contains("Invalid levels"),
2212            "Error should mention 'Invalid levels': {err_msg}"
2213        );
2214
2215        let result = Depth10StreamIterator::new(&temp_file, 10, 3, None, None, None, None);
2216        assert!(result.is_err());
2217
2218        let result = Depth10StreamIterator::new(&temp_file, 10, 5, None, None, None, None);
2219        assert!(result.is_ok());
2220
2221        let result = Depth10StreamIterator::new(&temp_file, 10, 25, None, None, None, None);
2222        assert!(result.is_ok());
2223
2224        std::fs::remove_file(&temp_file).ok();
2225    }
2226
2227    #[rstest]
2228    pub fn test_stream_deltas_with_mid_snapshot_inserts_clear() {
2229        // CSV with:
2230        // - Initial snapshot (is_snapshot=true) at start
2231        // - Some deltas (is_snapshot=false)
2232        // - Mid-day snapshot (is_snapshot=true) - should trigger CLEAR
2233        // - Back to deltas (is_snapshot=false)
2234        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2235binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2236binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2237binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2238binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2239binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,bid,50100.0,3.0
2240binance-futures,BTCUSDT,1640995300000000,1640995300100000,true,ask,50101.0,4.0
2241binance-futures,BTCUSDT,1640995301000000,1640995301100000,false,bid,50099.0,1.0";
2242
2243        let temp_file = std::env::temp_dir().join("test_stream_deltas_mid_snapshot.csv");
2244        std::fs::write(&temp_file, csv_data).unwrap();
2245
2246        let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, None).unwrap();
2247        let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2248
2249        let clear_count = all_deltas
2250            .iter()
2251            .filter(|d| d.action == BookAction::Clear)
2252            .count();
2253
2254        // Should have 2 CLEAR deltas: initial snapshot + mid-day snapshot
2255        assert_eq!(
2256            clear_count, 2,
2257            "Expected 2 CLEAR deltas (initial + mid-day snapshot), found {clear_count}"
2258        );
2259
2260        // Verify CLEAR positions:
2261        // 0=CLEAR, 1=Add, 2=Add, 3=Update, 4=Update, 5=CLEAR, 6=Add, 7=Add, 8=Update
2262        assert_eq!(all_deltas[0].action, BookAction::Clear);
2263        assert_eq!(all_deltas[5].action, BookAction::Clear);
2264
2265        // CLEAR deltas should NOT have F_LAST when followed by same-timestamp deltas
2266        assert_eq!(
2267            all_deltas[0].flags & RecordFlag::F_LAST.value(),
2268            0,
2269            "CLEAR at index 0 should not have F_LAST flag"
2270        );
2271        assert_eq!(
2272            all_deltas[5].flags & RecordFlag::F_LAST.value(),
2273            0,
2274            "CLEAR at index 5 should not have F_LAST flag"
2275        );
2276
2277        std::fs::remove_file(&temp_file).ok();
2278    }
2279
2280    #[rstest]
2281    pub fn test_load_deltas_with_mid_snapshot_inserts_clear() {
2282        let filepath = get_test_data_path("csv/deltas_with_snapshot.csv");
2283        let deltas = load_deltas(&filepath, Some(1), Some(1), None, None).unwrap();
2284
2285        let clear_count = deltas
2286            .iter()
2287            .filter(|d| d.action == BookAction::Clear)
2288            .count();
2289
2290        // Should have 2 CLEAR deltas: initial snapshot + mid-day snapshot
2291        assert_eq!(
2292            clear_count, 2,
2293            "Expected 2 CLEAR deltas (initial + mid-day snapshot), found {clear_count}"
2294        );
2295
2296        assert_eq!(deltas[0].action, BookAction::Clear);
2297
2298        let second_clear_idx = deltas
2299            .iter()
2300            .enumerate()
2301            .filter(|(_, d)| d.action == BookAction::Clear)
2302            .nth(1)
2303            .map(|(i, _)| i)
2304            .expect("Should have second CLEAR");
2305
2306        // 0=CLEAR, 1=Add, 2=Add, 3=Update, 4=Update, 5=Delete, 6=CLEAR
2307        assert_eq!(
2308            second_clear_idx, 6,
2309            "Second CLEAR should be at index 6, found {second_clear_idx}"
2310        );
2311
2312        // CLEAR deltas should NOT have F_LAST when followed by same-timestamp deltas
2313        assert_eq!(
2314            deltas[0].flags & RecordFlag::F_LAST.value(),
2315            0,
2316            "CLEAR at index 0 should not have F_LAST flag"
2317        );
2318        assert_eq!(
2319            deltas[6].flags & RecordFlag::F_LAST.value(),
2320            0,
2321            "CLEAR at index 6 should not have F_LAST flag"
2322        );
2323    }
2324
2325    #[rstest]
2326    fn test_stream_deltas_chunk_size_respects_clear() {
2327        // Test that chunk_size applies to total emitted deltas (including CLEARs)
2328        // With chunk_size=1, a snapshot boundary should emit CLEAR in one chunk
2329        // and the real delta in the next chunk
2330        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2331binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2332binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
2333
2334        let temp_file = std::env::temp_dir().join("test_stream_chunk_size_clear.csv");
2335        std::fs::write(&temp_file, csv_data).unwrap();
2336
2337        // chunk_size=1 should produce separate chunks for CLEAR and real deltas
2338        let stream = stream_deltas(&temp_file, 1, Some(1), Some(1), None, None).unwrap();
2339        let chunks: Vec<_> = stream.collect();
2340
2341        // Should have 3 chunks: [CLEAR], [data], [data]
2342        assert_eq!(chunks.len(), 3, "Expected 3 chunks with chunk_size=1");
2343        assert_eq!(chunks[0].as_ref().unwrap().len(), 1);
2344        assert_eq!(chunks[1].as_ref().unwrap().len(), 1);
2345        assert_eq!(chunks[2].as_ref().unwrap().len(), 1);
2346
2347        // First chunk should be CLEAR
2348        assert_eq!(chunks[0].as_ref().unwrap()[0].action, BookAction::Clear);
2349        // Second and third chunks should be data deltas
2350        assert_eq!(chunks[1].as_ref().unwrap()[0].action, BookAction::Add);
2351        assert_eq!(chunks[2].as_ref().unwrap()[0].action, BookAction::Add);
2352
2353        std::fs::remove_file(&temp_file).ok();
2354    }
2355
2356    #[rstest]
2357    fn test_stream_deltas_limit_stops_at_clear() {
2358        // Test that limit=1 with snapshot data returns only the CLEAR delta
2359        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2360binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2361binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0";
2362
2363        let temp_file = std::env::temp_dir().join("test_stream_limit_stops_at_clear.csv");
2364        std::fs::write(&temp_file, csv_data).unwrap();
2365
2366        // limit=1 should only get the CLEAR delta
2367        let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(1)).unwrap();
2368        let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2369
2370        assert_eq!(all_deltas.len(), 1);
2371        assert_eq!(all_deltas[0].action, BookAction::Clear);
2372
2373        std::fs::remove_file(&temp_file).ok();
2374    }
2375
2376    #[rstest]
2377    fn test_stream_deltas_limit_includes_clear() {
2378        // Test that limit counts total emitted deltas (including CLEARs)
2379        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2380binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2381binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2382binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2383binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2384binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5";
2385
2386        let temp_file = std::env::temp_dir().join("test_stream_limit_includes_clear.csv");
2387        std::fs::write(&temp_file, csv_data).unwrap();
2388
2389        // limit=4 should get exactly 4 deltas: 1 CLEAR + 3 data deltas
2390        let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(4)).unwrap();
2391        let all_deltas: Vec<_> = stream.flat_map(|chunk| chunk.unwrap()).collect();
2392
2393        assert_eq!(all_deltas.len(), 4);
2394        assert_eq!(all_deltas[0].action, BookAction::Clear);
2395        assert_eq!(all_deltas[1].action, BookAction::Add);
2396        assert_eq!(all_deltas[2].action, BookAction::Add);
2397        assert_eq!(all_deltas[3].action, BookAction::Update);
2398
2399        std::fs::remove_file(&temp_file).ok();
2400    }
2401
2402    #[rstest]
2403    fn test_stream_deltas_limit_sets_f_last() {
2404        // Test that F_LAST is set on the final delta when limit is reached
2405        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2406binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,bid,50000.0,1.0
2407binance-futures,BTCUSDT,1640995200000000,1640995200100000,true,ask,50001.0,2.0
2408binance-futures,BTCUSDT,1640995201000000,1640995201100000,false,bid,49999.0,0.5
2409binance-futures,BTCUSDT,1640995202000000,1640995202100000,false,ask,50002.0,1.5
2410binance-futures,BTCUSDT,1640995203000000,1640995203100000,false,bid,49998.0,0.5";
2411
2412        let temp_file = std::env::temp_dir().join("test_stream_limit_f_last.csv");
2413        std::fs::write(&temp_file, csv_data).unwrap();
2414
2415        // limit=3 should get 3 deltas with F_LAST on the last one
2416        let stream = stream_deltas(&temp_file, 100, Some(1), Some(1), None, Some(3)).unwrap();
2417        let chunks: Vec<_> = stream.collect();
2418
2419        // Should have 1 chunk with 3 deltas
2420        assert_eq!(chunks.len(), 1);
2421        let deltas = chunks[0].as_ref().unwrap();
2422        assert_eq!(deltas.len(), 3);
2423
2424        // Final delta should have F_LAST flag
2425        assert_eq!(
2426            deltas[2].flags & RecordFlag::F_LAST.value(),
2427            RecordFlag::F_LAST.value(),
2428            "Final delta should have F_LAST flag when limit is reached"
2429        );
2430
2431        std::fs::remove_file(&temp_file).ok();
2432    }
2433
2434    #[rstest]
2435    fn test_stream_deltas_chunk_boundary_no_f_last() {
2436        // Test that F_LAST is NOT set when only chunk_size boundary is hit (more data follows)
2437        let csv_data = "exchange,symbol,timestamp,local_timestamp,is_snapshot,side,price,amount
2438binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,bid,50000.0,1.0
2439binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,ask,50001.0,2.0
2440binance-futures,BTCUSDT,1640995200000000,1640995200100000,false,bid,49999.0,0.5";
2441
2442        let temp_file = std::env::temp_dir().join("test_stream_chunk_no_f_last.csv");
2443        std::fs::write(&temp_file, csv_data).unwrap();
2444
2445        // chunk_size=2, no limit - first chunk should NOT have F_LAST (more data follows)
2446        let mut stream = stream_deltas(&temp_file, 2, Some(1), Some(1), None, None).unwrap();
2447
2448        let chunk1 = stream.next().unwrap().unwrap();
2449        assert_eq!(chunk1.len(), 2);
2450
2451        // First chunk's last delta should NOT have F_LAST (more data follows with same timestamp)
2452        assert_eq!(
2453            chunk1[1].flags & RecordFlag::F_LAST.value(),
2454            0,
2455            "Mid-stream chunk should not have F_LAST flag"
2456        );
2457
2458        // Second chunk exists and has F_LAST (end of file)
2459        let chunk2 = stream.next().unwrap().unwrap();
2460        assert_eq!(chunk2.len(), 1);
2461        assert_eq!(
2462            chunk2[0].flags & RecordFlag::F_LAST.value(),
2463            RecordFlag::F_LAST.value(),
2464            "Final chunk at EOF should have F_LAST flag"
2465        );
2466
2467        std::fs::remove_file(&temp_file).ok();
2468    }
2469}