Skip to main content

nautilus_infrastructure/redis/
queries.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, str::FromStr};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use chrono::{DateTime, Utc};
21use futures::future::join_all;
22use nautilus_common::{cache::database::CacheMap, enums::SerializationEncoding};
23use nautilus_model::{
24    accounts::AccountAny,
25    data::{CustomData, DataType, HasTsInit},
26    identifiers::{AccountId, ClientOrderId, InstrumentId, PositionId},
27    instruments::{InstrumentAny, SyntheticInstrument},
28    orders::OrderAny,
29    position::Position,
30    types::Currency,
31};
32use redis::{AsyncCommands, aio::ConnectionManager};
33use serde::{Serialize, de::DeserializeOwned};
34use serde_json::Value;
35use ustr::Ustr;
36
37use super::get_index_key;
38
39// Collection keys
40const INDEX: &str = "index";
41const GENERAL: &str = "general";
42const CURRENCIES: &str = "currencies";
43const INSTRUMENTS: &str = "instruments";
44const SYNTHETICS: &str = "synthetics";
45const ACCOUNTS: &str = "accounts";
46const ORDERS: &str = "orders";
47const POSITIONS: &str = "positions";
48const ACTORS: &str = "actors";
49const STRATEGIES: &str = "strategies";
50const CUSTOM: &str = "custom";
51const REDIS_DELIMITER: char = ':';
52
53// Index keys
54const INDEX_ORDER_IDS: &str = "index:order_ids";
55const INDEX_ORDER_POSITION: &str = "index:order_position";
56const INDEX_ORDER_CLIENT: &str = "index:order_client";
57const INDEX_ORDERS: &str = "index:orders";
58const INDEX_ORDERS_OPEN: &str = "index:orders_open";
59const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
60const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
61const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
62const INDEX_POSITIONS: &str = "index:positions";
63const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
64const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
65
66#[derive(Debug)]
67pub struct DatabaseQueries;
68
69impl DatabaseQueries {
70    /// Serializes the given `payload` using the specified `encoding` to a byte vector.
71    ///
72    /// # Errors
73    ///
74    /// Returns an error if serialization to the chosen encoding fails.
75    pub fn serialize_payload<T: Serialize>(
76        encoding: SerializationEncoding,
77        payload: &T,
78    ) -> anyhow::Result<Vec<u8>> {
79        let mut value = serde_json::to_value(payload)?;
80        convert_timestamps(&mut value);
81        match encoding {
82            SerializationEncoding::MsgPack => rmp_serde::to_vec(&value)
83                .map_err(|e| anyhow::anyhow!("Failed to serialize msgpack `payload`: {e}")),
84            SerializationEncoding::Json => serde_json::to_vec(&value)
85                .map_err(|e| anyhow::anyhow!("Failed to serialize json `payload`: {e}")),
86        }
87    }
88
89    /// Deserializes the given byte slice `payload` into type `T` using the specified `encoding`.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if deserialization from the chosen encoding fails or converting to the target type fails.
94    pub fn deserialize_payload<T: DeserializeOwned>(
95        encoding: SerializationEncoding,
96        payload: &[u8],
97    ) -> anyhow::Result<T> {
98        let mut value = match encoding {
99            SerializationEncoding::MsgPack => rmp_serde::from_slice(payload)
100                .map_err(|e| anyhow::anyhow!("Failed to deserialize msgpack `payload`: {e}"))?,
101            SerializationEncoding::Json => serde_json::from_slice(payload)
102                .map_err(|e| anyhow::anyhow!("Failed to deserialize json `payload`: {e}"))?,
103        };
104
105        convert_timestamp_strings(&mut value);
106
107        serde_json::from_value(value)
108            .map_err(|e| anyhow::anyhow!("Failed to convert value to target type: {e}"))
109    }
110
111    /// Scans Redis for keys matching the given `pattern`.
112    ///
113    /// # Errors
114    ///
115    /// Returns an error if the Redis scan operation fails.
116    pub async fn scan_keys(
117        con: &mut ConnectionManager,
118        pattern: String,
119    ) -> anyhow::Result<Vec<String>> {
120        let mut result = Vec::new();
121        let mut cursor = 0u64;
122
123        loop {
124            let scan_result: (u64, Vec<String>) = redis::cmd("SCAN")
125                .arg(cursor)
126                .arg("MATCH")
127                .arg(&pattern)
128                .arg("COUNT")
129                .arg(5000)
130                .query_async(con)
131                .await?;
132
133            let (new_cursor, keys) = scan_result;
134            result.extend(keys);
135
136            // If cursor is 0, we've completed the full scan
137            if new_cursor == 0 {
138                break;
139            }
140
141            cursor = new_cursor;
142        }
143
144        Ok(result)
145    }
146
147    /// Bulk reads multiple keys from Redis using MGET for efficiency.
148    ///
149    /// # Errors
150    ///
151    /// Returns an error if the underlying Redis MGET operation fails.
152    pub async fn read_bulk(
153        con: &ConnectionManager,
154        keys: &[String],
155    ) -> anyhow::Result<Vec<Option<Bytes>>> {
156        if keys.is_empty() {
157            return Ok(vec![]);
158        }
159
160        let mut con = con.clone();
161
162        // Use MGET to fetch all keys in a single network operation
163        let results: Vec<Option<Vec<u8>>> =
164            redis::cmd("MGET").arg(keys).query_async(&mut con).await?;
165
166        // Convert Vec<u8> to Bytes
167        let bytes_results: Vec<Option<Bytes>> = results
168            .into_iter()
169            .map(|opt| opt.map(Bytes::from))
170            .collect();
171
172        Ok(bytes_results)
173    }
174
175    /// Bulk reads multiple keys from Redis using MGET, batched into chunks.
176    ///
177    /// Keys are batched into chunks of `batch_size` to avoid exceeding Redis
178    /// request size limits on some providers.
179    ///
180    /// # Errors
181    ///
182    /// Returns an error if `batch_size` is zero or if the underlying Redis MGET operation fails.
183    pub async fn read_bulk_batched(
184        con: &ConnectionManager,
185        keys: &[String],
186        batch_size: usize,
187    ) -> anyhow::Result<Vec<Option<Bytes>>> {
188        if batch_size == 0 {
189            anyhow::bail!("`batch_size` must be greater than zero");
190        }
191
192        if keys.is_empty() {
193            return Ok(vec![]);
194        }
195
196        let mut all_results: Vec<Option<Bytes>> = Vec::with_capacity(keys.len());
197
198        for chunk in keys.chunks(batch_size) {
199            let mut con = con.clone();
200
201            let results: Vec<Option<Vec<u8>>> =
202                redis::cmd("MGET").arg(chunk).query_async(&mut con).await?;
203
204            all_results.extend(results.into_iter().map(|opt| opt.map(Bytes::from)));
205        }
206
207        Ok(all_results)
208    }
209
210    /// Reads raw byte payloads for `key` under `trader_key` from Redis.
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if the underlying Redis read operation fails or if the collection is unsupported.
215    pub async fn read(
216        con: &ConnectionManager,
217        trader_key: &str,
218        key: &str,
219    ) -> anyhow::Result<Vec<Bytes>> {
220        let collection = Self::get_collection_key(key)?;
221        let full_key = format!("{trader_key}{REDIS_DELIMITER}{key}");
222
223        let mut con = con.clone();
224
225        match collection {
226            INDEX => Self::read_index(&mut con, &full_key).await,
227            GENERAL => Self::read_string(&mut con, &full_key).await,
228            CURRENCIES => Self::read_string(&mut con, &full_key).await,
229            INSTRUMENTS => Self::read_string(&mut con, &full_key).await,
230            SYNTHETICS => Self::read_string(&mut con, &full_key).await,
231            ACCOUNTS => Self::read_list(&mut con, &full_key).await,
232            ORDERS => Self::read_list(&mut con, &full_key).await,
233            POSITIONS => Self::read_list(&mut con, &full_key).await,
234            ACTORS => Self::read_string(&mut con, &full_key).await,
235            STRATEGIES => Self::read_string(&mut con, &full_key).await,
236            _ => anyhow::bail!("Unsupported operation: `read` for collection '{collection}'"),
237        }
238    }
239
240    /// Loads all cache data (currencies, instruments, synthetics, accounts, orders, positions) for `trader_key`.
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if loading any of the individual caches fails or combining data fails.
245    pub async fn load_all(
246        con: &ConnectionManager,
247        encoding: SerializationEncoding,
248        trader_key: &str,
249    ) -> anyhow::Result<CacheMap> {
250        let (currencies, instruments, synthetics, accounts, orders, positions) = tokio::try_join!(
251            Self::load_currencies(con, trader_key, encoding),
252            Self::load_instruments(con, trader_key, encoding),
253            Self::load_synthetics(con, trader_key, encoding),
254            Self::load_accounts(con, trader_key, encoding),
255            Self::load_orders(con, trader_key, encoding),
256            Self::load_positions(con, trader_key, encoding)
257        )
258        .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
259
260        // For now, we don't load greeks and yield curves from the database
261        // This will be implemented in the future
262        let greeks = AHashMap::new();
263        let yield_curves = AHashMap::new();
264
265        Ok(CacheMap {
266            currencies,
267            instruments,
268            synthetics,
269            accounts,
270            orders,
271            positions,
272            greeks,
273            yield_curves,
274        })
275    }
276
277    /// Loads all currencies for `trader_key` using the specified `encoding`.
278    ///
279    /// # Errors
280    ///
281    /// Returns an error if scanning keys or reading currency data fails.
282    pub async fn load_currencies(
283        con: &ConnectionManager,
284        trader_key: &str,
285        encoding: SerializationEncoding,
286    ) -> anyhow::Result<AHashMap<Ustr, Currency>> {
287        let mut currencies = AHashMap::new();
288        let pattern = format!("{trader_key}{REDIS_DELIMITER}{CURRENCIES}*");
289        log::debug!("Loading {pattern}");
290
291        let mut con = con.clone();
292        let keys = Self::scan_keys(&mut con, pattern).await?;
293
294        if keys.is_empty() {
295            return Ok(currencies);
296        }
297
298        // Use bulk loading with MGET for efficiency
299        let bulk_values = Self::read_bulk(&con, &keys).await?;
300
301        // Process the bulk results
302        for (key, value_opt) in keys.iter().zip(bulk_values.iter()) {
303            let currency_code = if let Some(code) = key.as_str().rsplit(':').next() {
304                Ustr::from(code)
305            } else {
306                log::error!("Invalid key format: {key}");
307                continue;
308            };
309
310            if let Some(value_bytes) = value_opt {
311                match Self::deserialize_payload(encoding, value_bytes) {
312                    Ok(currency) => {
313                        currencies.insert(currency_code, currency);
314                    }
315                    Err(e) => {
316                        log::error!("Failed to deserialize currency {currency_code}: {e}");
317                    }
318                }
319            } else {
320                log::error!("Currency not found in Redis: {currency_code}");
321            }
322        }
323
324        log::debug!("Loaded {} currencies(s)", currencies.len());
325
326        Ok(currencies)
327    }
328
329    /// Loads all instruments for `trader_key` using the specified `encoding`.
330    ///
331    /// # Errors
332    ///
333    /// Returns an error if scanning keys or reading instrument data fails.
334    /// Loads all instruments for `trader_key` using the specified `encoding`.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if scanning keys or reading instrument data fails.
339    pub async fn load_instruments(
340        con: &ConnectionManager,
341        trader_key: &str,
342        encoding: SerializationEncoding,
343    ) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
344        let mut instruments = AHashMap::new();
345        let pattern = format!("{trader_key}{REDIS_DELIMITER}{INSTRUMENTS}*");
346        log::debug!("Loading {pattern}");
347
348        let mut con = con.clone();
349        let keys = Self::scan_keys(&mut con, pattern).await?;
350
351        let futures: Vec<_> = keys
352            .iter()
353            .map(|key| {
354                let con = con.clone();
355                async move {
356                    let instrument_id = key
357                        .as_str()
358                        .rsplit(':')
359                        .next()
360                        .ok_or_else(|| {
361                            log::error!("Invalid key format: {key}");
362                            "Invalid key format"
363                        })
364                        .and_then(|code| {
365                            InstrumentId::from_str(code).map_err(|e| {
366                                log::error!("Failed to convert to InstrumentId for {key}: {e}");
367                                "Invalid instrument ID"
368                            })
369                        });
370
371                    let instrument_id = match instrument_id {
372                        Ok(id) => id,
373                        Err(_) => return None,
374                    };
375
376                    match Self::load_instrument(&con, trader_key, &instrument_id, encoding).await {
377                        Ok(Some(instrument)) => Some((instrument_id, instrument)),
378                        Ok(None) => {
379                            log::error!("Instrument not found: {instrument_id}");
380                            None
381                        }
382                        Err(e) => {
383                            log::error!("Failed to load instrument {instrument_id}: {e}");
384                            None
385                        }
386                    }
387                }
388            })
389            .collect();
390
391        // Insert all Instrument_id (key) and Instrument (value) into the HashMap, filtering out None values.
392        instruments.extend(join_all(futures).await.into_iter().flatten());
393        log::debug!("Loaded {} instruments(s)", instruments.len());
394
395        Ok(instruments)
396    }
397
398    /// Loads all synthetic instruments for `trader_key` using the specified `encoding`.
399    ///
400    /// # Errors
401    ///
402    /// Returns an error if scanning keys or reading synthetic instrument data fails.
403    /// Loads all synthetic instruments for `trader_key` using the specified `encoding`.
404    ///
405    /// # Errors
406    ///
407    /// Returns an error if scanning keys or reading synthetic instrument data fails.
408    pub async fn load_synthetics(
409        con: &ConnectionManager,
410        trader_key: &str,
411        encoding: SerializationEncoding,
412    ) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
413        let mut synthetics = AHashMap::new();
414        let pattern = format!("{trader_key}{REDIS_DELIMITER}{SYNTHETICS}*");
415        log::debug!("Loading {pattern}");
416
417        let mut con = con.clone();
418        let keys = Self::scan_keys(&mut con, pattern).await?;
419
420        let futures: Vec<_> = keys
421            .iter()
422            .map(|key| {
423                let con = con.clone();
424                async move {
425                    let instrument_id = key
426                        .as_str()
427                        .rsplit(':')
428                        .next()
429                        .ok_or_else(|| {
430                            log::error!("Invalid key format: {key}");
431                            "Invalid key format"
432                        })
433                        .and_then(|code| {
434                            InstrumentId::from_str(code).map_err(|e| {
435                                log::error!("Failed to parse InstrumentId for {key}: {e}");
436                                "Invalid instrument ID"
437                            })
438                        });
439
440                    let instrument_id = match instrument_id {
441                        Ok(id) => id,
442                        Err(_) => return None,
443                    };
444
445                    match Self::load_synthetic(&con, trader_key, &instrument_id, encoding).await {
446                        Ok(Some(synthetic)) => Some((instrument_id, synthetic)),
447                        Ok(None) => {
448                            log::error!("Synthetic not found: {instrument_id}");
449                            None
450                        }
451                        Err(e) => {
452                            log::error!("Failed to load synthetic {instrument_id}: {e}");
453                            None
454                        }
455                    }
456                }
457            })
458            .collect();
459
460        // Insert all Instrument_id (key) and Synthetic (value) into the HashMap, filtering out None values.
461        synthetics.extend(join_all(futures).await.into_iter().flatten());
462        log::debug!("Loaded {} synthetics(s)", synthetics.len());
463
464        Ok(synthetics)
465    }
466
467    /// Loads all accounts for `trader_key` using the specified `encoding`.
468    ///
469    /// # Errors
470    ///
471    /// Returns an error if scanning keys or reading account data fails.
472    /// Loads all accounts for `trader_key` using the specified `encoding`.
473    ///
474    /// # Errors
475    ///
476    /// Returns an error if scanning keys or reading account data fails.
477    pub async fn load_accounts(
478        con: &ConnectionManager,
479        trader_key: &str,
480        encoding: SerializationEncoding,
481    ) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
482        let mut accounts = AHashMap::new();
483        let pattern = format!("{trader_key}{REDIS_DELIMITER}{ACCOUNTS}*");
484        log::debug!("Loading {pattern}");
485
486        let mut con = con.clone();
487        let keys = Self::scan_keys(&mut con, pattern).await?;
488
489        let futures: Vec<_> = keys
490            .iter()
491            .map(|key| {
492                let con = con.clone();
493                async move {
494                    let account_id = if let Some(code) = key.as_str().rsplit(':').next() {
495                        AccountId::from(code)
496                    } else {
497                        log::error!("Invalid key format: {key}");
498                        return None;
499                    };
500
501                    match Self::load_account(&con, trader_key, &account_id, encoding).await {
502                        Ok(Some(account)) => Some((account_id, account)),
503                        Ok(None) => {
504                            log::error!("Account not found: {account_id}");
505                            None
506                        }
507                        Err(e) => {
508                            log::error!("Failed to load account {account_id}: {e}");
509                            None
510                        }
511                    }
512                }
513            })
514            .collect();
515
516        // Insert all Account_id (key) and Account (value) into the HashMap, filtering out None values.
517        accounts.extend(join_all(futures).await.into_iter().flatten());
518        log::debug!("Loaded {} accounts(s)", accounts.len());
519
520        Ok(accounts)
521    }
522
523    /// Loads all orders for `trader_key` using the specified `encoding`.
524    ///
525    /// # Errors
526    ///
527    /// Returns an error if scanning keys or reading order data fails.
528    /// Loads all orders for `trader_key` using the specified `encoding`.
529    ///
530    /// # Errors
531    ///
532    /// Returns an error if scanning keys or reading order data fails.
533    pub async fn load_orders(
534        con: &ConnectionManager,
535        trader_key: &str,
536        encoding: SerializationEncoding,
537    ) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
538        let mut orders = AHashMap::new();
539        let pattern = format!("{trader_key}{REDIS_DELIMITER}{ORDERS}*");
540        log::debug!("Loading {pattern}");
541
542        let mut con = con.clone();
543        let keys = Self::scan_keys(&mut con, pattern).await?;
544
545        let futures: Vec<_> = keys
546            .iter()
547            .map(|key| {
548                let con = con.clone();
549                async move {
550                    let client_order_id = if let Some(code) = key.as_str().rsplit(':').next() {
551                        ClientOrderId::from(code)
552                    } else {
553                        log::error!("Invalid key format: {key}");
554                        return None;
555                    };
556
557                    match Self::load_order(&con, trader_key, &client_order_id, encoding).await {
558                        Ok(Some(order)) => Some((client_order_id, order)),
559                        Ok(None) => {
560                            log::error!("Order not found: {client_order_id}");
561                            None
562                        }
563                        Err(e) => {
564                            log::error!("Failed to load order {client_order_id}: {e}");
565                            None
566                        }
567                    }
568                }
569            })
570            .collect();
571
572        // Insert all Client-Order-Id (key) and Order (value) into the HashMap, filtering out None values.
573        orders.extend(join_all(futures).await.into_iter().flatten());
574        log::debug!("Loaded {} order(s)", orders.len());
575
576        Ok(orders)
577    }
578
579    /// Loads all positions for `trader_key` using the specified `encoding`.
580    ///
581    /// # Errors
582    ///
583    /// Returns an error if scanning keys or reading position data fails.
584    /// Loads all positions for `trader_key` using the specified `encoding`.
585    ///
586    /// # Errors
587    ///
588    /// Returns an error if scanning keys or reading position data fails.
589    pub async fn load_positions(
590        con: &ConnectionManager,
591        trader_key: &str,
592        encoding: SerializationEncoding,
593    ) -> anyhow::Result<AHashMap<PositionId, Position>> {
594        let mut positions = AHashMap::new();
595        let pattern = format!("{trader_key}{REDIS_DELIMITER}{POSITIONS}*");
596        log::debug!("Loading {pattern}");
597
598        let mut con = con.clone();
599        let keys = Self::scan_keys(&mut con, pattern).await?;
600
601        let futures: Vec<_> = keys
602            .iter()
603            .map(|key| {
604                let con = con.clone();
605                async move {
606                    let position_id = if let Some(code) = key.as_str().rsplit(':').next() {
607                        PositionId::from(code)
608                    } else {
609                        log::error!("Invalid key format: {key}");
610                        return None;
611                    };
612
613                    match Self::load_position(&con, trader_key, &position_id, encoding).await {
614                        Ok(Some(position)) => Some((position_id, position)),
615                        Ok(None) => {
616                            log::error!("Position not found: {position_id}");
617                            None
618                        }
619                        Err(e) => {
620                            log::error!("Failed to load position {position_id}: {e}");
621                            None
622                        }
623                    }
624                }
625            })
626            .collect();
627
628        // Insert all Position_id (key) and Position (value) into the HashMap, filtering out None values.
629        positions.extend(join_all(futures).await.into_iter().flatten());
630        log::debug!("Loaded {} position(s)", positions.len());
631
632        Ok(positions)
633    }
634
635    /// Loads all custom data for `trader_key` matching the given `data_type`.
636    ///
637    /// Keys are stored as `custom:<ts_init_020>:<uuid>`; value is full CustomData JSON.
638    /// Scans all custom keys, deserializes, filters by type_name (full or short), metadata,
639    /// and identifier to match SQL semantics, then sorts by ts_init ascending.
640    ///
641    /// # Errors
642    ///
643    /// Returns an error if scanning, bulk read, or deserialization fails.
644    pub async fn load_custom_data(
645        con: &ConnectionManager,
646        trader_key: &str,
647        data_type: &DataType,
648    ) -> anyhow::Result<Vec<CustomData>> {
649        let pattern = format!("{trader_key}{REDIS_DELIMITER}{CUSTOM}*");
650        log::debug!("Loading custom data {pattern}");
651
652        let mut con = con.clone();
653        let keys = Self::scan_keys(&mut con, pattern).await?;
654
655        if keys.is_empty() {
656            return Ok(Vec::new());
657        }
658
659        let values = Self::read_bulk(&con, &keys).await?;
660        let request_type_name = data_type.type_name();
661        let request_short = request_type_name
662            .rsplit([':', '.'])
663            .next()
664            .unwrap_or(request_type_name);
665        let request_identifier = data_type.identifier().unwrap_or("");
666
667        let mut results = Vec::new();
668
669        for value_opt in values {
670            let Some(value_bytes) = value_opt else {
671                continue;
672            };
673            let custom = match CustomData::from_json_bytes(value_bytes.as_ref()) {
674                Ok(c) => c,
675                Err(e) => {
676                    log::warn!("Failed to deserialize custom data from Redis: {e}");
677                    continue;
678                }
679            };
680            let stored_type_name = custom.data_type.type_name();
681            let type_match =
682                stored_type_name == request_type_name || stored_type_name == request_short;
683            let identifier_match =
684                custom.data_type.identifier().unwrap_or("") == request_identifier;
685            let metadata_match = match (data_type.metadata(), custom.data_type.metadata()) {
686                (None, None) => true,
687                (Some(a), Some(b)) => serde_json::to_value(a).ok() == serde_json::to_value(b).ok(),
688                _ => false,
689            };
690
691            if type_match && identifier_match && metadata_match {
692                results.push(custom);
693            }
694        }
695
696        results.sort_by_key(|c| c.ts_init());
697        log::debug!("Loaded {} custom data item(s)", results.len());
698        Ok(results)
699    }
700
701    /// Loads a single currency for `trader_key` and `code` using the specified `encoding`.
702    ///
703    /// # Errors
704    ///
705    /// Returns an error if the underlying read or deserialization fails.
706    pub async fn load_currency(
707        con: &ConnectionManager,
708        trader_key: &str,
709        code: &Ustr,
710        encoding: SerializationEncoding,
711    ) -> anyhow::Result<Option<Currency>> {
712        let key = format!("{CURRENCIES}{REDIS_DELIMITER}{code}");
713        let result = Self::read(con, trader_key, &key).await?;
714
715        if result.is_empty() {
716            return Ok(None);
717        }
718
719        let currency = Self::deserialize_payload(encoding, &result[0])?;
720        Ok(currency)
721    }
722
723    /// Loads a single instrument for `trader_key` and `instrument_id` using the specified `encoding`.
724    ///
725    /// # Errors
726    ///
727    /// Returns an error if the underlying read or deserialization fails.
728    pub async fn load_instrument(
729        con: &ConnectionManager,
730        trader_key: &str,
731        instrument_id: &InstrumentId,
732        encoding: SerializationEncoding,
733    ) -> anyhow::Result<Option<InstrumentAny>> {
734        let key = format!("{INSTRUMENTS}{REDIS_DELIMITER}{instrument_id}");
735        let result = Self::read(con, trader_key, &key).await?;
736        if result.is_empty() {
737            return Ok(None);
738        }
739
740        let instrument: InstrumentAny = Self::deserialize_payload(encoding, &result[0])?;
741        Ok(Some(instrument))
742    }
743
744    /// Loads a single synthetic instrument for `trader_key` and `instrument_id` using the specified `encoding`.
745    ///
746    /// # Errors
747    ///
748    /// Returns an error if the underlying read or deserialization fails.
749    pub async fn load_synthetic(
750        con: &ConnectionManager,
751        trader_key: &str,
752        instrument_id: &InstrumentId,
753        encoding: SerializationEncoding,
754    ) -> anyhow::Result<Option<SyntheticInstrument>> {
755        let key = format!("{SYNTHETICS}{REDIS_DELIMITER}{instrument_id}");
756        let result = Self::read(con, trader_key, &key).await?;
757        if result.is_empty() {
758            return Ok(None);
759        }
760
761        let synthetic: SyntheticInstrument = Self::deserialize_payload(encoding, &result[0])?;
762        Ok(Some(synthetic))
763    }
764
765    /// Loads a single account for `trader_key` and `account_id` using the specified `encoding`.
766    ///
767    /// # Errors
768    ///
769    /// Returns an error if the underlying read or deserialization fails.
770    pub async fn load_account(
771        con: &ConnectionManager,
772        trader_key: &str,
773        account_id: &AccountId,
774        encoding: SerializationEncoding,
775    ) -> anyhow::Result<Option<AccountAny>> {
776        let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
777        let result = Self::read(con, trader_key, &key).await?;
778        if result.is_empty() {
779            return Ok(None);
780        }
781
782        let account: AccountAny = Self::deserialize_payload(encoding, &result[0])?;
783        Ok(Some(account))
784    }
785
786    /// Loads a single order for `trader_key` and `client_order_id` using the specified `encoding`.
787    ///
788    /// # Errors
789    ///
790    /// Returns an error if the underlying read or deserialization fails.
791    pub async fn load_order(
792        con: &ConnectionManager,
793        trader_key: &str,
794        client_order_id: &ClientOrderId,
795        encoding: SerializationEncoding,
796    ) -> anyhow::Result<Option<OrderAny>> {
797        let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
798        let result = Self::read(con, trader_key, &key).await?;
799        if result.is_empty() {
800            return Ok(None);
801        }
802
803        let order: OrderAny = Self::deserialize_payload(encoding, &result[0])?;
804        Ok(Some(order))
805    }
806
807    /// Loads a single position for `trader_key` and `position_id` using the specified `encoding`.
808    ///
809    /// # Errors
810    ///
811    /// Returns an error if the underlying read or deserialization fails.
812    pub async fn load_position(
813        con: &ConnectionManager,
814        trader_key: &str,
815        position_id: &PositionId,
816        encoding: SerializationEncoding,
817    ) -> anyhow::Result<Option<Position>> {
818        let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
819        let result = Self::read(con, trader_key, &key).await?;
820        if result.is_empty() {
821            return Ok(None);
822        }
823
824        let position: Position = Self::deserialize_payload(encoding, &result[0])?;
825        Ok(Some(position))
826    }
827
828    fn get_collection_key(key: &str) -> anyhow::Result<&str> {
829        key.split_once(REDIS_DELIMITER)
830            .map(|(collection, _)| collection)
831            .ok_or_else(|| {
832                anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
833            })
834    }
835
836    async fn read_index(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
837        let index_key = get_index_key(key)?;
838        match index_key {
839            INDEX_ORDER_IDS => Self::read_set(conn, key).await,
840            INDEX_ORDER_POSITION => Self::read_hset(conn, key).await,
841            INDEX_ORDER_CLIENT => Self::read_hset(conn, key).await,
842            INDEX_ORDERS => Self::read_set(conn, key).await,
843            INDEX_ORDERS_OPEN => Self::read_set(conn, key).await,
844            INDEX_ORDERS_CLOSED => Self::read_set(conn, key).await,
845            INDEX_ORDERS_EMULATED => Self::read_set(conn, key).await,
846            INDEX_ORDERS_INFLIGHT => Self::read_set(conn, key).await,
847            INDEX_POSITIONS => Self::read_set(conn, key).await,
848            INDEX_POSITIONS_OPEN => Self::read_set(conn, key).await,
849            INDEX_POSITIONS_CLOSED => Self::read_set(conn, key).await,
850            _ => anyhow::bail!("Index unknown '{index_key}' on read"),
851        }
852    }
853
854    async fn read_string(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
855        let result: Vec<u8> = conn.get(key).await?;
856
857        if result.is_empty() {
858            Ok(vec![])
859        } else {
860            Ok(vec![Bytes::from(result)])
861        }
862    }
863
864    async fn read_set(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
865        let result: Vec<Bytes> = conn.smembers(key).await?;
866        Ok(result)
867    }
868
869    async fn read_hset(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
870        let result: HashMap<String, String> = conn.hgetall(key).await?;
871        let json = serde_json::to_string(&result)?;
872        Ok(vec![Bytes::from(json.into_bytes())])
873    }
874
875    async fn read_list(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
876        let result: Vec<Bytes> = conn.lrange(key, 0, -1).await?;
877        Ok(result)
878    }
879}
880
881fn is_timestamp_field(key: &str) -> bool {
882    let expire_match = key == "expire_time_ns";
883    let ts_match = key.starts_with("ts_");
884    expire_match || ts_match
885}
886
887fn convert_timestamps(value: &mut Value) {
888    match value {
889        Value::Object(map) => {
890            for (key, v) in map {
891                if is_timestamp_field(key)
892                    && let Value::Number(n) = v
893                    && let Some(n) = n.as_u64()
894                {
895                    let dt = DateTime::<Utc>::from_timestamp_nanos(n as i64);
896                    *v = Value::String(dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true));
897                }
898                convert_timestamps(v);
899            }
900        }
901        Value::Array(arr) => {
902            for item in arr {
903                convert_timestamps(item);
904            }
905        }
906        _ => {}
907    }
908}
909
910fn convert_timestamp_strings(value: &mut Value) {
911    match value {
912        Value::Object(map) => {
913            for (key, v) in map {
914                if is_timestamp_field(key)
915                    && let Value::String(s) = v
916                    && let Ok(dt) = DateTime::parse_from_rfc3339(s)
917                {
918                    *v = Value::Number(
919                        (dt.with_timezone(&Utc)
920                            .timestamp_nanos_opt()
921                            .expect("Invalid DateTime") as u64)
922                            .into(),
923                    );
924                }
925                convert_timestamp_strings(v);
926            }
927        }
928        Value::Array(arr) => {
929            for item in arr {
930                convert_timestamp_strings(item);
931            }
932        }
933        _ => {}
934    }
935}