1use std::{collections::HashMap, str::FromStr};
17
18use ahash::AHashMap;
19use bytes::Bytes;
20use chrono::{DateTime, Utc};
21use futures::future::join_all;
22use nautilus_common::{cache::database::CacheMap, enums::SerializationEncoding};
23use nautilus_model::{
24 accounts::AccountAny,
25 data::{CustomData, DataType, HasTsInit},
26 identifiers::{AccountId, ClientOrderId, InstrumentId, PositionId},
27 instruments::{InstrumentAny, SyntheticInstrument},
28 orders::OrderAny,
29 position::Position,
30 types::Currency,
31};
32use redis::{AsyncCommands, aio::ConnectionManager};
33use serde::{Serialize, de::DeserializeOwned};
34use serde_json::Value;
35use ustr::Ustr;
36
37use super::get_index_key;
38
39const INDEX: &str = "index";
41const GENERAL: &str = "general";
42const CURRENCIES: &str = "currencies";
43const INSTRUMENTS: &str = "instruments";
44const SYNTHETICS: &str = "synthetics";
45const ACCOUNTS: &str = "accounts";
46const ORDERS: &str = "orders";
47const POSITIONS: &str = "positions";
48const ACTORS: &str = "actors";
49const STRATEGIES: &str = "strategies";
50const CUSTOM: &str = "custom";
51const REDIS_DELIMITER: char = ':';
52
53const INDEX_ORDER_IDS: &str = "index:order_ids";
55const INDEX_ORDER_POSITION: &str = "index:order_position";
56const INDEX_ORDER_CLIENT: &str = "index:order_client";
57const INDEX_ORDERS: &str = "index:orders";
58const INDEX_ORDERS_OPEN: &str = "index:orders_open";
59const INDEX_ORDERS_CLOSED: &str = "index:orders_closed";
60const INDEX_ORDERS_EMULATED: &str = "index:orders_emulated";
61const INDEX_ORDERS_INFLIGHT: &str = "index:orders_inflight";
62const INDEX_POSITIONS: &str = "index:positions";
63const INDEX_POSITIONS_OPEN: &str = "index:positions_open";
64const INDEX_POSITIONS_CLOSED: &str = "index:positions_closed";
65
66#[derive(Debug)]
67pub struct DatabaseQueries;
68
69impl DatabaseQueries {
70 pub fn serialize_payload<T: Serialize>(
76 encoding: SerializationEncoding,
77 payload: &T,
78 ) -> anyhow::Result<Vec<u8>> {
79 let mut value = serde_json::to_value(payload)?;
80 convert_timestamps(&mut value);
81 match encoding {
82 SerializationEncoding::MsgPack => rmp_serde::to_vec(&value)
83 .map_err(|e| anyhow::anyhow!("Failed to serialize msgpack `payload`: {e}")),
84 SerializationEncoding::Json => serde_json::to_vec(&value)
85 .map_err(|e| anyhow::anyhow!("Failed to serialize json `payload`: {e}")),
86 }
87 }
88
89 pub fn deserialize_payload<T: DeserializeOwned>(
95 encoding: SerializationEncoding,
96 payload: &[u8],
97 ) -> anyhow::Result<T> {
98 let mut value = match encoding {
99 SerializationEncoding::MsgPack => rmp_serde::from_slice(payload)
100 .map_err(|e| anyhow::anyhow!("Failed to deserialize msgpack `payload`: {e}"))?,
101 SerializationEncoding::Json => serde_json::from_slice(payload)
102 .map_err(|e| anyhow::anyhow!("Failed to deserialize json `payload`: {e}"))?,
103 };
104
105 convert_timestamp_strings(&mut value);
106
107 serde_json::from_value(value)
108 .map_err(|e| anyhow::anyhow!("Failed to convert value to target type: {e}"))
109 }
110
111 pub async fn scan_keys(
117 con: &mut ConnectionManager,
118 pattern: String,
119 ) -> anyhow::Result<Vec<String>> {
120 let mut result = Vec::new();
121 let mut cursor = 0u64;
122
123 loop {
124 let scan_result: (u64, Vec<String>) = redis::cmd("SCAN")
125 .arg(cursor)
126 .arg("MATCH")
127 .arg(&pattern)
128 .arg("COUNT")
129 .arg(5000)
130 .query_async(con)
131 .await?;
132
133 let (new_cursor, keys) = scan_result;
134 result.extend(keys);
135
136 if new_cursor == 0 {
138 break;
139 }
140
141 cursor = new_cursor;
142 }
143
144 Ok(result)
145 }
146
147 pub async fn read_bulk(
153 con: &ConnectionManager,
154 keys: &[String],
155 ) -> anyhow::Result<Vec<Option<Bytes>>> {
156 if keys.is_empty() {
157 return Ok(vec![]);
158 }
159
160 let mut con = con.clone();
161
162 let results: Vec<Option<Vec<u8>>> =
164 redis::cmd("MGET").arg(keys).query_async(&mut con).await?;
165
166 let bytes_results: Vec<Option<Bytes>> = results
168 .into_iter()
169 .map(|opt| opt.map(Bytes::from))
170 .collect();
171
172 Ok(bytes_results)
173 }
174
175 pub async fn read_bulk_batched(
184 con: &ConnectionManager,
185 keys: &[String],
186 batch_size: usize,
187 ) -> anyhow::Result<Vec<Option<Bytes>>> {
188 if batch_size == 0 {
189 anyhow::bail!("`batch_size` must be greater than zero");
190 }
191
192 if keys.is_empty() {
193 return Ok(vec![]);
194 }
195
196 let mut all_results: Vec<Option<Bytes>> = Vec::with_capacity(keys.len());
197
198 for chunk in keys.chunks(batch_size) {
199 let mut con = con.clone();
200
201 let results: Vec<Option<Vec<u8>>> =
202 redis::cmd("MGET").arg(chunk).query_async(&mut con).await?;
203
204 all_results.extend(results.into_iter().map(|opt| opt.map(Bytes::from)));
205 }
206
207 Ok(all_results)
208 }
209
210 pub async fn read(
216 con: &ConnectionManager,
217 trader_key: &str,
218 key: &str,
219 ) -> anyhow::Result<Vec<Bytes>> {
220 let collection = Self::get_collection_key(key)?;
221 let full_key = format!("{trader_key}{REDIS_DELIMITER}{key}");
222
223 let mut con = con.clone();
224
225 match collection {
226 INDEX => Self::read_index(&mut con, &full_key).await,
227 GENERAL => Self::read_string(&mut con, &full_key).await,
228 CURRENCIES => Self::read_string(&mut con, &full_key).await,
229 INSTRUMENTS => Self::read_string(&mut con, &full_key).await,
230 SYNTHETICS => Self::read_string(&mut con, &full_key).await,
231 ACCOUNTS => Self::read_list(&mut con, &full_key).await,
232 ORDERS => Self::read_list(&mut con, &full_key).await,
233 POSITIONS => Self::read_list(&mut con, &full_key).await,
234 ACTORS => Self::read_string(&mut con, &full_key).await,
235 STRATEGIES => Self::read_string(&mut con, &full_key).await,
236 _ => anyhow::bail!("Unsupported operation: `read` for collection '{collection}'"),
237 }
238 }
239
240 pub async fn load_all(
246 con: &ConnectionManager,
247 encoding: SerializationEncoding,
248 trader_key: &str,
249 ) -> anyhow::Result<CacheMap> {
250 let (currencies, instruments, synthetics, accounts, orders, positions) = tokio::try_join!(
251 Self::load_currencies(con, trader_key, encoding),
252 Self::load_instruments(con, trader_key, encoding),
253 Self::load_synthetics(con, trader_key, encoding),
254 Self::load_accounts(con, trader_key, encoding),
255 Self::load_orders(con, trader_key, encoding),
256 Self::load_positions(con, trader_key, encoding)
257 )
258 .map_err(|e| anyhow::anyhow!("Error loading cache data: {e}"))?;
259
260 let greeks = AHashMap::new();
263 let yield_curves = AHashMap::new();
264
265 Ok(CacheMap {
266 currencies,
267 instruments,
268 synthetics,
269 accounts,
270 orders,
271 positions,
272 greeks,
273 yield_curves,
274 })
275 }
276
277 pub async fn load_currencies(
283 con: &ConnectionManager,
284 trader_key: &str,
285 encoding: SerializationEncoding,
286 ) -> anyhow::Result<AHashMap<Ustr, Currency>> {
287 let mut currencies = AHashMap::new();
288 let pattern = format!("{trader_key}{REDIS_DELIMITER}{CURRENCIES}*");
289 log::debug!("Loading {pattern}");
290
291 let mut con = con.clone();
292 let keys = Self::scan_keys(&mut con, pattern).await?;
293
294 if keys.is_empty() {
295 return Ok(currencies);
296 }
297
298 let bulk_values = Self::read_bulk(&con, &keys).await?;
300
301 for (key, value_opt) in keys.iter().zip(bulk_values.iter()) {
303 let currency_code = if let Some(code) = key.as_str().rsplit(':').next() {
304 Ustr::from(code)
305 } else {
306 log::error!("Invalid key format: {key}");
307 continue;
308 };
309
310 if let Some(value_bytes) = value_opt {
311 match Self::deserialize_payload(encoding, value_bytes) {
312 Ok(currency) => {
313 currencies.insert(currency_code, currency);
314 }
315 Err(e) => {
316 log::error!("Failed to deserialize currency {currency_code}: {e}");
317 }
318 }
319 } else {
320 log::error!("Currency not found in Redis: {currency_code}");
321 }
322 }
323
324 log::debug!("Loaded {} currencies(s)", currencies.len());
325
326 Ok(currencies)
327 }
328
329 pub async fn load_instruments(
340 con: &ConnectionManager,
341 trader_key: &str,
342 encoding: SerializationEncoding,
343 ) -> anyhow::Result<AHashMap<InstrumentId, InstrumentAny>> {
344 let mut instruments = AHashMap::new();
345 let pattern = format!("{trader_key}{REDIS_DELIMITER}{INSTRUMENTS}*");
346 log::debug!("Loading {pattern}");
347
348 let mut con = con.clone();
349 let keys = Self::scan_keys(&mut con, pattern).await?;
350
351 let futures: Vec<_> = keys
352 .iter()
353 .map(|key| {
354 let con = con.clone();
355 async move {
356 let instrument_id = key
357 .as_str()
358 .rsplit(':')
359 .next()
360 .ok_or_else(|| {
361 log::error!("Invalid key format: {key}");
362 "Invalid key format"
363 })
364 .and_then(|code| {
365 InstrumentId::from_str(code).map_err(|e| {
366 log::error!("Failed to convert to InstrumentId for {key}: {e}");
367 "Invalid instrument ID"
368 })
369 });
370
371 let instrument_id = match instrument_id {
372 Ok(id) => id,
373 Err(_) => return None,
374 };
375
376 match Self::load_instrument(&con, trader_key, &instrument_id, encoding).await {
377 Ok(Some(instrument)) => Some((instrument_id, instrument)),
378 Ok(None) => {
379 log::error!("Instrument not found: {instrument_id}");
380 None
381 }
382 Err(e) => {
383 log::error!("Failed to load instrument {instrument_id}: {e}");
384 None
385 }
386 }
387 }
388 })
389 .collect();
390
391 instruments.extend(join_all(futures).await.into_iter().flatten());
393 log::debug!("Loaded {} instruments(s)", instruments.len());
394
395 Ok(instruments)
396 }
397
398 pub async fn load_synthetics(
409 con: &ConnectionManager,
410 trader_key: &str,
411 encoding: SerializationEncoding,
412 ) -> anyhow::Result<AHashMap<InstrumentId, SyntheticInstrument>> {
413 let mut synthetics = AHashMap::new();
414 let pattern = format!("{trader_key}{REDIS_DELIMITER}{SYNTHETICS}*");
415 log::debug!("Loading {pattern}");
416
417 let mut con = con.clone();
418 let keys = Self::scan_keys(&mut con, pattern).await?;
419
420 let futures: Vec<_> = keys
421 .iter()
422 .map(|key| {
423 let con = con.clone();
424 async move {
425 let instrument_id = key
426 .as_str()
427 .rsplit(':')
428 .next()
429 .ok_or_else(|| {
430 log::error!("Invalid key format: {key}");
431 "Invalid key format"
432 })
433 .and_then(|code| {
434 InstrumentId::from_str(code).map_err(|e| {
435 log::error!("Failed to parse InstrumentId for {key}: {e}");
436 "Invalid instrument ID"
437 })
438 });
439
440 let instrument_id = match instrument_id {
441 Ok(id) => id,
442 Err(_) => return None,
443 };
444
445 match Self::load_synthetic(&con, trader_key, &instrument_id, encoding).await {
446 Ok(Some(synthetic)) => Some((instrument_id, synthetic)),
447 Ok(None) => {
448 log::error!("Synthetic not found: {instrument_id}");
449 None
450 }
451 Err(e) => {
452 log::error!("Failed to load synthetic {instrument_id}: {e}");
453 None
454 }
455 }
456 }
457 })
458 .collect();
459
460 synthetics.extend(join_all(futures).await.into_iter().flatten());
462 log::debug!("Loaded {} synthetics(s)", synthetics.len());
463
464 Ok(synthetics)
465 }
466
467 pub async fn load_accounts(
478 con: &ConnectionManager,
479 trader_key: &str,
480 encoding: SerializationEncoding,
481 ) -> anyhow::Result<AHashMap<AccountId, AccountAny>> {
482 let mut accounts = AHashMap::new();
483 let pattern = format!("{trader_key}{REDIS_DELIMITER}{ACCOUNTS}*");
484 log::debug!("Loading {pattern}");
485
486 let mut con = con.clone();
487 let keys = Self::scan_keys(&mut con, pattern).await?;
488
489 let futures: Vec<_> = keys
490 .iter()
491 .map(|key| {
492 let con = con.clone();
493 async move {
494 let account_id = if let Some(code) = key.as_str().rsplit(':').next() {
495 AccountId::from(code)
496 } else {
497 log::error!("Invalid key format: {key}");
498 return None;
499 };
500
501 match Self::load_account(&con, trader_key, &account_id, encoding).await {
502 Ok(Some(account)) => Some((account_id, account)),
503 Ok(None) => {
504 log::error!("Account not found: {account_id}");
505 None
506 }
507 Err(e) => {
508 log::error!("Failed to load account {account_id}: {e}");
509 None
510 }
511 }
512 }
513 })
514 .collect();
515
516 accounts.extend(join_all(futures).await.into_iter().flatten());
518 log::debug!("Loaded {} accounts(s)", accounts.len());
519
520 Ok(accounts)
521 }
522
523 pub async fn load_orders(
534 con: &ConnectionManager,
535 trader_key: &str,
536 encoding: SerializationEncoding,
537 ) -> anyhow::Result<AHashMap<ClientOrderId, OrderAny>> {
538 let mut orders = AHashMap::new();
539 let pattern = format!("{trader_key}{REDIS_DELIMITER}{ORDERS}*");
540 log::debug!("Loading {pattern}");
541
542 let mut con = con.clone();
543 let keys = Self::scan_keys(&mut con, pattern).await?;
544
545 let futures: Vec<_> = keys
546 .iter()
547 .map(|key| {
548 let con = con.clone();
549 async move {
550 let client_order_id = if let Some(code) = key.as_str().rsplit(':').next() {
551 ClientOrderId::from(code)
552 } else {
553 log::error!("Invalid key format: {key}");
554 return None;
555 };
556
557 match Self::load_order(&con, trader_key, &client_order_id, encoding).await {
558 Ok(Some(order)) => Some((client_order_id, order)),
559 Ok(None) => {
560 log::error!("Order not found: {client_order_id}");
561 None
562 }
563 Err(e) => {
564 log::error!("Failed to load order {client_order_id}: {e}");
565 None
566 }
567 }
568 }
569 })
570 .collect();
571
572 orders.extend(join_all(futures).await.into_iter().flatten());
574 log::debug!("Loaded {} order(s)", orders.len());
575
576 Ok(orders)
577 }
578
579 pub async fn load_positions(
590 con: &ConnectionManager,
591 trader_key: &str,
592 encoding: SerializationEncoding,
593 ) -> anyhow::Result<AHashMap<PositionId, Position>> {
594 let mut positions = AHashMap::new();
595 let pattern = format!("{trader_key}{REDIS_DELIMITER}{POSITIONS}*");
596 log::debug!("Loading {pattern}");
597
598 let mut con = con.clone();
599 let keys = Self::scan_keys(&mut con, pattern).await?;
600
601 let futures: Vec<_> = keys
602 .iter()
603 .map(|key| {
604 let con = con.clone();
605 async move {
606 let position_id = if let Some(code) = key.as_str().rsplit(':').next() {
607 PositionId::from(code)
608 } else {
609 log::error!("Invalid key format: {key}");
610 return None;
611 };
612
613 match Self::load_position(&con, trader_key, &position_id, encoding).await {
614 Ok(Some(position)) => Some((position_id, position)),
615 Ok(None) => {
616 log::error!("Position not found: {position_id}");
617 None
618 }
619 Err(e) => {
620 log::error!("Failed to load position {position_id}: {e}");
621 None
622 }
623 }
624 }
625 })
626 .collect();
627
628 positions.extend(join_all(futures).await.into_iter().flatten());
630 log::debug!("Loaded {} position(s)", positions.len());
631
632 Ok(positions)
633 }
634
635 pub async fn load_custom_data(
645 con: &ConnectionManager,
646 trader_key: &str,
647 data_type: &DataType,
648 ) -> anyhow::Result<Vec<CustomData>> {
649 let pattern = format!("{trader_key}{REDIS_DELIMITER}{CUSTOM}*");
650 log::debug!("Loading custom data {pattern}");
651
652 let mut con = con.clone();
653 let keys = Self::scan_keys(&mut con, pattern).await?;
654
655 if keys.is_empty() {
656 return Ok(Vec::new());
657 }
658
659 let values = Self::read_bulk(&con, &keys).await?;
660 let request_type_name = data_type.type_name();
661 let request_short = request_type_name
662 .rsplit([':', '.'])
663 .next()
664 .unwrap_or(request_type_name);
665 let request_identifier = data_type.identifier().unwrap_or("");
666
667 let mut results = Vec::new();
668
669 for value_opt in values {
670 let Some(value_bytes) = value_opt else {
671 continue;
672 };
673 let custom = match CustomData::from_json_bytes(value_bytes.as_ref()) {
674 Ok(c) => c,
675 Err(e) => {
676 log::warn!("Failed to deserialize custom data from Redis: {e}");
677 continue;
678 }
679 };
680 let stored_type_name = custom.data_type.type_name();
681 let type_match =
682 stored_type_name == request_type_name || stored_type_name == request_short;
683 let identifier_match =
684 custom.data_type.identifier().unwrap_or("") == request_identifier;
685 let metadata_match = match (data_type.metadata(), custom.data_type.metadata()) {
686 (None, None) => true,
687 (Some(a), Some(b)) => serde_json::to_value(a).ok() == serde_json::to_value(b).ok(),
688 _ => false,
689 };
690
691 if type_match && identifier_match && metadata_match {
692 results.push(custom);
693 }
694 }
695
696 results.sort_by_key(|c| c.ts_init());
697 log::debug!("Loaded {} custom data item(s)", results.len());
698 Ok(results)
699 }
700
701 pub async fn load_currency(
707 con: &ConnectionManager,
708 trader_key: &str,
709 code: &Ustr,
710 encoding: SerializationEncoding,
711 ) -> anyhow::Result<Option<Currency>> {
712 let key = format!("{CURRENCIES}{REDIS_DELIMITER}{code}");
713 let result = Self::read(con, trader_key, &key).await?;
714
715 if result.is_empty() {
716 return Ok(None);
717 }
718
719 let currency = Self::deserialize_payload(encoding, &result[0])?;
720 Ok(currency)
721 }
722
723 pub async fn load_instrument(
729 con: &ConnectionManager,
730 trader_key: &str,
731 instrument_id: &InstrumentId,
732 encoding: SerializationEncoding,
733 ) -> anyhow::Result<Option<InstrumentAny>> {
734 let key = format!("{INSTRUMENTS}{REDIS_DELIMITER}{instrument_id}");
735 let result = Self::read(con, trader_key, &key).await?;
736 if result.is_empty() {
737 return Ok(None);
738 }
739
740 let instrument: InstrumentAny = Self::deserialize_payload(encoding, &result[0])?;
741 Ok(Some(instrument))
742 }
743
744 pub async fn load_synthetic(
750 con: &ConnectionManager,
751 trader_key: &str,
752 instrument_id: &InstrumentId,
753 encoding: SerializationEncoding,
754 ) -> anyhow::Result<Option<SyntheticInstrument>> {
755 let key = format!("{SYNTHETICS}{REDIS_DELIMITER}{instrument_id}");
756 let result = Self::read(con, trader_key, &key).await?;
757 if result.is_empty() {
758 return Ok(None);
759 }
760
761 let synthetic: SyntheticInstrument = Self::deserialize_payload(encoding, &result[0])?;
762 Ok(Some(synthetic))
763 }
764
765 pub async fn load_account(
771 con: &ConnectionManager,
772 trader_key: &str,
773 account_id: &AccountId,
774 encoding: SerializationEncoding,
775 ) -> anyhow::Result<Option<AccountAny>> {
776 let key = format!("{ACCOUNTS}{REDIS_DELIMITER}{account_id}");
777 let result = Self::read(con, trader_key, &key).await?;
778 if result.is_empty() {
779 return Ok(None);
780 }
781
782 let account: AccountAny = Self::deserialize_payload(encoding, &result[0])?;
783 Ok(Some(account))
784 }
785
786 pub async fn load_order(
792 con: &ConnectionManager,
793 trader_key: &str,
794 client_order_id: &ClientOrderId,
795 encoding: SerializationEncoding,
796 ) -> anyhow::Result<Option<OrderAny>> {
797 let key = format!("{ORDERS}{REDIS_DELIMITER}{client_order_id}");
798 let result = Self::read(con, trader_key, &key).await?;
799 if result.is_empty() {
800 return Ok(None);
801 }
802
803 let order: OrderAny = Self::deserialize_payload(encoding, &result[0])?;
804 Ok(Some(order))
805 }
806
807 pub async fn load_position(
813 con: &ConnectionManager,
814 trader_key: &str,
815 position_id: &PositionId,
816 encoding: SerializationEncoding,
817 ) -> anyhow::Result<Option<Position>> {
818 let key = format!("{POSITIONS}{REDIS_DELIMITER}{position_id}");
819 let result = Self::read(con, trader_key, &key).await?;
820 if result.is_empty() {
821 return Ok(None);
822 }
823
824 let position: Position = Self::deserialize_payload(encoding, &result[0])?;
825 Ok(Some(position))
826 }
827
828 fn get_collection_key(key: &str) -> anyhow::Result<&str> {
829 key.split_once(REDIS_DELIMITER)
830 .map(|(collection, _)| collection)
831 .ok_or_else(|| {
832 anyhow::anyhow!("Invalid `key`, missing a '{REDIS_DELIMITER}' delimiter, was {key}")
833 })
834 }
835
836 async fn read_index(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
837 let index_key = get_index_key(key)?;
838 match index_key {
839 INDEX_ORDER_IDS => Self::read_set(conn, key).await,
840 INDEX_ORDER_POSITION => Self::read_hset(conn, key).await,
841 INDEX_ORDER_CLIENT => Self::read_hset(conn, key).await,
842 INDEX_ORDERS => Self::read_set(conn, key).await,
843 INDEX_ORDERS_OPEN => Self::read_set(conn, key).await,
844 INDEX_ORDERS_CLOSED => Self::read_set(conn, key).await,
845 INDEX_ORDERS_EMULATED => Self::read_set(conn, key).await,
846 INDEX_ORDERS_INFLIGHT => Self::read_set(conn, key).await,
847 INDEX_POSITIONS => Self::read_set(conn, key).await,
848 INDEX_POSITIONS_OPEN => Self::read_set(conn, key).await,
849 INDEX_POSITIONS_CLOSED => Self::read_set(conn, key).await,
850 _ => anyhow::bail!("Index unknown '{index_key}' on read"),
851 }
852 }
853
854 async fn read_string(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
855 let result: Vec<u8> = conn.get(key).await?;
856
857 if result.is_empty() {
858 Ok(vec![])
859 } else {
860 Ok(vec![Bytes::from(result)])
861 }
862 }
863
864 async fn read_set(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
865 let result: Vec<Bytes> = conn.smembers(key).await?;
866 Ok(result)
867 }
868
869 async fn read_hset(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
870 let result: HashMap<String, String> = conn.hgetall(key).await?;
871 let json = serde_json::to_string(&result)?;
872 Ok(vec![Bytes::from(json.into_bytes())])
873 }
874
875 async fn read_list(conn: &mut ConnectionManager, key: &str) -> anyhow::Result<Vec<Bytes>> {
876 let result: Vec<Bytes> = conn.lrange(key, 0, -1).await?;
877 Ok(result)
878 }
879}
880
881fn is_timestamp_field(key: &str) -> bool {
882 let expire_match = key == "expire_time_ns";
883 let ts_match = key.starts_with("ts_");
884 expire_match || ts_match
885}
886
887fn convert_timestamps(value: &mut Value) {
888 match value {
889 Value::Object(map) => {
890 for (key, v) in map {
891 if is_timestamp_field(key)
892 && let Value::Number(n) = v
893 && let Some(n) = n.as_u64()
894 {
895 let dt = DateTime::<Utc>::from_timestamp_nanos(n as i64);
896 *v = Value::String(dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true));
897 }
898 convert_timestamps(v);
899 }
900 }
901 Value::Array(arr) => {
902 for item in arr {
903 convert_timestamps(item);
904 }
905 }
906 _ => {}
907 }
908}
909
910fn convert_timestamp_strings(value: &mut Value) {
911 match value {
912 Value::Object(map) => {
913 for (key, v) in map {
914 if is_timestamp_field(key)
915 && let Value::String(s) = v
916 && let Ok(dt) = DateTime::parse_from_rfc3339(s)
917 {
918 *v = Value::Number(
919 (dt.with_timezone(&Utc)
920 .timestamp_nanos_opt()
921 .expect("Invalid DateTime") as u64)
922 .into(),
923 );
924 }
925 convert_timestamp_strings(v);
926 }
927 }
928 Value::Array(arr) => {
929 for item in arr {
930 convert_timestamp_strings(item);
931 }
932 }
933 _ => {}
934 }
935}