nautilus_common/msgbus/
message.rs1use std::fmt::Display;
17
18use bytes::Bytes;
19use serde::{Deserialize, Serialize};
20use ustr::Ustr;
21
22use super::switchboard::CLOSE_TOPIC;
23
24#[derive(Clone, Debug, Serialize, Deserialize)]
26#[cfg_attr(
27 feature = "python",
28 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
29)]
30#[cfg_attr(
31 feature = "python",
32 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
33)]
34pub struct BusMessage {
35 pub topic: Ustr,
37 pub payload: Bytes,
39}
40
41impl BusMessage {
42 pub fn new(topic: Ustr, payload: Bytes) -> Self {
44 debug_assert!(!topic.is_empty());
45 Self { topic, payload }
46 }
47
48 pub fn with_str_topic<T: AsRef<str>>(topic: T, payload: Bytes) -> Self {
53 Self::new(Ustr::from(topic.as_ref()), payload)
54 }
55
56 pub fn new_close() -> Self {
58 Self::with_str_topic(CLOSE_TOPIC, Bytes::new())
59 }
60}
61
62impl Display for BusMessage {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 write!(
65 f,
66 "[{}] {}",
67 self.topic,
68 String::from_utf8_lossy(&self.payload)
69 )
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use bytes::Bytes;
76 use rstest::rstest;
77
78 use super::*;
79
80 #[rstest]
81 #[case("test/topic", "payload data")]
82 #[case("events/trading", "Another payload")]
83 fn test_with_str_topic_str(#[case] topic: &str, #[case] payload_str: &str) {
84 let payload = Bytes::from(payload_str.to_string());
85
86 let message = BusMessage::with_str_topic(topic, payload.clone());
87
88 assert_eq!(message.topic.as_str(), topic);
89 assert_eq!(message.payload, payload);
90 }
91
92 #[rstest]
93 fn test_with_str_topic_string() {
94 let topic_string = String::from("orders/new");
95 let payload = Bytes::from("order payload data");
96
97 let message = BusMessage::with_str_topic(topic_string.clone(), payload.clone());
98
99 assert_eq!(message.topic.as_str(), topic_string);
100 assert_eq!(message.payload, payload);
101 }
102
103 #[rstest]
104 fn test_new_close() {
105 let message = BusMessage::new_close();
106
107 assert_eq!(message.topic.as_str(), "CLOSE");
108 assert!(message.payload.is_empty());
109 }
110}