Skip to main content

nautilus_persistence/backend/
feather.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::RefCell,
19    collections::{HashMap, HashSet},
20    rc::Rc,
21    sync::Arc,
22};
23
24use ahash::AHashMap;
25use chrono_tz::Tz;
26use datafusion::arrow::{
27    datatypes::Schema, error::ArrowError, ipc::writer::StreamWriter, record_batch::RecordBatch,
28};
29use nautilus_common::{
30    cache::fifo::FifoCache,
31    clock::Clock,
32    msgbus::{mstr::MStr, subscribe_any, typed_handler::ShareableMessageHandler, unsubscribe_any},
33};
34use nautilus_core::{UUID4, UnixNanos};
35use nautilus_model::{
36    data::{
37        Bar, CatalogPathPrefix, CustomData, CustomDataTrait, Data, FundingRateUpdate,
38        IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas,
39        OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose, encode_custom_to_arrow,
40        get_arrow_schema,
41    },
42    events::{
43        AccountState, OrderAccepted, OrderCancelRejected, OrderCanceled, OrderDenied,
44        OrderEmulated, OrderExpired, OrderFilled, OrderInitialized, OrderModifyRejected,
45        OrderPendingCancel, OrderPendingUpdate, OrderRejected, OrderReleased, OrderSnapshot,
46        OrderSubmitted, OrderTriggered, OrderUpdated, PositionAdjusted, PositionChanged,
47        PositionClosed, PositionOpened, PositionSnapshot,
48    },
49    instruments::InstrumentAny,
50    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
51};
52use nautilus_serialization::arrow::{EncodeToRecordBatch, KEY_INSTRUMENT_ID};
53use object_store::{ObjectStore, ObjectStoreExt, path::Path};
54
55use super::catalog::urisafe_instrument_id;
56use crate::backend::{
57    catalog::safe_directory_identifier,
58    custom::{augment_batch_with_data_type_column, schema_with_data_type_column},
59};
60
61#[derive(Debug, Default, PartialEq, PartialOrd, Hash, Eq, Clone)]
62pub struct FileWriterPath {
63    path: Path,
64    type_str: String,
65    instrument_id: Option<String>,
66}
67
68/// A `FeatherBuffer` encodes data via an Arrow `StreamWriter`.
69///
70/// It flushes the internal byte buffer according to rotation policy.
71pub struct FeatherBuffer {
72    /// Arrow `StreamWriter` that writes to an in-memory `Vec<u8>`.
73    writer: StreamWriter<Vec<u8>>,
74    /// Current size in bytes.
75    size: u64,
76    /// TODO: Optional next rotation timestamp.
77    // next_rotation: Option<UnixNanos>,
78    /// Schema of the data being written.
79    schema: Schema,
80    /// Maximum buffer size in bytes.
81    max_buffer_size: u64,
82    /// Rotation config
83    rotation_config: RotationConfig,
84}
85
86impl FeatherBuffer {
87    /// Creates a new [`FeatherBuffer`] using the given path, schema and maximum buffer size.
88    pub fn new(schema: &Schema, rotation_config: RotationConfig) -> Result<Self, ArrowError> {
89        let writer = StreamWriter::try_new(Vec::new(), schema)?;
90        let mut max_buffer_size = 1_000_000_000_000; // 1 GB
91
92        if let RotationConfig::Size { max_size } = &rotation_config {
93            max_buffer_size = *max_size;
94        }
95
96        Ok(Self {
97            writer,
98            size: 0,
99            // next_rotation: None,
100            max_buffer_size,
101            schema: schema.clone(),
102            rotation_config,
103        })
104    }
105
106    /// Writes the given `RecordBatch` to the internal buffer.
107    ///
108    /// Returns true if it should be rotated according rotation policy
109    pub fn write_record_batch(&mut self, batch: &RecordBatch) -> Result<bool, ArrowError> {
110        self.writer.write(batch)?;
111        self.size += batch.get_array_memory_size() as u64;
112        Ok(self.size >= self.max_buffer_size)
113    }
114
115    /// Consumes the writer and returns the buffer of bytes from the `StreamWriter`
116    pub fn take_buffer(&mut self) -> Result<Vec<u8>, ArrowError> {
117        let mut writer = StreamWriter::try_new(Vec::new(), &self.schema)?;
118        std::mem::swap(&mut self.writer, &mut writer);
119        let buffer = writer.into_inner()?;
120        // TODO: Handle rotation config here
121        self.size = 0;
122        Ok(buffer)
123    }
124
125    /// Should rotate
126    #[must_use]
127    pub const fn should_rotate(&self) -> bool {
128        match &self.rotation_config {
129            RotationConfig::Size { max_size } => self.size >= *max_size,
130            _ => false,
131        }
132    }
133}
134
135/// Configuration for file rotation.
136#[derive(Debug, Clone)]
137pub enum RotationConfig {
138    /// Rotate based on file size.
139    Size {
140        /// Maximum buffer size in bytes before rotation.
141        max_size: u64,
142    },
143    /// Rotate based on a time interval.
144    Interval {
145        /// Interval in nanoseconds.
146        interval_ns: u64,
147    },
148    /// Rotate based on scheduled dates.
149    ScheduledDates {
150        /// Interval in nanoseconds.
151        interval_ns: u64,
152        /// Time of day for rotation (nanoseconds since midnight).
153        rotation_time: UnixNanos,
154        /// Timezone for rotation calculations.
155        rotation_timezone: Tz,
156    },
157    /// No automatic rotation.
158    NoRotation,
159}
160
161/// Manages multiple `FeatherBuffers` and handles encoding, rotation, and flushing to the object store.
162///
163/// The `write()` method is the single entry point for clients: they supply a data value (of generic type T)
164/// and the manager encodes it (using T's metadata via `EncodeToRecordBatch`), routes it by `CatalogPathPrefix`,
165/// and writes it to the appropriate `FileWriter`. When a writer's buffer is full or rotation criteria are met,
166/// its contents are flushed to the object store and it is replaced.
167pub struct FeatherWriter {
168    /// Base directory for writing files.
169    base_path: String,
170    /// Object store for persistence.
171    store: Arc<dyn ObjectStore>,
172    /// Clock for timestamps and rotation.
173    clock: Rc<RefCell<dyn Clock>>,
174    /// Rotation configuration.
175    rotation_config: RotationConfig,
176    /// Optional set of type names to include.
177    included_types: Option<HashSet<String>>,
178    /// Set of types that should be split by instrument.
179    per_instrument_types: HashSet<String>,
180    /// Map of active `FeatherBuffers` keyed by their path.
181    writers: HashMap<FileWriterPath, FeatherBuffer>,
182    /// Map of next rotation times keyed by their path.
183    next_rotation_times: HashMap<FileWriterPath, UnixNanos>,
184    /// Runtime handle for async operations.
185    runtime: tokio::runtime::Handle,
186    /// Flush interval in milliseconds (0 = no automatic flushing).
187    flush_interval_ms: u64,
188    /// Last flush timestamp in nanoseconds.
189    last_flush_ns: UnixNanos,
190    /// Bounded cache of recently seen event IDs for deduplication.
191    seen_event_ids: Box<FifoCache<UUID4, 10_000>>,
192}
193
194impl FeatherWriter {
195    /// Creates a new [`FeatherWriter`] instance.
196    pub fn new(
197        base_path: String,
198        store: Arc<dyn ObjectStore>,
199        clock: Rc<RefCell<dyn Clock>>,
200        rotation_config: RotationConfig,
201        included_types: Option<HashSet<String>>,
202        per_instrument_types: Option<HashSet<String>>,
203        flush_interval_ms: Option<u64>,
204    ) -> Self {
205        // Get the runtime handle for async operations
206        let runtime = nautilus_common::live::get_runtime().handle().clone();
207        let flush_interval_ms = flush_interval_ms.unwrap_or(1000); // Default 1 second
208        let last_flush_ns = clock.borrow().timestamp_ns();
209
210        Self {
211            base_path,
212            store,
213            clock,
214            rotation_config,
215            included_types,
216            per_instrument_types: per_instrument_types.unwrap_or_default(),
217            writers: HashMap::new(),
218            next_rotation_times: HashMap::new(),
219            runtime,
220            flush_interval_ms,
221            last_flush_ns,
222            seen_event_ids: Box::new(FifoCache::new()),
223        }
224    }
225
226    /// Writes a single data value.
227    /// This is the user entry point. The data is encoded into a `RecordBatch` and written to the appropriate `FileWriter`.
228    /// If the writer's buffer reaches capacity or meets rotation criteria (based on the rotation configuration),
229    /// the `FileWriter` is flushed to the object store and replaced.
230    pub async fn write<T>(&mut self, data: T) -> Result<(), Box<dyn std::error::Error>>
231    where
232        T: EncodeToRecordBatch + CatalogPathPrefix + 'static,
233    {
234        if !self.should_write::<T>() {
235            return Ok(());
236        }
237
238        let path = self.get_writer_path(&data)?;
239
240        // Create a new FileWriter if one does not exist.
241        if !self.writers.contains_key(&path) {
242            self.create_writer::<T>(path.clone(), &data)?;
243        }
244
245        // Encode the data into a RecordBatch using T's encoding logic.
246        let batch = T::encode_batch(&T::metadata(&data), &[data])?;
247
248        // Write the RecordBatch to the appropriate FileWriter.
249        if let Some(writer) = self.writers.get_mut(&path) {
250            let should_rotate = writer.write_record_batch(&batch)?;
251            if should_rotate || self.check_scheduled_rotation(&path) {
252                self.rotate_writer(&path).await?;
253            }
254        }
255
256        // Check if we should auto-flush based on time interval
257        self.check_flush().await?;
258
259        Ok(())
260    }
261
262    /// Writes a batch of data values as one or more `RecordBatch`es.
263    ///
264    /// Uses `T::chunk_metadata` to derive the file schema metadata. This protects
265    /// types like `OrderBookDelta` from having their file metadata poisoned by a
266    /// leading sentinel row (e.g. `BookAction::Clear`, which carries
267    /// `price_precision=0, size_precision=0`).
268    ///
269    /// Per-instrument types are partitioned by instrument so a mixed-instrument
270    /// batch lands in the correct file for each instrument.
271    pub async fn write_batch<T>(&mut self, data: Vec<T>) -> Result<(), Box<dyn std::error::Error>>
272    where
273        T: EncodeToRecordBatch + CatalogPathPrefix + 'static,
274    {
275        if data.is_empty() || !self.should_write::<T>() {
276            return Ok(());
277        }
278
279        // Group by logical writer identity (instrument_id for per-instrument types).
280        // Grouping on FileWriterPath would split same-instrument rows across distinct
281        // timestamped paths when the writer does not yet exist under a LiveClock.
282        let type_str = T::path_prefix();
283        let needs_instrument =
284            self.per_instrument_types.contains(type_str) || type_str.starts_with("custom_");
285
286        let mut groups: AHashMap<Option<String>, Vec<T>> = AHashMap::new();
287
288        for item in data {
289            let instrument_id = if needs_instrument {
290                T::metadata(&item).get(KEY_INSTRUMENT_ID).cloned()
291            } else {
292                None
293            };
294            groups.entry(instrument_id).or_default().push(item);
295        }
296
297        for group in groups.into_values() {
298            let path = self.get_writer_path(&group[0])?;
299            let metadata = T::chunk_metadata(&group);
300
301            if !self.writers.contains_key(&path) {
302                self.create_writer_with_metadata::<T>(path.clone(), metadata.clone())?;
303            }
304
305            let batch = T::encode_batch(&metadata, &group)?;
306
307            if let Some(writer) = self.writers.get_mut(&path) {
308                let should_rotate = writer.write_record_batch(&batch)?;
309                if should_rotate || self.check_scheduled_rotation(&path) {
310                    self.rotate_writer(&path).await?;
311                }
312            }
313        }
314
315        self.check_flush().await?;
316
317        Ok(())
318    }
319
320    /// Checks if enough time has passed since last flush and flushes if needed.
321    async fn check_flush(&mut self) -> Result<(), Box<dyn std::error::Error>> {
322        if self.flush_interval_ms == 0 {
323            return Ok(()); // Auto-flush disabled
324        }
325
326        let now_ns = self.clock.borrow().timestamp_ns();
327        let elapsed_ms = (now_ns.as_u64() - self.last_flush_ns.as_u64()) / 1_000_000;
328
329        if elapsed_ms >= self.flush_interval_ms {
330            self.flush().await?;
331            self.last_flush_ns = now_ns;
332        }
333
334        Ok(())
335    }
336
337    fn check_scheduled_rotation(&mut self, path: &FileWriterPath) -> bool {
338        match self.rotation_config {
339            RotationConfig::Interval { interval_ns } => {
340                let now = self.clock.borrow().timestamp_ns();
341                let next_rotation = self.next_rotation_times.get(path).copied();
342
343                match next_rotation {
344                    None => {
345                        self.next_rotation_times
346                            .insert(path.clone(), now + interval_ns);
347                        false
348                    }
349                    Some(next) if now >= next => {
350                        self.next_rotation_times
351                            .insert(path.clone(), now + interval_ns);
352                        true
353                    }
354                    _ => false,
355                }
356            }
357            RotationConfig::ScheduledDates {
358                interval_ns,
359                rotation_time,
360                rotation_timezone,
361            } => {
362                let now = self.clock.borrow().timestamp_ns();
363                let next_rotation = self.next_rotation_times.get(path).copied();
364
365                match next_rotation {
366                    None => {
367                        let next = self.calculate_next_scheduled_rotation(
368                            rotation_time,
369                            rotation_timezone,
370                            interval_ns,
371                        );
372                        self.next_rotation_times.insert(path.clone(), next);
373                        false
374                    }
375                    Some(next) if now >= next => {
376                        self.next_rotation_times
377                            .insert(path.clone(), now + interval_ns);
378                        true
379                    }
380                    _ => false,
381                }
382            }
383            _ => false,
384        }
385    }
386
387    fn calculate_next_scheduled_rotation(
388        &self,
389        rotation_time: UnixNanos,
390        rotation_timezone: Tz,
391        interval_ns: u64,
392    ) -> UnixNanos {
393        use chrono::TimeZone;
394        let now_utc = self.clock.borrow().utc_now();
395        let now_tz = now_utc.with_timezone(&rotation_timezone);
396
397        let rotation_time_secs = (*rotation_time / 1_000_000_000) as u32;
398        let rotation_time_nanos = (*rotation_time % 1_000_000_000) as u32;
399        let rotation_time_naive = chrono::NaiveTime::from_num_seconds_from_midnight_opt(
400            rotation_time_secs,
401            rotation_time_nanos,
402        )
403        .unwrap_or_else(|| chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
404
405        let mut next_rotation_tz = rotation_timezone
406            .from_local_datetime(&now_tz.date_naive().and_time(rotation_time_naive))
407            .earliest()
408            .unwrap_or(now_tz);
409
410        if next_rotation_tz <= now_tz {
411            // If the time has already passed today, we would usually add the interval
412            // But let's align exactly with how Python does it:
413            while next_rotation_tz <= now_tz {
414                // Add interval_ns to next_rotation_tz
415                // Since chrono::Duration doesn't take u64 nanos directly comfortably for large values,
416                // we'll convert to seconds and nanos.
417                let secs = (interval_ns / 1_000_000_000) as i64;
418                let nanos = (interval_ns % 1_000_000_000) as u32;
419                next_rotation_tz = next_rotation_tz
420                    + chrono::Duration::seconds(secs)
421                    + chrono::Duration::nanoseconds(nanos as i64);
422            }
423        }
424
425        UnixNanos::from(
426            next_rotation_tz
427                .with_timezone(&chrono::Utc)
428                .timestamp_nanos_opt()
429                .unwrap_or(0) as u64,
430        )
431    }
432
433    /// Flushes and rotates `FileWriter` associated with `key`.
434    /// TODO: Fix error type to handle arrow error and object store error
435    async fn rotate_writer(
436        &mut self,
437        path: &FileWriterPath,
438    ) -> Result<(), Box<dyn std::error::Error>> {
439        let mut writer = self.writers.remove(path).unwrap();
440        let bytes = writer.take_buffer()?;
441        self.store.put(&path.path, bytes.into()).await?;
442        let new_path = self.regen_writer_path(path);
443        self.writers.insert(new_path, writer);
444        Ok(())
445    }
446
447    /// Creates (and inserts) a new `FileWriter` for type T.
448    fn create_writer<T>(&mut self, path: FileWriterPath, data: &T) -> Result<(), ArrowError>
449    where
450        T: EncodeToRecordBatch + CatalogPathPrefix + 'static,
451    {
452        self.create_writer_with_metadata::<T>(path, T::metadata(data))
453    }
454
455    /// Creates (and inserts) a new `FileWriter` for type T with pre-computed metadata.
456    ///
457    /// Use this variant when the caller has selected metadata from a chunk
458    /// (e.g. via `T::chunk_metadata`) to avoid schema poisoning by sentinel rows.
459    fn create_writer_with_metadata<T>(
460        &mut self,
461        path: FileWriterPath,
462        metadata: HashMap<String, String>,
463    ) -> Result<(), ArrowError>
464    where
465        T: EncodeToRecordBatch + CatalogPathPrefix + 'static,
466    {
467        let schema = if self.per_instrument_types.contains(T::path_prefix()) {
468            T::get_schema(Some(metadata))
469        } else {
470            T::get_schema(None)
471        };
472
473        let writer = FeatherBuffer::new(&schema, self.rotation_config.clone())?;
474        self.writers.insert(path, writer);
475        Ok(())
476    }
477
478    /// Creates (and inserts) a new `FeatherBuffer` for custom data at the given path.
479    fn create_custom_writer(
480        &mut self,
481        path: FileWriterPath,
482        type_name: &str,
483    ) -> Result<(), Box<dyn std::error::Error>> {
484        if self.writers.contains_key(&path) {
485            return Ok(());
486        }
487        let base_schema = get_arrow_schema(type_name).ok_or_else(|| {
488            format!("Custom data type \"{type_name}\" is not registered for Arrow encoding")
489        })?;
490        let schema = schema_with_data_type_column(base_schema.as_ref(), type_name);
491        let writer = FeatherBuffer::new(&schema, self.rotation_config.clone())
492            .map_err(|e| format!("Failed to create feather buffer for custom {type_name}: {e}"))?;
493        self.writers.insert(path, writer);
494        Ok(())
495    }
496
497    /// Encodes a single `CustomData` into a `RecordBatch` with `data_type` column (catalog-compatible).
498    fn encode_custom_to_batch(
499        custom: &CustomData,
500    ) -> Result<RecordBatch, Box<dyn std::error::Error>> {
501        let type_name = custom.data.type_name();
502        let data_type_json = custom
503            .data_type
504            .to_persistence_json()
505            .map_err(|e| format!("Failed to serialize data_type for persistence: {e}"))?;
506        let dt_meta = custom.data_type.metadata_string_map();
507        let items: [Arc<dyn CustomDataTrait>; 1] = [Arc::clone(&custom.data)];
508        let batch = encode_custom_to_arrow(type_name, &items)
509            .map_err(|e| format!("Failed to encode custom data: {e}"))?
510            .ok_or_else(|| {
511                format!("Custom data type \"{type_name}\" is not registered for Arrow")
512            })?;
513        let batch = augment_batch_with_data_type_column(
514            &batch,
515            &data_type_json,
516            type_name,
517            dt_meta.as_ref(),
518        )
519        .map_err(|e| e.to_string())?;
520        Ok(batch)
521    }
522
523    /// Flushes all active `FeatherBuffers` by writing any remaining buffered bytes to the object store.
524    ///
525    /// This is called automatically based on `flush_interval_ms` if configured, but can also
526    /// be called manually by the client.
527    ///
528    /// Note: In Rust, we use in-memory buffers. Flushing writes the current buffer to the
529    /// object store and creates a new buffer for continued writing. This is different from
530    /// Python which just flushes OS buffers.
531    pub async fn flush(&mut self) -> Result<(), Box<dyn std::error::Error>> {
532        // Collect paths and their current buffers before flushing
533        let paths_to_flush: Vec<FileWriterPath> = self.writers.keys().cloned().collect();
534
535        // Flush each writer and recreate it
536        for path in paths_to_flush {
537            if let Some(mut writer) = self.writers.remove(&path) {
538                let bytes = writer.take_buffer()?;
539                if !bytes.is_empty() {
540                    // Write to the object store
541                    self.store.put(&path.path, bytes.into()).await?;
542                }
543
544                // Recreate writer with same schema for continued writing
545                // We need the schema and type info - for now, we'll recreate on next write
546                // The writer will be recreated automatically when write() is called again
547            }
548        }
549
550        self.last_flush_ns = self.clock.borrow().timestamp_ns();
551        Ok(())
552    }
553
554    /// Closes all writers by flushing and removing them.
555    ///
556    /// After calling this, no further writes should be performed.
557    pub async fn close(&mut self) -> Result<(), Box<dyn std::error::Error>> {
558        self.flush().await?;
559        self.writers.clear();
560        Ok(())
561    }
562
563    /// Returns whether the writer has been closed (all writers cleared).
564    pub fn is_closed(&self) -> bool {
565        self.writers.is_empty()
566    }
567
568    /// Returns information about the current files being written.
569    ///
570    /// Each entry maps a writer key (type_str and optional instrument_id) to
571    /// its current buffer size and file path.
572    pub fn get_current_file_info(&self) -> HashMap<String, (u64, String)> {
573        let mut info = HashMap::new();
574
575        for (path, buffer) in &self.writers {
576            let key = match &path.instrument_id {
577                Some(id) => format!("{}:{}", path.type_str, id),
578                None => path.type_str.clone(),
579            };
580            info.insert(key, (buffer.size, path.path.to_string()));
581        }
582        info
583    }
584
585    /// Returns the next rotation time for a specific writer key, if set.
586    pub fn get_next_rotation_time(
587        &self,
588        type_str: &str,
589        instrument_id: Option<&str>,
590    ) -> Option<UnixNanos> {
591        self.next_rotation_times
592            .iter()
593            .find(|(k, _)| k.type_str == type_str && k.instrument_id.as_deref() == instrument_id)
594            .map(|(_, &v)| v)
595    }
596
597    /// Determines whether type T should be written, based on the inclusion filter.
598    fn should_write<T: CatalogPathPrefix>(&self) -> bool {
599        self.included_types.as_ref().is_none_or(|included| {
600            let path = T::path_prefix();
601            included.contains(path)
602        })
603    }
604
605    /// Returns whether the given event ID has already been seen,
606    /// adding it to the cache if new.
607    pub fn is_duplicate_event_id(&mut self, event_id: &UUID4) -> bool {
608        if self.seen_event_ids.contains(event_id) {
609            return true;
610        }
611
612        self.seen_event_ids.add(*event_id);
613
614        false
615    }
616
617    fn regen_writer_path(&self, path: &FileWriterPath) -> FileWriterPath {
618        let type_str = path.type_str.clone();
619        let instrument_id = path.instrument_id.clone();
620        let timestamp = self.clock.borrow().timestamp_ns();
621        // Note: Path removes prefixing slashes
622        let mut path = Path::from(self.base_path.clone());
623
624        if type_str.starts_with("data/custom/") {
625            // Custom data: data/custom/{type_name}/[{identifier_segments}/]{file_stem}_{ts}.feather
626            let type_name = type_str.strip_prefix("data/custom/").unwrap_or(&type_str);
627            path = path.join("data").join("custom").join(type_name.to_string());
628
629            if let Some(ref id) = instrument_id {
630                let safe = safe_directory_identifier(id);
631                if !safe.is_empty() {
632                    for segment in safe.split('/') {
633                        path = path.join(segment.to_string());
634                    }
635                }
636            }
637            let file_stem = instrument_id.as_deref().unwrap_or(type_name);
638            path = path.join(format!("{file_stem}_{timestamp}.feather"));
639        } else if let Some(ref instrument_id) = instrument_id {
640            let safe_id = urisafe_instrument_id(instrument_id);
641            path = path.join(type_str.clone());
642            path = path.join(safe_id.clone());
643            path = path.join(format!("{safe_id}_{timestamp}.feather"));
644        } else {
645            path = path.join(format!("{type_str}_{timestamp}.feather"));
646        }
647
648        FileWriterPath {
649            path,
650            type_str,
651            instrument_id,
652        }
653    }
654
655    /// Builds `FileWriterPath` for custom data using DataType identifier as folder partition (catalog layout).
656    fn get_writer_path_custom(&self, type_name: &str, identifier: Option<&str>) -> FileWriterPath {
657        let timestamp = self.clock.borrow().timestamp_ns();
658        let type_str = format!("data/custom/{type_name}");
659        let instrument_id = identifier.map(String::from);
660
661        let mut path = Path::from(self.base_path.clone());
662        path = path.join("data").join("custom").join(type_name.to_string());
663
664        if let Some(id) = &identifier {
665            let safe = safe_directory_identifier(id);
666            if !safe.is_empty() {
667                for segment in safe.split('/') {
668                    path = path.join(segment.to_string());
669                }
670            }
671        }
672        let file_stem = identifier.unwrap_or(type_name);
673        path = path.join(format!("{file_stem}_{timestamp}.feather"));
674
675        FileWriterPath {
676            path,
677            type_str,
678            instrument_id,
679        }
680    }
681
682    /// Generates a key for a `FileWriter` based on type T and optional instrument ID.
683    /// Reuses an existing writer key (same type_str and instrument_id) if present, so we
684    /// buffer multiple items in the same file until rotation; otherwise creates a new path with current timestamp.
685    fn get_writer_path<T>(&self, data: &T) -> Result<FileWriterPath, Box<dyn std::error::Error>>
686    where
687        T: EncodeToRecordBatch + CatalogPathPrefix,
688    {
689        let type_str = T::path_prefix();
690        let metadata = T::metadata(data);
691
692        let instrument_id = if self.per_instrument_types.contains(type_str)
693            || (type_str.starts_with("custom_") && metadata.contains_key(KEY_INSTRUMENT_ID))
694        {
695            Some(metadata.get(KEY_INSTRUMENT_ID).cloned().ok_or_else(|| {
696                format!("Data {type_str} expected instrument_id metadata for per instrument writer")
697            })?)
698        } else {
699            None
700        };
701
702        // Reuse existing writer for same (type_str, instrument_id) so we buffer in one file until rotation
703        if let Some(existing) = self
704            .writers
705            .keys()
706            .find(|k| k.type_str == type_str && k.instrument_id == instrument_id)
707        {
708            return Ok(existing.clone());
709        }
710
711        let timestamp = self.clock.borrow().timestamp_ns();
712        let mut path = Path::from(self.base_path.clone());
713
714        if let Some(ref instrument_id) = instrument_id {
715            let safe_id = urisafe_instrument_id(instrument_id);
716            path = path.join(type_str);
717            path = path.join(safe_id.clone());
718            path = path.join(format!("{safe_id}_{timestamp}.feather"));
719        } else {
720            path = path.join(format!("{type_str}_{timestamp}.feather"));
721        }
722
723        Ok(FileWriterPath {
724            path,
725            type_str: type_str.to_string(),
726            instrument_id,
727        })
728    }
729
730    /// Writes a Data enum value to the appropriate writer.
731    ///
732    /// This is a convenience method that routes the Data enum to the appropriate
733    /// typed write method.
734    pub async fn write_data(&mut self, data: Data) -> Result<(), Box<dyn std::error::Error>> {
735        match data {
736            Data::Quote(quote) => self.write(quote).await,
737            Data::Trade(trade) => self.write(trade).await,
738            Data::Bar(bar) => self.write(bar).await,
739            Data::Delta(delta) => self.write(delta).await,
740            Data::Depth10(depth) => self.write(*depth).await,
741            Data::IndexPriceUpdate(price) => self.write(price).await,
742            Data::MarkPriceUpdate(price) => self.write(price).await,
743            Data::InstrumentStatus(status) => self.write(status).await,
744            Data::InstrumentClose(close) => self.write(close).await,
745            Data::Custom(custom) => self.write_custom_data(&custom).await,
746            Data::Deltas(deltas_api) => {
747                // Batch write so chunk_metadata can skip a leading BookAction::Clear sentinel
748                self.write_batch(deltas_api.deltas.clone()).await
749            }
750        }
751    }
752
753    /// Writes a single custom data value (catalog layout: data/custom/{type_name}/[{identifier}/]).
754    async fn write_custom_data(
755        &mut self,
756        custom: &CustomData,
757    ) -> Result<(), Box<dyn std::error::Error>> {
758        let type_name = custom.data.type_name();
759        let identifier = custom.data_type.identifier().map(String::from);
760
761        if !self.should_write_custom(type_name) {
762            return Ok(());
763        }
764
765        let path = self.get_writer_path_custom(type_name, identifier.as_deref());
766        if !self.writers.contains_key(&path) {
767            self.create_custom_writer(path.clone(), type_name)?;
768        }
769
770        let batch = Self::encode_custom_to_batch(custom)?;
771
772        if let Some(writer) = self.writers.get_mut(&path) {
773            let should_rotate = writer.write_record_batch(&batch)?;
774            if should_rotate || self.check_scheduled_rotation(&path) {
775                self.rotate_writer(&path).await?;
776            }
777        }
778
779        self.check_flush().await?;
780        Ok(())
781    }
782
783    fn should_write_custom(&self, type_name: &str) -> bool {
784        self.included_types.as_ref().is_none_or(|included| {
785            included.contains(type_name)
786                || included.contains("custom")
787                || included.contains(&format!("custom/{type_name}"))
788        })
789    }
790
791    /// Writes an instrument to the appropriate writer.
792    ///
793    /// Instruments are written to feather files and organized by instrument ID.
794    /// This method supports writing instruments that implement `EncodeToRecordBatch` and `CatalogPathPrefix`.
795    pub async fn write_instrument(
796        &mut self,
797        instrument: InstrumentAny,
798    ) -> Result<(), Box<dyn std::error::Error>> {
799        self.write(instrument).await
800    }
801
802    /// Subscribes to all messages on the message bus (pattern "*").
803    ///
804    /// This will automatically write all supported data types that are published
805    /// on the message bus to the feather files.
806    ///
807    /// The writer must be wrapped in `Rc<RefCell<>>` to be shareable with the message bus handler.
808    ///
809    /// Note: The handler spawns async tasks to write data, so writes happen asynchronously
810    /// and won't block the message bus.
811    pub fn subscribe_to_message_bus(
812        writer: Rc<RefCell<Self>>,
813    ) -> Result<ShareableMessageHandler, Box<dyn std::error::Error>> {
814        let runtime = writer.borrow().runtime.clone();
815
816        // Create handler that downcasts messages and writes them
817        // Note: We use Handle::enter() to allow blocking in the handler context
818        // This works when the handler is called from outside an async runtime
819        let handler = ShareableMessageHandler::from_any(move |message: &dyn Any| {
820            // Enter the runtime context to allow blocking
821            let _guard = runtime.enter();
822
823            // Try to downcast to various data types and write them
824            macro_rules! try_write {
825                ($message:expr, $type:ty, $name:literal) => {
826                    if let Some(value) = $message.downcast_ref::<$type>() {
827                        let mut writer = writer.borrow_mut();
828                        if let Err(e) = runtime.block_on(writer.write(value.clone())) {
829                            log::warn!("Failed to write {}: {e}", $name);
830                        }
831                        return;
832                    }
833                };
834            }
835
836            try_write!(message, QuoteTick, "QuoteTick");
837            try_write!(message, TradeTick, "TradeTick");
838            try_write!(message, Bar, "Bar");
839            try_write!(message, OrderBookDelta, "OrderBookDelta");
840            try_write!(message, OrderBookDepth10, "OrderBookDepth10");
841            try_write!(message, IndexPriceUpdate, "IndexPriceUpdate");
842            try_write!(message, MarkPriceUpdate, "MarkPriceUpdate");
843            try_write!(message, InstrumentStatus, "InstrumentStatus");
844            try_write!(message, InstrumentClose, "InstrumentClose");
845            try_write!(message, FundingRateUpdate, "FundingRateUpdate");
846            try_write!(message, AccountState, "AccountState");
847            try_write!(message, OrderInitialized, "OrderInitialized");
848            try_write!(message, OrderDenied, "OrderDenied");
849            try_write!(message, OrderEmulated, "OrderEmulated");
850            try_write!(message, OrderSubmitted, "OrderSubmitted");
851            try_write!(message, OrderAccepted, "OrderAccepted");
852            try_write!(message, OrderRejected, "OrderRejected");
853            try_write!(message, OrderPendingCancel, "OrderPendingCancel");
854            try_write!(message, OrderCanceled, "OrderCanceled");
855            try_write!(message, OrderCancelRejected, "OrderCancelRejected");
856            try_write!(message, OrderExpired, "OrderExpired");
857            try_write!(message, OrderTriggered, "OrderTriggered");
858            try_write!(message, OrderPendingUpdate, "OrderPendingUpdate");
859            try_write!(message, OrderReleased, "OrderReleased");
860            try_write!(message, OrderModifyRejected, "OrderModifyRejected");
861            try_write!(message, OrderUpdated, "OrderUpdated");
862            try_write!(message, OrderFilled, "OrderFilled");
863            try_write!(message, PositionOpened, "PositionOpened");
864            try_write!(message, PositionChanged, "PositionChanged");
865            try_write!(message, PositionClosed, "PositionClosed");
866            try_write!(message, PositionAdjusted, "PositionAdjusted");
867            try_write!(message, OrderSnapshot, "OrderSnapshot");
868            try_write!(message, PositionSnapshot, "PositionSnapshot");
869            try_write!(message, OrderStatusReport, "OrderStatusReport");
870            try_write!(message, FillReport, "FillReport");
871            try_write!(message, PositionStatusReport, "PositionStatusReport");
872            try_write!(message, ExecutionMassStatus, "ExecutionMassStatus");
873
874            if let Some(deltas) = message.downcast_ref::<OrderBookDeltas>() {
875                // Batch write so chunk_metadata can skip a leading BookAction::Clear sentinel
876                let mut writer = writer.borrow_mut();
877                if let Err(e) = runtime.block_on(writer.write_batch(deltas.deltas.clone())) {
878                    log::warn!("Failed to write OrderBookDeltas: {e}");
879                }
880            } else if let Some(custom) = message.downcast_ref::<CustomData>() {
881                let mut writer = writer.borrow_mut();
882                if let Err(e) = runtime.block_on(writer.write_data(Data::Custom(custom.clone()))) {
883                    log::warn!("Failed to write CustomData: {e}");
884                }
885            } else if let Some(instrument) = message.downcast_ref::<InstrumentAny>() {
886                let mut writer = writer.borrow_mut();
887                if let Err(e) = runtime.block_on(writer.write_instrument(instrument.clone())) {
888                    log::warn!("Failed to write InstrumentAny: {e}");
889                }
890            }
891            // Silently ignore unsupported message types.
892        });
893
894        // Subscribe to all messages using wildcard pattern
895        subscribe_any(
896            MStr::pattern("*"),
897            handler.clone(),
898            None, // No priority
899        );
900
901        Ok(handler)
902    }
903
904    /// Unsubscribes from the message bus.
905    pub fn unsubscribe_from_message_bus(handler: &ShareableMessageHandler) {
906        unsubscribe_any(MStr::pattern("*"), handler);
907    }
908}
909
910#[cfg(test)]
911mod tests {
912    use std::{io::Cursor, sync::Arc};
913
914    use datafusion::arrow::ipc::reader::StreamReader;
915    use nautilus_common::clock::TestClock;
916    use nautilus_model::{
917        data::{Data, OrderBookDeltas_API, QuoteTick, TradeTick},
918        enums::AggressorSide,
919        identifiers::{InstrumentId, TradeId},
920        types::{Price, Quantity},
921    };
922    use nautilus_serialization::arrow::{
923        ArrowSchemaProvider, DecodeDataFromRecordBatch, EncodeToRecordBatch,
924    };
925    use object_store::{ObjectStore, local::LocalFileSystem};
926    use rstest::rstest;
927    use tempfile::TempDir;
928
929    use super::*;
930
931    #[tokio::test]
932    async fn test_writer_manager_keys() {
933        // Create a temporary directory for base path
934        let temp_dir = TempDir::new().unwrap();
935        let base_path = temp_dir.path().to_str().unwrap().to_string();
936
937        // Create a LocalFileSystem based object store using the temp directory
938        let local_fs = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
939        let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
940
941        // Create a test clock
942        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
943        let timestamp = clock.borrow().timestamp_ns();
944
945        let quote_type_str = QuoteTick::path_prefix();
946
947        let mut per_instrument = HashSet::new();
948        per_instrument.insert(quote_type_str.to_string());
949
950        let mut manager = FeatherWriter::new(
951            base_path.clone(),
952            store,
953            clock,
954            RotationConfig::NoRotation,
955            None,
956            Some(per_instrument),
957            None, // flush_interval_ms
958        );
959
960        let instrument_id = "AAPL.AAPL";
961        // Write a dummy value
962        let quote = QuoteTick::new(
963            InstrumentId::from(instrument_id),
964            Price::from("100.0"),
965            Price::from("100.0"),
966            Quantity::from("100.0"),
967            Quantity::from("100.0"),
968            UnixNanos::from(1000000000000000000),
969            UnixNanos::from(1000000000000000000),
970        );
971
972        let trade = TradeTick::new(
973            InstrumentId::from(instrument_id),
974            Price::from("100.0"),
975            Quantity::from("100.0"),
976            AggressorSide::Buyer,
977            TradeId::from("1"),
978            UnixNanos::from(1000000000000000000),
979            UnixNanos::from(1000000000000000000),
980        );
981
982        manager.write(quote).await.unwrap();
983        manager.write(trade).await.unwrap();
984
985        // Check keys and paths for quotes and trades
986        let path = manager.get_writer_path(&quote).unwrap();
987        let safe_id = instrument_id.replace('/', "");
988        let expected_path = Path::from(format!(
989            "{base_path}/quotes/{safe_id}/{safe_id}_{timestamp}.feather"
990        ));
991        assert_eq!(path.path, expected_path);
992        assert!(manager.writers.contains_key(&path));
993        let writer = manager.writers.get(&path).unwrap();
994        assert!(writer.size > 0);
995
996        let path = manager.get_writer_path(&trade).unwrap();
997        let expected_path = Path::from(format!("{base_path}/trades_{timestamp}.feather"));
998        assert_eq!(path.path, expected_path);
999        assert!(manager.writers.contains_key(&path));
1000        let writer = manager.writers.get(&path).unwrap();
1001        assert!(writer.size > 0);
1002    }
1003
1004    #[rstest]
1005    fn test_file_writer_round_trip() {
1006        let instrument_id = "AAPL.AAPL";
1007        // Write a dummy value.
1008        let quote = QuoteTick::new(
1009            InstrumentId::from(instrument_id),
1010            Price::from("100.0"),
1011            Price::from("100.0"),
1012            Quantity::from("100.0"),
1013            Quantity::from("100.0"),
1014            UnixNanos::from(100),
1015            UnixNanos::from(100),
1016        );
1017        let metadata = QuoteTick::metadata(&quote);
1018        let schema = QuoteTick::get_schema(Some(metadata.clone()));
1019        let batch = QuoteTick::encode_batch(&QuoteTick::metadata(&quote), &[quote]).unwrap();
1020
1021        let mut writer = FeatherBuffer::new(&schema, RotationConfig::NoRotation).unwrap();
1022        writer.write_record_batch(&batch).unwrap();
1023
1024        let buffer = writer.take_buffer().unwrap();
1025        let mut reader = StreamReader::try_new(Cursor::new(buffer.as_slice()), None).unwrap();
1026
1027        let read_metadata = reader.schema().metadata().clone();
1028        assert_eq!(read_metadata, metadata);
1029
1030        let read_batch = reader.next().unwrap().unwrap();
1031        assert_eq!(read_batch.column(0), batch.column(0));
1032
1033        let decoded = QuoteTick::decode_data_batch(&metadata, batch).unwrap();
1034        assert_eq!(decoded[0], Data::from(quote));
1035    }
1036
1037    #[tokio::test]
1038    async fn test_round_trip() {
1039        // Create a temporary directory for base path
1040        let temp_dir = TempDir::new_in(".").unwrap();
1041        let base_path = temp_dir.path().to_str().unwrap().to_string();
1042
1043        // Create a LocalFileSystem based object store using the temp directory
1044        let local_fs = LocalFileSystem::new_with_prefix(&base_path).unwrap();
1045        let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
1046
1047        // Create a test clock
1048        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1049
1050        let quote_type_str = QuoteTick::path_prefix();
1051        let trade_type_str = TradeTick::path_prefix();
1052
1053        let mut per_instrument = HashSet::new();
1054        per_instrument.insert(quote_type_str.to_string());
1055        per_instrument.insert(trade_type_str.to_string());
1056
1057        let mut manager = FeatherWriter::new(
1058            base_path.clone(),
1059            store,
1060            clock,
1061            RotationConfig::NoRotation,
1062            None,
1063            Some(per_instrument),
1064            None, // flush_interval_ms
1065        );
1066
1067        let instrument_id = "AAPL.AAPL";
1068        // Write a dummy value.
1069        let quote = QuoteTick::new(
1070            InstrumentId::from(instrument_id),
1071            Price::from("100.0"),
1072            Price::from("100.0"),
1073            Quantity::from("100.0"),
1074            Quantity::from("100.0"),
1075            UnixNanos::from(100),
1076            UnixNanos::from(100),
1077        );
1078
1079        let trade = TradeTick::new(
1080            InstrumentId::from(instrument_id),
1081            Price::from("100.0"),
1082            Quantity::from("100.0"),
1083            AggressorSide::Buyer,
1084            TradeId::from("1"),
1085            UnixNanos::from(100),
1086            UnixNanos::from(100),
1087        );
1088
1089        manager.write(quote).await.unwrap();
1090        manager.write(trade).await.unwrap();
1091
1092        let paths = manager.writers.keys().cloned().collect::<Vec<_>>();
1093        assert_eq!(paths.len(), 2);
1094
1095        // Flush data
1096        manager.flush().await.unwrap();
1097
1098        // Read files from the temporary directory
1099        let mut recovered_quotes = Vec::new();
1100        let mut recovered_trades = Vec::new();
1101        let local_fs = LocalFileSystem::new_with_prefix(&base_path).unwrap();
1102        for path in paths {
1103            let path_str = local_fs.path_to_filesystem(&path.path).unwrap();
1104            let buffer = std::fs::File::open(&path_str).unwrap();
1105            let reader = StreamReader::try_new(buffer, None).unwrap();
1106            let metadata = reader.schema().metadata().clone();
1107            for batch in reader {
1108                let batch = batch.unwrap();
1109                if path_str.to_str().unwrap().contains("quotes") {
1110                    let decoded = QuoteTick::decode_data_batch(&metadata, batch).unwrap();
1111                    recovered_quotes.extend(decoded);
1112                } else if path_str.to_str().unwrap().contains("trades") {
1113                    let decoded = TradeTick::decode_data_batch(&metadata, batch).unwrap();
1114                    recovered_trades.extend(decoded);
1115                }
1116            }
1117        }
1118
1119        // Assert that the recovered data matches the written data
1120        assert_eq!(recovered_quotes.len(), 1, "Expected one QuoteTick record");
1121        assert_eq!(recovered_trades.len(), 1, "Expected one TradeTick record");
1122
1123        // Check key fields to ensure the data round-tripped correctly
1124        assert_eq!(recovered_quotes[0], Data::from(quote));
1125        assert_eq!(recovered_trades[0], Data::from(trade));
1126    }
1127
1128    #[tokio::test]
1129    async fn test_write_data_enum() {
1130        let temp_dir = TempDir::new().unwrap();
1131        let base_path = temp_dir.path().to_str().unwrap().to_string();
1132        let local_fs = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1133        let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
1134        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1135
1136        let mut writer = FeatherWriter::new(
1137            base_path,
1138            store,
1139            clock,
1140            RotationConfig::NoRotation,
1141            None,
1142            None,
1143            None,
1144        );
1145
1146        let quote = QuoteTick::new(
1147            InstrumentId::from("AUD/USD.SIM"),
1148            Price::from("1.0"),
1149            Price::from("1.0"),
1150            Quantity::from("1000"),
1151            Quantity::from("1000"),
1152            UnixNanos::from(1000),
1153            UnixNanos::from(1000),
1154        );
1155
1156        // Test writing via write_data
1157        writer.write_data(Data::Quote(quote)).await.unwrap();
1158        writer.flush().await.unwrap();
1159
1160        // Verify file was created
1161        assert!(!writer.writers.is_empty() || temp_dir.path().read_dir().unwrap().count() > 0);
1162    }
1163
1164    #[tokio::test]
1165    async fn test_write_data_all_types() {
1166        let temp_dir = TempDir::new().unwrap();
1167        let base_path = temp_dir.path().to_str().unwrap().to_string();
1168        let local_fs = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1169        let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
1170        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1171
1172        let mut writer = FeatherWriter::new(
1173            base_path,
1174            store,
1175            clock,
1176            RotationConfig::NoRotation,
1177            None,
1178            None,
1179            None,
1180        );
1181
1182        let instrument_id = InstrumentId::from("AUD/USD.SIM");
1183
1184        // Test all data types
1185        let quote = QuoteTick::new(
1186            instrument_id,
1187            Price::from("1.0"),
1188            Price::from("1.0"),
1189            Quantity::from("1000"),
1190            Quantity::from("1000"),
1191            UnixNanos::from(1000),
1192            UnixNanos::from(1000),
1193        );
1194        writer.write_data(Data::Quote(quote)).await.unwrap();
1195
1196        let trade = TradeTick::new(
1197            instrument_id,
1198            Price::from("1.0"),
1199            Quantity::from("1000"),
1200            AggressorSide::Buyer,
1201            TradeId::from("1"),
1202            UnixNanos::from(2000),
1203            UnixNanos::from(2000),
1204        );
1205        writer.write_data(Data::Trade(trade)).await.unwrap();
1206
1207        let delta = OrderBookDelta::clear(
1208            instrument_id,
1209            0,
1210            UnixNanos::from(3000),
1211            UnixNanos::from(3000),
1212        );
1213        writer.write_data(Data::Delta(delta)).await.unwrap();
1214
1215        writer.flush().await.unwrap();
1216    }
1217
1218    #[tokio::test]
1219    async fn test_auto_flush() {
1220        let temp_dir = TempDir::new().unwrap();
1221        let base_path = temp_dir.path().to_str().unwrap().to_string();
1222        let local_fs = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1223        let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
1224        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1225
1226        let mut writer = FeatherWriter::new(
1227            base_path,
1228            store,
1229            clock.clone(),
1230            RotationConfig::NoRotation,
1231            None,
1232            None,
1233            Some(100), // 100ms flush interval
1234        );
1235
1236        let quote = QuoteTick::new(
1237            InstrumentId::from("AUD/USD.SIM"),
1238            Price::from("1.0"),
1239            Price::from("1.0"),
1240            Quantity::from("1000"),
1241            Quantity::from("1000"),
1242            UnixNanos::from(1000),
1243            UnixNanos::from(1000),
1244        );
1245
1246        // Write first quote
1247        writer.write(quote).await.unwrap();
1248
1249        // Note: TestClock doesn't have set_time_ns, so we can't easily test auto-flush
1250        // with time advancement. Instead, we test that check_flush is called during write.
1251        // For a proper test, we'd need a mock clock or use LiveClock with time advancement.
1252
1253        // Write second quote - check_flush will be called but won't flush if time hasn't advanced
1254        let quote2 = QuoteTick::new(
1255            InstrumentId::from("AUD/USD.SIM"),
1256            Price::from("1.1"),
1257            Price::from("1.1"),
1258            Quantity::from("1000"),
1259            Quantity::from("1000"),
1260            UnixNanos::from(2000),
1261            UnixNanos::from(2000),
1262        );
1263        writer.write(quote2).await.unwrap();
1264
1265        // Verify that writes succeeded (check_flush was called, even if it didn't flush)
1266        // The flush_interval_ms is set, so check_flush runs but won't flush without time advancement
1267    }
1268
1269    #[tokio::test]
1270    async fn test_close() {
1271        let temp_dir = TempDir::new().unwrap();
1272        let base_path = temp_dir.path().to_str().unwrap().to_string();
1273        let local_fs = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1274        let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
1275        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1276
1277        let mut writer = FeatherWriter::new(
1278            base_path,
1279            store,
1280            clock,
1281            RotationConfig::NoRotation,
1282            None,
1283            None,
1284            None,
1285        );
1286
1287        let quote = QuoteTick::new(
1288            InstrumentId::from("AUD/USD.SIM"),
1289            Price::from("1.0"),
1290            Price::from("1.0"),
1291            Quantity::from("1000"),
1292            Quantity::from("1000"),
1293            UnixNanos::from(1000),
1294            UnixNanos::from(1000),
1295        );
1296
1297        writer.write(quote).await.unwrap();
1298        assert!(!writer.writers.is_empty());
1299
1300        writer.close().await.unwrap();
1301        assert!(writer.writers.is_empty());
1302    }
1303
1304    // Note: Message bus subscription test is skipped due to async/sync boundary complexity.
1305    // The handler uses block_on which can't be used from within an async runtime.
1306    // This functionality is better tested via Python integration tests where the message bus
1307    // is used in a non-async context or via proper async task spawning.
1308
1309    #[tokio::test]
1310    async fn test_write_data_orderbook_deltas() {
1311        let temp_dir = TempDir::new().unwrap();
1312        let base_path = temp_dir.path().to_str().unwrap().to_string();
1313        let local_fs = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1314        let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
1315        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1316
1317        let mut writer = FeatherWriter::new(
1318            base_path,
1319            store,
1320            clock,
1321            RotationConfig::NoRotation,
1322            None,
1323            None,
1324            None,
1325        );
1326
1327        let instrument_id = InstrumentId::from("AUD/USD.SIM");
1328        let delta1 = OrderBookDelta::clear(
1329            instrument_id,
1330            0,
1331            UnixNanos::from(1000),
1332            UnixNanos::from(1000),
1333        );
1334        let delta2 = OrderBookDelta::clear(
1335            instrument_id,
1336            0,
1337            UnixNanos::from(2000),
1338            UnixNanos::from(2000),
1339        );
1340
1341        let deltas = OrderBookDeltas::new(instrument_id, vec![delta1, delta2]);
1342        let deltas_api = OrderBookDeltas_API::new(deltas);
1343
1344        // Test writing OrderBookDeltas via write_data
1345        writer.write_data(Data::Deltas(deltas_api)).await.unwrap();
1346        writer.flush().await.unwrap();
1347    }
1348
1349    #[tokio::test]
1350    #[cfg(feature = "python")]
1351    async fn test_write_custom_data_round_trip() {
1352        use std::sync::Arc;
1353
1354        use futures::StreamExt;
1355        use nautilus_model::{
1356            data::{CustomData, Data, DataType},
1357            identifiers::InstrumentId,
1358        };
1359        use nautilus_serialization::{
1360            arrow::custom::CustomDataDecoder, ensure_custom_data_registered,
1361        };
1362
1363        use crate::test_data::RustTestCustomData;
1364
1365        ensure_custom_data_registered::<RustTestCustomData>();
1366
1367        let temp_dir = TempDir::new().unwrap();
1368        let base_path = temp_dir.path().to_str().unwrap().to_string();
1369        let local_fs = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1370        let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
1371        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1372
1373        let mut writer = FeatherWriter::new(
1374            base_path.clone(),
1375            store.clone(),
1376            clock,
1377            RotationConfig::NoRotation,
1378            None,
1379            None,
1380            None,
1381        );
1382
1383        let instrument_id = InstrumentId::from("RUST.TEST");
1384        let data_type = DataType::new("RustTestCustomData", None, Some(instrument_id.to_string()));
1385        let original = RustTestCustomData {
1386            instrument_id,
1387            value: 1.23,
1388            flag: true,
1389            ts_event: UnixNanos::from(1000),
1390            ts_init: UnixNanos::from(1000),
1391        };
1392        let custom = CustomData::new(Arc::new(original.clone()), data_type);
1393
1394        writer
1395            .write_data(Data::Custom(custom))
1396            .await
1397            .expect("write_data CustomData");
1398        writer.flush().await.expect("flush");
1399
1400        let prefix = Path::from(format!("{base_path}/data/custom/RustTestCustomData"));
1401        let mut list_stream = store.list(Some(&prefix));
1402        let first = list_stream.next().await.expect("at least one object");
1403        let meta = first.expect("list item");
1404        let bytes = store
1405            .get(&meta.location)
1406            .await
1407            .expect("get")
1408            .bytes()
1409            .await
1410            .expect("bytes");
1411        let mut reader =
1412            StreamReader::try_new(Cursor::new(bytes.as_ref()), None).expect("StreamReader");
1413        let schema = reader.schema();
1414        let metadata: std::collections::HashMap<String, String> = schema
1415            .metadata()
1416            .iter()
1417            .map(|(k, v)| (k.clone(), v.clone()))
1418            .collect();
1419        let batch = reader.next().expect("batch").expect("batch ok");
1420        let decoded =
1421            CustomDataDecoder::decode_data_batch(&metadata, batch).expect("decode_data_batch");
1422        assert_eq!(decoded.len(), 1);
1423        if let Data::Custom(decoded_custom) = &decoded[0] {
1424            assert_eq!(decoded_custom.data_type.type_name(), "RustTestCustomData");
1425            let rust: &RustTestCustomData = decoded_custom
1426                .data
1427                .as_any()
1428                .downcast_ref::<RustTestCustomData>()
1429                .expect("RustTestCustomData");
1430            assert_eq!(rust, &original);
1431        } else {
1432            panic!("Expected Data::Custom");
1433        }
1434    }
1435}