1use std::{
17 fmt::Display,
18 sync::{Mutex, OnceLock, atomic::Ordering, mpsc::SendError},
19};
20
21use ahash::AHashMap;
22use indexmap::IndexMap;
23use log::{
24 Level, LevelFilter, Log, STATIC_MAX_LEVEL,
25 kv::{ToValue, Value},
26 set_boxed_logger, set_max_level,
27};
28use nautilus_core::{
29 UUID4, UnixNanos,
30 datetime::unix_nanos_to_iso8601,
31 time::{get_atomic_clock_realtime, get_atomic_clock_static},
32};
33use nautilus_model::identifiers::TraderId;
34use serde::{Deserialize, Serialize, Serializer};
35use ustr::Ustr;
36
37pub use super::config::LoggerConfig;
38use super::{LOGGING_BYPASSED, LOGGING_GUARDS_ACTIVE, LOGGING_INITIALIZED, LOGGING_REALTIME};
39#[cfg(not(all(feature = "simulation", madsim)))]
40use crate::logging::writer::{FileWriter, LogWriter, StderrWriter, StdoutWriter};
41use crate::{
42 enums::{LogColor, LogLevel},
43 logging::writer::FileWriterConfig,
44};
45
46#[cfg(not(all(feature = "simulation", madsim)))]
47const LOGGING: &str = "logging";
48const KV_COLOR: &str = "color";
49const KV_COMPONENT: &str = "component";
50
51static LOGGER_TX: OnceLock<std::sync::mpsc::Sender<LogEvent>> = OnceLock::new();
53
54static LOGGER_HANDLE: Mutex<Option<std::thread::JoinHandle<()>>> = Mutex::new(None);
56
57#[derive(Debug)]
63pub struct Logger {
64 pub config: LoggerConfig,
66 tx: std::sync::mpsc::Sender<LogEvent>,
68}
69
70#[derive(Debug)]
72pub enum LogEvent {
73 Log(LogLine),
75 Flush,
77 Close,
79}
80
81#[derive(Clone, Debug, Serialize, Deserialize)]
83pub struct LogLine {
84 pub timestamp: UnixNanos,
86 pub level: Level,
88 pub color: LogColor,
90 pub component: Ustr,
92 pub message: String,
94}
95
96impl Display for LogLine {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 write!(f, "[{}] {}: {}", self.level, self.component, self.message)
99 }
100}
101
102#[derive(Clone, Debug)]
109pub struct LogLineWrapper {
110 line: LogLine,
112 cache: Option<String>,
114 colored: Option<String>,
116 trader_id: Ustr,
118}
119
120impl LogLineWrapper {
121 #[must_use]
123 pub const fn new(line: LogLine, trader_id: Ustr) -> Self {
124 Self {
125 line,
126 cache: None,
127 colored: None,
128 trader_id,
129 }
130 }
131
132 pub fn get_string(&mut self) -> &str {
137 self.cache.get_or_insert_with(|| {
138 format!(
139 "{} [{}] {}.{}: {}\n",
140 unix_nanos_to_iso8601(self.line.timestamp),
141 self.line.level,
142 self.trader_id,
143 &self.line.component,
144 &self.line.message,
145 )
146 })
147 }
148
149 pub fn get_colored(&mut self) -> &str {
155 self.colored.get_or_insert_with(|| {
156 format!(
157 "\x1b[1m{}\x1b[0m {}[{}] {}.{}: {}\x1b[0m\n",
158 unix_nanos_to_iso8601(self.line.timestamp),
159 &self.line.color.as_ansi(),
160 self.line.level,
161 self.trader_id,
162 &self.line.component,
163 &self.line.message,
164 )
165 })
166 }
167
168 #[must_use]
177 pub fn get_json(&self) -> String {
178 let json_string =
179 serde_json::to_string(&self).expect("Error serializing log event to string");
180 format!("{json_string}\n")
181 }
182}
183
184impl Serialize for LogLineWrapper {
185 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
186 where
187 S: Serializer,
188 {
189 let mut json_obj = IndexMap::new();
190 let timestamp = unix_nanos_to_iso8601(self.line.timestamp);
191 json_obj.insert("timestamp".to_string(), timestamp);
192 json_obj.insert("trader_id".to_string(), self.trader_id.to_string());
193 json_obj.insert("level".to_string(), self.line.level.to_string());
194 json_obj.insert("color".to_string(), self.line.color.to_string());
195 json_obj.insert("component".to_string(), self.line.component.to_string());
196 json_obj.insert("message".to_string(), self.line.message.clone());
197
198 json_obj.serialize(serializer)
199 }
200}
201
202impl Log for Logger {
203 fn enabled(&self, metadata: &log::Metadata) -> bool {
204 !LOGGING_BYPASSED.load(Ordering::Relaxed)
205 && (metadata.level() == Level::Error
206 || metadata.level() <= self.config.stdout_level
207 || metadata.level() <= self.config.fileout_level)
208 }
209
210 fn log(&self, record: &log::Record) {
211 if self.enabled(record.metadata()) {
212 let timestamp = if LOGGING_REALTIME.load(Ordering::Relaxed) {
213 get_atomic_clock_realtime().get_time_ns()
214 } else {
215 get_atomic_clock_static().get_time_ns()
216 };
217 let level = record.level();
218 let key_values = record.key_values();
219 let color: LogColor = key_values
220 .get(KV_COLOR.into())
221 .and_then(|v| v.to_u64().map(|v| (v as u8).into()))
222 .unwrap_or(level.into());
223 let component = key_values.get(KV_COMPONENT.into()).map_or_else(
224 || Ustr::from(record.metadata().target()),
225 |v| Ustr::from(&v.to_string()),
226 );
227
228 let line = LogLine {
229 timestamp,
230 level,
231 color,
232 component,
233 message: format!("{}", record.args()),
234 };
235
236 if let Err(SendError(LogEvent::Log(line))) = self.tx.send(LogEvent::Log(line)) {
237 eprintln!("Error sending log event (receiver closed): {line}");
238 }
239 }
240 }
241
242 fn flush(&self) {
243 if LOGGING_BYPASSED.load(Ordering::Relaxed) {
245 return;
246 }
247
248 if let Err(e) = self.tx.send(LogEvent::Flush) {
249 eprintln!("Error sending flush log event: {e}");
250 }
251 }
252}
253
254impl Logger {
255 pub fn init_with_env(
261 trader_id: TraderId,
262 instance_id: UUID4,
263 file_config: FileWriterConfig,
264 ) -> anyhow::Result<LogGuard> {
265 let config = LoggerConfig::from_env()?;
266 Self::init_with_config(trader_id, instance_id, config, file_config)
267 }
268
269 pub fn init_with_config(
275 trader_id: TraderId,
276 instance_id: UUID4,
277 config: LoggerConfig,
278 file_config: FileWriterConfig,
279 ) -> anyhow::Result<LogGuard> {
280 if super::LOGGING_INITIALIZED.load(Ordering::SeqCst) {
282 return LogGuard::new()
283 .ok_or_else(|| anyhow::anyhow!("Logging already initialized but sender missing"));
284 }
285
286 let (tx, rx) = std::sync::mpsc::channel::<LogEvent>();
287
288 let logger_tx = tx.clone();
289 let logger = Self {
290 tx: logger_tx,
291 config: config.clone(),
292 };
293
294 set_boxed_logger(Box::new(logger))?;
295
296 if LOGGER_TX.set(tx).is_err() {
298 debug_assert!(
299 false,
300 "LOGGER_TX already set - re-initialization not supported"
301 );
302 }
303
304 if config.bypass_logging {
305 super::logging_set_bypass();
306 }
307
308 let is_colored = config.is_colored;
309
310 let print_config = config.print_config;
311 if print_config {
312 println!("STATIC_MAX_LEVEL={STATIC_MAX_LEVEL}");
313 println!("Logger initialized with {config:?} {file_config:?}");
314 }
315
316 #[cfg(not(all(feature = "simulation", madsim)))]
317 {
318 let handle = std::thread::Builder::new()
319 .name(LOGGING.to_string())
320 .spawn(move || {
321 Self::handle_messages(
322 trader_id.to_string(),
323 instance_id.to_string(),
324 config,
325 file_config,
326 rx,
327 );
328 })?;
329
330 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock() {
332 debug_assert!(
333 handle_guard.is_none(),
334 "LOGGER_HANDLE already set - re-initialization not supported"
335 );
336 *handle_guard = Some(handle);
337 }
338 }
339
340 #[cfg(all(feature = "simulation", madsim))]
341 {
342 let _ = (trader_id, instance_id, config, file_config, rx);
347 super::logging_set_bypass();
348 }
349
350 let max_level = log::LevelFilter::Trace;
351 set_max_level(max_level);
352
353 if print_config {
354 println!("Logger set as `log` implementation with max level {max_level}");
355 }
356
357 super::LOGGING_INITIALIZED.store(true, Ordering::SeqCst);
358 super::LOGGING_COLORED.store(is_colored, Ordering::SeqCst);
359
360 LogGuard::new()
361 .ok_or_else(|| anyhow::anyhow!("Failed to create LogGuard from global sender"))
362 }
363
364 #[cfg(not(all(feature = "simulation", madsim)))]
365 #[expect(clippy::needless_pass_by_value)]
366 fn handle_messages(
367 trader_id: String,
368 instance_id: String,
369 config: LoggerConfig,
370 file_config: FileWriterConfig,
371 rx: std::sync::mpsc::Receiver<LogEvent>,
372 ) {
373 let LoggerConfig {
374 stdout_level,
375 fileout_level,
376 component_level,
377 module_level,
378 log_components_only,
379 is_colored,
380 print_config: _,
381 use_tracing: _,
382 bypass_logging: _,
383 file_config: _,
384 clear_log_file: _,
385 } = config;
386
387 let mut module_filters_sorted: Vec<(Ustr, LevelFilter)> =
389 module_level.into_iter().collect();
390 module_filters_sorted.sort_by_key(|b| std::cmp::Reverse(b.0.len()));
391
392 let trader_id_cache = Ustr::from(&trader_id);
393
394 let mut stdout_writer = StdoutWriter::new(stdout_level, is_colored);
396 let mut stderr_writer = StderrWriter::new(is_colored);
397
398 let mut file_writer_opt = if fileout_level == LevelFilter::Off {
400 None
401 } else {
402 FileWriter::new(trader_id, instance_id, file_config, fileout_level)
403 };
404
405 let process_event = |event: LogEvent,
406 stdout_writer: &mut StdoutWriter,
407 stderr_writer: &mut StderrWriter,
408 file_writer_opt: &mut Option<FileWriter>| {
409 match event {
410 LogEvent::Log(line) => {
411 if should_filter_log(
412 &line.component,
413 line.level,
414 &module_filters_sorted,
415 &component_level,
416 log_components_only,
417 ) {
418 return;
419 }
420
421 let mut wrapper = LogLineWrapper::new(line, trader_id_cache);
422
423 if stderr_writer.enabled(&wrapper.line) {
424 if is_colored {
425 stderr_writer.write(wrapper.get_colored());
426 } else {
427 stderr_writer.write(wrapper.get_string());
428 }
429 }
430
431 if stdout_writer.enabled(&wrapper.line) {
432 if is_colored {
433 stdout_writer.write(wrapper.get_colored());
434 } else {
435 stdout_writer.write(wrapper.get_string());
436 }
437 }
438
439 if let Some(file_writer) = file_writer_opt
440 && file_writer.enabled(&wrapper.line)
441 {
442 if file_writer.json_format {
443 file_writer.write(&wrapper.get_json());
444 } else {
445 file_writer.write(wrapper.get_string());
446 }
447 }
448 }
449 LogEvent::Flush => {
450 stdout_writer.flush();
451 stderr_writer.flush();
452
453 if let Some(file_writer) = file_writer_opt {
454 file_writer.flush();
455 }
456 }
457 LogEvent::Close => {
458 }
460 }
461 };
462
463 while let Ok(event) = rx.recv() {
465 match event {
466 LogEvent::Log(_) | LogEvent::Flush => process_event(
467 event,
468 &mut stdout_writer,
469 &mut stderr_writer,
470 &mut file_writer_opt,
471 ),
472 LogEvent::Close => {
473 stdout_writer.flush();
475 stderr_writer.flush();
476
477 if let Some(ref mut file_writer) = file_writer_opt {
478 file_writer.flush();
479 }
480
481 while let Ok(evt) = rx.try_recv() {
484 match evt {
485 LogEvent::Close => (), _ => process_event(
487 evt,
488 &mut stdout_writer,
489 &mut stderr_writer,
490 &mut file_writer_opt,
491 ),
492 }
493 }
494
495 stdout_writer.flush();
497 stderr_writer.flush();
498
499 if let Some(ref mut file_writer) = file_writer_opt {
500 file_writer.flush();
501 }
502
503 break;
504 }
505 }
506 }
507 }
508}
509
510#[must_use]
517pub fn should_filter_log(
518 component: &Ustr,
519 line_level: log::Level,
520 module_filters_sorted: &[(Ustr, LevelFilter)],
521 component_level: &AHashMap<Ustr, LevelFilter>,
522 log_components_only: bool,
523) -> bool {
524 if module_filters_sorted.is_empty() && component_level.is_empty() {
525 return log_components_only;
526 }
527
528 let module_filter = module_filters_sorted
530 .iter()
531 .find(|(path, _)| component.starts_with(path.as_str()))
532 .map(|(_, level)| *level);
533
534 let component_filter = component_level.get(component).copied();
535
536 if log_components_only && module_filter.is_none() && component_filter.is_none() {
537 return true;
538 }
539
540 if let Some(filter_level) = module_filter.or(component_filter)
542 && line_level > filter_level
543 {
544 return true;
545 }
546
547 false
548}
549
550pub(crate) fn shutdown_graceful() {
559 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
561 log::set_max_level(log::LevelFilter::Off);
562
563 if let Some(tx) = LOGGER_TX.get() {
565 let _ = tx.send(LogEvent::Close);
566 }
567
568 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
569 && let Some(handle) = handle_guard.take()
570 && handle.thread().id() != std::thread::current().id()
571 {
572 let _ = handle.join();
573 }
574
575 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
576}
577
578pub fn log<T: AsRef<str>>(level: LogLevel, color: LogColor, component: Ustr, message: T) {
579 let color = Value::from(color as u8);
580
581 match level {
582 LogLevel::Off => {}
583 LogLevel::Trace => {
584 log::trace!(component = component.to_value(), color = color; "{}", message.as_ref());
585 }
586 LogLevel::Debug => {
587 log::debug!(component = component.to_value(), color = color; "{}", message.as_ref());
588 }
589 LogLevel::Info => {
590 log::info!(component = component.to_value(), color = color; "{}", message.as_ref());
591 }
592 LogLevel::Warning => {
593 log::warn!(component = component.to_value(), color = color; "{}", message.as_ref());
594 }
595 LogLevel::Error => {
596 log::error!(component = component.to_value(), color = color; "{}", message.as_ref());
597 }
598 }
599}
600
601#[cfg_attr(
628 feature = "python",
629 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
630)]
631#[cfg_attr(
632 feature = "python",
633 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
634)]
635#[derive(Debug)]
636pub struct LogGuard {
637 tx: std::sync::mpsc::Sender<LogEvent>,
638}
639
640impl LogGuard {
641 #[must_use]
649 pub fn new() -> Option<Self> {
650 LOGGER_TX.get().map(|tx| {
651 LOGGING_GUARDS_ACTIVE
652 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
653 if count == u8::MAX {
654 None } else {
656 Some(count + 1)
657 }
658 })
659 .expect("Maximum number of active LogGuards (255) exceeded");
660
661 Self { tx: tx.clone() }
662 })
663 }
664}
665
666impl Drop for LogGuard {
667 fn drop(&mut self) {
672 let previous_count = LOGGING_GUARDS_ACTIVE
673 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
674 assert!(count != 0, "LogGuard reference count underflow");
675 Some(count - 1)
676 })
677 .expect("Failed to decrement LogGuard count");
678
679 if previous_count == 1 && LOGGING_GUARDS_ACTIVE.load(Ordering::SeqCst) == 0 {
681 LOGGING_BYPASSED.store(true, Ordering::SeqCst);
685
686 log::set_max_level(log::LevelFilter::Off);
688
689 let _ = self.tx.send(LogEvent::Close);
691
692 if let Ok(mut handle_guard) = LOGGER_HANDLE.lock()
694 && let Some(handle) = handle_guard.take()
695 {
696 if handle.thread().id() != std::thread::current().id() {
698 let _ = handle.join();
699 }
700 }
701
702 LOGGING_INITIALIZED.store(false, Ordering::SeqCst);
704 } else {
705 let _ = self.tx.send(LogEvent::Flush);
707 }
708 }
709}
710
711#[cfg(test)]
712mod tests {
713 use ahash::AHashMap;
714 use log::LevelFilter;
715 use nautilus_core::UUID4;
716 use nautilus_model::identifiers::TraderId;
717 use rstest::*;
718 use serde_json::Value;
719 use tempfile::tempdir;
720 use ustr::Ustr;
721
722 use super::*;
723 use crate::enums::LogColor;
724
725 #[rstest]
726 fn log_message_serialization() {
727 let log_message = LogLine {
728 timestamp: UnixNanos::default(),
729 level: log::Level::Info,
730 color: LogColor::Normal,
731 component: Ustr::from("Portfolio"),
732 message: "This is a log message".to_string(),
733 };
734
735 let serialized_json = serde_json::to_string(&log_message).unwrap();
736 let deserialized_value: Value = serde_json::from_str(&serialized_json).unwrap();
737
738 assert_eq!(deserialized_value["level"], "INFO");
739 assert_eq!(deserialized_value["component"], "Portfolio");
740 assert_eq!(deserialized_value["message"], "This is a log message");
741 }
742
743 #[rstest]
744 fn log_config_parsing() {
745 let config =
746 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Error")
747 .unwrap();
748 assert_eq!(
749 config,
750 LoggerConfig {
751 stdout_level: LevelFilter::Info,
752 fileout_level: LevelFilter::Debug,
753 component_level: AHashMap::from_iter(vec![(
754 Ustr::from("RiskEngine"),
755 LevelFilter::Error
756 )]),
757 module_level: AHashMap::new(),
758 log_components_only: false,
759 is_colored: true,
760 print_config: false,
761 use_tracing: false,
762 ..Default::default()
763 }
764 );
765 }
766
767 #[rstest]
768 fn log_config_parsing2() {
769 let config = LoggerConfig::from_spec("stdout=Warn;print_config;fileout=Error;").unwrap();
770 assert_eq!(
771 config,
772 LoggerConfig {
773 stdout_level: LevelFilter::Warn,
774 fileout_level: LevelFilter::Error,
775 component_level: AHashMap::new(),
776 module_level: AHashMap::new(),
777 log_components_only: false,
778 is_colored: true,
779 print_config: true,
780 use_tracing: false,
781 ..Default::default()
782 }
783 );
784 }
785
786 #[rstest]
787 fn log_config_parsing_with_log_components_only() {
788 let config =
789 LoggerConfig::from_spec("stdout=Info;log_components_only;RiskEngine=Debug").unwrap();
790 assert_eq!(
791 config,
792 LoggerConfig {
793 stdout_level: LevelFilter::Info,
794 fileout_level: LevelFilter::Off,
795 component_level: AHashMap::from_iter(vec![(
796 Ustr::from("RiskEngine"),
797 LevelFilter::Debug
798 )]),
799 module_level: AHashMap::new(),
800 log_components_only: true,
801 is_colored: true,
802 print_config: false,
803 use_tracing: false,
804 ..Default::default()
805 }
806 );
807 }
808
809 #[rstest]
810 fn test_log_line_wrapper_plain_string() {
811 let line = LogLine {
812 timestamp: 1_650_000_000_000_000_000.into(),
813 level: log::Level::Info,
814 color: LogColor::Normal,
815 component: Ustr::from("TestComponent"),
816 message: "Test message".to_string(),
817 };
818
819 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
820 let result = wrapper.get_string();
821
822 assert!(result.contains("TRADER-001"));
823 assert!(result.contains("TestComponent"));
824 assert!(result.contains("Test message"));
825 assert!(result.contains("[INFO]"));
826 assert!(result.ends_with('\n'));
827 assert!(!result.contains("\x1b["));
829 }
830
831 #[rstest]
832 fn test_log_line_wrapper_colored_string() {
833 let line = LogLine {
834 timestamp: 1_650_000_000_000_000_000.into(),
835 level: log::Level::Info,
836 color: LogColor::Green,
837 component: Ustr::from("TestComponent"),
838 message: "Test message".to_string(),
839 };
840
841 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-001"));
842 let result = wrapper.get_colored();
843
844 assert!(result.contains("TRADER-001"));
845 assert!(result.contains("TestComponent"));
846 assert!(result.contains("Test message"));
847 assert!(result.contains("\x1b["));
849 assert!(result.ends_with('\n'));
850 }
851
852 #[rstest]
853 fn test_log_line_wrapper_json_output() {
854 let line = LogLine {
855 timestamp: 1_650_000_000_000_000_000.into(),
856 level: log::Level::Warn,
857 color: LogColor::Yellow,
858 component: Ustr::from("RiskEngine"),
859 message: "Warning message".to_string(),
860 };
861
862 let wrapper = LogLineWrapper::new(line, Ustr::from("TRADER-002"));
863 let json = wrapper.get_json();
864
865 let parsed: Value = serde_json::from_str(json.trim()).unwrap();
866 assert_eq!(parsed["trader_id"], "TRADER-002");
867 assert_eq!(parsed["component"], "RiskEngine");
868 assert_eq!(parsed["message"], "Warning message");
869 assert_eq!(parsed["level"], "WARN");
870 assert_eq!(parsed["color"], "YELLOW");
871 }
872
873 #[rstest]
874 fn test_log_line_wrapper_caches_string() {
875 let line = LogLine {
876 timestamp: 1_650_000_000_000_000_000.into(),
877 level: log::Level::Info,
878 color: LogColor::Normal,
879 component: Ustr::from("Test"),
880 message: "Cached".to_string(),
881 };
882
883 let mut wrapper = LogLineWrapper::new(line, Ustr::from("TRADER"));
884 let first = wrapper.get_string().to_string();
885 let second = wrapper.get_string().to_string();
886
887 assert_eq!(first, second);
888 }
889
890 #[rstest]
891 fn test_log_line_display() {
892 let line = LogLine {
893 timestamp: 0.into(),
894 level: log::Level::Error,
895 color: LogColor::Red,
896 component: Ustr::from("Component"),
897 message: "Error occurred".to_string(),
898 };
899
900 let display = format!("{line}");
901 assert_eq!(display, "[ERROR] Component: Error occurred");
902 }
903
904 fn sorted_module_filters(map: AHashMap<Ustr, LevelFilter>) -> Vec<(Ustr, LevelFilter)> {
906 let mut v: Vec<_> = map.into_iter().collect();
907 v.sort_by_key(|b| std::cmp::Reverse(b.0.len()));
908 v
909 }
910
911 #[rstest]
912 fn test_filter_no_filters_passes_all() {
913 let module_filters = vec![];
914 let component_level = AHashMap::new();
915
916 assert!(!should_filter_log(
917 &Ustr::from("anything"),
918 Level::Trace,
919 &module_filters,
920 &component_level,
921 false
922 ));
923 }
924
925 #[rstest]
926 fn test_filter_component_exact_match() {
927 let module_filters = vec![];
928 let component_level = AHashMap::from_iter([(Ustr::from("RiskEngine"), LevelFilter::Error)]);
929
930 assert!(should_filter_log(
931 &Ustr::from("RiskEngine"),
932 Level::Info,
933 &module_filters,
934 &component_level,
935 false
936 ));
937 assert!(!should_filter_log(
938 &Ustr::from("RiskEngine"),
939 Level::Error,
940 &module_filters,
941 &component_level,
942 false
943 ));
944 assert!(!should_filter_log(
945 &Ustr::from("Portfolio"),
946 Level::Info,
947 &module_filters,
948 &component_level,
949 false
950 ));
951 }
952
953 #[rstest]
954 fn test_filter_module_prefix_match() {
955 let module_filters = vec![(Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug)];
956 let component_level = AHashMap::new();
957
958 assert!(!should_filter_log(
959 &Ustr::from("nautilus_okx::websocket"),
960 Level::Debug,
961 &module_filters,
962 &component_level,
963 false
964 ));
965 assert!(!should_filter_log(
966 &Ustr::from("nautilus_okx::websocket::handler"),
967 Level::Debug,
968 &module_filters,
969 &component_level,
970 false
971 ));
972 assert!(should_filter_log(
973 &Ustr::from("nautilus_okx::websocket::handler"),
974 Level::Trace,
975 &module_filters,
976 &component_level,
977 false
978 ));
979 assert!(!should_filter_log(
980 &Ustr::from("nautilus_binance::data"),
981 Level::Trace,
982 &module_filters,
983 &component_level,
984 false
985 ));
986 }
987
988 #[rstest]
989 fn test_filter_longest_prefix_wins() {
990 let module_filters = sorted_module_filters(AHashMap::from_iter([
991 (Ustr::from("nautilus_okx"), LevelFilter::Error),
992 (Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug),
993 ]));
994 let component_level = AHashMap::new();
995
996 assert!(!should_filter_log(
997 &Ustr::from("nautilus_okx::websocket::handler"),
998 Level::Debug,
999 &module_filters,
1000 &component_level,
1001 false
1002 ));
1003 assert!(should_filter_log(
1004 &Ustr::from("nautilus_okx::data"),
1005 Level::Debug,
1006 &module_filters,
1007 &component_level,
1008 false
1009 ));
1010 assert!(!should_filter_log(
1011 &Ustr::from("nautilus_okx::data"),
1012 Level::Error,
1013 &module_filters,
1014 &component_level,
1015 false
1016 ));
1017 }
1018
1019 #[rstest]
1020 fn test_filter_module_precedence_over_component() {
1021 let module_filters = vec![(Ustr::from("nautilus_okx::websocket"), LevelFilter::Debug)];
1022 let component_level =
1023 AHashMap::from_iter([(Ustr::from("nautilus_okx::websocket"), LevelFilter::Error)]);
1024
1025 assert!(!should_filter_log(
1026 &Ustr::from("nautilus_okx::websocket"),
1027 Level::Debug,
1028 &module_filters,
1029 &component_level,
1030 false
1031 ));
1032 }
1033
1034 #[rstest]
1035 fn test_filter_log_components_only_blocks_unknown() {
1036 let module_filters = vec![];
1037 let component_level = AHashMap::from_iter([(Ustr::from("RiskEngine"), LevelFilter::Debug)]);
1038
1039 assert!(should_filter_log(
1040 &Ustr::from("Portfolio"),
1041 Level::Info,
1042 &module_filters,
1043 &component_level,
1044 true
1045 ));
1046 assert!(!should_filter_log(
1047 &Ustr::from("RiskEngine"),
1048 Level::Info,
1049 &module_filters,
1050 &component_level,
1051 true
1052 ));
1053 }
1054
1055 #[rstest]
1056 fn test_filter_log_components_only_with_module() {
1057 let module_filters = vec![(Ustr::from("nautilus_okx"), LevelFilter::Debug)];
1058 let component_level = AHashMap::new();
1059
1060 assert!(!should_filter_log(
1061 &Ustr::from("nautilus_okx::websocket"),
1062 Level::Debug,
1063 &module_filters,
1064 &component_level,
1065 true
1066 ));
1067 assert!(should_filter_log(
1068 &Ustr::from("nautilus_binance::data"),
1069 Level::Debug,
1070 &module_filters,
1071 &component_level,
1072 true
1073 ));
1074 }
1075
1076 #[rstest]
1077 fn test_filter_level_comparison() {
1078 let module_filters = vec![];
1079 let component_level = AHashMap::from_iter([(Ustr::from("Test"), LevelFilter::Warn)]);
1080
1081 assert!(!should_filter_log(
1082 &Ustr::from("Test"),
1083 Level::Error,
1084 &module_filters,
1085 &component_level,
1086 false
1087 ));
1088 assert!(!should_filter_log(
1089 &Ustr::from("Test"),
1090 Level::Warn,
1091 &module_filters,
1092 &component_level,
1093 false
1094 ));
1095 assert!(should_filter_log(
1096 &Ustr::from("Test"),
1097 Level::Info,
1098 &module_filters,
1099 &component_level,
1100 false
1101 ));
1102 assert!(should_filter_log(
1103 &Ustr::from("Test"),
1104 Level::Debug,
1105 &module_filters,
1106 &component_level,
1107 false
1108 ));
1109 assert!(should_filter_log(
1110 &Ustr::from("Test"),
1111 Level::Trace,
1112 &module_filters,
1113 &component_level,
1114 false
1115 ));
1116 }
1117
1118 #[cfg(not(all(feature = "simulation", madsim)))]
1126 mod serial_tests {
1127 use std::{sync::atomic::Ordering, time::Duration};
1128
1129 use super::*;
1130 use crate::{
1131 logging::{
1132 LOGGING_BYPASSED, logging_clock_set_static_mode, logging_clock_set_static_time,
1133 logging_is_initialized, logging_set_bypass,
1134 },
1135 testing::wait_until,
1136 };
1137
1138 #[rstest]
1139 fn test_logging_to_file() {
1140 let config = LoggerConfig {
1141 fileout_level: LevelFilter::Debug,
1142 ..Default::default()
1143 };
1144
1145 let temp_dir = tempdir().expect("Failed to create temporary directory");
1146 let file_config = FileWriterConfig {
1147 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1148 ..Default::default()
1149 };
1150
1151 let log_guard = Logger::init_with_config(
1152 TraderId::from("TRADER-001"),
1153 UUID4::new(),
1154 config,
1155 file_config,
1156 );
1157
1158 logging_clock_set_static_mode();
1159 logging_clock_set_static_time(1_650_000_000_000_000);
1160
1161 log::info!(
1162 component = "RiskEngine";
1163 "This is a test"
1164 );
1165
1166 let mut log_contents = String::new();
1167
1168 wait_until(
1169 || {
1170 std::fs::read_dir(&temp_dir)
1171 .expect("Failed to read directory")
1172 .filter_map(Result::ok)
1173 .any(|entry| entry.path().is_file())
1174 },
1175 Duration::from_secs(3),
1176 );
1177
1178 drop(log_guard); wait_until(
1181 || {
1182 let log_file_path = std::fs::read_dir(&temp_dir)
1183 .expect("Failed to read directory")
1184 .filter_map(Result::ok)
1185 .find(|entry| entry.path().is_file())
1186 .expect("No files found in directory")
1187 .path();
1188 log_contents = std::fs::read_to_string(log_file_path)
1189 .expect("Error while reading log file");
1190 !log_contents.is_empty()
1191 },
1192 Duration::from_secs(3),
1193 );
1194
1195 assert_eq!(
1196 log_contents,
1197 "1970-01-20T02:20:00.000000000Z [INFO] TRADER-001.RiskEngine: This is a test\n"
1198 );
1199 }
1200
1201 #[rstest]
1202 fn test_shutdown_drains_backlog_tail() {
1203 const N: usize = 1000;
1204
1205 let config = LoggerConfig {
1207 stdout_level: LevelFilter::Off,
1208 fileout_level: LevelFilter::Info,
1209 ..Default::default()
1210 };
1211
1212 let temp_dir = tempdir().expect("Failed to create temporary directory");
1213 let file_config = FileWriterConfig {
1214 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1215 ..Default::default()
1216 };
1217
1218 let log_guard = Logger::init_with_config(
1219 TraderId::from("TRADER-TAIL"),
1220 UUID4::new(),
1221 config,
1222 file_config,
1223 )
1224 .expect("Failed to initialize logger");
1225
1226 logging_clock_set_static_mode();
1228 logging_clock_set_static_time(1_700_000_000_000_000);
1229
1230 for i in 0..N {
1232 log::info!(component = "TailDrain"; "BacklogTest {i}");
1233 }
1234
1235 drop(log_guard);
1237
1238 let mut count = 0usize;
1240 wait_until(
1241 || {
1242 if let Some(log_file) = std::fs::read_dir(&temp_dir)
1243 .expect("Failed to read directory")
1244 .filter_map(Result::ok)
1245 .find(|entry| entry.path().is_file())
1246 {
1247 let log_file_path = log_file.path();
1248 if let Ok(contents) = std::fs::read_to_string(log_file_path) {
1249 count = contents
1250 .lines()
1251 .filter(|l| l.contains("BacklogTest "))
1252 .count();
1253 count >= N
1254 } else {
1255 false
1256 }
1257 } else {
1258 false
1259 }
1260 },
1261 Duration::from_secs(5),
1262 );
1263
1264 assert_eq!(count, N, "Expected all pre-shutdown messages to be written");
1265 }
1266
1267 #[rstest]
1268 fn test_log_component_level_filtering() {
1269 let config =
1270 LoggerConfig::from_spec("stdout=Info;fileout=Debug;RiskEngine=Error").unwrap();
1271
1272 let temp_dir = tempdir().expect("Failed to create temporary directory");
1273 let file_config = FileWriterConfig {
1274 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1275 ..Default::default()
1276 };
1277
1278 let log_guard = Logger::init_with_config(
1279 TraderId::from("TRADER-001"),
1280 UUID4::new(),
1281 config,
1282 file_config,
1283 );
1284
1285 logging_clock_set_static_mode();
1286 logging_clock_set_static_time(1_650_000_000_000_000);
1287
1288 log::info!(
1289 component = "RiskEngine";
1290 "This is a test"
1291 );
1292
1293 drop(log_guard); wait_until(
1296 || {
1297 if let Some(log_file) = std::fs::read_dir(&temp_dir)
1298 .expect("Failed to read directory")
1299 .filter_map(Result::ok)
1300 .find(|entry| entry.path().is_file())
1301 {
1302 let log_file_path = log_file.path();
1303 let log_contents = std::fs::read_to_string(log_file_path)
1304 .expect("Error while reading log file");
1305 !log_contents.contains("RiskEngine")
1306 } else {
1307 false
1308 }
1309 },
1310 Duration::from_secs(3),
1311 );
1312
1313 assert!(
1314 std::fs::read_dir(&temp_dir)
1315 .expect("Failed to read directory")
1316 .filter_map(Result::ok)
1317 .any(|entry| entry.path().is_file()),
1318 "Log file exists"
1319 );
1320 }
1321
1322 #[rstest]
1323 fn test_logging_to_file_in_json_format() {
1324 let config =
1325 LoggerConfig::from_spec("stdout=Info;is_colored;fileout=Debug;RiskEngine=Info")
1326 .unwrap();
1327
1328 let temp_dir = tempdir().expect("Failed to create temporary directory");
1329 let file_config = FileWriterConfig {
1330 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1331 file_format: Some("json".to_string()),
1332 ..Default::default()
1333 };
1334
1335 let log_guard = Logger::init_with_config(
1336 TraderId::from("TRADER-001"),
1337 UUID4::new(),
1338 config,
1339 file_config,
1340 );
1341
1342 logging_clock_set_static_mode();
1343 logging_clock_set_static_time(1_650_000_000_000_000);
1344
1345 log::info!(
1346 component = "RiskEngine";
1347 "This is a test"
1348 );
1349
1350 let mut log_contents = String::new();
1351
1352 drop(log_guard); wait_until(
1355 || {
1356 if let Some(log_file) = std::fs::read_dir(&temp_dir)
1357 .expect("Failed to read directory")
1358 .filter_map(Result::ok)
1359 .find(|entry| entry.path().is_file())
1360 {
1361 let log_file_path = log_file.path();
1362 log_contents = std::fs::read_to_string(log_file_path)
1363 .expect("Error while reading log file");
1364 !log_contents.is_empty()
1365 } else {
1366 false
1367 }
1368 },
1369 Duration::from_secs(3),
1370 );
1371
1372 assert_eq!(
1373 log_contents,
1374 "{\"timestamp\":\"1970-01-20T02:20:00.000000000Z\",\"trader_id\":\"TRADER-001\",\"level\":\"INFO\",\"color\":\"NORMAL\",\"component\":\"RiskEngine\",\"message\":\"This is a test\"}\n"
1375 );
1376 }
1377
1378 #[rstest]
1379 fn test_init_sets_logging_is_initialized_flag() {
1380 let config = LoggerConfig::default();
1381 let file_config = FileWriterConfig::default();
1382
1383 let guard = Logger::init_with_config(
1384 TraderId::from("TRADER-001"),
1385 UUID4::new(),
1386 config,
1387 file_config,
1388 );
1389 assert!(guard.is_ok());
1390 assert!(logging_is_initialized());
1391
1392 drop(guard);
1393 assert!(!logging_is_initialized());
1394 }
1395
1396 #[rstest]
1397 fn test_reinit_after_guard_drop_fails() {
1398 let config = LoggerConfig::default();
1399 let file_config = FileWriterConfig::default();
1400
1401 let guard1 = Logger::init_with_config(
1402 TraderId::from("TRADER-001"),
1403 UUID4::new(),
1404 config.clone(),
1405 file_config.clone(),
1406 );
1407 assert!(guard1.is_ok());
1408 drop(guard1);
1409
1410 let guard2 = Logger::init_with_config(
1412 TraderId::from("TRADER-002"),
1413 UUID4::new(),
1414 config,
1415 file_config,
1416 );
1417 assert!(guard2.is_err());
1418 }
1419
1420 #[rstest]
1421 fn test_bypass_before_init_prevents_logging() {
1422 logging_set_bypass();
1423 assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
1424
1425 let temp_dir = tempdir().expect("Failed to create temporary directory");
1426 let config = LoggerConfig {
1427 fileout_level: LevelFilter::Debug,
1428 ..Default::default()
1429 };
1430 let file_config = FileWriterConfig {
1431 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1432 ..Default::default()
1433 };
1434
1435 let guard = Logger::init_with_config(
1436 TraderId::from("TRADER-001"),
1437 UUID4::new(),
1438 config,
1439 file_config,
1440 );
1441 assert!(guard.is_ok());
1442
1443 log::info!(
1444 component = "TestComponent";
1445 "This should be bypassed"
1446 );
1447 std::thread::sleep(Duration::from_millis(100));
1448 drop(guard);
1449
1450 assert!(LOGGING_BYPASSED.load(Ordering::Relaxed));
1452 }
1453
1454 #[rstest]
1455 fn test_module_level_filtering() {
1456 let config = LoggerConfig::from_spec(
1460 "stdout=Off;fileout=Trace;nautilus::adapters=Warn;nautilus::adapters::okx=Debug",
1461 )
1462 .unwrap();
1463
1464 let temp_dir = tempdir().expect("Failed to create temporary directory");
1465 let file_config = FileWriterConfig {
1466 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1467 ..Default::default()
1468 };
1469
1470 let log_guard = Logger::init_with_config(
1471 TraderId::from("TRADER-MOD"),
1472 UUID4::new(),
1473 config,
1474 file_config,
1475 )
1476 .expect("Failed to initialize logger");
1477
1478 logging_clock_set_static_mode();
1479 logging_clock_set_static_time(1_650_000_000_000_000);
1480
1481 log::debug!(
1483 component = "nautilus::adapters::okx::websocket";
1484 "OKX debug message"
1485 );
1486
1487 log::info!(
1489 component = "nautilus::adapters::okx";
1490 "OKX info message"
1491 );
1492
1493 log::info!(
1495 component = "nautilus::adapters::binance";
1496 "Binance info message SHOULD NOT APPEAR"
1497 );
1498
1499 log::warn!(
1501 component = "nautilus::adapters::binance";
1502 "Binance warn message"
1503 );
1504
1505 log::trace!(
1507 component = "Portfolio";
1508 "Portfolio trace message"
1509 );
1510
1511 drop(log_guard);
1512
1513 wait_until(
1514 || {
1515 std::fs::read_dir(&temp_dir)
1516 .expect("Failed to read directory")
1517 .filter_map(Result::ok)
1518 .any(|entry| entry.path().is_file())
1519 },
1520 Duration::from_secs(3),
1521 );
1522
1523 let log_file_path = std::fs::read_dir(&temp_dir)
1524 .expect("Failed to read directory")
1525 .filter_map(Result::ok)
1526 .find(|entry| entry.path().is_file())
1527 .expect("No log file found")
1528 .path();
1529
1530 let log_contents =
1531 std::fs::read_to_string(log_file_path).expect("Error reading log file");
1532
1533 assert!(
1534 log_contents.contains("OKX debug message"),
1535 "OKX debug should pass (longer prefix wins)"
1536 );
1537 assert!(
1538 log_contents.contains("OKX info message"),
1539 "OKX info should pass"
1540 );
1541 assert!(
1542 log_contents.contains("Binance warn message"),
1543 "Binance warn should pass"
1544 );
1545 assert!(
1546 log_contents.contains("Portfolio trace message"),
1547 "Unfiltered component should pass"
1548 );
1549 assert!(
1550 !log_contents.contains("SHOULD NOT APPEAR"),
1551 "Binance info should be filtered (adapters=Warn)"
1552 );
1553 }
1554 }
1555
1556 #[cfg(all(feature = "simulation", madsim))]
1557 mod sim_tests {
1558 use std::sync::atomic::Ordering;
1559
1560 use super::*;
1561 use crate::logging::LOGGING_BYPASSED;
1562
1563 #[rstest]
1564 fn test_init_under_madsim_skips_writer_thread_and_forces_bypass() {
1565 let config = LoggerConfig {
1566 bypass_logging: false,
1567 ..Default::default()
1568 };
1569 let temp_dir = tempdir().expect("Failed to create temporary directory");
1570 let file_config = FileWriterConfig {
1571 directory: Some(temp_dir.path().to_str().unwrap().to_string()),
1572 ..Default::default()
1573 };
1574
1575 let _guard = Logger::init_with_config(
1576 TraderId::from("TRADER-SIM"),
1577 UUID4::new(),
1578 config,
1579 file_config,
1580 )
1581 .expect("init should succeed under simulation");
1582
1583 assert!(LOGGING_INITIALIZED.load(Ordering::SeqCst));
1584 assert!(
1585 LOGGING_BYPASSED.load(Ordering::SeqCst),
1586 "bypass must be forced under cfg(madsim) even when config disables it"
1587 );
1588 assert!(
1589 LOGGER_HANDLE
1590 .lock()
1591 .expect("LOGGER_HANDLE mutex should not be poisoned")
1592 .is_none(),
1593 "writer thread must not be spawned under cfg(madsim)"
1594 );
1595 }
1596 }
1597}