Skip to main content

nautilus_infrastructure/redis/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a Redis backed `CacheDatabase` and `MessageBusDatabase` implementation.
17
18pub mod cache;
19pub mod msgbus;
20pub mod queries;
21
22use std::time::Duration;
23
24use nautilus_common::{
25    logging::log_task_awaiting,
26    msgbus::database::{DatabaseConfig, MessageBusConfig},
27};
28use nautilus_core::{UUID4, string::semver::SemVer};
29use nautilus_model::identifiers::TraderId;
30use redis::RedisError;
31
32const REDIS_MIN_VERSION: &str = "6.2.0";
33const REDIS_DELIMITER: char = ':';
34const REDIS_INDEX_PATTERN: &str = ":index:";
35const REDIS_XTRIM: &str = "XTRIM";
36const REDIS_MINID: &str = "MINID";
37const REDIS_FLUSHDB: &str = "FLUSHDB";
38
39/// Extracts the index key from a full Redis key.
40///
41/// Handles keys with instance_id prefix by finding the `:index:` pattern.
42/// e.g., "trader-id:uuid:index:order_position" -> "index:order_position"
43pub(crate) fn get_index_key(key: &str) -> anyhow::Result<&str> {
44    if let Some(pos) = key.find(REDIS_INDEX_PATTERN) {
45        return Ok(&key[pos + 1..]);
46    }
47
48    if key.starts_with("index:") {
49        return Ok(key);
50    }
51
52    anyhow::bail!("Invalid index key format: {key}")
53}
54
55async fn await_handle(handle: Option<tokio::task::JoinHandle<()>>, task_name: &str) {
56    if let Some(handle) = handle {
57        log_task_awaiting(task_name);
58
59        let timeout = Duration::from_secs(2);
60        match tokio::time::timeout(timeout, handle).await {
61            Ok(result) => {
62                if let Err(e) = result {
63                    log::error!("Error awaiting task '{task_name}': {e:?}");
64                }
65            }
66            Err(_) => {
67                log::error!("Timeout {timeout:?} awaiting task '{task_name}'");
68            }
69        }
70    }
71}
72
73/// Parses a Redis connection URL from the given database config, returning the
74/// full URL and a redacted version with the password obfuscated.
75///
76/// Authentication matrix handled:
77/// ┌───────────┬───────────┬────────────────────────────┐
78/// │ Username  │ Password  │ Resulting user-info part   │
79/// ├───────────┼───────────┼────────────────────────────┤
80/// │ non-empty │ non-empty │ user:pass@                 │
81/// │ empty     │ non-empty │ :pass@                     │
82/// │ empty     │ empty     │ (omitted)                  │
83/// └───────────┴───────────┴────────────────────────────┘
84///
85/// # Panics
86///
87/// Panics if a username is provided without a corresponding password.
88#[must_use]
89pub fn get_redis_url(config: DatabaseConfig) -> (String, String) {
90    let host = config.host.unwrap_or("127.0.0.1".to_string());
91    let port = config.port.unwrap_or(6379);
92    let username = config.username.unwrap_or_default();
93    let password = config.password.unwrap_or_default();
94    let ssl = config.ssl;
95
96    // Redact the password for logging/metrics: keep the first & last two chars.
97    let redact_pw = |pw: &str| {
98        if pw.len() > 4 {
99            format!("{}...{}", &pw[..2], &pw[pw.len() - 2..])
100        } else {
101            pw.to_owned()
102        }
103    };
104
105    // Build the `userinfo@` portion for both the real and redacted URLs.
106    let (auth, auth_redacted) = match (username.is_empty(), password.is_empty()) {
107        // user:pass@
108        (false, false) => (
109            format!("{username}:{password}@"),
110            format!("{username}:{}@", redact_pw(&password)),
111        ),
112        // :pass@
113        (true, false) => (
114            format!(":{password}@"),
115            format!(":{}@", redact_pw(&password)),
116        ),
117        // username but no password ⇒  configuration error
118        (false, true) => panic!(
119            "Redis config error: username supplied without password. \
120            Either supply a password or omit the username."
121        ),
122        // no credentials
123        (true, true) => (String::new(), String::new()),
124    };
125
126    let scheme = if ssl { "rediss" } else { "redis" };
127
128    let url = format!("{scheme}://{auth}{host}:{port}");
129    let redacted_url = format!("{scheme}://{auth_redacted}{host}:{port}");
130
131    (url, redacted_url)
132}
133
134/// Creates a new Redis connection manager based on the provided database `config` and connection name.
135///
136/// # Errors
137///
138/// Returns an error if:
139/// - Constructing the Redis client fails.
140/// - Establishing or configuring the connection manager fails.
141///
142/// In case of reconnection issues, the connection will retry reconnection
143/// `number_of_retries` times, with an exponentially increasing delay, calculated as
144/// `factor * (exponent_base ^ current-try)`, bounded by `max_delay`.
145///
146/// The new connection will time out operations after `response_timeout` has passed.
147/// Each connection attempt to the server will time out after `connection_timeout`.
148pub async fn create_redis_connection(
149    con_name: &str,
150    config: DatabaseConfig,
151) -> anyhow::Result<redis::aio::ConnectionManager> {
152    log::debug!("Creating {con_name} redis connection");
153    let (redis_url, redacted_url) = get_redis_url(config.clone());
154    log::debug!("Connecting to {redacted_url}");
155
156    let connection_timeout = Duration::from_secs(u64::from(config.connection_timeout));
157    let response_timeout = Duration::from_secs(u64::from(config.response_timeout));
158    let number_of_retries = config.number_of_retries;
159    let exponent_base = config.exponent_base as f32;
160
161    // Use factor as min_delay base for backoff: factor * (exponent_base ^ tries)
162    let min_delay = Duration::from_millis(config.factor);
163    let max_delay = Duration::from_secs(config.max_delay);
164
165    let client = redis::Client::open(redis_url)?;
166
167    let connection_manager_config = redis::aio::ConnectionManagerConfig::new()
168        .set_exponent_base(exponent_base)
169        .set_number_of_retries(number_of_retries)
170        .set_response_timeout(Some(response_timeout))
171        .set_connection_timeout(Some(connection_timeout))
172        .set_min_delay(min_delay)
173        .set_max_delay(max_delay);
174
175    let mut con = client
176        .get_connection_manager_with_config(connection_manager_config)
177        .await?;
178
179    let version = get_redis_version(&mut con).await?;
180    let min_version = SemVer::parse(REDIS_MIN_VERSION)?;
181    let con_msg = format!("Connected to redis v{version}");
182
183    if version >= min_version {
184        log::info!("{con_msg}");
185    } else {
186        log::error!("{con_msg}, but minimum supported version is {REDIS_MIN_VERSION}");
187    }
188
189    Ok(con)
190}
191
192/// Flushes the entire Redis database for the specified connection.
193///
194/// # Errors
195///
196/// Returns an error if the FLUSHDB command fails.
197pub async fn flush_redis(
198    con: &mut redis::aio::ConnectionManager,
199) -> anyhow::Result<(), RedisError> {
200    redis::cmd(REDIS_FLUSHDB).exec_async(con).await
201}
202
203/// Parse the stream key from the given identifiers and config.
204#[must_use]
205pub fn get_stream_key(
206    trader_id: TraderId,
207    instance_id: UUID4,
208    config: &MessageBusConfig,
209) -> String {
210    let mut stream_key = String::new();
211
212    if config.use_trader_prefix {
213        stream_key.push_str("trader-");
214    }
215
216    if config.use_trader_id {
217        stream_key.push_str(trader_id.as_str());
218        stream_key.push(REDIS_DELIMITER);
219    }
220
221    if config.use_instance_id {
222        stream_key.push_str(&format!("{instance_id}"));
223        stream_key.push(REDIS_DELIMITER);
224    }
225
226    stream_key.push_str(&config.streams_prefix);
227    stream_key
228}
229
230async fn get_redis_version(conn: &mut redis::aio::ConnectionManager) -> anyhow::Result<SemVer> {
231    let info: String = redis::cmd("INFO").query_async(conn).await?;
232    let version_str = match info.lines().find_map(|line| {
233        if line.starts_with("redis_version:") {
234            line.split(':').nth(1).map(|s| s.trim().to_string())
235        } else {
236            None
237        }
238    }) {
239        Some(info) => info,
240        None => {
241            anyhow::bail!("Redis version not available");
242        }
243    };
244
245    SemVer::parse(&version_str)
246}
247
248#[cfg(test)]
249mod tests {
250    use rstest::rstest;
251    use serde_json::json;
252
253    use super::*;
254
255    #[rstest]
256    fn test_get_redis_url_default_values() {
257        let config: DatabaseConfig = serde_json::from_value(json!({})).unwrap();
258        let (url, redacted_url) = get_redis_url(config);
259        assert_eq!(url, "redis://127.0.0.1:6379");
260        assert_eq!(redacted_url, "redis://127.0.0.1:6379");
261    }
262
263    #[rstest]
264    fn test_get_redis_url_password_only() {
265        // Username omitted, but password present
266        let config_json = json!({
267            "host": "example.com",
268            "port": 6380,
269            "password": "secretpw",   // >4 chars ⇒ will be redacted
270        });
271        let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
272        let (url, redacted_url) = get_redis_url(config);
273        assert_eq!(url, "redis://:secretpw@example.com:6380");
274        assert_eq!(redacted_url, "redis://:se...pw@example.com:6380");
275    }
276
277    #[rstest]
278    fn test_get_redis_url_full_config_with_ssl() {
279        let config_json = json!({
280            "host": "example.com",
281            "port": 6380,
282            "username": "user",
283            "password": "pass",
284            "ssl": true,
285        });
286        let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
287        let (url, redacted_url) = get_redis_url(config);
288        assert_eq!(url, "rediss://user:pass@example.com:6380");
289        assert_eq!(redacted_url, "rediss://user:pass@example.com:6380");
290    }
291
292    #[rstest]
293    fn test_get_redis_url_full_config_without_ssl() {
294        let config_json = json!({
295            "host": "example.com",
296            "port": 6380,
297            "username": "username",
298            "password": "password",
299            "ssl": false,
300        });
301        let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
302        let (url, redacted_url) = get_redis_url(config);
303        assert_eq!(url, "redis://username:password@example.com:6380");
304        assert_eq!(redacted_url, "redis://username:pa...rd@example.com:6380");
305    }
306
307    #[rstest]
308    fn test_get_redis_url_missing_username_and_password() {
309        let config_json = json!({
310            "host": "example.com",
311            "port": 6380,
312            "ssl": false,
313        });
314        let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
315        let (url, redacted_url) = get_redis_url(config);
316        assert_eq!(url, "redis://example.com:6380");
317        assert_eq!(redacted_url, "redis://example.com:6380");
318    }
319
320    #[rstest]
321    fn test_get_redis_url_ssl_default_false() {
322        let config_json = json!({
323            "host": "example.com",
324            "port": 6380,
325            "username": "username",
326            "password": "password",
327            // "ssl" is intentionally omitted to test default behavior
328        });
329        let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
330        let (url, redacted_url) = get_redis_url(config);
331        assert_eq!(url, "redis://username:password@example.com:6380");
332        assert_eq!(redacted_url, "redis://username:pa...rd@example.com:6380");
333    }
334
335    #[rstest]
336    fn test_get_stream_key_with_trader_prefix_and_instance_id() {
337        let trader_id = TraderId::from("tester-123");
338        let instance_id = UUID4::new();
339        let config = MessageBusConfig {
340            use_instance_id: true,
341            ..Default::default()
342        };
343
344        let key = get_stream_key(trader_id, instance_id, &config);
345        assert_eq!(key, format!("trader-tester-123:{instance_id}:stream"));
346    }
347
348    #[rstest]
349    fn test_get_stream_key_without_trader_prefix_or_instance_id() {
350        let trader_id = TraderId::from("tester-123");
351        let instance_id = UUID4::new();
352        let config = MessageBusConfig {
353            use_trader_prefix: false,
354            use_trader_id: false,
355            ..Default::default()
356        };
357
358        let key = get_stream_key(trader_id, instance_id, &config);
359        assert_eq!(key, format!("stream"));
360    }
361
362    #[rstest]
363    fn test_get_index_key_without_prefix() {
364        let key = "index:order_position";
365        assert_eq!(get_index_key(key).unwrap(), "index:order_position");
366    }
367
368    #[rstest]
369    fn test_get_index_key_with_trader_prefix() {
370        let key = "trader-tester-123:index:order_position";
371        assert_eq!(get_index_key(key).unwrap(), "index:order_position");
372    }
373
374    #[rstest]
375    fn test_get_index_key_with_instance_id() {
376        let key = "trader-tester-123:abc-uuid-123:index:order_position";
377        assert_eq!(get_index_key(key).unwrap(), "index:order_position");
378    }
379
380    #[rstest]
381    fn test_get_index_key_invalid() {
382        let key = "no_index_pattern";
383        assert!(get_index_key(key).is_err());
384    }
385}