Skip to main content

nautilus_infrastructure/redis/
msgbus.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Redis-backed message bus database for the system.
17//!
18//! # Architecture
19//!
20//! Runs background tasks on `get_runtime()` for publishing, stream reading,
21//! and heartbeats. Messages are sent via an unbounded `tokio::sync::mpsc`
22//! channel to the publish task, which buffers and writes them to Redis
23//! streams. Each background task owns its own Redis connection created on
24//! the Nautilus runtime.
25//!
26//! Handles are stored as `Option<JoinHandle>` for idempotent shutdown via
27//! `close_async()`. The synchronous `close()` uses `block_in_place` to
28//! bridge into the async shutdown path and must be called from outside any
29//! `current_thread` Tokio runtime.
30
31use std::{
32    collections::{HashMap, VecDeque},
33    fmt::Debug,
34    sync::{
35        Arc,
36        atomic::{AtomicBool, Ordering},
37    },
38    time::Duration,
39};
40
41use bytes::Bytes;
42use futures::stream::Stream;
43use nautilus_common::{
44    live::get_runtime,
45    logging::{log_task_error, log_task_started, log_task_stopped},
46    msgbus::{
47        BusMessage,
48        database::{DatabaseConfig, MessageBusConfig, MessageBusDatabaseAdapter},
49        switchboard::CLOSE_TOPIC,
50    },
51};
52use nautilus_core::{
53    UUID4,
54    time::{duration_since_unix_epoch, get_atomic_clock_realtime},
55};
56use nautilus_cryptography::providers::install_cryptographic_provider;
57use nautilus_model::identifiers::TraderId;
58use redis::{AsyncCommands, streams};
59use streams::StreamReadOptions;
60use ustr::Ustr;
61
62use super::{REDIS_MINID, REDIS_XTRIM, await_handle};
63use crate::redis::{create_redis_connection, get_stream_key};
64
65const MSGBUS_PUBLISH: &str = "msgbus-publish";
66const MSGBUS_STREAM: &str = "msgbus-stream";
67const MSGBUS_HEARTBEAT: &str = "msgbus-heartbeat";
68const HEARTBEAT_TOPIC: &str = "health:heartbeat";
69const TRIM_BUFFER_SECS: u64 = 60;
70
71type RedisStreamBulk = Vec<HashMap<String, Vec<HashMap<String, redis::Value>>>>;
72
73#[cfg_attr(
74    feature = "python",
75    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
76)]
77pub struct RedisMessageBusDatabase {
78    /// The trader ID for this message bus database.
79    pub trader_id: TraderId,
80    /// The instance ID for this message bus database.
81    pub instance_id: UUID4,
82    pub_tx: tokio::sync::mpsc::UnboundedSender<BusMessage>,
83    pub_handle: Option<tokio::task::JoinHandle<()>>,
84    stream_rx: Option<tokio::sync::mpsc::Receiver<BusMessage>>,
85    stream_handle: Option<tokio::task::JoinHandle<()>>,
86    stream_signal: Arc<AtomicBool>,
87    heartbeat_handle: Option<tokio::task::JoinHandle<()>>,
88    heartbeat_signal: Arc<AtomicBool>,
89}
90
91impl Debug for RedisMessageBusDatabase {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        f.debug_struct(stringify!(RedisMessageBusDatabase))
94            .field("trader_id", &self.trader_id)
95            .field("instance_id", &self.instance_id)
96            .finish()
97    }
98}
99
100impl MessageBusDatabaseAdapter for RedisMessageBusDatabase {
101    type DatabaseType = Self;
102
103    /// Creates a new [`RedisMessageBusDatabase`] instance for the given `trader_id`, `instance_id`, and `config`.
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if:
108    /// - The database configuration is missing in `config`.
109    /// - Establishing the Redis connection for publishing fails.
110    fn new(
111        trader_id: TraderId,
112        instance_id: UUID4,
113        config: MessageBusConfig,
114    ) -> anyhow::Result<Self> {
115        install_cryptographic_provider();
116
117        let config_clone = config.clone();
118        let db_config = config
119            .database
120            .clone()
121            .ok_or_else(|| anyhow::anyhow!("No database config"))?;
122
123        let (pub_tx, pub_rx) = tokio::sync::mpsc::unbounded_channel::<BusMessage>();
124
125        // Create publish task (start the runtime here for now)
126        let pub_handle = Some(get_runtime().spawn(async move {
127            if let Err(e) = publish_messages(pub_rx, trader_id, instance_id, config_clone).await {
128                log_task_error(MSGBUS_PUBLISH, &e);
129            }
130        }));
131
132        // Conditionally create stream task and channel if external streams configured
133        let external_streams = config.external_streams.clone().unwrap_or_default();
134        let stream_signal = Arc::new(AtomicBool::new(false));
135        let (stream_rx, stream_handle) = if external_streams.is_empty() {
136            (None, None)
137        } else {
138            let stream_signal_clone = stream_signal.clone();
139            let (stream_tx, stream_rx) = tokio::sync::mpsc::channel::<BusMessage>(100_000);
140            (
141                Some(stream_rx),
142                Some(get_runtime().spawn(async move {
143                    if let Err(e) =
144                        stream_messages(stream_tx, db_config, external_streams, stream_signal_clone)
145                            .await
146                    {
147                        log_task_error(MSGBUS_STREAM, &e);
148                    }
149                })),
150            )
151        };
152
153        // Create heartbeat task
154        let heartbeat_signal = Arc::new(AtomicBool::new(false));
155        let heartbeat_handle = if let Some(heartbeat_interval_secs) = config.heartbeat_interval_secs
156        {
157            let signal = heartbeat_signal.clone();
158            let pub_tx_clone = pub_tx.clone();
159
160            Some(get_runtime().spawn(async move {
161                run_heartbeat(heartbeat_interval_secs, signal, pub_tx_clone).await;
162            }))
163        } else {
164            None
165        };
166
167        Ok(Self {
168            trader_id,
169            instance_id,
170            pub_tx,
171            pub_handle,
172            stream_rx,
173            stream_handle,
174            stream_signal,
175            heartbeat_handle,
176            heartbeat_signal,
177        })
178    }
179
180    /// Returns whether the message bus database adapter publishing channel is closed.
181    fn is_closed(&self) -> bool {
182        self.pub_tx.is_closed()
183    }
184
185    /// Publishes a message with the given `topic` and `payload`.
186    fn publish(&self, topic: Ustr, payload: Bytes) {
187        let msg = BusMessage::new(topic, payload);
188        if let Err(e) = self.pub_tx.send(msg) {
189            log::error!("Failed to send message: {e}");
190        }
191    }
192
193    /// Closes the message bus database adapter.
194    fn close(&mut self) {
195        log::debug!("Closing");
196
197        self.stream_signal.store(true, Ordering::Relaxed);
198        self.heartbeat_signal.store(true, Ordering::Relaxed);
199
200        if !self.pub_tx.is_closed() {
201            let msg = BusMessage::new_close();
202
203            if let Err(e) = self.pub_tx.send(msg) {
204                log::error!("Failed to send close message: {e:?}");
205            }
206        }
207
208        // Keep close sync for now to avoid async trait method
209        tokio::task::block_in_place(|| {
210            get_runtime().block_on(async {
211                self.close_async().await;
212            });
213        });
214
215        log::debug!("Closed");
216    }
217}
218
219impl RedisMessageBusDatabase {
220    /// Retrieves the Redis stream receiver for this message bus instance.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the stream receiver has already been taken.
225    pub fn get_stream_receiver(
226        &mut self,
227    ) -> anyhow::Result<tokio::sync::mpsc::Receiver<BusMessage>> {
228        self.stream_rx
229            .take()
230            .ok_or_else(|| anyhow::anyhow!("Stream receiver already taken"))
231    }
232
233    /// Streams messages arriving on the stream receiver channel.
234    pub fn stream(
235        mut stream_rx: tokio::sync::mpsc::Receiver<BusMessage>,
236    ) -> impl Stream<Item = BusMessage> + 'static {
237        async_stream::stream! {
238            while let Some(msg) = stream_rx.recv().await {
239                yield msg;
240            }
241        }
242    }
243
244    pub async fn close_async(&mut self) {
245        await_handle(self.pub_handle.take(), MSGBUS_PUBLISH).await;
246        await_handle(self.stream_handle.take(), MSGBUS_STREAM).await;
247        await_handle(self.heartbeat_handle.take(), MSGBUS_HEARTBEAT).await;
248    }
249}
250
251/// Publishes messages received on `rx` to Redis streams for the given `trader_id` and `instance_id`, using `config`.
252///
253/// # Errors
254///
255/// Returns an error if:
256/// - The database configuration is missing in `config`.
257/// - Establishing the Redis connection fails.
258/// - Any Redis command fails during publishing.
259pub async fn publish_messages(
260    mut rx: tokio::sync::mpsc::UnboundedReceiver<BusMessage>,
261    trader_id: TraderId,
262    instance_id: UUID4,
263    config: MessageBusConfig,
264) -> anyhow::Result<()> {
265    log_task_started(MSGBUS_PUBLISH);
266
267    let db_config = config
268        .database
269        .as_ref()
270        .ok_or_else(|| anyhow::anyhow!("No database config"))?;
271    let mut con = create_redis_connection(MSGBUS_PUBLISH, db_config.clone()).await?;
272    let stream_key = get_stream_key(trader_id, instance_id, &config);
273
274    // Auto-trimming
275    let autotrim_duration = config
276        .autotrim_mins
277        .filter(|&mins| mins > 0)
278        .map(|mins| Duration::from_secs(u64::from(mins) * 60));
279    let mut last_trim_index: HashMap<String, usize> = HashMap::new();
280
281    // Buffering
282    let mut buffer: VecDeque<BusMessage> = VecDeque::new();
283    let buffer_interval = Duration::from_millis(u64::from(config.buffer_interval_ms.unwrap_or(0)));
284
285    // A sleep used to trigger periodic flushing of the buffer.
286    // When `buffer_interval` is zero we skip using the timer and flush immediately
287    // after every message.
288    let flush_timer = tokio::time::sleep(buffer_interval);
289    tokio::pin!(flush_timer);
290
291    loop {
292        tokio::select! {
293            maybe_msg = rx.recv() => {
294                if let Some(msg) = maybe_msg {
295                    if msg.topic == CLOSE_TOPIC {
296                        log::debug!("Received close message");
297                        // Ensure we exit the loop after flushing any remaining messages.
298                        if !buffer.is_empty() {
299                            drain_buffer(
300                                &mut con,
301                                &stream_key,
302                                config.stream_per_topic,
303                                autotrim_duration,
304                                &mut last_trim_index,
305                                &mut buffer,
306                            ).await?;
307                        }
308                        break;
309                    }
310
311                    buffer.push_back(msg);
312
313                    if buffer_interval.is_zero() {
314                        // Immediate flush mode
315                        drain_buffer(
316                            &mut con,
317                            &stream_key,
318                            config.stream_per_topic,
319                            autotrim_duration,
320                            &mut last_trim_index,
321                            &mut buffer,
322                        ).await?;
323                    }
324                } else {
325                    log::debug!("Channel hung up");
326                    break;
327                }
328            }
329            // Only poll the timer when the interval is non-zero. This avoids
330            // unnecessarily waking the task when immediate flushing is enabled.
331            () = &mut flush_timer, if !buffer_interval.is_zero() => {
332                if !buffer.is_empty() {
333                    drain_buffer(
334                        &mut con,
335                        &stream_key,
336                        config.stream_per_topic,
337                        autotrim_duration,
338                        &mut last_trim_index,
339                        &mut buffer,
340                    ).await?;
341                }
342
343                // Schedule the next tick
344                flush_timer.as_mut().reset(tokio::time::Instant::now() + buffer_interval);
345            }
346        }
347    }
348
349    // Drain any remaining messages
350    if !buffer.is_empty() {
351        drain_buffer(
352            &mut con,
353            &stream_key,
354            config.stream_per_topic,
355            autotrim_duration,
356            &mut last_trim_index,
357            &mut buffer,
358        )
359        .await?;
360    }
361
362    log_task_stopped(MSGBUS_PUBLISH);
363    Ok(())
364}
365
366async fn drain_buffer(
367    conn: &mut redis::aio::ConnectionManager,
368    stream_key: &str,
369    stream_per_topic: bool,
370    autotrim_duration: Option<Duration>,
371    last_trim_index: &mut HashMap<String, usize>,
372    buffer: &mut VecDeque<BusMessage>,
373) -> anyhow::Result<()> {
374    let mut pipe = redis::pipe();
375    pipe.atomic();
376
377    for msg in buffer.drain(..) {
378        let items: Vec<(&str, &[u8])> = vec![
379            ("topic", msg.topic.as_ref()),
380            ("payload", msg.payload.as_ref()),
381        ];
382        let stream_key = if stream_per_topic {
383            format!("{stream_key}:{}", &msg.topic)
384        } else {
385            stream_key.to_string()
386        };
387        pipe.xadd(&stream_key, "*", &items);
388
389        if autotrim_duration.is_none() {
390            continue; // Nothing else to do
391        }
392
393        // Autotrim stream
394        let last_trim_ms = last_trim_index.entry(stream_key.clone()).or_insert(0); // Remove clone
395        let unix_duration_now = duration_since_unix_epoch();
396        let trim_buffer = Duration::from_secs(TRIM_BUFFER_SECS);
397
398        // Improve efficiency of this by batching
399        if *last_trim_ms < (unix_duration_now - trim_buffer).as_millis() as usize {
400            let min_timestamp_ms =
401                (unix_duration_now - autotrim_duration.unwrap()).as_millis() as usize;
402            let result: Result<(), redis::RedisError> = redis::cmd(REDIS_XTRIM)
403                .arg(stream_key.clone())
404                .arg(REDIS_MINID)
405                .arg(min_timestamp_ms)
406                .query_async(conn)
407                .await;
408
409            if let Err(e) = result {
410                log::error!("Error trimming stream '{stream_key}': {e}");
411            } else {
412                last_trim_index.insert(stream_key.clone(), unix_duration_now.as_millis() as usize);
413            }
414        }
415    }
416
417    pipe.query_async(conn).await.map_err(anyhow::Error::from)
418}
419
420/// Streams messages from Redis streams and sends them over the provided `tx` channel.
421///
422/// # Errors
423///
424/// Returns an error if:
425/// - Establishing the Redis connection fails.
426/// - Any Redis read operation fails.
427pub async fn stream_messages(
428    tx: tokio::sync::mpsc::Sender<BusMessage>,
429    config: DatabaseConfig,
430    stream_keys: Vec<String>,
431    stream_signal: Arc<AtomicBool>,
432) -> anyhow::Result<()> {
433    log_task_started(MSGBUS_STREAM);
434
435    let mut con = create_redis_connection(MSGBUS_STREAM, config).await?;
436
437    let stream_keys = &stream_keys
438        .iter()
439        .map(String::as_str)
440        .collect::<Vec<&str>>();
441
442    log::debug!("Listening to streams: [{}]", stream_keys.join(", "));
443
444    // Start streaming from current timestamp
445    let clock = get_atomic_clock_realtime();
446    let timestamp_ms = clock.get_time_ms();
447    let initial_id = timestamp_ms.to_string();
448
449    let mut last_ids: HashMap<String, String> = stream_keys
450        .iter()
451        .map(|&key| (key.to_string(), initial_id.clone()))
452        .collect();
453
454    let opts = StreamReadOptions::default().block(100);
455
456    'outer: loop {
457        if stream_signal.load(Ordering::Relaxed) {
458            log::debug!("Received streaming terminate signal");
459            break;
460        }
461
462        let ids: Vec<String> = stream_keys
463            .iter()
464            .map(|&key| last_ids[key].clone())
465            .collect();
466        let id_refs: Vec<&str> = ids.iter().map(String::as_str).collect();
467
468        let result: Result<RedisStreamBulk, _> =
469            con.xread_options(&[&stream_keys], &[&id_refs], &opts).await;
470
471        match result {
472            Ok(stream_bulk) => {
473                if stream_bulk.is_empty() {
474                    // Timeout occurred: no messages received
475                    continue;
476                }
477
478                for entry in &stream_bulk {
479                    for (stream_key, stream_msgs) in entry {
480                        for stream_msg in stream_msgs {
481                            for (id, array) in stream_msg {
482                                last_ids.insert(stream_key.clone(), id.clone());
483
484                                match decode_bus_message(array) {
485                                    Ok(msg) => {
486                                        if let Err(e) = tx.send(msg).await {
487                                            log::debug!("Channel closed: {e:?}");
488                                            break 'outer; // End streaming
489                                        }
490                                    }
491                                    Err(e) => {
492                                        log::error!("{e:?}");
493                                    }
494                                }
495                            }
496                        }
497                    }
498                }
499            }
500            Err(e) => {
501                anyhow::bail!("Error reading from stream: {e:?}");
502            }
503        }
504    }
505
506    log_task_stopped(MSGBUS_STREAM);
507    Ok(())
508}
509
510/// Decodes a Redis stream message value into a `BusMessage`.
511///
512/// # Errors
513///
514/// Returns an error if:
515/// - The incoming `stream_msg` is not an array.
516/// - The array has fewer than four elements (invalid format).
517/// - Parsing the topic or payload fails.
518fn decode_bus_message(stream_msg: &redis::Value) -> anyhow::Result<BusMessage> {
519    if let redis::Value::Array(stream_msg) = stream_msg {
520        if stream_msg.len() < 4 {
521            anyhow::bail!("Invalid stream message format: {stream_msg:?}");
522        }
523
524        let topic = match &stream_msg[1] {
525            redis::Value::BulkString(bytes) => match String::from_utf8(bytes.clone()) {
526                Ok(topic) => topic,
527                Err(e) => anyhow::bail!("Error parsing topic: {e}"),
528            },
529            _ => {
530                anyhow::bail!("Invalid topic format: {stream_msg:?}");
531            }
532        };
533
534        let payload = match &stream_msg[3] {
535            redis::Value::BulkString(bytes) => Bytes::copy_from_slice(bytes),
536            _ => {
537                anyhow::bail!("Invalid payload format: {stream_msg:?}");
538            }
539        };
540
541        Ok(BusMessage::with_str_topic(topic, payload))
542    } else {
543        anyhow::bail!("Invalid stream message format: {stream_msg:?}")
544    }
545}
546
547async fn run_heartbeat(
548    heartbeat_interval_secs: u16,
549    signal: Arc<AtomicBool>,
550    pub_tx: tokio::sync::mpsc::UnboundedSender<BusMessage>,
551) {
552    log_task_started("heartbeat");
553    log::debug!("Heartbeat at {heartbeat_interval_secs} second intervals");
554
555    let heartbeat_interval = Duration::from_secs(u64::from(heartbeat_interval_secs));
556    let heartbeat_timer = tokio::time::interval(heartbeat_interval);
557
558    let check_interval = Duration::from_millis(100);
559    let check_timer = tokio::time::interval(check_interval);
560
561    tokio::pin!(heartbeat_timer);
562    tokio::pin!(check_timer);
563
564    loop {
565        if signal.load(Ordering::Relaxed) {
566            log::debug!("Received heartbeat terminate signal");
567            break;
568        }
569
570        tokio::select! {
571            _ = heartbeat_timer.tick() => {
572                let heartbeat = create_heartbeat_msg();
573                if let Err(e) = pub_tx.send(heartbeat) {
574                    // We expect an error if the channel is closed during shutdown
575                    log::debug!("Error sending heartbeat: {e}");
576                }
577            },
578            _ = check_timer.tick() => {}
579        }
580    }
581
582    log_task_stopped("heartbeat");
583}
584
585fn create_heartbeat_msg() -> BusMessage {
586    let payload = Bytes::from(chrono::Utc::now().to_rfc3339().into_bytes());
587    BusMessage::with_str_topic(HEARTBEAT_TOPIC, payload)
588}
589
590#[cfg(test)]
591mod tests {
592    use redis::Value;
593    use rstest::*;
594
595    use super::*;
596
597    #[rstest]
598    fn test_decode_bus_message_valid() {
599        let stream_msg = Value::Array(vec![
600            Value::BulkString(b"0".to_vec()),
601            Value::BulkString(b"topic1".to_vec()),
602            Value::BulkString(b"unused".to_vec()),
603            Value::BulkString(b"data1".to_vec()),
604        ]);
605
606        let result = decode_bus_message(&stream_msg);
607        assert!(result.is_ok());
608        let msg = result.unwrap();
609        assert_eq!(msg.topic, "topic1");
610        assert_eq!(msg.payload, Bytes::from("data1"));
611    }
612
613    #[rstest]
614    fn test_decode_bus_message_missing_fields() {
615        let stream_msg = Value::Array(vec![
616            Value::BulkString(b"0".to_vec()),
617            Value::BulkString(b"topic1".to_vec()),
618        ]);
619
620        let result = decode_bus_message(&stream_msg);
621        assert!(result.is_err());
622        assert_eq!(
623            format!("{}", result.unwrap_err()),
624            "Invalid stream message format: [bulk-string('\"0\"'), bulk-string('\"topic1\"')]"
625        );
626    }
627
628    #[rstest]
629    fn test_decode_bus_message_invalid_topic_format() {
630        let stream_msg = Value::Array(vec![
631            Value::BulkString(b"0".to_vec()),
632            Value::Int(42), // Invalid topic format
633            Value::BulkString(b"unused".to_vec()),
634            Value::BulkString(b"data1".to_vec()),
635        ]);
636
637        let result = decode_bus_message(&stream_msg);
638        assert!(result.is_err());
639        assert_eq!(
640            format!("{}", result.unwrap_err()),
641            "Invalid topic format: [bulk-string('\"0\"'), int(42), bulk-string('\"unused\"'), bulk-string('\"data1\"')]"
642        );
643    }
644
645    #[rstest]
646    fn test_decode_bus_message_invalid_payload_format() {
647        let stream_msg = Value::Array(vec![
648            Value::BulkString(b"0".to_vec()),
649            Value::BulkString(b"topic1".to_vec()),
650            Value::BulkString(b"unused".to_vec()),
651            Value::Int(42), // Invalid payload format
652        ]);
653
654        let result = decode_bus_message(&stream_msg);
655        assert!(result.is_err());
656        assert_eq!(
657            format!("{}", result.unwrap_err()),
658            "Invalid payload format: [bulk-string('\"0\"'), bulk-string('\"topic1\"'), bulk-string('\"unused\"'), int(42)]"
659        );
660    }
661
662    #[rstest]
663    fn test_decode_bus_message_invalid_stream_msg_format() {
664        let stream_msg = Value::BulkString(b"not an array".to_vec());
665
666        let result = decode_bus_message(&stream_msg);
667        assert!(result.is_err());
668        assert_eq!(
669            format!("{}", result.unwrap_err()),
670            "Invalid stream message format: bulk-string('\"not an array\"')"
671        );
672    }
673}
674
675#[cfg(target_os = "linux")] // Run Redis tests on Linux platforms only
676#[cfg(test)]
677mod serial_tests {
678    use nautilus_common::testing::wait_until_async;
679    use redis::aio::ConnectionManager;
680    use rstest::*;
681
682    use super::*;
683    use crate::redis::flush_redis;
684
685    #[fixture]
686    async fn redis_connection() -> ConnectionManager {
687        let config = DatabaseConfig::default();
688        let mut con = create_redis_connection(MSGBUS_STREAM, config)
689            .await
690            .unwrap();
691        flush_redis(&mut con).await.unwrap();
692        con
693    }
694
695    #[rstest]
696    #[tokio::test(flavor = "multi_thread")]
697    async fn test_stream_messages_terminate_signal(#[future] redis_connection: ConnectionManager) {
698        let mut con = redis_connection.await;
699        let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
700
701        let trader_id = TraderId::from("tester-001");
702        let instance_id = UUID4::new();
703        let config = MessageBusConfig {
704            database: Some(DatabaseConfig::default()),
705            ..Default::default()
706        };
707
708        let stream_key = get_stream_key(trader_id, instance_id, &config);
709        let external_streams = vec![stream_key.clone()];
710        let stream_signal = Arc::new(AtomicBool::new(false));
711        let stream_signal_clone = stream_signal.clone();
712
713        // Start the message streaming task
714        let handle = tokio::spawn(async move {
715            stream_messages(
716                tx,
717                DatabaseConfig::default(),
718                external_streams,
719                stream_signal_clone,
720            )
721            .await
722            .unwrap();
723        });
724
725        stream_signal.store(true, Ordering::Relaxed);
726        let _ = rx.recv().await; // Wait for the tx to close
727
728        // Shutdown and cleanup
729        rx.close();
730        handle.await.unwrap();
731        flush_redis(&mut con).await.unwrap();
732    }
733
734    #[rstest]
735    #[tokio::test(flavor = "multi_thread")]
736    async fn test_stream_messages_when_receiver_closed(
737        #[future] redis_connection: ConnectionManager,
738    ) {
739        let mut con = redis_connection.await;
740        let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
741
742        let trader_id = TraderId::from("tester-001");
743        let instance_id = UUID4::new();
744        let config = MessageBusConfig {
745            database: Some(DatabaseConfig::default()),
746            ..Default::default()
747        };
748
749        let stream_key = get_stream_key(trader_id, instance_id, &config);
750        let external_streams = vec![stream_key.clone()];
751        let stream_signal = Arc::new(AtomicBool::new(false));
752        let stream_signal_clone = stream_signal.clone();
753
754        // Use a message ID in the future, as streaming begins
755        // around the timestamp the task is spawned.
756        let clock = get_atomic_clock_realtime();
757        let future_id = (clock.get_time_ms() + 1_000_000).to_string();
758
759        // Publish test message
760        let _: () = con
761            .xadd(
762                stream_key,
763                future_id,
764                &[("topic", "topic1"), ("payload", "data1")],
765            )
766            .await
767            .unwrap();
768
769        // Immediately close channel
770        rx.close();
771
772        // Start the message streaming task
773        let handle = tokio::spawn(async move {
774            stream_messages(
775                tx,
776                DatabaseConfig::default(),
777                external_streams,
778                stream_signal_clone,
779            )
780            .await
781            .unwrap();
782        });
783
784        // Shutdown and cleanup
785        handle.await.unwrap();
786        flush_redis(&mut con).await.unwrap();
787    }
788
789    #[rstest]
790    #[tokio::test(flavor = "multi_thread")]
791    async fn test_stream_messages(#[future] redis_connection: ConnectionManager) {
792        let mut con = redis_connection.await;
793        let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
794
795        let trader_id = TraderId::from("tester-001");
796        let instance_id = UUID4::new();
797        let config = MessageBusConfig {
798            database: Some(DatabaseConfig::default()),
799            ..Default::default()
800        };
801
802        let stream_key = get_stream_key(trader_id, instance_id, &config);
803        let external_streams = vec![stream_key.clone()];
804        let stream_signal = Arc::new(AtomicBool::new(false));
805        let stream_signal_clone = stream_signal.clone();
806
807        // Use a message ID in the future, as streaming begins
808        // around the timestamp the task is spawned.
809        let clock = get_atomic_clock_realtime();
810        let future_id = (clock.get_time_ms() + 1_000_000).to_string();
811
812        // Publish test message
813        let _: () = con
814            .xadd(
815                stream_key,
816                future_id,
817                &[("topic", "topic1"), ("payload", "data1")],
818            )
819            .await
820            .unwrap();
821
822        // Start the message streaming task
823        let handle = tokio::spawn(async move {
824            stream_messages(
825                tx,
826                DatabaseConfig::default(),
827                external_streams,
828                stream_signal_clone,
829            )
830            .await
831            .unwrap();
832        });
833
834        // Receive and verify the message
835        let msg = rx.recv().await.unwrap();
836        assert_eq!(msg.topic, "topic1");
837        assert_eq!(msg.payload, Bytes::from("data1"));
838
839        // Shutdown and cleanup
840        rx.close();
841        stream_signal.store(true, Ordering::Relaxed);
842        handle.await.unwrap();
843        flush_redis(&mut con).await.unwrap();
844    }
845
846    #[rstest]
847    #[tokio::test(flavor = "multi_thread")]
848    async fn test_publish_messages(#[future] redis_connection: ConnectionManager) {
849        let mut con = redis_connection.await;
850        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<BusMessage>();
851
852        let trader_id = TraderId::from("tester-001");
853        let instance_id = UUID4::new();
854        let config = MessageBusConfig {
855            database: Some(DatabaseConfig::default()),
856            stream_per_topic: false,
857            ..Default::default()
858        };
859        let stream_key = get_stream_key(trader_id, instance_id, &config);
860
861        // Start the publish_messages task
862        let handle = tokio::spawn(async move {
863            publish_messages(rx, trader_id, instance_id, config)
864                .await
865                .unwrap();
866        });
867
868        // Send a test message
869        let msg = BusMessage::with_str_topic("test_topic", Bytes::from("test_payload"));
870        tx.send(msg).unwrap();
871
872        // Wait until the message is published to Redis
873        wait_until_async(
874            || {
875                let mut con = con.clone();
876                let stream_key = stream_key.clone();
877                async move {
878                    let messages: RedisStreamBulk =
879                        con.xread(&[&stream_key], &["0"]).await.unwrap();
880                    !messages.is_empty()
881                }
882            },
883            Duration::from_secs(3),
884        )
885        .await;
886
887        // Verify the message was published to Redis
888        let messages: RedisStreamBulk = con.xread(&[&stream_key], &["0"]).await.unwrap();
889        assert_eq!(messages.len(), 1);
890        let stream_msgs = messages[0].get(&stream_key).unwrap();
891        let stream_msg_array = &stream_msgs[0].values().next().unwrap();
892        let decoded_message = decode_bus_message(stream_msg_array).unwrap();
893        assert_eq!(decoded_message.topic, "test_topic");
894        assert_eq!(decoded_message.payload, Bytes::from("test_payload"));
895
896        // Stop publishing task
897        let msg = BusMessage::new_close();
898        tx.send(msg).unwrap();
899
900        // Shutdown and cleanup
901        handle.await.unwrap();
902        flush_redis(&mut con).await.unwrap();
903    }
904
905    #[rstest]
906    #[tokio::test(flavor = "multi_thread")]
907    async fn test_stream_messages_multiple_streams(#[future] redis_connection: ConnectionManager) {
908        let mut con = redis_connection.await;
909        let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
910
911        // Setup multiple stream keys
912        let stream_key1 = "test:stream:1".to_string();
913        let stream_key2 = "test:stream:2".to_string();
914        let external_streams = vec![stream_key1.clone(), stream_key2.clone()];
915        let stream_signal = Arc::new(AtomicBool::new(false));
916        let stream_signal_clone = stream_signal.clone();
917
918        let clock = get_atomic_clock_realtime();
919        let base_id = clock.get_time_ms() + 1_000_000;
920
921        // Start streaming task
922        let handle = tokio::spawn(async move {
923            stream_messages(
924                tx,
925                DatabaseConfig::default(),
926                external_streams,
927                stream_signal_clone,
928            )
929            .await
930            .unwrap();
931        });
932
933        tokio::time::sleep(Duration::from_millis(200)).await;
934
935        // Publish to stream 1 at higher ID
936        let _: () = con
937            .xadd(
938                &stream_key1,
939                format!("{}", base_id + 100),
940                &[("topic", "stream1-first"), ("payload", "data")],
941            )
942            .await
943            .unwrap();
944
945        let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
946            .await
947            .expect("Stream 1 message should be received")
948            .unwrap();
949        assert_eq!(msg.topic, "stream1-first");
950
951        // Publish to stream 2 at lower ID (tests independent cursor tracking)
952        let _: () = con
953            .xadd(
954                &stream_key2,
955                format!("{}", base_id + 50),
956                &[("topic", "stream2-second"), ("payload", "data")],
957            )
958            .await
959            .unwrap();
960
961        let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
962            .await
963            .expect("Stream 2 message should be received")
964            .unwrap();
965        assert_eq!(msg.topic, "stream2-second");
966
967        // Shutdown and cleanup
968        rx.close();
969        stream_signal.store(true, Ordering::Relaxed);
970        handle.await.unwrap();
971        flush_redis(&mut con).await.unwrap();
972    }
973
974    #[rstest]
975    #[tokio::test(flavor = "multi_thread")]
976    async fn test_stream_messages_interleaved_at_different_rates(
977        #[future] redis_connection: ConnectionManager,
978    ) {
979        let mut con = redis_connection.await;
980        let (tx, mut rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
981
982        // Setup multiple stream keys
983        let stream_key1 = "test:stream:interleaved:1".to_string();
984        let stream_key2 = "test:stream:interleaved:2".to_string();
985        let stream_key3 = "test:stream:interleaved:3".to_string();
986        let external_streams = vec![
987            stream_key1.clone(),
988            stream_key2.clone(),
989            stream_key3.clone(),
990        ];
991        let stream_signal = Arc::new(AtomicBool::new(false));
992        let stream_signal_clone = stream_signal.clone();
993
994        let clock = get_atomic_clock_realtime();
995        let base_id = clock.get_time_ms() + 1_000_000;
996
997        let handle = tokio::spawn(async move {
998            stream_messages(
999                tx,
1000                DatabaseConfig::default(),
1001                external_streams,
1002                stream_signal_clone,
1003            )
1004            .await
1005            .unwrap();
1006        });
1007
1008        tokio::time::sleep(Duration::from_millis(200)).await;
1009
1010        // Stream 1 advances with high ID
1011        let _: () = con
1012            .xadd(
1013                &stream_key1,
1014                format!("{}", base_id + 100),
1015                &[("topic", "s1m1"), ("payload", "data")],
1016            )
1017            .await
1018            .unwrap();
1019        let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
1020            .await
1021            .expect("Stream 1 message should be received")
1022            .unwrap();
1023        assert_eq!(msg.topic, "s1m1");
1024
1025        // Stream 2 gets message at lower ID - would be skipped with global cursor
1026        let _: () = con
1027            .xadd(
1028                &stream_key2,
1029                format!("{}", base_id + 50),
1030                &[("topic", "s2m1"), ("payload", "data")],
1031            )
1032            .await
1033            .unwrap();
1034        let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
1035            .await
1036            .expect("Stream 2 message should be received")
1037            .unwrap();
1038        assert_eq!(msg.topic, "s2m1");
1039
1040        // Stream 3 gets message at even lower ID
1041        let _: () = con
1042            .xadd(
1043                &stream_key3,
1044                format!("{}", base_id + 25),
1045                &[("topic", "s3m1"), ("payload", "data")],
1046            )
1047            .await
1048            .unwrap();
1049        let msg = tokio::time::timeout(Duration::from_secs(2), rx.recv())
1050            .await
1051            .expect("Stream 3 message should be received")
1052            .unwrap();
1053        assert_eq!(msg.topic, "s3m1");
1054
1055        // Shutdown and cleanup
1056        rx.close();
1057        stream_signal.store(true, Ordering::Relaxed);
1058        handle.await.unwrap();
1059        flush_redis(&mut con).await.unwrap();
1060    }
1061
1062    #[rstest]
1063    #[tokio::test(flavor = "multi_thread")]
1064    async fn test_close() {
1065        let trader_id = TraderId::from("tester-001");
1066        let instance_id = UUID4::new();
1067        let config = MessageBusConfig {
1068            database: Some(DatabaseConfig::default()),
1069            ..Default::default()
1070        };
1071
1072        let mut db = RedisMessageBusDatabase::new(trader_id, instance_id, config).unwrap();
1073
1074        // Close the message bus database (test should not hang)
1075        db.close();
1076    }
1077
1078    #[rstest]
1079    #[tokio::test(flavor = "multi_thread")]
1080    async fn test_heartbeat_task() {
1081        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<BusMessage>();
1082        let signal = Arc::new(AtomicBool::new(false));
1083
1084        // Start the heartbeat task with a short interval
1085        let handle = tokio::spawn(run_heartbeat(1, signal.clone(), tx));
1086
1087        // Wait for a couple of heartbeats
1088        tokio::time::sleep(Duration::from_secs(2)).await;
1089
1090        // Stop the heartbeat task
1091        signal.store(true, Ordering::Relaxed);
1092        handle.await.unwrap();
1093
1094        // Ensure heartbeats were sent
1095        let mut heartbeats: Vec<BusMessage> = Vec::new();
1096        while let Ok(hb) = rx.try_recv() {
1097            heartbeats.push(hb);
1098        }
1099
1100        assert!(!heartbeats.is_empty());
1101
1102        for hb in heartbeats {
1103            assert_eq!(hb.topic, HEARTBEAT_TOPIC);
1104        }
1105    }
1106}