nautilus_persistence/python/
feather.rs1use std::{
19 cell::RefCell,
20 collections::{HashMap, HashSet},
21 rc::Rc,
22};
23
24use nautilus_common::{
25 live::get_runtime,
26 msgbus::typed_handler::ShareableMessageHandler,
27 python::{cache::PyCache, clock::PyClock},
28};
29use nautilus_core::UnixNanos;
30use nautilus_model::{
31 data::{
32 Bar, Data, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
33 OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose,
34 },
35 events::{
36 AccountState, OrderAccepted, OrderCancelRejected, OrderCanceled, OrderDenied,
37 OrderEmulated, OrderExpired, OrderFilled, OrderInitialized, OrderModifyRejected,
38 OrderPendingCancel, OrderPendingUpdate, OrderRejected, OrderReleased, OrderSnapshot,
39 OrderSubmitted, OrderTriggered, OrderUpdated, PositionAdjusted, PositionChanged,
40 PositionClosed, PositionOpened, PositionSnapshot,
41 },
42 python::instruments::pyobject_to_instrument_any,
43 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
44};
45use object_store::ObjectStoreExt;
46use pyo3::{exceptions::PyIOError, prelude::*};
47
48use crate::{
49 backend::feather::{FeatherWriter, RotationConfig},
50 parquet::create_object_store_from_path,
51};
52
53#[pyclass(
58 name = "StreamingFeatherWriter",
59 module = "nautilus_trader.core.nautilus_pyo3.persistence",
60 unsendable
61)]
62#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.persistence")]
63pub struct PyStreamingFeatherWriter {
64 writer: Rc<RefCell<FeatherWriter>>,
65 handler: Option<ShareableMessageHandler>,
66}
67
68#[pymethods]
69#[pyo3_stub_gen::derive::gen_stub_pymethods]
70impl PyStreamingFeatherWriter {
71 #[new]
88 #[pyo3(signature = (
89 path,
90 cache,
91 clock,
92 fs_protocol=None,
93 fs_storage_options=None,
94 include_types=None,
95 rotation_mode=3,
96 max_file_size=1024*1024*1024,
97 rotation_interval_ns=None,
98 rotation_time_ns=None,
99 rotation_timezone="UTC",
100 flush_interval_ms=None,
101 replace=false
102 ))]
103 #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
104 pub fn new(
105 path: String,
106 cache: PyCache,
107 clock: PyClock,
108 fs_protocol: Option<&str>,
109 fs_storage_options: Option<HashMap<String, String>>,
110 include_types: Option<Vec<String>>,
111 rotation_mode: u8,
112 max_file_size: u64,
113 rotation_interval_ns: Option<u64>,
114 rotation_time_ns: Option<u64>,
115 rotation_timezone: &str,
116 flush_interval_ms: Option<u64>,
117 replace: bool,
118 ) -> PyResult<Self> {
119 let full_path = if let Some(protocol) = fs_protocol {
122 if protocol != "file" && !path.contains("://") {
123 format!("{protocol}://{path}")
124 } else {
125 path.clone()
126 }
127 } else {
128 path.clone()
129 };
130
131 let storage_options = fs_storage_options
132 .map(|map| map.into_iter().collect::<ahash::AHashMap<String, String>>());
133
134 let (object_store, _base_path, _original_uri) =
135 create_object_store_from_path(&full_path, storage_options)
136 .map_err(|e| PyIOError::new_err(format!("Failed to create object store: {e}")))?;
137
138 if replace {
140 let runtime = get_runtime();
141 let store_ref = object_store.clone();
142 runtime
143 .block_on(async {
144 let prefix =
145 object_store::path::Path::from(path.trim_start_matches('/').to_string());
146 let mut stream = store_ref.list(Some(&prefix));
147 let mut to_delete = Vec::new();
148
149 while let Some(result) = futures::StreamExt::next(&mut stream).await {
150 if let Ok(meta) = result {
151 to_delete.push(meta.location);
152 }
153 }
154
155 for path in to_delete {
156 let _ = store_ref.delete(&path).await;
157 }
158 Ok::<(), anyhow::Error>(())
159 })
160 .map_err(|e| {
161 PyIOError::new_err(format!("Failed to replace existing files: {e}"))
162 })?;
163 }
164
165 let rotation_config = match rotation_mode {
168 0 => RotationConfig::Size {
169 max_size: max_file_size,
170 },
171 1 => {
172 let interval = rotation_interval_ns.unwrap_or(86_400_000_000_000); RotationConfig::Interval {
174 interval_ns: interval,
175 }
176 }
177 2 => {
178 let interval = rotation_interval_ns.unwrap_or(86_400_000_000_000); let tz = rotation_timezone.parse::<chrono_tz::Tz>().map_err(|e| {
180 PyIOError::new_err(format!("Failed to parse rotation_timezone: {e}"))
181 })?;
182 let time_ns = rotation_time_ns.unwrap_or(0);
183 RotationConfig::ScheduledDates {
184 interval_ns: interval,
185 rotation_time: UnixNanos::from(time_ns),
186 rotation_timezone: tz,
187 }
188 }
189 3 => RotationConfig::NoRotation,
190 _ => RotationConfig::NoRotation, };
192
193 let included_types =
195 include_types.map(|types| types.into_iter().collect::<HashSet<String>>());
196
197 let mut per_instrument_types = HashSet::new();
199 per_instrument_types.insert("bars".to_string());
200 per_instrument_types.insert("order_book_deltas".to_string());
201 per_instrument_types.insert("order_book_depths".to_string());
202 per_instrument_types.insert("quotes".to_string());
203 per_instrument_types.insert("trades".to_string());
204
205 let clock_rc = clock.clock_rc();
208 let _cache = cache;
211
212 let writer = FeatherWriter::new(
214 path,
215 object_store,
216 clock_rc,
217 rotation_config,
218 included_types,
219 Some(per_instrument_types),
220 flush_interval_ms, );
222
223 Ok(Self {
224 writer: Rc::new(RefCell::new(writer)),
225 handler: None,
226 })
227 }
228
229 pub fn subscribe(&mut self) -> PyResult<()> {
234 if self.handler.is_some() {
235 return Ok(());
237 }
238
239 let handler = FeatherWriter::subscribe_to_message_bus(self.writer.clone())
240 .map_err(|e| PyIOError::new_err(format!("Failed to subscribe to message bus: {e}")))?;
241
242 self.handler = Some(handler);
243 Ok(())
244 }
245
246 pub fn unsubscribe(&mut self) -> PyResult<()> {
248 if let Some(handler) = self.handler.take() {
249 FeatherWriter::unsubscribe_from_message_bus(&handler);
250 }
251 Ok(())
252 }
253
254 #[expect(clippy::needless_pass_by_value)]
261 pub fn write(&self, py: Python, data: Py<PyAny>) -> PyResult<()> {
262 macro_rules! try_write {
263 ($type:ty, $name:literal) => {
264 if let Ok(value) = data.extract::<$type>(py) {
265 let mut writer = self.writer.borrow_mut();
266 let runtime = get_runtime();
267 return runtime
268 .block_on(async { writer.write(value).await })
269 .map_err(|e| {
270 PyIOError::new_err(format!("Failed to write {}: {e}", $name))
271 });
272 }
273 };
274 }
275
276 if let Ok(quote) = data.extract::<QuoteTick>(py) {
278 let mut writer = self.writer.borrow_mut();
279 let runtime = get_runtime();
280 return runtime
281 .block_on(async { writer.write_data(Data::Quote(quote)).await })
282 .map_err(|e| PyIOError::new_err(format!("Failed to write QuoteTick: {e}")));
283 }
284
285 if let Ok(trade) = data.extract::<TradeTick>(py) {
286 let mut writer = self.writer.borrow_mut();
287 let runtime = get_runtime();
288 return runtime
289 .block_on(async { writer.write_data(Data::Trade(trade)).await })
290 .map_err(|e| PyIOError::new_err(format!("Failed to write TradeTick: {e}")));
291 }
292
293 if let Ok(bar) = data.extract::<Bar>(py) {
294 let mut writer = self.writer.borrow_mut();
295 let runtime = get_runtime();
296 return runtime
297 .block_on(async { writer.write_data(Data::Bar(bar)).await })
298 .map_err(|e| PyIOError::new_err(format!("Failed to write Bar: {e}")));
299 }
300
301 if let Ok(delta) = data.extract::<OrderBookDelta>(py) {
302 let mut writer = self.writer.borrow_mut();
303 let runtime = get_runtime();
304 return runtime
305 .block_on(async { writer.write_data(Data::Delta(delta)).await })
306 .map_err(|e| PyIOError::new_err(format!("Failed to write OrderBookDelta: {e}")));
307 }
308
309 if let Ok(depth) = data.extract::<OrderBookDepth10>(py) {
310 let mut writer = self.writer.borrow_mut();
311 let runtime = get_runtime();
312 return runtime
313 .block_on(async { writer.write_data(Data::Depth10(Box::new(depth))).await })
314 .map_err(|e| PyIOError::new_err(format!("Failed to write OrderBookDepth10: {e}")));
315 }
316
317 if let Ok(price) = data.extract::<IndexPriceUpdate>(py) {
318 let mut writer = self.writer.borrow_mut();
319 let runtime = get_runtime();
320 return runtime
321 .block_on(async { writer.write_data(Data::IndexPriceUpdate(price)).await })
322 .map_err(|e| PyIOError::new_err(format!("Failed to write IndexPriceUpdate: {e}")));
323 }
324
325 if let Ok(price) = data.extract::<MarkPriceUpdate>(py) {
326 let mut writer = self.writer.borrow_mut();
327 let runtime = get_runtime();
328 return runtime
329 .block_on(async { writer.write_data(Data::MarkPriceUpdate(price)).await })
330 .map_err(|e| PyIOError::new_err(format!("Failed to write MarkPriceUpdate: {e}")));
331 }
332
333 if let Ok(close) = data.extract::<InstrumentClose>(py) {
334 let mut writer = self.writer.borrow_mut();
335 let runtime = get_runtime();
336 return runtime
337 .block_on(async { writer.write_data(Data::InstrumentClose(close)).await })
338 .map_err(|e| PyIOError::new_err(format!("Failed to write InstrumentClose: {e}")));
339 }
340
341 try_write!(FundingRateUpdate, "FundingRateUpdate");
342 try_write!(InstrumentStatus, "InstrumentStatus");
343 try_write!(AccountState, "AccountState");
344 try_write!(OrderInitialized, "OrderInitialized");
345 try_write!(OrderDenied, "OrderDenied");
346 try_write!(OrderEmulated, "OrderEmulated");
347 try_write!(OrderSubmitted, "OrderSubmitted");
348 try_write!(OrderAccepted, "OrderAccepted");
349 try_write!(OrderRejected, "OrderRejected");
350 try_write!(OrderPendingCancel, "OrderPendingCancel");
351 try_write!(OrderCanceled, "OrderCanceled");
352 try_write!(OrderCancelRejected, "OrderCancelRejected");
353 try_write!(OrderExpired, "OrderExpired");
354 try_write!(OrderTriggered, "OrderTriggered");
355 try_write!(OrderPendingUpdate, "OrderPendingUpdate");
356 try_write!(OrderReleased, "OrderReleased");
357 try_write!(OrderModifyRejected, "OrderModifyRejected");
358 try_write!(OrderUpdated, "OrderUpdated");
359 try_write!(OrderFilled, "OrderFilled");
360 try_write!(PositionOpened, "PositionOpened");
361 try_write!(PositionChanged, "PositionChanged");
362 try_write!(PositionClosed, "PositionClosed");
363 try_write!(PositionAdjusted, "PositionAdjusted");
364 try_write!(OrderSnapshot, "OrderSnapshot");
365 try_write!(PositionSnapshot, "PositionSnapshot");
366 try_write!(OrderStatusReport, "OrderStatusReport");
367 try_write!(FillReport, "FillReport");
368 try_write!(PositionStatusReport, "PositionStatusReport");
369 try_write!(ExecutionMassStatus, "ExecutionMassStatus");
370
371 if let Ok(instrument) = pyobject_to_instrument_any(py, data.clone_ref(py)) {
373 let mut writer = self.writer.borrow_mut();
374 let runtime = get_runtime();
375 return runtime
376 .block_on(async { writer.write_instrument(instrument).await })
377 .map_err(|e| PyIOError::new_err(format!("Failed to write instrument: {e}")));
378 }
379
380 Err(PyIOError::new_err(
381 "Unsupported data type for feather writer",
382 ))
383 }
384
385 pub fn flush(&self) -> PyResult<()> {
390 let mut writer = self.writer.borrow_mut();
391 let runtime = get_runtime();
392
393 runtime
394 .block_on(async { writer.flush().await })
395 .map_err(|e| PyIOError::new_err(format!("Failed to flush: {e}")))
396 }
397
398 pub fn close(&self) -> PyResult<()> {
402 let mut writer = self.writer.borrow_mut();
403 let runtime = get_runtime();
404
405 runtime
406 .block_on(async { writer.close().await })
407 .map_err(|e| PyIOError::new_err(format!("Failed to close: {e}")))
408 }
409
410 #[getter]
412 pub fn is_closed(&self) -> bool {
413 self.writer.borrow().is_closed()
414 }
415
416 pub fn get_current_file_info(&self) -> HashMap<String, (u64, String)> {
420 self.writer.borrow().get_current_file_info()
421 }
422
423 #[pyo3(signature = (type_str, instrument_id=None))]
425 pub fn get_next_rotation_time(
426 &self,
427 type_str: &str,
428 instrument_id: Option<&str>,
429 ) -> Option<u64> {
430 self.writer
431 .borrow()
432 .get_next_rotation_time(type_str, instrument_id)
433 .map(|ns| ns.as_u64())
434 }
435}