1use std::{
32 collections::{HashMap, VecDeque},
33 fmt::Debug,
34 sync::{
35 Arc,
36 atomic::{AtomicBool, Ordering},
37 },
38 time::Duration,
39};
40
41use bytes::Bytes;
42use futures::stream::Stream;
43use nautilus_common::{
44 live::get_runtime,
45 logging::{log_task_error, log_task_started, log_task_stopped},
46 msgbus::{
47 BusMessage,
48 database::{DatabaseConfig, MessageBusConfig, MessageBusDatabaseAdapter},
49 switchboard::CLOSE_TOPIC,
50 },
51};
52use nautilus_core::{
53 UUID4,
54 time::{duration_since_unix_epoch, get_atomic_clock_realtime},
55};
56use nautilus_cryptography::providers::install_cryptographic_provider;
57use nautilus_model::identifiers::TraderId;
58use redis::{AsyncCommands, streams};
59use streams::StreamReadOptions;
60use ustr::Ustr;
61
62use super::{REDIS_MINID, REDIS_XTRIM, await_handle};
63use crate::redis::{create_redis_connection, get_stream_key};
64
65const MSGBUS_PUBLISH: &str = "msgbus-publish";
66const MSGBUS_STREAM: &str = "msgbus-stream";
67const MSGBUS_HEARTBEAT: &str = "msgbus-heartbeat";
68const HEARTBEAT_TOPIC: &str = "health:heartbeat";
69const TRIM_BUFFER_SECS: u64 = 60;
70
71type RedisStreamBulk = Vec<HashMap<String, Vec<HashMap<String, redis::Value>>>>;
72
73#[cfg_attr(
74 feature = "python",
75 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
76)]
77pub struct RedisMessageBusDatabase {
78 pub trader_id: TraderId,
80 pub instance_id: UUID4,
82 pub_tx: tokio::sync::mpsc::UnboundedSender<BusMessage>,
83 pub_handle: Option<tokio::task::JoinHandle<()>>,
84 stream_rx: Option<tokio::sync::mpsc::Receiver<BusMessage>>,
85 stream_handle: Option<tokio::task::JoinHandle<()>>,
86 stream_signal: Arc<AtomicBool>,
87 heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
88 heartbeat_signal: Arc<AtomicBool>,
89}
90
91impl Debug for RedisMessageBusDatabase {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 f.debug_struct(stringify!(RedisMessageBusDatabase))
94 .field("trader_id", &self.trader_id)
95 .field("instance_id", &self.instance_id)
96 .finish()
97 }
98}
99
100impl MessageBusDatabaseAdapter for RedisMessageBusDatabase {
101 type DatabaseType = Self;
102
103 fn new(
111 trader_id: TraderId,
112 instance_id: UUID4,
113 config: MessageBusConfig,
114 ) -> anyhow::Result<Self> {
115 install_cryptographic_provider();
116
117 let config_clone = config.clone();
118 let db_config = config
119 .database
120 .clone()
121 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
122
123 let (pub_tx, pub_rx) = tokio::sync::mpsc::unbounded_channel::<BusMessage>();
124
125 let pub_handle = Some(get_runtime().spawn(async move {
127 if let Err(e) = publish_messages(pub_rx, trader_id, instance_id, config_clone).await {
128 log_task_error(MSGBUS_PUBLISH, &e);
129 }
130 }));
131
132 let external_streams = config.external_streams.clone().unwrap_or_default();
134 let stream_signal = Arc::new(AtomicBool::new(false));
135 let (stream_rx, stream_handle) = if external_streams.is_empty() {
136 (None, None)
137 } else {
138 let stream_signal_clone = stream_signal.clone();
139 let (stream_tx, stream_rx) = tokio::sync::mpsc::channel::<BusMessage>(100_000);
140 (
141 Some(stream_rx),
142 Some(get_runtime().spawn(async move {
143 if let Err(e) =
144 stream_messages(stream_tx, db_config, external_streams, stream_signal_clone)
145 .await
146 {
147 log_task_error(MSGBUS_STREAM, &e);
148 }
149 })),
150 )
151 };
152
153 let heartbeat_signal = Arc::new(AtomicBool::new(false));
155 let heartbeat_handle = if let Some(heartbeat_interval_secs) = config.heartbeat_interval_secs
156 {
157 let signal = heartbeat_signal.clone();
158 let pub_tx_clone = pub_tx.clone();
159
160 Some(get_runtime().spawn(async move {
161 run_heartbeat(heartbeat_interval_secs, signal, pub_tx_clone).await;
162 }))
163 } else {
164 None
165 };
166
167 Ok(Self {
168 trader_id,
169 instance_id,
170 pub_tx,
171 pub_handle,
172 stream_rx,
173 stream_handle,
174 stream_signal,
175 heartbeat_handle,
176 heartbeat_signal,
177 })
178 }
179
180 fn is_closed(&self) -> bool {
182 self.pub_tx.is_closed()
183 }
184
185 fn publish(&self, topic: Ustr, payload: Bytes) {
187 let msg = BusMessage::new(topic, payload);
188 if let Err(e) = self.pub_tx.send(msg) {
189 log::error!("Failed to send message: {e}");
190 }
191 }
192
193 fn close(&mut self) {
195 log::debug!("Closing");
196
197 self.stream_signal.store(true, Ordering::Relaxed);
198 self.heartbeat_signal.store(true, Ordering::Relaxed);
199
200 if !self.pub_tx.is_closed() {
201 let msg = BusMessage::new_close();
202
203 if let Err(e) = self.pub_tx.send(msg) {
204 log::error!("Failed to send close message: {e:?}");
205 }
206 }
207
208 tokio::task::block_in_place(|| {
210 get_runtime().block_on(async {
211 self.close_async().await;
212 });
213 });
214
215 log::debug!("Closed");
216 }
217}
218
219impl RedisMessageBusDatabase {
220 pub fn get_stream_receiver(
226 &mut self,
227 ) -> anyhow::Result<tokio::sync::mpsc::Receiver<BusMessage>> {
228 self.stream_rx
229 .take()
230 .ok_or_else(|| anyhow::anyhow!("Stream receiver already taken"))
231 }
232
233 pub fn stream(
235 mut stream_rx: tokio::sync::mpsc::Receiver<BusMessage>,
236 ) -> impl Stream<Item = BusMessage> + 'static {
237 async_stream::stream! {
238 while let Some(msg) = stream_rx.recv().await {
239 yield msg;
240 }
241 }
242 }
243
244 pub async fn close_async(&mut self) {
245 await_handle(self.pub_handle.take(), MSGBUS_PUBLISH).await;
246 await_handle(self.stream_handle.take(), MSGBUS_STREAM).await;
247 await_handle(self.heartbeat_handle.take(), MSGBUS_HEARTBEAT).await;
248 }
249}
250
251pub async fn publish_messages(
260 mut rx: tokio::sync::mpsc::UnboundedReceiver<BusMessage>,
261 trader_id: TraderId,
262 instance_id: UUID4,
263 config: MessageBusConfig,
264) -> anyhow::Result<()> {
265 log_task_started(MSGBUS_PUBLISH);
266
267 let db_config = config
268 .database
269 .as_ref()
270 .ok_or_else(|| anyhow::anyhow!("No database config"))?;
271 let mut con = create_redis_connection(MSGBUS_PUBLISH, db_config.clone()).await?;
272 let stream_key = get_stream_key(trader_id, instance_id, &config);
273
274 let autotrim_duration = config
276 .autotrim_mins
277 .filter(|&mins| mins > 0)
278 .map(|mins| Duration::from_secs(u64::from(mins) * 60));
279 let mut last_trim_index: HashMap<String, usize> = HashMap::new();
280
281 let mut buffer: VecDeque<BusMessage> = VecDeque::new();
283 let buffer_interval = Duration::from_millis(u64::from(config.buffer_interval_ms.unwrap_or(0)));
284
285 let flush_timer = tokio::time::sleep(buffer_interval);
289 tokio::pin!(flush_timer);
290
291 loop {
292 tokio::select! {
293 maybe_msg = rx.recv() => {
294 if let Some(msg) = maybe_msg {
295 if msg.topic == CLOSE_TOPIC {
296 log::debug!("Received close message");
297 if !buffer.is_empty() {
299 drain_buffer(
300 &mut con,
301 &stream_key,
302 config.stream_per_topic,
303 autotrim_duration,
304 &mut last_trim_index,
305 &mut buffer,
306 ).await?;
307 }
308 break;
309 }
310
311 buffer.push_back(msg);
312
313 if buffer_interval.is_zero() {
314 drain_buffer(
316 &mut con,
317 &stream_key,
318 config.stream_per_topic,
319 autotrim_duration,
320 &mut last_trim_index,
321 &mut buffer,
322 ).await?;
323 }
324 } else {
325 log::debug!("Channel hung up");
326 break;
327 }
328 }
329 () = &mut flush_timer, if !buffer_interval.is_zero() => {
332 if !buffer.is_empty() {
333 drain_buffer(
334 &mut con,
335 &stream_key,
336 config.stream_per_topic,
337 autotrim_duration,
338 &mut last_trim_index,
339 &mut buffer,
340 ).await?;
341 }
342
343 flush_timer.as_mut().reset(tokio::time::Instant::now() + buffer_interval);
345 }
346 }
347 }
348
349 if !buffer.is_empty() {
351 drain_buffer(
352 &mut con,
353 &stream_key,
354 config.stream_per_topic,
355 autotrim_duration,
356 &mut last_trim_index,
357 &mut buffer,
358 )
359 .await?;
360 }
361
362 log_task_stopped(MSGBUS_PUBLISH);
363 Ok(())
364}
365
366async fn drain_buffer(
367 conn: &mut redis::aio::ConnectionManager,
368 stream_key: &str,
369 stream_per_topic: bool,
370 autotrim_duration: Option<Duration>,
371 last_trim_index: &mut HashMap<String, usize>,
372 buffer: &mut VecDeque<BusMessage>,
373) -> anyhow::Result<()> {
374 let mut pipe = redis::pipe();
375 pipe.atomic();
376
377 for msg in buffer.drain(..) {
378 let items: Vec<(&str, &[u8])> = vec![
379 ("topic", msg.topic.as_ref()),
380 ("payload", msg.payload.as_ref()),
381 ];
382 let stream_key = if stream_per_topic {
383 format!("{stream_key}:{}", &msg.topic)
384 } else {
385 stream_key.to_string()
386 };
387 pipe.xadd(&stream_key, "*", &items);
388
389 if autotrim_duration.is_none() {
390 continue; }
392
393 let last_trim_ms = last_trim_index.entry(stream_key.clone()).or_insert(0); let unix_duration_now = duration_since_unix_epoch();
396 let trim_buffer = Duration::from_secs(TRIM_BUFFER_SECS);
397
398 if *last_trim_ms < (unix_duration_now - trim_buffer).as_millis() as usize {
400 let min_timestamp_ms =
401 (unix_duration_now - autotrim_duration.unwrap()).as_millis() as usize;
402 let result: Result<(), redis::RedisError> = redis::cmd(REDIS_XTRIM)
403 .arg(stream_key.clone())
404 .arg(REDIS_MINID)
405 .arg(min_timestamp_ms)
406 .query_async(conn)
407 .await;
408
409 if let Err(e) = result {
410 log::error!("Error trimming stream '{stream_key}': {e}");
411 } else {
412 last_trim_index.insert(stream_key.clone(), unix_duration_now.as_millis() as usize);
413 }
414 }
415 }
416
417 pipe.query_async(conn).await.map_err(anyhow::Error::from)
418}
419
420pub async fn stream_messages(
428 tx: tokio::sync::mpsc::Sender<BusMessage>,
429 config: DatabaseConfig,
430 stream_keys: Vec<String>,
431 stream_signal: Arc<AtomicBool>,
432) -> anyhow::Result<()> {
433 log_task_started(MSGBUS_STREAM);
434
435 let mut con = create_redis_connection(MSGBUS_STREAM, config).await?;
436
437 let stream_keys = &stream_keys
438 .iter()
439 .map(String::as_str)
440 .collect::<Vec<&str>>();
441
442 log::debug!("Listening to streams: [{}]", stream_keys.join(", "));
443
444 let clock = get_atomic_clock_realtime();
446 let timestamp_ms = clock.get_time_ms();
447 let initial_id = timestamp_ms.to_string();
448
449 let mut last_ids: HashMap<String, String> = stream_keys
450 .iter()
451 .map(|&key| (key.to_string(), initial_id.clone()))
452 .collect();
453
454 let opts = StreamReadOptions::default().block(100);
455
456 'outer: loop {
457 if stream_signal.load(Ordering::Relaxed) {
458 log::debug!("Received streaming terminate signal");
459 break;
460 }
461
462 let ids: Vec<String> = stream_keys
463 .iter()
464 .map(|&key| last_ids[key].clone())
465 .collect();
466 let id_refs: Vec<&str> = ids.iter().map(String::as_str).collect();
467
468 let result: Result<RedisStreamBulk, _> =
469 con.xread_options(&[&stream_keys], &[&id_refs], &opts).await;
470
471 match result {
472 Ok(stream_bulk) => {
473 if stream_bulk.is_empty() {
474 continue;
476 }
477
478 for entry in &stream_bulk {
479 for (stream_key, stream_msgs) in entry {
480 for stream_msg in stream_msgs {
481 for (id, array) in stream_msg {
482 last_ids.insert(stream_key.clone(), id.clone());
483
484 match decode_bus_message(array) {
485 Ok(msg) => {
486 if let Err(e) = tx.send(msg).await {
487 log::debug!("Channel closed: {e:?}");
488 break 'outer; }
490 }
491 Err(e) => {
492 log::error!("{e:?}");
493 }
494 }
495 }
496 }
497 }
498 }
499 }
500 Err(e) => {
501 anyhow::bail!("Error reading from stream: {e:?}");
502 }
503 }
504 }
505
506 log_task_stopped(MSGBUS_STREAM);
507 Ok(())
508}
509
510fn decode_bus_message(stream_msg: &redis::Value) -> anyhow::Result<BusMessage> {
519 if let redis::Value::Array(stream_msg) = stream_msg {
520 if stream_msg.len() < 4 {
521 anyhow::bail!("Invalid stream message format: {stream_msg:?}");
522 }
523
524 let topic = match &stream_msg[1] {
525 redis::Value::BulkString(bytes) => match String::from_utf8(bytes.clone()) {
526 Ok(topic) => topic,
527 Err(e) => anyhow::bail!("Error parsing topic: {e}"),
528 },
529 _ => {
530 anyhow::bail!("Invalid topic format: {stream_msg:?}");
531 }
532 };
533
534 let payload = match &stream_msg[3] {
535 redis::Value::BulkString(bytes) => Bytes::copy_from_slice(bytes),
536 _ => {
537 anyhow::bail!("Invalid payload format: {stream_msg:?}");
538 }
539 };
540
541 Ok(BusMessage::with_str_topic(topic, payload))
542 } else {
543 anyhow::bail!("Invalid stream message format: {stream_msg:?}")
544 }
545}
546
547async fn run_heartbeat(
548 heartbeat_interval_secs: u16,
549 signal: Arc<AtomicBool>,
550 pub_tx: tokio::sync::mpsc::UnboundedSender<BusMessage>,
551) {
552 log_task_started("heartbeat");
553 log::debug!("Heartbeat at {heartbeat_interval_secs} second intervals");
554
555 let heartbeat_interval = Duration::from_secs(u64::from(heartbeat_interval_secs));
556 let heartbeat_timer = tokio::time::interval(heartbeat_interval);
557
558 let check_interval = Duration::from_millis(100);
559 let check_timer = tokio::time::interval(check_interval);
560
561 tokio::pin!(heartbeat_timer);
562 tokio::pin!(check_timer);
563
564 loop {
565 if signal.load(Ordering::Relaxed) {
566 log::debug!("Received heartbeat terminate signal");
567 break;
568 }
569
570 tokio::select! {
571 _ = heartbeat_timer.tick() => {
572 let heartbeat = create_heartbeat_msg();
573 if let Err(e) = pub_tx.send(heartbeat) {
574 log::debug!("Error sending heartbeat: {e}");
576 }
577 },
578 _ = check_timer.tick() => {}
579 }
580 }
581
582 log_task_stopped("heartbeat");
583}
584
585fn create_heartbeat_msg() -> BusMessage {
586 let payload = Bytes::from(chrono::Utc::now().to_rfc3339().into_bytes());
587 BusMessage::with_str_topic(HEARTBEAT_TOPIC, payload)
588}
589
590#[cfg(test)]
591mod tests {
592 use redis::Value;
593 use rstest::*;
594
595 use super::*;
596
597 #[rstest]
598 fn test_decode_bus_message_valid() {
599 let stream_msg = Value::Array(vec![
600 Value::BulkString(b"0".to_vec()),
601 Value::BulkString(b"topic1".to_vec()),
602 Value::BulkString(b"unused".to_vec()),
603 Value::BulkString(b"data1".to_vec()),
604 ]);
605
606 let result = decode_bus_message(&stream_msg);
607 assert!(result.is_ok());
608 let msg = result.unwrap();
609 assert_eq!(msg.topic, "topic1");
610 assert_eq!(msg.payload, Bytes::from("data1"));
611 }
612
613 #[rstest]
614 fn test_decode_bus_message_missing_fields() {
615 let stream_msg = Value::Array(vec![
616 Value::BulkString(b"0".to_vec()),
617 Value::BulkString(b"topic1".to_vec()),
618 ]);
619
620 let result = decode_bus_message(&stream_msg);
621 assert!(result.is_err());
622 assert_eq!(
623 format!("{}", result.unwrap_err()),
624 "Invalid stream message format: [bulk-string('\"0\"'), bulk-string('\"topic1\"')]"
625 );
626 }
627
628 #[rstest]
629 fn test_decode_bus_message_invalid_topic_format() {
630 let stream_msg = Value::Array(vec![
631 Value::BulkString(b"0".to_vec()),
632 Value::Int(42), Value::BulkString(b"unused".to_vec()),
634 Value::BulkString(b"data1".to_vec()),
635 ]);
636
637 let result = decode_bus_message(&stream_msg);
638 assert!(result.is_err());
639 assert_eq!(
640 format!("{}", result.unwrap_err()),
641 "Invalid topic format: [bulk-string('\"0\"'), int(42), bulk-string('\"unused\"'), bulk-string('\"data1\"')]"
642 );
643 }
644
645 #[rstest]
646 fn test_decode_bus_message_invalid_payload_format() {
647 let stream_msg = Value::Array(vec![
648 Value::BulkString(b"0".to_vec()),
649 Value::BulkString(b"topic1".to_vec()),
650 Value::BulkString(b"unused".to_vec()),
651 Value::Int(42), ]);
653
654 let result = decode_bus_message(&stream_msg);
655 assert!(result.is_err());
656 assert_eq!(
657 format!("{}", result.unwrap_err()),
658 "Invalid payload format: [bulk-string('\"0\"'), bulk-string('\"topic1\"'), bulk-string('\"unused\"'), int(42)]"
659 );
660 }
661
662 #[rstest]
663 fn test_decode_bus_message_invalid_stream_msg_format() {
664 let stream_msg = Value::BulkString(b"not an array".to_vec());
665
666 let result = decode_bus_message(&stream_msg);
667 assert!(result.is_err());
668 assert_eq!(
669 format!("{}", result.unwrap_err()),
670 "Invalid stream message format: bulk-string('\"not an array\"')"
671 );
672 }
673}
674
675#[cfg(target_os = "linux")] #[cfg(test)]
677mod serial_tests {
678 use nautilus_common::testing::wait_until_async;
679 use redis::aio::ConnectionManager;
680 use rstest::*;
681
682 use super::*;
683 use crate::redis::flush_redis;
684
685 #[fixture]
686 async fn redis_connection() -> ConnectionManager {
687 let config = DatabaseConfig::default();
688 let mut con = create_redis_connection(MSGBUS_STREAM, config)
689 .await
690 .unwrap();
691 flush_redis(&mut con).await.unwrap();
692 con
693 }
694
695 #[rstest]
696 #[tokio::test(flavor = "multi_thread")]
697 async fn test_stream_messages_terminate_signal(#[future] redis_connection: ConnectionManager) {
698 let mut con = redis_connection.await;
699 let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
700
701 let trader_id = TraderId::from("tester-001");
702 let instance_id = UUID4::new();
703 let config = MessageBusConfig {
704 database: Some(DatabaseConfig::default()),
705 ..Default::default()
706 };
707
708 let stream_key = get_stream_key(trader_id, instance_id, &config);
709 let external_streams = vec![stream_key.clone()];
710 let stream_signal = Arc::new(AtomicBool::new(false));
711 let stream_signal_clone = stream_signal.clone();
712
713 let handle = tokio::spawn(async move {
715 stream_messages(
716 tx,
717 DatabaseConfig::default(),
718 external_streams,
719 stream_signal_clone,
720 )
721 .await
722 .unwrap();
723 });
724
725 stream_signal.store(true, Ordering::Relaxed);
726 let _ = rx.recv().await; rx.close();
730 handle.await.unwrap();
731 flush_redis(&mut con).await.unwrap();
732 }
733
734 #[rstest]
735 #[tokio::test(flavor = "multi_thread")]
736 async fn test_stream_messages_when_receiver_closed(
737 #[future] redis_connection: ConnectionManager,
738 ) {
739 let mut con = redis_connection.await;
740 let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
741
742 let trader_id = TraderId::from("tester-001");
743 let instance_id = UUID4::new();
744 let config = MessageBusConfig {
745 database: Some(DatabaseConfig::default()),
746 ..Default::default()
747 };
748
749 let stream_key = get_stream_key(trader_id, instance_id, &config);
750 let external_streams = vec![stream_key.clone()];
751 let stream_signal = Arc::new(AtomicBool::new(false));
752 let stream_signal_clone = stream_signal.clone();
753
754 let clock = get_atomic_clock_realtime();
757 let future_id = (clock.get_time_ms() + 1_000_000).to_string();
758
759 let _: () = con
761 .xadd(
762 stream_key,
763 future_id,
764 &[("topic", "topic1"), ("payload", "data1")],
765 )
766 .await
767 .unwrap();
768
769 rx.close();
771
772 let handle = tokio::spawn(async move {
774 stream_messages(
775 tx,
776 DatabaseConfig::default(),
777 external_streams,
778 stream_signal_clone,
779 )
780 .await
781 .unwrap();
782 });
783
784 handle.await.unwrap();
786 flush_redis(&mut con).await.unwrap();
787 }
788
789 #[rstest]
790 #[tokio::test(flavor = "multi_thread")]
791 async fn test_stream_messages(#[future] redis_connection: ConnectionManager) {
792 let mut con = redis_connection.await;
793 let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
794
795 let trader_id = TraderId::from("tester-001");
796 let instance_id = UUID4::new();
797 let config = MessageBusConfig {
798 database: Some(DatabaseConfig::default()),
799 ..Default::default()
800 };
801
802 let stream_key = get_stream_key(trader_id, instance_id, &config);
803 let external_streams = vec![stream_key.clone()];
804 let stream_signal = Arc::new(AtomicBool::new(false));
805 let stream_signal_clone = stream_signal.clone();
806
807 let clock = get_atomic_clock_realtime();
810 let future_id = (clock.get_time_ms() + 1_000_000).to_string();
811
812 let _: () = con
814 .xadd(
815 stream_key,
816 future_id,
817 &[("topic", "topic1"), ("payload", "data1")],
818 )
819 .await
820 .unwrap();
821
822 let handle = tokio::spawn(async move {
824 stream_messages(
825 tx,
826 DatabaseConfig::default(),
827 external_streams,
828 stream_signal_clone,
829 )
830 .await
831 .unwrap();
832 });
833
834 let msg = rx.recv().await.unwrap();
836 assert_eq!(msg.topic, "topic1");
837 assert_eq!(msg.payload, Bytes::from("data1"));
838
839 rx.close();
841 stream_signal.store(true, Ordering::Relaxed);
842 handle.await.unwrap();
843 flush_redis(&mut con).await.unwrap();
844 }
845
846 #[rstest]
847 #[tokio::test(flavor = "multi_thread")]
848 async fn test_publish_messages(#[future] redis_connection: ConnectionManager) {
849 let mut con = redis_connection.await;
850 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<BusMessage>();
851
852 let trader_id = TraderId::from("tester-001");
853 let instance_id = UUID4::new();
854 let config = MessageBusConfig {
855 database: Some(DatabaseConfig::default()),
856 stream_per_topic: false,
857 ..Default::default()
858 };
859 let stream_key = get_stream_key(trader_id, instance_id, &config);
860
861 let handle = tokio::spawn(async move {
863 publish_messages(rx, trader_id, instance_id, config)
864 .await
865 .unwrap();
866 });
867
868 let msg = BusMessage::with_str_topic("test_topic", Bytes::from("test_payload"));
870 tx.send(msg).unwrap();
871
872 wait_until_async(
874 || {
875 let mut con = con.clone();
876 let stream_key = stream_key.clone();
877 async move {
878 let messages: RedisStreamBulk =
879 con.xread(&[&stream_key], &["0"]).await.unwrap();
880 !messages.is_empty()
881 }
882 },
883 Duration::from_secs(3),
884 )
885 .await;
886
887 let messages: RedisStreamBulk = con.xread(&[&stream_key], &["0"]).await.unwrap();
889 assert_eq!(messages.len(), 1);
890 let stream_msgs = messages[0].get(&stream_key).unwrap();
891 let stream_msg_array = &stream_msgs[0].values().next().unwrap();
892 let decoded_message = decode_bus_message(stream_msg_array).unwrap();
893 assert_eq!(decoded_message.topic, "test_topic");
894 assert_eq!(decoded_message.payload, Bytes::from("test_payload"));
895
896 let msg = BusMessage::new_close();
898 tx.send(msg).unwrap();
899
900 handle.await.unwrap();
902 flush_redis(&mut con).await.unwrap();
903 }
904
905 #[rstest]
906 #[tokio::test(flavor = "multi_thread")]
907 async fn test_stream_messages_multiple_streams(#[future] redis_connection: ConnectionManager) {
908 let mut con = redis_connection.await;
909 let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
910
911 let stream_key1 = "test:stream:1".to_string();
913 let stream_key2 = "test:stream:2".to_string();
914 let external_streams = vec![stream_key1.clone(), stream_key2.clone()];
915 let stream_signal = Arc::new(AtomicBool::new(false));
916 let stream_signal_clone = stream_signal.clone();
917
918 let clock = get_atomic_clock_realtime();
919 let base_id = clock.get_time_ms() + 1_000_000;
920
921 let handle = tokio::spawn(async move {
923 stream_messages(
924 tx,
925 DatabaseConfig::default(),
926 external_streams,
927 stream_signal_clone,
928 )
929 .await
930 .unwrap();
931 });
932
933 tokio::time::sleep(Duration::from_millis(200)).await;
934
935 let _: () = con
937 .xadd(
938 &stream_key1,
939 format!("{}", base_id + 100),
940 &[("topic", "stream1-first"), ("payload", "data")],
941 )
942 .await
943 .unwrap();
944
945 let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
946 .await
947 .expect("Stream 1 message should be received")
948 .unwrap();
949 assert_eq!(msg.topic, "stream1-first");
950
951 let _: () = con
953 .xadd(
954 &stream_key2,
955 format!("{}", base_id + 50),
956 &[("topic", "stream2-second"), ("payload", "data")],
957 )
958 .await
959 .unwrap();
960
961 let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
962 .await
963 .expect("Stream 2 message should be received")
964 .unwrap();
965 assert_eq!(msg.topic, "stream2-second");
966
967 rx.close();
969 stream_signal.store(true, Ordering::Relaxed);
970 handle.await.unwrap();
971 flush_redis(&mut con).await.unwrap();
972 }
973
974 #[rstest]
975 #[tokio::test(flavor = "multi_thread")]
976 async fn test_stream_messages_interleaved_at_different_rates(
977 #[future] redis_connection: ConnectionManager,
978 ) {
979 let mut con = redis_connection.await;
980 let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
981
982 let stream_key1 = "test:stream:interleaved:1".to_string();
984 let stream_key2 = "test:stream:interleaved:2".to_string();
985 let stream_key3 = "test:stream:interleaved:3".to_string();
986 let external_streams = vec![
987 stream_key1.clone(),
988 stream_key2.clone(),
989 stream_key3.clone(),
990 ];
991 let stream_signal = Arc::new(AtomicBool::new(false));
992 let stream_signal_clone = stream_signal.clone();
993
994 let clock = get_atomic_clock_realtime();
995 let base_id = clock.get_time_ms() + 1_000_000;
996
997 let handle = tokio::spawn(async move {
998 stream_messages(
999 tx,
1000 DatabaseConfig::default(),
1001 external_streams,
1002 stream_signal_clone,
1003 )
1004 .await
1005 .unwrap();
1006 });
1007
1008 tokio::time::sleep(Duration::from_millis(200)).await;
1009
1010 let _: () = con
1012 .xadd(
1013 &stream_key1,
1014 format!("{}", base_id + 100),
1015 &[("topic", "s1m1"), ("payload", "data")],
1016 )
1017 .await
1018 .unwrap();
1019 let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
1020 .await
1021 .expect("Stream 1 message should be received")
1022 .unwrap();
1023 assert_eq!(msg.topic, "s1m1");
1024
1025 let _: () = con
1027 .xadd(
1028 &stream_key2,
1029 format!("{}", base_id + 50),
1030 &[("topic", "s2m1"), ("payload", "data")],
1031 )
1032 .await
1033 .unwrap();
1034 let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
1035 .await
1036 .expect("Stream 2 message should be received")
1037 .unwrap();
1038 assert_eq!(msg.topic, "s2m1");
1039
1040 let _: () = con
1042 .xadd(
1043 &stream_key3,
1044 format!("{}", base_id + 25),
1045 &[("topic", "s3m1"), ("payload", "data")],
1046 )
1047 .await
1048 .unwrap();
1049 let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
1050 .await
1051 .expect("Stream 3 message should be received")
1052 .unwrap();
1053 assert_eq!(msg.topic, "s3m1");
1054
1055 rx.close();
1057 stream_signal.store(true, Ordering::Relaxed);
1058 handle.await.unwrap();
1059 flush_redis(&mut con).await.unwrap();
1060 }
1061
1062 #[rstest]
1063 #[tokio::test(flavor = "multi_thread")]
1064 async fn test_close() {
1065 let trader_id = TraderId::from("tester-001");
1066 let instance_id = UUID4::new();
1067 let config = MessageBusConfig {
1068 database: Some(DatabaseConfig::default()),
1069 ..Default::default()
1070 };
1071
1072 let mut db = RedisMessageBusDatabase::new(trader_id, instance_id, config).unwrap();
1073
1074 db.close();
1076 }
1077
1078 #[rstest]
1079 #[tokio::test(flavor = "multi_thread")]
1080 async fn test_heartbeat_task() {
1081 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<BusMessage>();
1082 let signal = Arc::new(AtomicBool::new(false));
1083
1084 let handle = tokio::spawn(run_heartbeat(1, signal.clone(), tx));
1086
1087 tokio::time::sleep(Duration::from_secs(2)).await;
1089
1090 signal.store(true, Ordering::Relaxed);
1092 handle.await.unwrap();
1093
1094 let mut heartbeats: Vec<BusMessage> = Vec::new();
1096 while let Ok(hb) = rx.try_recv() {
1097 heartbeats.push(hb);
1098 }
1099
1100 assert!(!heartbeats.is_empty());
1101
1102 for hb in heartbeats {
1103 assert_eq!(hb.topic, HEARTBEAT_TOPIC);
1104 }
1105 }
1106}