Skip to main content

nautilus_tardis/python/
csv.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{fmt::Debug, path::PathBuf};
17
18use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20    data::{FundingRateUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
21    identifiers::InstrumentId,
22};
23use pyo3::prelude::*;
24
25use crate::csv::{
26    load::{
27        load_deltas, load_depth10_from_snapshot5, load_depth10_from_snapshot25, load_funding_rates,
28        load_quotes, load_trades,
29    },
30    stream::{
31        stream_batched_deltas, stream_deltas, stream_depth10_from_snapshot5,
32        stream_depth10_from_snapshot25, stream_funding_rates, stream_quotes, stream_trades,
33    },
34};
35
36macro_rules! impl_tardis_stream_iterator {
37    ($struct_name:ident, $data_type:ty, $type_name:expr) => {
38        #[pyclass(unsendable)]
39        #[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.tardis")]
40        pub struct $struct_name {
41            stream: Box<dyn Iterator<Item = anyhow::Result<Vec<$data_type>>>>,
42        }
43
44        impl Debug for $struct_name {
45            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46                write!(f, "{} {{ stream: ... }}", $type_name)
47            }
48        }
49
50        #[pymethods]
51        #[pyo3_stub_gen::derive::gen_stub_pymethods]
52        impl $struct_name {
53            const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
54                slf
55            }
56
57            fn __next__(&mut self) -> PyResult<Option<Vec<$data_type>>> {
58                match self.stream.next() {
59                    Some(Ok(chunk)) => Ok(Some(chunk)),
60                    Some(Err(e)) => Err(to_pyvalue_err(e)),
61                    None => Ok(None),
62                }
63            }
64        }
65    };
66}
67
68/// # Errors
69///
70/// Returns a Python error if loading or parsing the CSV file fails.
71#[pyfunction(name = "load_tardis_deltas")]
72#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
73#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
74pub fn py_load_tardis_deltas(
75    filepath: PathBuf,
76    price_precision: Option<u8>,
77    size_precision: Option<u8>,
78    instrument_id: Option<InstrumentId>,
79    limit: Option<usize>,
80) -> PyResult<Vec<OrderBookDelta>> {
81    load_deltas(
82        filepath,
83        price_precision,
84        size_precision,
85        instrument_id,
86        limit,
87    )
88    .map_err(to_pyvalue_err)
89}
90
91/// # Errors
92///
93/// Returns a Python error if loading or parsing the CSV file fails.
94#[pyfunction(name = "load_tardis_depth10_from_snapshot5")]
95#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
96#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
97pub fn py_load_tardis_depth10_from_snapshot5(
98    filepath: PathBuf,
99    price_precision: Option<u8>,
100    size_precision: Option<u8>,
101    instrument_id: Option<InstrumentId>,
102    limit: Option<usize>,
103) -> PyResult<Vec<OrderBookDepth10>> {
104    load_depth10_from_snapshot5(
105        filepath,
106        price_precision,
107        size_precision,
108        instrument_id,
109        limit,
110    )
111    .map_err(to_pyvalue_err)
112}
113
114/// # Errors
115///
116/// Returns a Python error if loading or parsing the CSV file fails.
117#[pyfunction(name = "load_tardis_depth10_from_snapshot25")]
118#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
119#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
120pub fn py_load_tardis_depth10_from_snapshot25(
121    filepath: PathBuf,
122    price_precision: Option<u8>,
123    size_precision: Option<u8>,
124    instrument_id: Option<InstrumentId>,
125    limit: Option<usize>,
126) -> PyResult<Vec<OrderBookDepth10>> {
127    load_depth10_from_snapshot25(
128        filepath,
129        price_precision,
130        size_precision,
131        instrument_id,
132        limit,
133    )
134    .map_err(to_pyvalue_err)
135}
136
137/// # Errors
138///
139/// Returns a Python error if loading or parsing the CSV file fails.
140#[pyfunction(name = "load_tardis_quotes")]
141#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
142#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
143pub fn py_load_tardis_quotes(
144    filepath: PathBuf,
145    price_precision: Option<u8>,
146    size_precision: Option<u8>,
147    instrument_id: Option<InstrumentId>,
148    limit: Option<usize>,
149) -> PyResult<Vec<QuoteTick>> {
150    load_quotes(
151        filepath,
152        price_precision,
153        size_precision,
154        instrument_id,
155        limit,
156    )
157    .map_err(to_pyvalue_err)
158}
159
160/// # Errors
161///
162/// Returns a Python error if loading or parsing the CSV file fails.
163#[pyfunction(name = "load_tardis_trades")]
164#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
165#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
166pub fn py_load_tardis_trades(
167    filepath: PathBuf,
168    price_precision: Option<u8>,
169    size_precision: Option<u8>,
170    instrument_id: Option<InstrumentId>,
171    limit: Option<usize>,
172) -> PyResult<Vec<TradeTick>> {
173    load_trades(
174        filepath,
175        price_precision,
176        size_precision,
177        instrument_id,
178        limit,
179    )
180    .map_err(to_pyvalue_err)
181}
182
183/// # Errors
184///
185/// Returns a Python error if loading or parsing the CSV file fails.
186#[pyfunction(name = "load_tardis_funding_rates")]
187#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
188#[pyo3(signature = (filepath, instrument_id=None, limit=None))]
189pub fn py_load_tardis_funding_rates(
190    filepath: PathBuf,
191    instrument_id: Option<InstrumentId>,
192    limit: Option<usize>,
193) -> PyResult<Vec<FundingRateUpdate>> {
194    load_funding_rates(filepath, instrument_id, limit).map_err(to_pyvalue_err)
195}
196
197impl_tardis_stream_iterator!(
198    TardisDeltaStreamIterator,
199    OrderBookDelta,
200    "TardisDeltasStreamIterator"
201);
202
203/// Streams order book deltas from a Tardis CSV file.
204///
205/// # Errors
206///
207/// Returns a Python error if loading or parsing the CSV file fails.
208#[pyfunction(name = "stream_tardis_deltas")]
209#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
210#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
211pub fn py_stream_tardis_deltas(
212    filepath: PathBuf,
213    chunk_size: usize,
214    price_precision: Option<u8>,
215    size_precision: Option<u8>,
216    instrument_id: Option<InstrumentId>,
217    limit: Option<usize>,
218) -> PyResult<TardisDeltaStreamIterator> {
219    let stream = stream_deltas(
220        filepath,
221        chunk_size,
222        price_precision,
223        size_precision,
224        instrument_id,
225        limit,
226    )
227    .map_err(to_pyvalue_err)?;
228
229    Ok(TardisDeltaStreamIterator {
230        stream: Box::new(stream),
231    })
232}
233
234#[pyclass(unsendable)]
235#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.tardis")]
236pub struct TardisBatchedDeltasStreamIterator {
237    stream: Box<dyn Iterator<Item = anyhow::Result<Vec<Py<PyAny>>>>>,
238}
239
240impl Debug for TardisBatchedDeltasStreamIterator {
241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242        write!(f, "TardisBatchedDeltasStreamIterator {{ stream: ... }}")
243    }
244}
245
246#[pymethods]
247#[pyo3_stub_gen::derive::gen_stub_pymethods]
248impl TardisBatchedDeltasStreamIterator {
249    const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
250        slf
251    }
252
253    fn __next__(&mut self) -> PyResult<Option<Vec<Py<PyAny>>>> {
254        match self.stream.next() {
255            Some(Ok(batch)) => Ok(Some(batch)),
256            Some(Err(e)) => Err(to_pyvalue_err(e)),
257            None => Ok(None),
258        }
259    }
260}
261
262/// Streams batched order book deltas from a Tardis CSV file.
263///
264/// # Errors
265///
266/// Returns a Python error if loading or parsing the CSV file fails.
267#[pyfunction(name = "stream_tardis_batched_deltas")]
268#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
269#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
270pub fn py_stream_tardis_batched_deltas(
271    filepath: PathBuf,
272    chunk_size: usize,
273    price_precision: Option<u8>,
274    size_precision: Option<u8>,
275    instrument_id: Option<InstrumentId>,
276    limit: Option<usize>,
277) -> PyResult<TardisBatchedDeltasStreamIterator> {
278    let stream = stream_batched_deltas(
279        filepath,
280        chunk_size,
281        price_precision,
282        size_precision,
283        instrument_id,
284        limit,
285    )
286    .map_err(to_pyvalue_err)?;
287
288    Ok(TardisBatchedDeltasStreamIterator {
289        stream: Box::new(stream),
290    })
291}
292
293impl_tardis_stream_iterator!(
294    TardisQuoteStreamIterator,
295    QuoteTick,
296    "TardisQuoteStreamIterator"
297);
298
299/// Streams quote ticks from a Tardis CSV file.
300///
301/// # Errors
302///
303/// Returns a Python error if loading or parsing the CSV file fails.
304#[pyfunction(name = "stream_tardis_quotes")]
305#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
306#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
307pub fn py_stream_tardis_quotes(
308    filepath: PathBuf,
309    chunk_size: usize,
310    price_precision: Option<u8>,
311    size_precision: Option<u8>,
312    instrument_id: Option<InstrumentId>,
313    limit: Option<usize>,
314) -> PyResult<TardisQuoteStreamIterator> {
315    let stream = stream_quotes(
316        filepath,
317        chunk_size,
318        price_precision,
319        size_precision,
320        instrument_id,
321        limit,
322    )
323    .map_err(to_pyvalue_err)?;
324
325    Ok(TardisQuoteStreamIterator {
326        stream: Box::new(stream),
327    })
328}
329
330impl_tardis_stream_iterator!(
331    TardisTradeStreamIterator,
332    TradeTick,
333    "TardisTradeStreamIterator"
334);
335
336/// Streams trade ticks from a Tardis CSV file.
337///
338/// # Errors
339///
340/// Returns a Python error if loading or parsing the CSV file fails.
341#[pyfunction(name = "stream_tardis_trades")]
342#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
343#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
344pub fn py_stream_tardis_trades(
345    filepath: PathBuf,
346    chunk_size: usize,
347    price_precision: Option<u8>,
348    size_precision: Option<u8>,
349    instrument_id: Option<InstrumentId>,
350    limit: Option<usize>,
351) -> PyResult<TardisTradeStreamIterator> {
352    let stream = stream_trades(
353        filepath,
354        chunk_size,
355        price_precision,
356        size_precision,
357        instrument_id,
358        limit,
359    )
360    .map_err(to_pyvalue_err)?;
361
362    Ok(TardisTradeStreamIterator {
363        stream: Box::new(stream),
364    })
365}
366
367impl_tardis_stream_iterator!(
368    TardisDepth10StreamIterator,
369    OrderBookDepth10,
370    "TardisDepth10StreamIterator"
371);
372
373/// Streams order book depth10 from a Tardis snapshot5 CSV file.
374///
375/// # Errors
376///
377/// Returns a Python error if loading or parsing the CSV file fails.
378#[pyfunction(name = "stream_tardis_depth10_from_snapshot5")]
379#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
380#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
381pub fn py_stream_tardis_depth10_from_snapshot5(
382    filepath: PathBuf,
383    chunk_size: usize,
384    price_precision: Option<u8>,
385    size_precision: Option<u8>,
386    instrument_id: Option<InstrumentId>,
387    limit: Option<usize>,
388) -> PyResult<TardisDepth10StreamIterator> {
389    let stream = stream_depth10_from_snapshot5(
390        filepath,
391        chunk_size,
392        price_precision,
393        size_precision,
394        instrument_id,
395        limit,
396    )
397    .map_err(to_pyvalue_err)?;
398
399    Ok(TardisDepth10StreamIterator {
400        stream: Box::new(stream),
401    })
402}
403
404/// Streams order book depth10 from a Tardis snapshot25 CSV file.
405///
406/// # Errors
407///
408/// Returns a Python error if loading or parsing the CSV file fails.
409#[pyfunction(name = "stream_tardis_depth10_from_snapshot25")]
410#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
411#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
412pub fn py_stream_tardis_depth10_from_snapshot25(
413    filepath: PathBuf,
414    chunk_size: usize,
415    price_precision: Option<u8>,
416    size_precision: Option<u8>,
417    instrument_id: Option<InstrumentId>,
418    limit: Option<usize>,
419) -> PyResult<TardisDepth10StreamIterator> {
420    let stream = stream_depth10_from_snapshot25(
421        filepath,
422        chunk_size,
423        price_precision,
424        size_precision,
425        instrument_id,
426        limit,
427    )
428    .map_err(to_pyvalue_err)?;
429
430    Ok(TardisDepth10StreamIterator {
431        stream: Box::new(stream),
432    })
433}
434
435impl_tardis_stream_iterator!(
436    TardisFundingRateStreamIterator,
437    FundingRateUpdate,
438    "TardisFundingRateStreamIterator"
439);
440
441/// Streams funding rate updates from a Tardis derivative ticker CSV file.
442///
443/// # Errors
444///
445/// Returns a Python error if loading or parsing the CSV file fails.
446#[pyfunction(name = "stream_tardis_funding_rates")]
447#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
448#[pyo3(signature = (filepath, chunk_size=100_000, instrument_id=None, limit=None))]
449pub fn py_stream_tardis_funding_rates(
450    filepath: PathBuf,
451    chunk_size: usize,
452    instrument_id: Option<InstrumentId>,
453    limit: Option<usize>,
454) -> PyResult<TardisFundingRateStreamIterator> {
455    let stream =
456        stream_funding_rates(filepath, chunk_size, instrument_id, limit).map_err(to_pyvalue_err)?;
457
458    Ok(TardisFundingRateStreamIterator {
459        stream: Box::new(stream),
460    })
461}