Skip to main content

nautilus_persistence/python/
feather.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Rust `FeatherWriter` as `StreamingFeatherWriter`.
17
18use std::{
19    cell::RefCell,
20    collections::{HashMap, HashSet},
21    rc::Rc,
22};
23
24use nautilus_common::{
25    live::get_runtime,
26    msgbus::typed_handler::ShareableMessageHandler,
27    python::{cache::PyCache, clock::PyClock},
28};
29use nautilus_core::UnixNanos;
30use nautilus_model::{
31    data::{
32        Bar, Data, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate,
33        OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose,
34    },
35    events::{
36        AccountState, OrderAccepted, OrderCancelRejected, OrderCanceled, OrderDenied,
37        OrderEmulated, OrderExpired, OrderFilled, OrderInitialized, OrderModifyRejected,
38        OrderPendingCancel, OrderPendingUpdate, OrderRejected, OrderReleased, OrderSnapshot,
39        OrderSubmitted, OrderTriggered, OrderUpdated, PositionAdjusted, PositionChanged,
40        PositionClosed, PositionOpened, PositionSnapshot,
41    },
42    python::instruments::pyobject_to_instrument_any,
43    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
44};
45use object_store::ObjectStoreExt;
46use pyo3::{exceptions::PyIOError, prelude::*};
47
48use crate::{
49    backend::feather::{FeatherWriter, RotationConfig},
50    parquet::create_object_store_from_path,
51};
52
53/// Python binding for the Rust `FeatherWriter`.
54///
55/// This provides a streaming writer of Nautilus objects into feather files with rotation
56/// capabilities, matching the interface of Python's `StreamingFeatherWriter`.
57#[pyclass(
58    name = "StreamingFeatherWriter",
59    module = "nautilus_trader.core.nautilus_pyo3.persistence",
60    unsendable
61)]
62#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.persistence")]
63pub struct PyStreamingFeatherWriter {
64    writer: Rc<RefCell<FeatherWriter>>,
65    handler: Option<ShareableMessageHandler>,
66}
67
68#[pymethods]
69#[pyo3_stub_gen::derive::gen_stub_pymethods]
70impl PyStreamingFeatherWriter {
71    /// Creates a new `StreamingFeatherWriter` instance.
72    ///
73    /// # Parameters
74    ///
75    /// - `path`: The path to persist the stream to. Must be a directory.
76    /// - `cache`: The cache for query info (PyCache).
77    /// - `clock`: The clock to use for time-related operations (PyClock).
78    /// - `fs_protocol`: Optional filesystem protocol (default: "file").
79    /// - `fs_storage_options`: Optional storage options for cloud backends.
80    /// - `include_types`: Optional list of type names to include (e.g., ["quotes", "trades"]).
81    /// - `rotation_mode`: Rotation mode (0=SIZE, 1=INTERVAL, 2=SCHEDULED_DATES, 3=NO_ROTATION).
82    /// - `max_file_size`: Maximum file size in bytes before rotation (for SIZE mode).
83    /// - `rotation_interval_ns`: Rotation interval in nanoseconds (for INTERVAL/SCHEDULED_DATES modes).
84    /// - `rotation_time_ns`: Scheduled rotation time in nanoseconds (for SCHEDULED_DATES mode).
85    /// - `flush_interval_ms`: Flush interval in milliseconds (default: 1000). Set to 0 to disable auto-flush.
86    /// - `replace`: If existing files at the given path should be replaced (default: False).
87    #[new]
88    #[pyo3(signature = (
89        path,
90        cache,
91        clock,
92        fs_protocol=None,
93        fs_storage_options=None,
94        include_types=None,
95        rotation_mode=3,
96        max_file_size=1024*1024*1024,
97        rotation_interval_ns=None,
98        rotation_time_ns=None,
99        rotation_timezone="UTC",
100        flush_interval_ms=None,
101        replace=false
102    ))]
103    #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
104    pub fn new(
105        path: String,
106        cache: PyCache,
107        clock: PyClock,
108        fs_protocol: Option<&str>,
109        fs_storage_options: Option<HashMap<String, String>>,
110        include_types: Option<Vec<String>>,
111        rotation_mode: u8,
112        max_file_size: u64,
113        rotation_interval_ns: Option<u64>,
114        rotation_time_ns: Option<u64>,
115        rotation_timezone: &str,
116        flush_interval_ms: Option<u64>,
117        replace: bool,
118    ) -> PyResult<Self> {
119        // Create object store from path
120        // Use fs_protocol to construct the full path if it's a cloud protocol
121        let full_path = if let Some(protocol) = fs_protocol {
122            if protocol != "file" && !path.contains("://") {
123                format!("{protocol}://{path}")
124            } else {
125                path.clone()
126            }
127        } else {
128            path.clone()
129        };
130
131        let storage_options = fs_storage_options
132            .map(|map| map.into_iter().collect::<ahash::AHashMap<String, String>>());
133
134        let (object_store, _base_path, _original_uri) =
135            create_object_store_from_path(&full_path, storage_options)
136                .map_err(|e| PyIOError::new_err(format!("Failed to create object store: {e}")))?;
137
138        // Handle replace parameter - delete existing files if requested
139        if replace {
140            let runtime = get_runtime();
141            let store_ref = object_store.clone();
142            runtime
143                .block_on(async {
144                    let prefix =
145                        object_store::path::Path::from(path.trim_start_matches('/').to_string());
146                    let mut stream = store_ref.list(Some(&prefix));
147                    let mut to_delete = Vec::new();
148
149                    while let Some(result) = futures::StreamExt::next(&mut stream).await {
150                        if let Ok(meta) = result {
151                            to_delete.push(meta.location);
152                        }
153                    }
154
155                    for path in to_delete {
156                        let _ = store_ref.delete(&path).await;
157                    }
158                    Ok::<(), anyhow::Error>(())
159                })
160                .map_err(|e| {
161                    PyIOError::new_err(format!("Failed to replace existing files: {e}"))
162                })?;
163        }
164
165        // Convert rotation mode to RotationConfig
166        // Python RotationMode: 0=SIZE, 1=INTERVAL, 2=SCHEDULED_DATES, 3=NO_ROTATION
167        let rotation_config = match rotation_mode {
168            0 => RotationConfig::Size {
169                max_size: max_file_size,
170            },
171            1 => {
172                let interval = rotation_interval_ns.unwrap_or(86_400_000_000_000); // Default 1 day
173                RotationConfig::Interval {
174                    interval_ns: interval,
175                }
176            }
177            2 => {
178                let interval = rotation_interval_ns.unwrap_or(86_400_000_000_000); // Default 1 day
179                let tz = rotation_timezone.parse::<chrono_tz::Tz>().map_err(|e| {
180                    PyIOError::new_err(format!("Failed to parse rotation_timezone: {e}"))
181                })?;
182                let time_ns = rotation_time_ns.unwrap_or(0);
183                RotationConfig::ScheduledDates {
184                    interval_ns: interval,
185                    rotation_time: UnixNanos::from(time_ns),
186                    rotation_timezone: tz,
187                }
188            }
189            3 => RotationConfig::NoRotation,
190            _ => RotationConfig::NoRotation, // Default to no rotation for invalid values
191        };
192
193        // Convert include_types to HashSet
194        let included_types =
195            include_types.map(|types| types.into_iter().collect::<HashSet<String>>());
196
197        // Set up per-instrument types (matching Python's _per_instrument_writers)
198        let mut per_instrument_types = HashSet::new();
199        per_instrument_types.insert("bars".to_string());
200        per_instrument_types.insert("order_book_deltas".to_string());
201        per_instrument_types.insert("order_book_depths".to_string());
202        per_instrument_types.insert("quotes".to_string());
203        per_instrument_types.insert("trades".to_string());
204
205        // Extract Clock from Python wrapper
206        // PyClock wraps Rc<RefCell<dyn Clock>>, we get the inner Rc
207        let clock_rc = clock.clock_rc();
208        // Note: Cache parameter is kept for API compatibility with Python StreamingFeatherWriter
209        // but is not directly used by FeatherWriter
210        let _cache = cache;
211
212        // Create FeatherWriter
213        let writer = FeatherWriter::new(
214            path,
215            object_store,
216            clock_rc,
217            rotation_config,
218            included_types,
219            Some(per_instrument_types),
220            flush_interval_ms, // Auto-flush interval in milliseconds
221        );
222
223        Ok(Self {
224            writer: Rc::new(RefCell::new(writer)),
225            handler: None,
226        })
227    }
228
229    /// Subscribes to all messages on the message bus (pattern "*").
230    ///
231    /// This matches the behavior of Python's StreamingFeatherWriter when subscribed
232    /// via `trader.subscribe("*", writer.write)`.
233    pub fn subscribe(&mut self) -> PyResult<()> {
234        if self.handler.is_some() {
235            // Already subscribed
236            return Ok(());
237        }
238
239        let handler = FeatherWriter::subscribe_to_message_bus(self.writer.clone())
240            .map_err(|e| PyIOError::new_err(format!("Failed to subscribe to message bus: {e}")))?;
241
242        self.handler = Some(handler);
243        Ok(())
244    }
245
246    /// Unsubscribes from the message bus.
247    pub fn unsubscribe(&mut self) -> PyResult<()> {
248        if let Some(handler) = self.handler.take() {
249            FeatherWriter::unsubscribe_from_message_bus(&handler);
250        }
251        Ok(())
252    }
253
254    /// Writes a data object to the stream.
255    ///
256    /// # Parameters
257    ///
258    /// - `data`: The data object to write (must be a Nautilus data type from pyo3).
259    ///
260    #[expect(clippy::needless_pass_by_value)]
261    pub fn write(&self, py: Python, data: Py<PyAny>) -> PyResult<()> {
262        macro_rules! try_write {
263            ($type:ty, $name:literal) => {
264                if let Ok(value) = data.extract::<$type>(py) {
265                    let mut writer = self.writer.borrow_mut();
266                    let runtime = get_runtime();
267                    return runtime
268                        .block_on(async { writer.write(value).await })
269                        .map_err(|e| {
270                            PyIOError::new_err(format!("Failed to write {}: {e}", $name))
271                        });
272                }
273            };
274        }
275
276        // Try to convert from common pyo3 data types
277        if let Ok(quote) = data.extract::<QuoteTick>(py) {
278            let mut writer = self.writer.borrow_mut();
279            let runtime = get_runtime();
280            return runtime
281                .block_on(async { writer.write_data(Data::Quote(quote)).await })
282                .map_err(|e| PyIOError::new_err(format!("Failed to write QuoteTick: {e}")));
283        }
284
285        if let Ok(trade) = data.extract::<TradeTick>(py) {
286            let mut writer = self.writer.borrow_mut();
287            let runtime = get_runtime();
288            return runtime
289                .block_on(async { writer.write_data(Data::Trade(trade)).await })
290                .map_err(|e| PyIOError::new_err(format!("Failed to write TradeTick: {e}")));
291        }
292
293        if let Ok(bar) = data.extract::<Bar>(py) {
294            let mut writer = self.writer.borrow_mut();
295            let runtime = get_runtime();
296            return runtime
297                .block_on(async { writer.write_data(Data::Bar(bar)).await })
298                .map_err(|e| PyIOError::new_err(format!("Failed to write Bar: {e}")));
299        }
300
301        if let Ok(delta) = data.extract::<OrderBookDelta>(py) {
302            let mut writer = self.writer.borrow_mut();
303            let runtime = get_runtime();
304            return runtime
305                .block_on(async { writer.write_data(Data::Delta(delta)).await })
306                .map_err(|e| PyIOError::new_err(format!("Failed to write OrderBookDelta: {e}")));
307        }
308
309        if let Ok(depth) = data.extract::<OrderBookDepth10>(py) {
310            let mut writer = self.writer.borrow_mut();
311            let runtime = get_runtime();
312            return runtime
313                .block_on(async { writer.write_data(Data::Depth10(Box::new(depth))).await })
314                .map_err(|e| PyIOError::new_err(format!("Failed to write OrderBookDepth10: {e}")));
315        }
316
317        if let Ok(price) = data.extract::<IndexPriceUpdate>(py) {
318            let mut writer = self.writer.borrow_mut();
319            let runtime = get_runtime();
320            return runtime
321                .block_on(async { writer.write_data(Data::IndexPriceUpdate(price)).await })
322                .map_err(|e| PyIOError::new_err(format!("Failed to write IndexPriceUpdate: {e}")));
323        }
324
325        if let Ok(price) = data.extract::<MarkPriceUpdate>(py) {
326            let mut writer = self.writer.borrow_mut();
327            let runtime = get_runtime();
328            return runtime
329                .block_on(async { writer.write_data(Data::MarkPriceUpdate(price)).await })
330                .map_err(|e| PyIOError::new_err(format!("Failed to write MarkPriceUpdate: {e}")));
331        }
332
333        if let Ok(close) = data.extract::<InstrumentClose>(py) {
334            let mut writer = self.writer.borrow_mut();
335            let runtime = get_runtime();
336            return runtime
337                .block_on(async { writer.write_data(Data::InstrumentClose(close)).await })
338                .map_err(|e| PyIOError::new_err(format!("Failed to write InstrumentClose: {e}")));
339        }
340
341        try_write!(FundingRateUpdate, "FundingRateUpdate");
342        try_write!(InstrumentStatus, "InstrumentStatus");
343        try_write!(AccountState, "AccountState");
344        try_write!(OrderInitialized, "OrderInitialized");
345        try_write!(OrderDenied, "OrderDenied");
346        try_write!(OrderEmulated, "OrderEmulated");
347        try_write!(OrderSubmitted, "OrderSubmitted");
348        try_write!(OrderAccepted, "OrderAccepted");
349        try_write!(OrderRejected, "OrderRejected");
350        try_write!(OrderPendingCancel, "OrderPendingCancel");
351        try_write!(OrderCanceled, "OrderCanceled");
352        try_write!(OrderCancelRejected, "OrderCancelRejected");
353        try_write!(OrderExpired, "OrderExpired");
354        try_write!(OrderTriggered, "OrderTriggered");
355        try_write!(OrderPendingUpdate, "OrderPendingUpdate");
356        try_write!(OrderReleased, "OrderReleased");
357        try_write!(OrderModifyRejected, "OrderModifyRejected");
358        try_write!(OrderUpdated, "OrderUpdated");
359        try_write!(OrderFilled, "OrderFilled");
360        try_write!(PositionOpened, "PositionOpened");
361        try_write!(PositionChanged, "PositionChanged");
362        try_write!(PositionClosed, "PositionClosed");
363        try_write!(PositionAdjusted, "PositionAdjusted");
364        try_write!(OrderSnapshot, "OrderSnapshot");
365        try_write!(PositionSnapshot, "PositionSnapshot");
366        try_write!(OrderStatusReport, "OrderStatusReport");
367        try_write!(FillReport, "FillReport");
368        try_write!(PositionStatusReport, "PositionStatusReport");
369        try_write!(ExecutionMassStatus, "ExecutionMassStatus");
370
371        // Try instrument types (uses type_str attribute for dispatch)
372        if let Ok(instrument) = pyobject_to_instrument_any(py, data.clone_ref(py)) {
373            let mut writer = self.writer.borrow_mut();
374            let runtime = get_runtime();
375            return runtime
376                .block_on(async { writer.write_instrument(instrument).await })
377                .map_err(|e| PyIOError::new_err(format!("Failed to write instrument: {e}")));
378        }
379
380        Err(PyIOError::new_err(
381            "Unsupported data type for feather writer",
382        ))
383    }
384
385    /// Flushes all active buffers by writing any remaining buffered bytes to the object store.
386    ///
387    /// This is called automatically based on `flush_interval_ms` if configured, but can also
388    /// be called manually by the client.
389    pub fn flush(&self) -> PyResult<()> {
390        let mut writer = self.writer.borrow_mut();
391        let runtime = get_runtime();
392
393        runtime
394            .block_on(async { writer.flush().await })
395            .map_err(|e| PyIOError::new_err(format!("Failed to flush: {e}")))
396    }
397
398    /// Closes all writers by flushing and removing them.
399    ///
400    /// After calling this, no further writes should be performed.
401    pub fn close(&self) -> PyResult<()> {
402        let mut writer = self.writer.borrow_mut();
403        let runtime = get_runtime();
404
405        runtime
406            .block_on(async { writer.close().await })
407            .map_err(|e| PyIOError::new_err(format!("Failed to close: {e}")))
408    }
409
410    /// Returns whether the writer has been closed (no active writers).
411    #[getter]
412    pub fn is_closed(&self) -> bool {
413        self.writer.borrow().is_closed()
414    }
415
416    /// Returns information about the current files being written.
417    ///
418    /// Returns a dictionary mapping writer keys to (size, path) tuples.
419    pub fn get_current_file_info(&self) -> HashMap<String, (u64, String)> {
420        self.writer.borrow().get_current_file_info()
421    }
422
423    /// Returns the next rotation time for a writer, or None if not set.
424    #[pyo3(signature = (type_str, instrument_id=None))]
425    pub fn get_next_rotation_time(
426        &self,
427        type_str: &str,
428        instrument_id: Option<&str>,
429    ) -> Option<u64> {
430        self.writer
431            .borrow()
432            .get_next_rotation_time(type_str, instrument_id)
433            .map(|ns| ns.as_u64())
434    }
435}