nautilus_infrastructure/redis/
mod.rs1pub mod cache;
19pub mod msgbus;
20pub mod queries;
21
22use std::time::Duration;
23
24use nautilus_common::{
25 logging::log_task_awaiting,
26 msgbus::database::{DatabaseConfig, MessageBusConfig},
27};
28use nautilus_core::{UUID4, string::semver::SemVer};
29use nautilus_model::identifiers::TraderId;
30use redis::RedisError;
31
32const REDIS_MIN_VERSION: &str = "6.2.0";
33const REDIS_DELIMITER: char = ':';
34const REDIS_INDEX_PATTERN: &str = ":index:";
35const REDIS_XTRIM: &str = "XTRIM";
36const REDIS_MINID: &str = "MINID";
37const REDIS_FLUSHDB: &str = "FLUSHDB";
38
39pub(crate) fn get_index_key(key: &str) -> anyhow::Result<&str> {
44 if let Some(pos) = key.find(REDIS_INDEX_PATTERN) {
45 return Ok(&key[pos + 1..]);
46 }
47
48 if key.starts_with("index:") {
49 return Ok(key);
50 }
51
52 anyhow::bail!("Invalid index key format: {key}")
53}
54
55async fn await_handle(handle: Option<tokio::task::JoinHandle<()>>, task_name: &str) {
56 if let Some(handle) = handle {
57 log_task_awaiting(task_name);
58
59 let timeout = Duration::from_secs(2);
60 match tokio::time::timeout(timeout, handle).await {
61 Ok(result) => {
62 if let Err(e) = result {
63 log::error!("Error awaiting task '{task_name}': {e:?}");
64 }
65 }
66 Err(_) => {
67 log::error!("Timeout {timeout:?} awaiting task '{task_name}'");
68 }
69 }
70 }
71}
72
73#[must_use]
89pub fn get_redis_url(config: DatabaseConfig) -> (String, String) {
90 let host = config.host.unwrap_or("127.0.0.1".to_string());
91 let port = config.port.unwrap_or(6379);
92 let username = config.username.unwrap_or_default();
93 let password = config.password.unwrap_or_default();
94 let ssl = config.ssl;
95
96 let redact_pw = |pw: &str| {
98 if pw.len() > 4 {
99 format!("{}...{}", &pw[..2], &pw[pw.len() - 2..])
100 } else {
101 pw.to_owned()
102 }
103 };
104
105 let (auth, auth_redacted) = match (username.is_empty(), password.is_empty()) {
107 (false, false) => (
109 format!("{username}:{password}@"),
110 format!("{username}:{}@", redact_pw(&password)),
111 ),
112 (true, false) => (
114 format!(":{password}@"),
115 format!(":{}@", redact_pw(&password)),
116 ),
117 (false, true) => panic!(
119 "Redis config error: username supplied without password. \
120 Either supply a password or omit the username."
121 ),
122 (true, true) => (String::new(), String::new()),
124 };
125
126 let scheme = if ssl { "rediss" } else { "redis" };
127
128 let url = format!("{scheme}://{auth}{host}:{port}");
129 let redacted_url = format!("{scheme}://{auth_redacted}{host}:{port}");
130
131 (url, redacted_url)
132}
133
134pub async fn create_redis_connection(
149 con_name: &str,
150 config: DatabaseConfig,
151) -> anyhow::Result<redis::aio::ConnectionManager> {
152 log::debug!("Creating {con_name} redis connection");
153 let (redis_url, redacted_url) = get_redis_url(config.clone());
154 log::debug!("Connecting to {redacted_url}");
155
156 let connection_timeout = Duration::from_secs(u64::from(config.connection_timeout));
157 let response_timeout = Duration::from_secs(u64::from(config.response_timeout));
158 let number_of_retries = config.number_of_retries;
159 let exponent_base = config.exponent_base as f32;
160
161 let min_delay = Duration::from_millis(config.factor);
163 let max_delay = Duration::from_secs(config.max_delay);
164
165 let client = redis::Client::open(redis_url)?;
166
167 let connection_manager_config = redis::aio::ConnectionManagerConfig::new()
168 .set_exponent_base(exponent_base)
169 .set_number_of_retries(number_of_retries)
170 .set_response_timeout(Some(response_timeout))
171 .set_connection_timeout(Some(connection_timeout))
172 .set_min_delay(min_delay)
173 .set_max_delay(max_delay);
174
175 let mut con = client
176 .get_connection_manager_with_config(connection_manager_config)
177 .await?;
178
179 let version = get_redis_version(&mut con).await?;
180 let min_version = SemVer::parse(REDIS_MIN_VERSION)?;
181 let con_msg = format!("Connected to redis v{version}");
182
183 if version >= min_version {
184 log::info!("{con_msg}");
185 } else {
186 log::error!("{con_msg}, but minimum supported version is {REDIS_MIN_VERSION}");
187 }
188
189 Ok(con)
190}
191
192pub async fn flush_redis(
198 con: &mut redis::aio::ConnectionManager,
199) -> anyhow::Result<(), RedisError> {
200 redis::cmd(REDIS_FLUSHDB).exec_async(con).await
201}
202
203#[must_use]
205pub fn get_stream_key(
206 trader_id: TraderId,
207 instance_id: UUID4,
208 config: &MessageBusConfig,
209) -> String {
210 let mut stream_key = String::new();
211
212 if config.use_trader_prefix {
213 stream_key.push_str("trader-");
214 }
215
216 if config.use_trader_id {
217 stream_key.push_str(trader_id.as_str());
218 stream_key.push(REDIS_DELIMITER);
219 }
220
221 if config.use_instance_id {
222 stream_key.push_str(&format!("{instance_id}"));
223 stream_key.push(REDIS_DELIMITER);
224 }
225
226 stream_key.push_str(&config.streams_prefix);
227 stream_key
228}
229
230async fn get_redis_version(conn: &mut redis::aio::ConnectionManager) -> anyhow::Result<SemVer> {
231 let info: String = redis::cmd("INFO").query_async(conn).await?;
232 let version_str = match info.lines().find_map(|line| {
233 if line.starts_with("redis_version:") {
234 line.split(':').nth(1).map(|s| s.trim().to_string())
235 } else {
236 None
237 }
238 }) {
239 Some(info) => info,
240 None => {
241 anyhow::bail!("Redis version not available");
242 }
243 };
244
245 SemVer::parse(&version_str)
246}
247
248#[cfg(test)]
249mod tests {
250 use rstest::rstest;
251 use serde_json::json;
252
253 use super::*;
254
255 #[rstest]
256 fn test_get_redis_url_default_values() {
257 let config: DatabaseConfig = serde_json::from_value(json!({})).unwrap();
258 let (url, redacted_url) = get_redis_url(config);
259 assert_eq!(url, "redis://127.0.0.1:6379");
260 assert_eq!(redacted_url, "redis://127.0.0.1:6379");
261 }
262
263 #[rstest]
264 fn test_get_redis_url_password_only() {
265 let config_json = json!({
267 "host": "example.com",
268 "port": 6380,
269 "password": "secretpw", });
271 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
272 let (url, redacted_url) = get_redis_url(config);
273 assert_eq!(url, "redis://:secretpw@example.com:6380");
274 assert_eq!(redacted_url, "redis://:se...pw@example.com:6380");
275 }
276
277 #[rstest]
278 fn test_get_redis_url_full_config_with_ssl() {
279 let config_json = json!({
280 "host": "example.com",
281 "port": 6380,
282 "username": "user",
283 "password": "pass",
284 "ssl": true,
285 });
286 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
287 let (url, redacted_url) = get_redis_url(config);
288 assert_eq!(url, "rediss://user:pass@example.com:6380");
289 assert_eq!(redacted_url, "rediss://user:pass@example.com:6380");
290 }
291
292 #[rstest]
293 fn test_get_redis_url_full_config_without_ssl() {
294 let config_json = json!({
295 "host": "example.com",
296 "port": 6380,
297 "username": "username",
298 "password": "password",
299 "ssl": false,
300 });
301 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
302 let (url, redacted_url) = get_redis_url(config);
303 assert_eq!(url, "redis://username:password@example.com:6380");
304 assert_eq!(redacted_url, "redis://username:pa...rd@example.com:6380");
305 }
306
307 #[rstest]
308 fn test_get_redis_url_missing_username_and_password() {
309 let config_json = json!({
310 "host": "example.com",
311 "port": 6380,
312 "ssl": false,
313 });
314 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
315 let (url, redacted_url) = get_redis_url(config);
316 assert_eq!(url, "redis://example.com:6380");
317 assert_eq!(redacted_url, "redis://example.com:6380");
318 }
319
320 #[rstest]
321 fn test_get_redis_url_ssl_default_false() {
322 let config_json = json!({
323 "host": "example.com",
324 "port": 6380,
325 "username": "username",
326 "password": "password",
327 });
329 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
330 let (url, redacted_url) = get_redis_url(config);
331 assert_eq!(url, "redis://username:password@example.com:6380");
332 assert_eq!(redacted_url, "redis://username:pa...rd@example.com:6380");
333 }
334
335 #[rstest]
336 fn test_get_stream_key_with_trader_prefix_and_instance_id() {
337 let trader_id = TraderId::from("tester-123");
338 let instance_id = UUID4::new();
339 let config = MessageBusConfig {
340 use_instance_id: true,
341 ..Default::default()
342 };
343
344 let key = get_stream_key(trader_id, instance_id, &config);
345 assert_eq!(key, format!("trader-tester-123:{instance_id}:stream"));
346 }
347
348 #[rstest]
349 fn test_get_stream_key_without_trader_prefix_or_instance_id() {
350 let trader_id = TraderId::from("tester-123");
351 let instance_id = UUID4::new();
352 let config = MessageBusConfig {
353 use_trader_prefix: false,
354 use_trader_id: false,
355 ..Default::default()
356 };
357
358 let key = get_stream_key(trader_id, instance_id, &config);
359 assert_eq!(key, format!("stream"));
360 }
361
362 #[rstest]
363 fn test_get_index_key_without_prefix() {
364 let key = "index:order_position";
365 assert_eq!(get_index_key(key).unwrap(), "index:order_position");
366 }
367
368 #[rstest]
369 fn test_get_index_key_with_trader_prefix() {
370 let key = "trader-tester-123:index:order_position";
371 assert_eq!(get_index_key(key).unwrap(), "index:order_position");
372 }
373
374 #[rstest]
375 fn test_get_index_key_with_instance_id() {
376 let key = "trader-tester-123:abc-uuid-123:index:order_position";
377 assert_eq!(get_index_key(key).unwrap(), "index:order_position");
378 }
379
380 #[rstest]
381 fn test_get_index_key_invalid() {
382 let key = "no_index_pattern";
383 assert!(get_index_key(key).is_err());
384 }
385}