1use std::{
17 any::Any,
18 cell::RefCell,
19 collections::{HashMap, HashSet},
20 rc::Rc,
21 sync::Arc,
22};
23
24use ahash::AHashMap;
25use chrono_tz::Tz;
26use datafusion::arrow::{
27 datatypes::Schema, error::ArrowError, ipc::writer::StreamWriter, record_batch::RecordBatch,
28};
29use nautilus_common::{
30 cache::fifo::FifoCache,
31 clock::Clock,
32 msgbus::{mstr::MStr, subscribe_any, typed_handler::ShareableMessageHandler, unsubscribe_any},
33};
34use nautilus_core::{UUID4, UnixNanos};
35use nautilus_model::{
36 data::{
37 Bar, CatalogPathPrefix, CustomData, CustomDataTrait, Data, FundingRateUpdate,
38 IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas,
39 OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose, encode_custom_to_arrow,
40 get_arrow_schema,
41 },
42 events::{
43 AccountState, OrderAccepted, OrderCancelRejected, OrderCanceled, OrderDenied,
44 OrderEmulated, OrderExpired, OrderFilled, OrderInitialized, OrderModifyRejected,
45 OrderPendingCancel, OrderPendingUpdate, OrderRejected, OrderReleased, OrderSnapshot,
46 OrderSubmitted, OrderTriggered, OrderUpdated, PositionAdjusted, PositionChanged,
47 PositionClosed, PositionOpened, PositionSnapshot,
48 },
49 instruments::InstrumentAny,
50 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
51};
52use nautilus_serialization::arrow::{EncodeToRecordBatch, KEY_INSTRUMENT_ID};
53use object_store::{ObjectStore, ObjectStoreExt, path::Path};
54
55use super::catalog::urisafe_instrument_id;
56use crate::backend::{
57 catalog::safe_directory_identifier,
58 custom::{augment_batch_with_data_type_column, schema_with_data_type_column},
59};
60
61#[derive(Debug, Default, PartialEq, PartialOrd, Hash, Eq, Clone)]
62pub struct FileWriterPath {
63 path: Path,
64 type_str: String,
65 instrument_id: Option<String>,
66}
67
68pub struct FeatherBuffer {
72 writer: StreamWriter<Vec<u8>>,
74 size: u64,
76 schema: Schema,
80 max_buffer_size: u64,
82 rotation_config: RotationConfig,
84}
85
86impl FeatherBuffer {
87 pub fn new(schema: &Schema, rotation_config: RotationConfig) -> Result<Self, ArrowError> {
89 let writer = StreamWriter::try_new(Vec::new(), schema)?;
90 let mut max_buffer_size = 1_000_000_000_000; if let RotationConfig::Size { max_size } = &rotation_config {
93 max_buffer_size = *max_size;
94 }
95
96 Ok(Self {
97 writer,
98 size: 0,
99 max_buffer_size,
101 schema: schema.clone(),
102 rotation_config,
103 })
104 }
105
106 pub fn write_record_batch(&mut self, batch: &RecordBatch) -> Result<bool, ArrowError> {
110 self.writer.write(batch)?;
111 self.size += batch.get_array_memory_size() as u64;
112 Ok(self.size >= self.max_buffer_size)
113 }
114
115 pub fn take_buffer(&mut self) -> Result<Vec<u8>, ArrowError> {
117 let mut writer = StreamWriter::try_new(Vec::new(), &self.schema)?;
118 std::mem::swap(&mut self.writer, &mut writer);
119 let buffer = writer.into_inner()?;
120 self.size = 0;
122 Ok(buffer)
123 }
124
125 #[must_use]
127 pub const fn should_rotate(&self) -> bool {
128 match &self.rotation_config {
129 RotationConfig::Size { max_size } => self.size >= *max_size,
130 _ => false,
131 }
132 }
133}
134
135#[derive(Debug, Clone)]
137pub enum RotationConfig {
138 Size {
140 max_size: u64,
142 },
143 Interval {
145 interval_ns: u64,
147 },
148 ScheduledDates {
150 interval_ns: u64,
152 rotation_time: UnixNanos,
154 rotation_timezone: Tz,
156 },
157 NoRotation,
159}
160
161pub struct FeatherWriter {
168 base_path: String,
170 store: Arc<dyn ObjectStore>,
172 clock: Rc<RefCell<dyn Clock>>,
174 rotation_config: RotationConfig,
176 included_types: Option<HashSet<String>>,
178 per_instrument_types: HashSet<String>,
180 writers: HashMap<FileWriterPath, FeatherBuffer>,
182 next_rotation_times: HashMap<FileWriterPath, UnixNanos>,
184 runtime: tokio::runtime::Handle,
186 flush_interval_ms: u64,
188 last_flush_ns: UnixNanos,
190 seen_event_ids: Box<FifoCache<UUID4, 10_000>>,
192}
193
194impl FeatherWriter {
195 pub fn new(
197 base_path: String,
198 store: Arc<dyn ObjectStore>,
199 clock: Rc<RefCell<dyn Clock>>,
200 rotation_config: RotationConfig,
201 included_types: Option<HashSet<String>>,
202 per_instrument_types: Option<HashSet<String>>,
203 flush_interval_ms: Option<u64>,
204 ) -> Self {
205 let runtime = nautilus_common::live::get_runtime().handle().clone();
207 let flush_interval_ms = flush_interval_ms.unwrap_or(1000); let last_flush_ns = clock.borrow().timestamp_ns();
209
210 Self {
211 base_path,
212 store,
213 clock,
214 rotation_config,
215 included_types,
216 per_instrument_types: per_instrument_types.unwrap_or_default(),
217 writers: HashMap::new(),
218 next_rotation_times: HashMap::new(),
219 runtime,
220 flush_interval_ms,
221 last_flush_ns,
222 seen_event_ids: Box::new(FifoCache::new()),
223 }
224 }
225
226 pub async fn write<T>(&mut self, data: T) -> Result<(), Box<dyn std::error::Error>>
231 where
232 T: EncodeToRecordBatch + CatalogPathPrefix + 'static,
233 {
234 if !self.should_write::<T>() {
235 return Ok(());
236 }
237
238 let path = self.get_writer_path(&data)?;
239
240 if !self.writers.contains_key(&path) {
242 self.create_writer::<T>(path.clone(), &data)?;
243 }
244
245 let batch = T::encode_batch(&T::metadata(&data), &[data])?;
247
248 if let Some(writer) = self.writers.get_mut(&path) {
250 let should_rotate = writer.write_record_batch(&batch)?;
251 if should_rotate || self.check_scheduled_rotation(&path) {
252 self.rotate_writer(&path).await?;
253 }
254 }
255
256 self.check_flush().await?;
258
259 Ok(())
260 }
261
262 pub async fn write_batch<T>(&mut self, data: Vec<T>) -> Result<(), Box<dyn std::error::Error>>
272 where
273 T: EncodeToRecordBatch + CatalogPathPrefix + 'static,
274 {
275 if data.is_empty() || !self.should_write::<T>() {
276 return Ok(());
277 }
278
279 let type_str = T::path_prefix();
283 let needs_instrument =
284 self.per_instrument_types.contains(type_str) || type_str.starts_with("custom_");
285
286 let mut groups: AHashMap<Option<String>, Vec<T>> = AHashMap::new();
287
288 for item in data {
289 let instrument_id = if needs_instrument {
290 T::metadata(&item).get(KEY_INSTRUMENT_ID).cloned()
291 } else {
292 None
293 };
294 groups.entry(instrument_id).or_default().push(item);
295 }
296
297 for group in groups.into_values() {
298 let path = self.get_writer_path(&group[0])?;
299 let metadata = T::chunk_metadata(&group);
300
301 if !self.writers.contains_key(&path) {
302 self.create_writer_with_metadata::<T>(path.clone(), metadata.clone())?;
303 }
304
305 let batch = T::encode_batch(&metadata, &group)?;
306
307 if let Some(writer) = self.writers.get_mut(&path) {
308 let should_rotate = writer.write_record_batch(&batch)?;
309 if should_rotate || self.check_scheduled_rotation(&path) {
310 self.rotate_writer(&path).await?;
311 }
312 }
313 }
314
315 self.check_flush().await?;
316
317 Ok(())
318 }
319
320 async fn check_flush(&mut self) -> Result<(), Box<dyn std::error::Error>> {
322 if self.flush_interval_ms == 0 {
323 return Ok(()); }
325
326 let now_ns = self.clock.borrow().timestamp_ns();
327 let elapsed_ms = (now_ns.as_u64() - self.last_flush_ns.as_u64()) / 1_000_000;
328
329 if elapsed_ms >= self.flush_interval_ms {
330 self.flush().await?;
331 self.last_flush_ns = now_ns;
332 }
333
334 Ok(())
335 }
336
337 fn check_scheduled_rotation(&mut self, path: &FileWriterPath) -> bool {
338 match self.rotation_config {
339 RotationConfig::Interval { interval_ns } => {
340 let now = self.clock.borrow().timestamp_ns();
341 let next_rotation = self.next_rotation_times.get(path).copied();
342
343 match next_rotation {
344 None => {
345 self.next_rotation_times
346 .insert(path.clone(), now + interval_ns);
347 false
348 }
349 Some(next) if now >= next => {
350 self.next_rotation_times
351 .insert(path.clone(), now + interval_ns);
352 true
353 }
354 _ => false,
355 }
356 }
357 RotationConfig::ScheduledDates {
358 interval_ns,
359 rotation_time,
360 rotation_timezone,
361 } => {
362 let now = self.clock.borrow().timestamp_ns();
363 let next_rotation = self.next_rotation_times.get(path).copied();
364
365 match next_rotation {
366 None => {
367 let next = self.calculate_next_scheduled_rotation(
368 rotation_time,
369 rotation_timezone,
370 interval_ns,
371 );
372 self.next_rotation_times.insert(path.clone(), next);
373 false
374 }
375 Some(next) if now >= next => {
376 self.next_rotation_times
377 .insert(path.clone(), now + interval_ns);
378 true
379 }
380 _ => false,
381 }
382 }
383 _ => false,
384 }
385 }
386
387 fn calculate_next_scheduled_rotation(
388 &self,
389 rotation_time: UnixNanos,
390 rotation_timezone: Tz,
391 interval_ns: u64,
392 ) -> UnixNanos {
393 use chrono::TimeZone;
394 let now_utc = self.clock.borrow().utc_now();
395 let now_tz = now_utc.with_timezone(&rotation_timezone);
396
397 let rotation_time_secs = (*rotation_time / 1_000_000_000) as u32;
398 let rotation_time_nanos = (*rotation_time % 1_000_000_000) as u32;
399 let rotation_time_naive = chrono::NaiveTime::from_num_seconds_from_midnight_opt(
400 rotation_time_secs,
401 rotation_time_nanos,
402 )
403 .unwrap_or_else(|| chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
404
405 let mut next_rotation_tz = rotation_timezone
406 .from_local_datetime(&now_tz.date_naive().and_time(rotation_time_naive))
407 .earliest()
408 .unwrap_or(now_tz);
409
410 if next_rotation_tz <= now_tz {
411 while next_rotation_tz <= now_tz {
414 let secs = (interval_ns / 1_000_000_000) as i64;
418 let nanos = (interval_ns % 1_000_000_000) as u32;
419 next_rotation_tz = next_rotation_tz
420 + chrono::Duration::seconds(secs)
421 + chrono::Duration::nanoseconds(nanos as i64);
422 }
423 }
424
425 UnixNanos::from(
426 next_rotation_tz
427 .with_timezone(&chrono::Utc)
428 .timestamp_nanos_opt()
429 .unwrap_or(0) as u64,
430 )
431 }
432
433 async fn rotate_writer(
436 &mut self,
437 path: &FileWriterPath,
438 ) -> Result<(), Box<dyn std::error::Error>> {
439 let mut writer = self.writers.remove(path).unwrap();
440 let bytes = writer.take_buffer()?;
441 self.store.put(&path.path, bytes.into()).await?;
442 let new_path = self.regen_writer_path(path);
443 self.writers.insert(new_path, writer);
444 Ok(())
445 }
446
447 fn create_writer<T>(&mut self, path: FileWriterPath, data: &T) -> Result<(), ArrowError>
449 where
450 T: EncodeToRecordBatch + CatalogPathPrefix + 'static,
451 {
452 self.create_writer_with_metadata::<T>(path, T::metadata(data))
453 }
454
455 fn create_writer_with_metadata<T>(
460 &mut self,
461 path: FileWriterPath,
462 metadata: HashMap<String, String>,
463 ) -> Result<(), ArrowError>
464 where
465 T: EncodeToRecordBatch + CatalogPathPrefix + 'static,
466 {
467 let schema = if self.per_instrument_types.contains(T::path_prefix()) {
468 T::get_schema(Some(metadata))
469 } else {
470 T::get_schema(None)
471 };
472
473 let writer = FeatherBuffer::new(&schema, self.rotation_config.clone())?;
474 self.writers.insert(path, writer);
475 Ok(())
476 }
477
478 fn create_custom_writer(
480 &mut self,
481 path: FileWriterPath,
482 type_name: &str,
483 ) -> Result<(), Box<dyn std::error::Error>> {
484 if self.writers.contains_key(&path) {
485 return Ok(());
486 }
487 let base_schema = get_arrow_schema(type_name).ok_or_else(|| {
488 format!("Custom data type \"{type_name}\" is not registered for Arrow encoding")
489 })?;
490 let schema = schema_with_data_type_column(base_schema.as_ref(), type_name);
491 let writer = FeatherBuffer::new(&schema, self.rotation_config.clone())
492 .map_err(|e| format!("Failed to create feather buffer for custom {type_name}: {e}"))?;
493 self.writers.insert(path, writer);
494 Ok(())
495 }
496
497 fn encode_custom_to_batch(
499 custom: &CustomData,
500 ) -> Result<RecordBatch, Box<dyn std::error::Error>> {
501 let type_name = custom.data.type_name();
502 let data_type_json = custom
503 .data_type
504 .to_persistence_json()
505 .map_err(|e| format!("Failed to serialize data_type for persistence: {e}"))?;
506 let dt_meta = custom.data_type.metadata_string_map();
507 let items: [Arc<dyn CustomDataTrait>; 1] = [Arc::clone(&custom.data)];
508 let batch = encode_custom_to_arrow(type_name, &items)
509 .map_err(|e| format!("Failed to encode custom data: {e}"))?
510 .ok_or_else(|| {
511 format!("Custom data type \"{type_name}\" is not registered for Arrow")
512 })?;
513 let batch = augment_batch_with_data_type_column(
514 &batch,
515 &data_type_json,
516 type_name,
517 dt_meta.as_ref(),
518 )
519 .map_err(|e| e.to_string())?;
520 Ok(batch)
521 }
522
523 pub async fn flush(&mut self) -> Result<(), Box<dyn std::error::Error>> {
532 let paths_to_flush: Vec<FileWriterPath> = self.writers.keys().cloned().collect();
534
535 for path in paths_to_flush {
537 if let Some(mut writer) = self.writers.remove(&path) {
538 let bytes = writer.take_buffer()?;
539 if !bytes.is_empty() {
540 self.store.put(&path.path, bytes.into()).await?;
542 }
543
544 }
548 }
549
550 self.last_flush_ns = self.clock.borrow().timestamp_ns();
551 Ok(())
552 }
553
554 pub async fn close(&mut self) -> Result<(), Box<dyn std::error::Error>> {
558 self.flush().await?;
559 self.writers.clear();
560 Ok(())
561 }
562
563 pub fn is_closed(&self) -> bool {
565 self.writers.is_empty()
566 }
567
568 pub fn get_current_file_info(&self) -> HashMap<String, (u64, String)> {
573 let mut info = HashMap::new();
574
575 for (path, buffer) in &self.writers {
576 let key = match &path.instrument_id {
577 Some(id) => format!("{}:{}", path.type_str, id),
578 None => path.type_str.clone(),
579 };
580 info.insert(key, (buffer.size, path.path.to_string()));
581 }
582 info
583 }
584
585 pub fn get_next_rotation_time(
587 &self,
588 type_str: &str,
589 instrument_id: Option<&str>,
590 ) -> Option<UnixNanos> {
591 self.next_rotation_times
592 .iter()
593 .find(|(k, _)| k.type_str == type_str && k.instrument_id.as_deref() == instrument_id)
594 .map(|(_, &v)| v)
595 }
596
597 fn should_write<T: CatalogPathPrefix>(&self) -> bool {
599 self.included_types.as_ref().is_none_or(|included| {
600 let path = T::path_prefix();
601 included.contains(path)
602 })
603 }
604
605 pub fn is_duplicate_event_id(&mut self, event_id: &UUID4) -> bool {
608 if self.seen_event_ids.contains(event_id) {
609 return true;
610 }
611
612 self.seen_event_ids.add(*event_id);
613
614 false
615 }
616
617 fn regen_writer_path(&self, path: &FileWriterPath) -> FileWriterPath {
618 let type_str = path.type_str.clone();
619 let instrument_id = path.instrument_id.clone();
620 let timestamp = self.clock.borrow().timestamp_ns();
621 let mut path = Path::from(self.base_path.clone());
623
624 if type_str.starts_with("data/custom/") {
625 let type_name = type_str.strip_prefix("data/custom/").unwrap_or(&type_str);
627 path = path.join("data").join("custom").join(type_name.to_string());
628
629 if let Some(ref id) = instrument_id {
630 let safe = safe_directory_identifier(id);
631 if !safe.is_empty() {
632 for segment in safe.split('/') {
633 path = path.join(segment.to_string());
634 }
635 }
636 }
637 let file_stem = instrument_id.as_deref().unwrap_or(type_name);
638 path = path.join(format!("{file_stem}_{timestamp}.feather"));
639 } else if let Some(ref instrument_id) = instrument_id {
640 let safe_id = urisafe_instrument_id(instrument_id);
641 path = path.join(type_str.clone());
642 path = path.join(safe_id.clone());
643 path = path.join(format!("{safe_id}_{timestamp}.feather"));
644 } else {
645 path = path.join(format!("{type_str}_{timestamp}.feather"));
646 }
647
648 FileWriterPath {
649 path,
650 type_str,
651 instrument_id,
652 }
653 }
654
655 fn get_writer_path_custom(&self, type_name: &str, identifier: Option<&str>) -> FileWriterPath {
657 let timestamp = self.clock.borrow().timestamp_ns();
658 let type_str = format!("data/custom/{type_name}");
659 let instrument_id = identifier.map(String::from);
660
661 let mut path = Path::from(self.base_path.clone());
662 path = path.join("data").join("custom").join(type_name.to_string());
663
664 if let Some(id) = &identifier {
665 let safe = safe_directory_identifier(id);
666 if !safe.is_empty() {
667 for segment in safe.split('/') {
668 path = path.join(segment.to_string());
669 }
670 }
671 }
672 let file_stem = identifier.unwrap_or(type_name);
673 path = path.join(format!("{file_stem}_{timestamp}.feather"));
674
675 FileWriterPath {
676 path,
677 type_str,
678 instrument_id,
679 }
680 }
681
682 fn get_writer_path<T>(&self, data: &T) -> Result<FileWriterPath, Box<dyn std::error::Error>>
686 where
687 T: EncodeToRecordBatch + CatalogPathPrefix,
688 {
689 let type_str = T::path_prefix();
690 let metadata = T::metadata(data);
691
692 let instrument_id = if self.per_instrument_types.contains(type_str)
693 || (type_str.starts_with("custom_") && metadata.contains_key(KEY_INSTRUMENT_ID))
694 {
695 Some(metadata.get(KEY_INSTRUMENT_ID).cloned().ok_or_else(|| {
696 format!("Data {type_str} expected instrument_id metadata for per instrument writer")
697 })?)
698 } else {
699 None
700 };
701
702 if let Some(existing) = self
704 .writers
705 .keys()
706 .find(|k| k.type_str == type_str && k.instrument_id == instrument_id)
707 {
708 return Ok(existing.clone());
709 }
710
711 let timestamp = self.clock.borrow().timestamp_ns();
712 let mut path = Path::from(self.base_path.clone());
713
714 if let Some(ref instrument_id) = instrument_id {
715 let safe_id = urisafe_instrument_id(instrument_id);
716 path = path.join(type_str);
717 path = path.join(safe_id.clone());
718 path = path.join(format!("{safe_id}_{timestamp}.feather"));
719 } else {
720 path = path.join(format!("{type_str}_{timestamp}.feather"));
721 }
722
723 Ok(FileWriterPath {
724 path,
725 type_str: type_str.to_string(),
726 instrument_id,
727 })
728 }
729
730 pub async fn write_data(&mut self, data: Data) -> Result<(), Box<dyn std::error::Error>> {
735 match data {
736 Data::Quote(quote) => self.write(quote).await,
737 Data::Trade(trade) => self.write(trade).await,
738 Data::Bar(bar) => self.write(bar).await,
739 Data::Delta(delta) => self.write(delta).await,
740 Data::Depth10(depth) => self.write(*depth).await,
741 Data::IndexPriceUpdate(price) => self.write(price).await,
742 Data::MarkPriceUpdate(price) => self.write(price).await,
743 Data::InstrumentStatus(status) => self.write(status).await,
744 Data::InstrumentClose(close) => self.write(close).await,
745 Data::Custom(custom) => self.write_custom_data(&custom).await,
746 Data::Deltas(deltas_api) => {
747 self.write_batch(deltas_api.deltas.clone()).await
749 }
750 }
751 }
752
753 async fn write_custom_data(
755 &mut self,
756 custom: &CustomData,
757 ) -> Result<(), Box<dyn std::error::Error>> {
758 let type_name = custom.data.type_name();
759 let identifier = custom.data_type.identifier().map(String::from);
760
761 if !self.should_write_custom(type_name) {
762 return Ok(());
763 }
764
765 let path = self.get_writer_path_custom(type_name, identifier.as_deref());
766 if !self.writers.contains_key(&path) {
767 self.create_custom_writer(path.clone(), type_name)?;
768 }
769
770 let batch = Self::encode_custom_to_batch(custom)?;
771
772 if let Some(writer) = self.writers.get_mut(&path) {
773 let should_rotate = writer.write_record_batch(&batch)?;
774 if should_rotate || self.check_scheduled_rotation(&path) {
775 self.rotate_writer(&path).await?;
776 }
777 }
778
779 self.check_flush().await?;
780 Ok(())
781 }
782
783 fn should_write_custom(&self, type_name: &str) -> bool {
784 self.included_types.as_ref().is_none_or(|included| {
785 included.contains(type_name)
786 || included.contains("custom")
787 || included.contains(&format!("custom/{type_name}"))
788 })
789 }
790
791 pub async fn write_instrument(
796 &mut self,
797 instrument: InstrumentAny,
798 ) -> Result<(), Box<dyn std::error::Error>> {
799 self.write(instrument).await
800 }
801
802 pub fn subscribe_to_message_bus(
812 writer: Rc<RefCell<Self>>,
813 ) -> Result<ShareableMessageHandler, Box<dyn std::error::Error>> {
814 let runtime = writer.borrow().runtime.clone();
815
816 let handler = ShareableMessageHandler::from_any(move |message: &dyn Any| {
820 let _guard = runtime.enter();
822
823 macro_rules! try_write {
825 ($message:expr, $type:ty, $name:literal) => {
826 if let Some(value) = $message.downcast_ref::<$type>() {
827 let mut writer = writer.borrow_mut();
828 if let Err(e) = runtime.block_on(writer.write(value.clone())) {
829 log::warn!("Failed to write {}: {e}", $name);
830 }
831 return;
832 }
833 };
834 }
835
836 try_write!(message, QuoteTick, "QuoteTick");
837 try_write!(message, TradeTick, "TradeTick");
838 try_write!(message, Bar, "Bar");
839 try_write!(message, OrderBookDelta, "OrderBookDelta");
840 try_write!(message, OrderBookDepth10, "OrderBookDepth10");
841 try_write!(message, IndexPriceUpdate, "IndexPriceUpdate");
842 try_write!(message, MarkPriceUpdate, "MarkPriceUpdate");
843 try_write!(message, InstrumentStatus, "InstrumentStatus");
844 try_write!(message, InstrumentClose, "InstrumentClose");
845 try_write!(message, FundingRateUpdate, "FundingRateUpdate");
846 try_write!(message, AccountState, "AccountState");
847 try_write!(message, OrderInitialized, "OrderInitialized");
848 try_write!(message, OrderDenied, "OrderDenied");
849 try_write!(message, OrderEmulated, "OrderEmulated");
850 try_write!(message, OrderSubmitted, "OrderSubmitted");
851 try_write!(message, OrderAccepted, "OrderAccepted");
852 try_write!(message, OrderRejected, "OrderRejected");
853 try_write!(message, OrderPendingCancel, "OrderPendingCancel");
854 try_write!(message, OrderCanceled, "OrderCanceled");
855 try_write!(message, OrderCancelRejected, "OrderCancelRejected");
856 try_write!(message, OrderExpired, "OrderExpired");
857 try_write!(message, OrderTriggered, "OrderTriggered");
858 try_write!(message, OrderPendingUpdate, "OrderPendingUpdate");
859 try_write!(message, OrderReleased, "OrderReleased");
860 try_write!(message, OrderModifyRejected, "OrderModifyRejected");
861 try_write!(message, OrderUpdated, "OrderUpdated");
862 try_write!(message, OrderFilled, "OrderFilled");
863 try_write!(message, PositionOpened, "PositionOpened");
864 try_write!(message, PositionChanged, "PositionChanged");
865 try_write!(message, PositionClosed, "PositionClosed");
866 try_write!(message, PositionAdjusted, "PositionAdjusted");
867 try_write!(message, OrderSnapshot, "OrderSnapshot");
868 try_write!(message, PositionSnapshot, "PositionSnapshot");
869 try_write!(message, OrderStatusReport, "OrderStatusReport");
870 try_write!(message, FillReport, "FillReport");
871 try_write!(message, PositionStatusReport, "PositionStatusReport");
872 try_write!(message, ExecutionMassStatus, "ExecutionMassStatus");
873
874 if let Some(deltas) = message.downcast_ref::<OrderBookDeltas>() {
875 let mut writer = writer.borrow_mut();
877 if let Err(e) = runtime.block_on(writer.write_batch(deltas.deltas.clone())) {
878 log::warn!("Failed to write OrderBookDeltas: {e}");
879 }
880 } else if let Some(custom) = message.downcast_ref::<CustomData>() {
881 let mut writer = writer.borrow_mut();
882 if let Err(e) = runtime.block_on(writer.write_data(Data::Custom(custom.clone()))) {
883 log::warn!("Failed to write CustomData: {e}");
884 }
885 } else if let Some(instrument) = message.downcast_ref::<InstrumentAny>() {
886 let mut writer = writer.borrow_mut();
887 if let Err(e) = runtime.block_on(writer.write_instrument(instrument.clone())) {
888 log::warn!("Failed to write InstrumentAny: {e}");
889 }
890 }
891 });
893
894 subscribe_any(
896 MStr::pattern("*"),
897 handler.clone(),
898 None, );
900
901 Ok(handler)
902 }
903
904 pub fn unsubscribe_from_message_bus(handler: &ShareableMessageHandler) {
906 unsubscribe_any(MStr::pattern("*"), handler);
907 }
908}
909
910#[cfg(test)]
911mod tests {
912 use std::{io::Cursor, sync::Arc};
913
914 use datafusion::arrow::ipc::reader::StreamReader;
915 use nautilus_common::clock::TestClock;
916 use nautilus_model::{
917 data::{Data, OrderBookDeltas_API, QuoteTick, TradeTick},
918 enums::AggressorSide,
919 identifiers::{InstrumentId, TradeId},
920 types::{Price, Quantity},
921 };
922 use nautilus_serialization::arrow::{
923 ArrowSchemaProvider, DecodeDataFromRecordBatch, EncodeToRecordBatch,
924 };
925 use object_store::{ObjectStore, local::LocalFileSystem};
926 use rstest::rstest;
927 use tempfile::TempDir;
928
929 use super::*;
930
931 #[tokio::test]
932 async fn test_writer_manager_keys() {
933 let temp_dir = TempDir::new().unwrap();
935 let base_path = temp_dir.path().to_str().unwrap().to_string();
936
937 let local_fs = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
939 let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
940
941 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
943 let timestamp = clock.borrow().timestamp_ns();
944
945 let quote_type_str = QuoteTick::path_prefix();
946
947 let mut per_instrument = HashSet::new();
948 per_instrument.insert(quote_type_str.to_string());
949
950 let mut manager = FeatherWriter::new(
951 base_path.clone(),
952 store,
953 clock,
954 RotationConfig::NoRotation,
955 None,
956 Some(per_instrument),
957 None, );
959
960 let instrument_id = "AAPL.AAPL";
961 let quote = QuoteTick::new(
963 InstrumentId::from(instrument_id),
964 Price::from("100.0"),
965 Price::from("100.0"),
966 Quantity::from("100.0"),
967 Quantity::from("100.0"),
968 UnixNanos::from(1000000000000000000),
969 UnixNanos::from(1000000000000000000),
970 );
971
972 let trade = TradeTick::new(
973 InstrumentId::from(instrument_id),
974 Price::from("100.0"),
975 Quantity::from("100.0"),
976 AggressorSide::Buyer,
977 TradeId::from("1"),
978 UnixNanos::from(1000000000000000000),
979 UnixNanos::from(1000000000000000000),
980 );
981
982 manager.write(quote).await.unwrap();
983 manager.write(trade).await.unwrap();
984
985 let path = manager.get_writer_path("e).unwrap();
987 let safe_id = instrument_id.replace('/', "");
988 let expected_path = Path::from(format!(
989 "{base_path}/quotes/{safe_id}/{safe_id}_{timestamp}.feather"
990 ));
991 assert_eq!(path.path, expected_path);
992 assert!(manager.writers.contains_key(&path));
993 let writer = manager.writers.get(&path).unwrap();
994 assert!(writer.size > 0);
995
996 let path = manager.get_writer_path(&trade).unwrap();
997 let expected_path = Path::from(format!("{base_path}/trades_{timestamp}.feather"));
998 assert_eq!(path.path, expected_path);
999 assert!(manager.writers.contains_key(&path));
1000 let writer = manager.writers.get(&path).unwrap();
1001 assert!(writer.size > 0);
1002 }
1003
1004 #[rstest]
1005 fn test_file_writer_round_trip() {
1006 let instrument_id = "AAPL.AAPL";
1007 let quote = QuoteTick::new(
1009 InstrumentId::from(instrument_id),
1010 Price::from("100.0"),
1011 Price::from("100.0"),
1012 Quantity::from("100.0"),
1013 Quantity::from("100.0"),
1014 UnixNanos::from(100),
1015 UnixNanos::from(100),
1016 );
1017 let metadata = QuoteTick::metadata("e);
1018 let schema = QuoteTick::get_schema(Some(metadata.clone()));
1019 let batch = QuoteTick::encode_batch(&QuoteTick::metadata("e), &[quote]).unwrap();
1020
1021 let mut writer = FeatherBuffer::new(&schema, RotationConfig::NoRotation).unwrap();
1022 writer.write_record_batch(&batch).unwrap();
1023
1024 let buffer = writer.take_buffer().unwrap();
1025 let mut reader = StreamReader::try_new(Cursor::new(buffer.as_slice()), None).unwrap();
1026
1027 let read_metadata = reader.schema().metadata().clone();
1028 assert_eq!(read_metadata, metadata);
1029
1030 let read_batch = reader.next().unwrap().unwrap();
1031 assert_eq!(read_batch.column(0), batch.column(0));
1032
1033 let decoded = QuoteTick::decode_data_batch(&metadata, batch).unwrap();
1034 assert_eq!(decoded[0], Data::from(quote));
1035 }
1036
1037 #[tokio::test]
1038 async fn test_round_trip() {
1039 let temp_dir = TempDir::new_in(".").unwrap();
1041 let base_path = temp_dir.path().to_str().unwrap().to_string();
1042
1043 let local_fs = LocalFileSystem::new_with_prefix(&base_path).unwrap();
1045 let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
1046
1047 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1049
1050 let quote_type_str = QuoteTick::path_prefix();
1051 let trade_type_str = TradeTick::path_prefix();
1052
1053 let mut per_instrument = HashSet::new();
1054 per_instrument.insert(quote_type_str.to_string());
1055 per_instrument.insert(trade_type_str.to_string());
1056
1057 let mut manager = FeatherWriter::new(
1058 base_path.clone(),
1059 store,
1060 clock,
1061 RotationConfig::NoRotation,
1062 None,
1063 Some(per_instrument),
1064 None, );
1066
1067 let instrument_id = "AAPL.AAPL";
1068 let quote = QuoteTick::new(
1070 InstrumentId::from(instrument_id),
1071 Price::from("100.0"),
1072 Price::from("100.0"),
1073 Quantity::from("100.0"),
1074 Quantity::from("100.0"),
1075 UnixNanos::from(100),
1076 UnixNanos::from(100),
1077 );
1078
1079 let trade = TradeTick::new(
1080 InstrumentId::from(instrument_id),
1081 Price::from("100.0"),
1082 Quantity::from("100.0"),
1083 AggressorSide::Buyer,
1084 TradeId::from("1"),
1085 UnixNanos::from(100),
1086 UnixNanos::from(100),
1087 );
1088
1089 manager.write(quote).await.unwrap();
1090 manager.write(trade).await.unwrap();
1091
1092 let paths = manager.writers.keys().cloned().collect::<Vec<_>>();
1093 assert_eq!(paths.len(), 2);
1094
1095 manager.flush().await.unwrap();
1097
1098 let mut recovered_quotes = Vec::new();
1100 let mut recovered_trades = Vec::new();
1101 let local_fs = LocalFileSystem::new_with_prefix(&base_path).unwrap();
1102 for path in paths {
1103 let path_str = local_fs.path_to_filesystem(&path.path).unwrap();
1104 let buffer = std::fs::File::open(&path_str).unwrap();
1105 let reader = StreamReader::try_new(buffer, None).unwrap();
1106 let metadata = reader.schema().metadata().clone();
1107 for batch in reader {
1108 let batch = batch.unwrap();
1109 if path_str.to_str().unwrap().contains("quotes") {
1110 let decoded = QuoteTick::decode_data_batch(&metadata, batch).unwrap();
1111 recovered_quotes.extend(decoded);
1112 } else if path_str.to_str().unwrap().contains("trades") {
1113 let decoded = TradeTick::decode_data_batch(&metadata, batch).unwrap();
1114 recovered_trades.extend(decoded);
1115 }
1116 }
1117 }
1118
1119 assert_eq!(recovered_quotes.len(), 1, "Expected one QuoteTick record");
1121 assert_eq!(recovered_trades.len(), 1, "Expected one TradeTick record");
1122
1123 assert_eq!(recovered_quotes[0], Data::from(quote));
1125 assert_eq!(recovered_trades[0], Data::from(trade));
1126 }
1127
1128 #[tokio::test]
1129 async fn test_write_data_enum() {
1130 let temp_dir = TempDir::new().unwrap();
1131 let base_path = temp_dir.path().to_str().unwrap().to_string();
1132 let local_fs = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1133 let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
1134 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1135
1136 let mut writer = FeatherWriter::new(
1137 base_path,
1138 store,
1139 clock,
1140 RotationConfig::NoRotation,
1141 None,
1142 None,
1143 None,
1144 );
1145
1146 let quote = QuoteTick::new(
1147 InstrumentId::from("AUD/USD.SIM"),
1148 Price::from("1.0"),
1149 Price::from("1.0"),
1150 Quantity::from("1000"),
1151 Quantity::from("1000"),
1152 UnixNanos::from(1000),
1153 UnixNanos::from(1000),
1154 );
1155
1156 writer.write_data(Data::Quote(quote)).await.unwrap();
1158 writer.flush().await.unwrap();
1159
1160 assert!(!writer.writers.is_empty() || temp_dir.path().read_dir().unwrap().count() > 0);
1162 }
1163
1164 #[tokio::test]
1165 async fn test_write_data_all_types() {
1166 let temp_dir = TempDir::new().unwrap();
1167 let base_path = temp_dir.path().to_str().unwrap().to_string();
1168 let local_fs = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1169 let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
1170 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1171
1172 let mut writer = FeatherWriter::new(
1173 base_path,
1174 store,
1175 clock,
1176 RotationConfig::NoRotation,
1177 None,
1178 None,
1179 None,
1180 );
1181
1182 let instrument_id = InstrumentId::from("AUD/USD.SIM");
1183
1184 let quote = QuoteTick::new(
1186 instrument_id,
1187 Price::from("1.0"),
1188 Price::from("1.0"),
1189 Quantity::from("1000"),
1190 Quantity::from("1000"),
1191 UnixNanos::from(1000),
1192 UnixNanos::from(1000),
1193 );
1194 writer.write_data(Data::Quote(quote)).await.unwrap();
1195
1196 let trade = TradeTick::new(
1197 instrument_id,
1198 Price::from("1.0"),
1199 Quantity::from("1000"),
1200 AggressorSide::Buyer,
1201 TradeId::from("1"),
1202 UnixNanos::from(2000),
1203 UnixNanos::from(2000),
1204 );
1205 writer.write_data(Data::Trade(trade)).await.unwrap();
1206
1207 let delta = OrderBookDelta::clear(
1208 instrument_id,
1209 0,
1210 UnixNanos::from(3000),
1211 UnixNanos::from(3000),
1212 );
1213 writer.write_data(Data::Delta(delta)).await.unwrap();
1214
1215 writer.flush().await.unwrap();
1216 }
1217
1218 #[tokio::test]
1219 async fn test_auto_flush() {
1220 let temp_dir = TempDir::new().unwrap();
1221 let base_path = temp_dir.path().to_str().unwrap().to_string();
1222 let local_fs = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1223 let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
1224 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1225
1226 let mut writer = FeatherWriter::new(
1227 base_path,
1228 store,
1229 clock.clone(),
1230 RotationConfig::NoRotation,
1231 None,
1232 None,
1233 Some(100), );
1235
1236 let quote = QuoteTick::new(
1237 InstrumentId::from("AUD/USD.SIM"),
1238 Price::from("1.0"),
1239 Price::from("1.0"),
1240 Quantity::from("1000"),
1241 Quantity::from("1000"),
1242 UnixNanos::from(1000),
1243 UnixNanos::from(1000),
1244 );
1245
1246 writer.write(quote).await.unwrap();
1248
1249 let quote2 = QuoteTick::new(
1255 InstrumentId::from("AUD/USD.SIM"),
1256 Price::from("1.1"),
1257 Price::from("1.1"),
1258 Quantity::from("1000"),
1259 Quantity::from("1000"),
1260 UnixNanos::from(2000),
1261 UnixNanos::from(2000),
1262 );
1263 writer.write(quote2).await.unwrap();
1264
1265 }
1268
1269 #[tokio::test]
1270 async fn test_close() {
1271 let temp_dir = TempDir::new().unwrap();
1272 let base_path = temp_dir.path().to_str().unwrap().to_string();
1273 let local_fs = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1274 let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
1275 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1276
1277 let mut writer = FeatherWriter::new(
1278 base_path,
1279 store,
1280 clock,
1281 RotationConfig::NoRotation,
1282 None,
1283 None,
1284 None,
1285 );
1286
1287 let quote = QuoteTick::new(
1288 InstrumentId::from("AUD/USD.SIM"),
1289 Price::from("1.0"),
1290 Price::from("1.0"),
1291 Quantity::from("1000"),
1292 Quantity::from("1000"),
1293 UnixNanos::from(1000),
1294 UnixNanos::from(1000),
1295 );
1296
1297 writer.write(quote).await.unwrap();
1298 assert!(!writer.writers.is_empty());
1299
1300 writer.close().await.unwrap();
1301 assert!(writer.writers.is_empty());
1302 }
1303
1304 #[tokio::test]
1310 async fn test_write_data_orderbook_deltas() {
1311 let temp_dir = TempDir::new().unwrap();
1312 let base_path = temp_dir.path().to_str().unwrap().to_string();
1313 let local_fs = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1314 let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
1315 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1316
1317 let mut writer = FeatherWriter::new(
1318 base_path,
1319 store,
1320 clock,
1321 RotationConfig::NoRotation,
1322 None,
1323 None,
1324 None,
1325 );
1326
1327 let instrument_id = InstrumentId::from("AUD/USD.SIM");
1328 let delta1 = OrderBookDelta::clear(
1329 instrument_id,
1330 0,
1331 UnixNanos::from(1000),
1332 UnixNanos::from(1000),
1333 );
1334 let delta2 = OrderBookDelta::clear(
1335 instrument_id,
1336 0,
1337 UnixNanos::from(2000),
1338 UnixNanos::from(2000),
1339 );
1340
1341 let deltas = OrderBookDeltas::new(instrument_id, vec![delta1, delta2]);
1342 let deltas_api = OrderBookDeltas_API::new(deltas);
1343
1344 writer.write_data(Data::Deltas(deltas_api)).await.unwrap();
1346 writer.flush().await.unwrap();
1347 }
1348
1349 #[tokio::test]
1350 #[cfg(feature = "python")]
1351 async fn test_write_custom_data_round_trip() {
1352 use std::sync::Arc;
1353
1354 use futures::StreamExt;
1355 use nautilus_model::{
1356 data::{CustomData, Data, DataType},
1357 identifiers::InstrumentId,
1358 };
1359 use nautilus_serialization::{
1360 arrow::custom::CustomDataDecoder, ensure_custom_data_registered,
1361 };
1362
1363 use crate::test_data::RustTestCustomData;
1364
1365 ensure_custom_data_registered::<RustTestCustomData>();
1366
1367 let temp_dir = TempDir::new().unwrap();
1368 let base_path = temp_dir.path().to_str().unwrap().to_string();
1369 let local_fs = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1370 let store: Arc<dyn ObjectStore> = Arc::new(local_fs);
1371 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1372
1373 let mut writer = FeatherWriter::new(
1374 base_path.clone(),
1375 store.clone(),
1376 clock,
1377 RotationConfig::NoRotation,
1378 None,
1379 None,
1380 None,
1381 );
1382
1383 let instrument_id = InstrumentId::from("RUST.TEST");
1384 let data_type = DataType::new("RustTestCustomData", None, Some(instrument_id.to_string()));
1385 let original = RustTestCustomData {
1386 instrument_id,
1387 value: 1.23,
1388 flag: true,
1389 ts_event: UnixNanos::from(1000),
1390 ts_init: UnixNanos::from(1000),
1391 };
1392 let custom = CustomData::new(Arc::new(original.clone()), data_type);
1393
1394 writer
1395 .write_data(Data::Custom(custom))
1396 .await
1397 .expect("write_data CustomData");
1398 writer.flush().await.expect("flush");
1399
1400 let prefix = Path::from(format!("{base_path}/data/custom/RustTestCustomData"));
1401 let mut list_stream = store.list(Some(&prefix));
1402 let first = list_stream.next().await.expect("at least one object");
1403 let meta = first.expect("list item");
1404 let bytes = store
1405 .get(&meta.location)
1406 .await
1407 .expect("get")
1408 .bytes()
1409 .await
1410 .expect("bytes");
1411 let mut reader =
1412 StreamReader::try_new(Cursor::new(bytes.as_ref()), None).expect("StreamReader");
1413 let schema = reader.schema();
1414 let metadata: std::collections::HashMap<String, String> = schema
1415 .metadata()
1416 .iter()
1417 .map(|(k, v)| (k.clone(), v.clone()))
1418 .collect();
1419 let batch = reader.next().expect("batch").expect("batch ok");
1420 let decoded =
1421 CustomDataDecoder::decode_data_batch(&metadata, batch).expect("decode_data_batch");
1422 assert_eq!(decoded.len(), 1);
1423 if let Data::Custom(decoded_custom) = &decoded[0] {
1424 assert_eq!(decoded_custom.data_type.type_name(), "RustTestCustomData");
1425 let rust: &RustTestCustomData = decoded_custom
1426 .data
1427 .as_any()
1428 .downcast_ref::<RustTestCustomData>()
1429 .expect("RustTestCustomData");
1430 assert_eq!(rust, &original);
1431 } else {
1432 panic!("Expected Data::Custom");
1433 }
1434 }
1435}