Skip to main content

nautilus_infrastructure/python/redis/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use bytes::Bytes;
17use nautilus_common::live::get_runtime;
18use nautilus_core::{
19    UUID4,
20    python::{to_pyruntime_err, to_pyvalue_err},
21};
22use nautilus_model::{
23    data::{CustomData, DataType},
24    identifiers::{AccountId, ClientOrderId, PositionId, TraderId},
25    python::{
26        account::account_any_to_pyobject, instruments::instrument_any_to_pyobject,
27        orders::order_any_to_pyobject,
28    },
29};
30use pyo3::{
31    IntoPyObjectExt,
32    prelude::*,
33    types::{PyBytes, PyDict},
34};
35
36use crate::redis::{cache::RedisCacheDatabase, queries::DatabaseQueries};
37
38#[pymethods]
39impl RedisCacheDatabase {
40    /// Creates a new `RedisCacheDatabase` instance for the given `trader_id`, `instance_id`, and `config`.
41    ///
42    /// # Errors
43    ///
44    /// Returns an error if:
45    /// - The database configuration is missing in `config`.
46    /// - Establishing the Redis connection fails.
47    /// - The command processing task cannot be spawned.
48    #[new]
49    fn py_new(trader_id: TraderId, instance_id: UUID4, config_json: &[u8]) -> PyResult<Self> {
50        let config = serde_json::from_slice(config_json).map_err(to_pyvalue_err)?;
51        let result =
52            get_runtime().block_on(async { Self::new(trader_id, instance_id, config).await });
53        result.map_err(to_pyruntime_err)
54    }
55
56    #[pyo3(name = "close")]
57    fn py_close(&mut self) {
58        self.close();
59    }
60
61    #[pyo3(name = "flushdb")]
62    fn py_flushdb(&mut self) {
63        get_runtime().block_on(async { self.flushdb().await });
64    }
65
66    /// Retrieves all keys matching the given `pattern` from Redis for this trader.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if the underlying Redis scan operation fails.
71    #[pyo3(name = "keys")]
72    fn py_keys(&mut self, pattern: &str) -> PyResult<Vec<String>> {
73        let result = get_runtime().block_on(async { self.keys(pattern).await });
74        result.map_err(to_pyruntime_err)
75    }
76
77    #[pyo3(name = "load_all")]
78    fn py_load_all(&mut self) -> PyResult<Py<PyAny>> {
79        let result = get_runtime().block_on(async {
80            DatabaseQueries::load_all(&self.con, self.get_encoding(), self.get_trader_key()).await
81        });
82
83        match result {
84            Ok(cache_map) => Python::attach(|py| {
85                let dict = PyDict::new(py);
86
87                // Load currencies
88                let currencies_dict = PyDict::new(py);
89                for (key, value) in cache_map.currencies {
90                    currencies_dict
91                        .set_item(key.to_string(), value)
92                        .map_err(to_pyvalue_err)?;
93                }
94                dict.set_item("currencies", currencies_dict)
95                    .map_err(to_pyvalue_err)?;
96
97                // Load instruments
98                let instruments_dict = PyDict::new(py);
99                for (key, value) in cache_map.instruments {
100                    let py_object = instrument_any_to_pyobject(py, value)?;
101                    instruments_dict
102                        .set_item(key, py_object)
103                        .map_err(to_pyvalue_err)?;
104                }
105                dict.set_item("instruments", instruments_dict)
106                    .map_err(to_pyvalue_err)?;
107
108                // Load synthetics
109                let synthetics_dict = PyDict::new(py);
110                for (key, value) in cache_map.synthetics {
111                    synthetics_dict
112                        .set_item(key, value)
113                        .map_err(to_pyvalue_err)?;
114                }
115                dict.set_item("synthetics", synthetics_dict)
116                    .map_err(to_pyvalue_err)?;
117
118                // Load accounts
119                let accounts_dict = PyDict::new(py);
120                for (key, value) in cache_map.accounts {
121                    let py_object = account_any_to_pyobject(py, value)?;
122                    accounts_dict
123                        .set_item(key, py_object)
124                        .map_err(to_pyvalue_err)?;
125                }
126                dict.set_item("accounts", accounts_dict)
127                    .map_err(to_pyvalue_err)?;
128
129                // Load orders
130                let orders_dict = PyDict::new(py);
131                for (key, value) in cache_map.orders {
132                    let py_object = order_any_to_pyobject(py, value)?;
133                    orders_dict
134                        .set_item(key, py_object)
135                        .map_err(to_pyvalue_err)?;
136                }
137                dict.set_item("orders", orders_dict)
138                    .map_err(to_pyvalue_err)?;
139
140                // Load positions
141                let positions_dict = PyDict::new(py);
142                for (key, value) in cache_map.positions {
143                    positions_dict
144                        .set_item(key, value)
145                        .map_err(to_pyvalue_err)?;
146                }
147                dict.set_item("positions", positions_dict)
148                    .map_err(to_pyvalue_err)?;
149
150                dict.into_py_any(py)
151            }),
152            Err(e) => Err(to_pyruntime_err(e)),
153        }
154    }
155
156    /// Reads the value(s) associated with `key` for this trader from Redis.
157    ///
158    /// # Errors
159    ///
160    /// Returns an error if the underlying Redis read operation fails.
161    #[pyo3(name = "read")]
162    fn py_read(&mut self, py: Python, key: &str) -> PyResult<Vec<Py<PyAny>>> {
163        let result = get_runtime().block_on(async { self.read(key).await });
164        match result {
165            Ok(result) => {
166                let vec_py_bytes = result
167                    .into_iter()
168                    .map(|r| PyBytes::new(py, r.as_ref()).into())
169                    .collect::<Vec<Py<PyAny>>>();
170                Ok(vec_py_bytes)
171            }
172            Err(e) => Err(to_pyruntime_err(e)),
173        }
174    }
175
176    /// Reads multiple values using bulk operations for efficiency.
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if the underlying Redis read operation fails.
181    #[pyo3(name = "read_bulk")]
182    #[expect(clippy::needless_pass_by_value)]
183    fn py_read_bulk(&mut self, py: Python, keys: Vec<String>) -> PyResult<Vec<Option<Py<PyAny>>>> {
184        let result = get_runtime().block_on(async { self.read_bulk(&keys).await });
185        match result {
186            Ok(results) => {
187                let vec_py_bytes = results
188                    .into_iter()
189                    .map(|opt| opt.map(|bytes| PyBytes::new(py, bytes.as_ref()).into()))
190                    .collect::<Vec<Option<Py<PyAny>>>>();
191                Ok(vec_py_bytes)
192            }
193            Err(e) => Err(to_pyruntime_err(e)),
194        }
195    }
196
197    /// Sends an insert command for `key` with optional `payload` to Redis via the background task.
198    ///
199    /// # Errors
200    ///
201    /// Returns an error if the command cannot be sent to the background task channel.
202    #[pyo3(name = "insert")]
203    fn py_insert(&mut self, key: String, payload: Vec<Vec<u8>>) -> PyResult<()> {
204        let payload: Vec<Bytes> = payload.into_iter().map(Bytes::from).collect();
205        self.insert(key, Some(payload)).map_err(to_pyvalue_err)
206    }
207
208    /// Sends an update command for `key` with optional `payload` to Redis via the background task.
209    ///
210    /// # Errors
211    ///
212    /// Returns an error if the command cannot be sent to the background task channel.
213    #[pyo3(name = "update")]
214    fn py_update(&mut self, key: String, payload: Vec<Vec<u8>>) -> PyResult<()> {
215        let payload: Vec<Bytes> = payload.into_iter().map(Bytes::from).collect();
216        self.update(key, Some(payload)).map_err(to_pyvalue_err)
217    }
218
219    /// Sends a delete command for `key` with optional `payload` to Redis via the background task.
220    ///
221    /// # Errors
222    ///
223    /// Returns an error if the command cannot be sent to the background task channel.
224    #[pyo3(name = "delete")]
225    #[pyo3(signature = (key, payload=None))]
226    fn py_delete(&mut self, key: String, payload: Option<Vec<Vec<u8>>>) -> PyResult<()> {
227        let payload: Option<Vec<Bytes>> =
228            payload.map(|vec| vec.into_iter().map(Bytes::from).collect());
229        self.delete(key, payload).map_err(to_pyvalue_err)
230    }
231
232    /// Delete the given order from the database with full index cleanup.
233    ///
234    /// # Errors
235    ///
236    /// Returns an error if the command cannot be sent to the background task channel.
237    #[pyo3(name = "delete_order")]
238    fn py_delete_order(&mut self, client_order_id: &str) -> PyResult<()> {
239        let client_order_id = ClientOrderId::new(client_order_id);
240        self.delete_order(&client_order_id).map_err(to_pyvalue_err)
241    }
242
243    /// Delete the given position from the database with full index cleanup.
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if the command cannot be sent to the background task channel.
248    #[pyo3(name = "delete_position")]
249    fn py_delete_position(&mut self, position_id: &str) -> PyResult<()> {
250        let position_id = PositionId::new(position_id);
251        self.delete_position(&position_id).map_err(to_pyvalue_err)
252    }
253
254    /// Delete the given account event from the database.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if the command cannot be sent to the background task channel.
259    #[pyo3(name = "delete_account_event")]
260    fn py_delete_account_event(&mut self, account_id: &str, event_id: &str) -> PyResult<()> {
261        let account_id = AccountId::new(account_id);
262        self.delete_account_event(&account_id, event_id)
263            .map_err(to_pyvalue_err)
264    }
265
266    /// Stores custom data in Redis (key format: `custom:<ts_init_020>:<uuid>`, value: full JSON).
267    ///
268    /// # Errors
269    ///
270    /// Returns an error if serialization fails or the insert command cannot be sent.
271    #[pyo3(name = "add_custom_data")]
272    #[expect(clippy::needless_pass_by_value)]
273    fn py_add_custom_data(&mut self, data: CustomData) -> PyResult<()> {
274        self.add_custom_data(&data).map_err(to_pyvalue_err)
275    }
276
277    /// Loads custom data from Redis matching the given `data_type` (blocking).
278    ///
279    /// Spawns the async query on the global Nautilus runtime and blocks until
280    /// the result arrives via a channel. Safe from any thread context (Python,
281    /// test runtimes, plain threads).
282    #[pyo3(name = "load_custom_data")]
283    #[expect(clippy::needless_pass_by_value)]
284    fn py_load_custom_data(
285        &mut self,
286        py: Python<'_>,
287        data_type: DataType,
288    ) -> PyResult<Vec<CustomData>> {
289        py.detach(|| self.load_custom_data(&data_type).map_err(to_pyvalue_err))
290    }
291}