1use std::{fmt::Debug, path::PathBuf};
17
18use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20 data::{FundingRateUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
21 identifiers::InstrumentId,
22};
23use pyo3::prelude::*;
24
25use crate::csv::{
26 load::{
27 load_deltas, load_depth10_from_snapshot5, load_depth10_from_snapshot25, load_funding_rates,
28 load_quotes, load_trades,
29 },
30 stream::{
31 stream_batched_deltas, stream_deltas, stream_depth10_from_snapshot5,
32 stream_depth10_from_snapshot25, stream_funding_rates, stream_quotes, stream_trades,
33 },
34};
35
36macro_rules! impl_tardis_stream_iterator {
37 ($struct_name:ident, $data_type:ty, $type_name:expr) => {
38 #[pyclass(unsendable)]
39 #[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.tardis")]
40 pub struct $struct_name {
41 stream: Box<dyn Iterator<Item = anyhow::Result<Vec<$data_type>>>>,
42 }
43
44 impl Debug for $struct_name {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 write!(f, "{} {{ stream: ... }}", $type_name)
47 }
48 }
49
50 #[pymethods]
51 #[pyo3_stub_gen::derive::gen_stub_pymethods]
52 impl $struct_name {
53 const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
54 slf
55 }
56
57 fn __next__(&mut self) -> PyResult<Option<Vec<$data_type>>> {
58 match self.stream.next() {
59 Some(Ok(chunk)) => Ok(Some(chunk)),
60 Some(Err(e)) => Err(to_pyvalue_err(e)),
61 None => Ok(None),
62 }
63 }
64 }
65 };
66}
67
68#[pyfunction(name = "load_tardis_deltas")]
72#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
73#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
74pub fn py_load_tardis_deltas(
75 filepath: PathBuf,
76 price_precision: Option<u8>,
77 size_precision: Option<u8>,
78 instrument_id: Option<InstrumentId>,
79 limit: Option<usize>,
80) -> PyResult<Vec<OrderBookDelta>> {
81 load_deltas(
82 filepath,
83 price_precision,
84 size_precision,
85 instrument_id,
86 limit,
87 )
88 .map_err(to_pyvalue_err)
89}
90
91#[pyfunction(name = "load_tardis_depth10_from_snapshot5")]
95#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
96#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
97pub fn py_load_tardis_depth10_from_snapshot5(
98 filepath: PathBuf,
99 price_precision: Option<u8>,
100 size_precision: Option<u8>,
101 instrument_id: Option<InstrumentId>,
102 limit: Option<usize>,
103) -> PyResult<Vec<OrderBookDepth10>> {
104 load_depth10_from_snapshot5(
105 filepath,
106 price_precision,
107 size_precision,
108 instrument_id,
109 limit,
110 )
111 .map_err(to_pyvalue_err)
112}
113
114#[pyfunction(name = "load_tardis_depth10_from_snapshot25")]
118#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
119#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
120pub fn py_load_tardis_depth10_from_snapshot25(
121 filepath: PathBuf,
122 price_precision: Option<u8>,
123 size_precision: Option<u8>,
124 instrument_id: Option<InstrumentId>,
125 limit: Option<usize>,
126) -> PyResult<Vec<OrderBookDepth10>> {
127 load_depth10_from_snapshot25(
128 filepath,
129 price_precision,
130 size_precision,
131 instrument_id,
132 limit,
133 )
134 .map_err(to_pyvalue_err)
135}
136
137#[pyfunction(name = "load_tardis_quotes")]
141#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
142#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
143pub fn py_load_tardis_quotes(
144 filepath: PathBuf,
145 price_precision: Option<u8>,
146 size_precision: Option<u8>,
147 instrument_id: Option<InstrumentId>,
148 limit: Option<usize>,
149) -> PyResult<Vec<QuoteTick>> {
150 load_quotes(
151 filepath,
152 price_precision,
153 size_precision,
154 instrument_id,
155 limit,
156 )
157 .map_err(to_pyvalue_err)
158}
159
160#[pyfunction(name = "load_tardis_trades")]
164#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
165#[pyo3(signature = (filepath, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
166pub fn py_load_tardis_trades(
167 filepath: PathBuf,
168 price_precision: Option<u8>,
169 size_precision: Option<u8>,
170 instrument_id: Option<InstrumentId>,
171 limit: Option<usize>,
172) -> PyResult<Vec<TradeTick>> {
173 load_trades(
174 filepath,
175 price_precision,
176 size_precision,
177 instrument_id,
178 limit,
179 )
180 .map_err(to_pyvalue_err)
181}
182
183#[pyfunction(name = "load_tardis_funding_rates")]
187#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
188#[pyo3(signature = (filepath, instrument_id=None, limit=None))]
189pub fn py_load_tardis_funding_rates(
190 filepath: PathBuf,
191 instrument_id: Option<InstrumentId>,
192 limit: Option<usize>,
193) -> PyResult<Vec<FundingRateUpdate>> {
194 load_funding_rates(filepath, instrument_id, limit).map_err(to_pyvalue_err)
195}
196
197impl_tardis_stream_iterator!(
198 TardisDeltaStreamIterator,
199 OrderBookDelta,
200 "TardisDeltasStreamIterator"
201);
202
203#[pyfunction(name = "stream_tardis_deltas")]
209#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
210#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
211pub fn py_stream_tardis_deltas(
212 filepath: PathBuf,
213 chunk_size: usize,
214 price_precision: Option<u8>,
215 size_precision: Option<u8>,
216 instrument_id: Option<InstrumentId>,
217 limit: Option<usize>,
218) -> PyResult<TardisDeltaStreamIterator> {
219 let stream = stream_deltas(
220 filepath,
221 chunk_size,
222 price_precision,
223 size_precision,
224 instrument_id,
225 limit,
226 )
227 .map_err(to_pyvalue_err)?;
228
229 Ok(TardisDeltaStreamIterator {
230 stream: Box::new(stream),
231 })
232}
233
234#[pyclass(unsendable)]
235#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.tardis")]
236pub struct TardisBatchedDeltasStreamIterator {
237 stream: Box<dyn Iterator<Item = anyhow::Result<Vec<Py<PyAny>>>>>,
238}
239
240impl Debug for TardisBatchedDeltasStreamIterator {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 write!(f, "TardisBatchedDeltasStreamIterator {{ stream: ... }}")
243 }
244}
245
246#[pymethods]
247#[pyo3_stub_gen::derive::gen_stub_pymethods]
248impl TardisBatchedDeltasStreamIterator {
249 const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
250 slf
251 }
252
253 fn __next__(&mut self) -> PyResult<Option<Vec<Py<PyAny>>>> {
254 match self.stream.next() {
255 Some(Ok(batch)) => Ok(Some(batch)),
256 Some(Err(e)) => Err(to_pyvalue_err(e)),
257 None => Ok(None),
258 }
259 }
260}
261
262#[pyfunction(name = "stream_tardis_batched_deltas")]
268#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
269#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
270pub fn py_stream_tardis_batched_deltas(
271 filepath: PathBuf,
272 chunk_size: usize,
273 price_precision: Option<u8>,
274 size_precision: Option<u8>,
275 instrument_id: Option<InstrumentId>,
276 limit: Option<usize>,
277) -> PyResult<TardisBatchedDeltasStreamIterator> {
278 let stream = stream_batched_deltas(
279 filepath,
280 chunk_size,
281 price_precision,
282 size_precision,
283 instrument_id,
284 limit,
285 )
286 .map_err(to_pyvalue_err)?;
287
288 Ok(TardisBatchedDeltasStreamIterator {
289 stream: Box::new(stream),
290 })
291}
292
293impl_tardis_stream_iterator!(
294 TardisQuoteStreamIterator,
295 QuoteTick,
296 "TardisQuoteStreamIterator"
297);
298
299#[pyfunction(name = "stream_tardis_quotes")]
305#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
306#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
307pub fn py_stream_tardis_quotes(
308 filepath: PathBuf,
309 chunk_size: usize,
310 price_precision: Option<u8>,
311 size_precision: Option<u8>,
312 instrument_id: Option<InstrumentId>,
313 limit: Option<usize>,
314) -> PyResult<TardisQuoteStreamIterator> {
315 let stream = stream_quotes(
316 filepath,
317 chunk_size,
318 price_precision,
319 size_precision,
320 instrument_id,
321 limit,
322 )
323 .map_err(to_pyvalue_err)?;
324
325 Ok(TardisQuoteStreamIterator {
326 stream: Box::new(stream),
327 })
328}
329
330impl_tardis_stream_iterator!(
331 TardisTradeStreamIterator,
332 TradeTick,
333 "TardisTradeStreamIterator"
334);
335
336#[pyfunction(name = "stream_tardis_trades")]
342#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
343#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
344pub fn py_stream_tardis_trades(
345 filepath: PathBuf,
346 chunk_size: usize,
347 price_precision: Option<u8>,
348 size_precision: Option<u8>,
349 instrument_id: Option<InstrumentId>,
350 limit: Option<usize>,
351) -> PyResult<TardisTradeStreamIterator> {
352 let stream = stream_trades(
353 filepath,
354 chunk_size,
355 price_precision,
356 size_precision,
357 instrument_id,
358 limit,
359 )
360 .map_err(to_pyvalue_err)?;
361
362 Ok(TardisTradeStreamIterator {
363 stream: Box::new(stream),
364 })
365}
366
367impl_tardis_stream_iterator!(
368 TardisDepth10StreamIterator,
369 OrderBookDepth10,
370 "TardisDepth10StreamIterator"
371);
372
373#[pyfunction(name = "stream_tardis_depth10_from_snapshot5")]
379#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
380#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
381pub fn py_stream_tardis_depth10_from_snapshot5(
382 filepath: PathBuf,
383 chunk_size: usize,
384 price_precision: Option<u8>,
385 size_precision: Option<u8>,
386 instrument_id: Option<InstrumentId>,
387 limit: Option<usize>,
388) -> PyResult<TardisDepth10StreamIterator> {
389 let stream = stream_depth10_from_snapshot5(
390 filepath,
391 chunk_size,
392 price_precision,
393 size_precision,
394 instrument_id,
395 limit,
396 )
397 .map_err(to_pyvalue_err)?;
398
399 Ok(TardisDepth10StreamIterator {
400 stream: Box::new(stream),
401 })
402}
403
404#[pyfunction(name = "stream_tardis_depth10_from_snapshot25")]
410#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
411#[pyo3(signature = (filepath, chunk_size=100_000, price_precision=None, size_precision=None, instrument_id=None, limit=None))]
412pub fn py_stream_tardis_depth10_from_snapshot25(
413 filepath: PathBuf,
414 chunk_size: usize,
415 price_precision: Option<u8>,
416 size_precision: Option<u8>,
417 instrument_id: Option<InstrumentId>,
418 limit: Option<usize>,
419) -> PyResult<TardisDepth10StreamIterator> {
420 let stream = stream_depth10_from_snapshot25(
421 filepath,
422 chunk_size,
423 price_precision,
424 size_precision,
425 instrument_id,
426 limit,
427 )
428 .map_err(to_pyvalue_err)?;
429
430 Ok(TardisDepth10StreamIterator {
431 stream: Box::new(stream),
432 })
433}
434
435impl_tardis_stream_iterator!(
436 TardisFundingRateStreamIterator,
437 FundingRateUpdate,
438 "TardisFundingRateStreamIterator"
439);
440
441#[pyfunction(name = "stream_tardis_funding_rates")]
447#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.tardis")]
448#[pyo3(signature = (filepath, chunk_size=100_000, instrument_id=None, limit=None))]
449pub fn py_stream_tardis_funding_rates(
450 filepath: PathBuf,
451 chunk_size: usize,
452 instrument_id: Option<InstrumentId>,
453 limit: Option<usize>,
454) -> PyResult<TardisFundingRateStreamIterator> {
455 let stream =
456 stream_funding_rates(filepath, chunk_size, instrument_id, limit).map_err(to_pyvalue_err)?;
457
458 Ok(TardisFundingRateStreamIterator {
459 stream: Box::new(stream),
460 })
461}