nautilus_infrastructure/python/redis/
cache.rs1use bytes::Bytes;
17use nautilus_common::live::get_runtime;
18use nautilus_core::{
19 UUID4,
20 python::{to_pyruntime_err, to_pyvalue_err},
21};
22use nautilus_model::{
23 data::{CustomData, DataType},
24 identifiers::{AccountId, ClientOrderId, PositionId, TraderId},
25 python::{
26 account::account_any_to_pyobject, instruments::instrument_any_to_pyobject,
27 orders::order_any_to_pyobject,
28 },
29};
30use pyo3::{
31 IntoPyObjectExt,
32 prelude::*,
33 types::{PyBytes, PyDict},
34};
35
36use crate::redis::{cache::RedisCacheDatabase, queries::DatabaseQueries};
37
38#[pymethods]
39impl RedisCacheDatabase {
40 #[new]
49 fn py_new(trader_id: TraderId, instance_id: UUID4, config_json: &[u8]) -> PyResult<Self> {
50 let config = serde_json::from_slice(config_json).map_err(to_pyvalue_err)?;
51 let result =
52 get_runtime().block_on(async { Self::new(trader_id, instance_id, config).await });
53 result.map_err(to_pyruntime_err)
54 }
55
56 #[pyo3(name = "close")]
57 fn py_close(&mut self) {
58 self.close();
59 }
60
61 #[pyo3(name = "flushdb")]
62 fn py_flushdb(&mut self) {
63 get_runtime().block_on(async { self.flushdb().await });
64 }
65
66 #[pyo3(name = "keys")]
72 fn py_keys(&mut self, pattern: &str) -> PyResult<Vec<String>> {
73 let result = get_runtime().block_on(async { self.keys(pattern).await });
74 result.map_err(to_pyruntime_err)
75 }
76
77 #[pyo3(name = "load_all")]
78 fn py_load_all(&mut self) -> PyResult<Py<PyAny>> {
79 let result = get_runtime().block_on(async {
80 DatabaseQueries::load_all(&self.con, self.get_encoding(), self.get_trader_key()).await
81 });
82
83 match result {
84 Ok(cache_map) => Python::attach(|py| {
85 let dict = PyDict::new(py);
86
87 let currencies_dict = PyDict::new(py);
89 for (key, value) in cache_map.currencies {
90 currencies_dict
91 .set_item(key.to_string(), value)
92 .map_err(to_pyvalue_err)?;
93 }
94 dict.set_item("currencies", currencies_dict)
95 .map_err(to_pyvalue_err)?;
96
97 let instruments_dict = PyDict::new(py);
99 for (key, value) in cache_map.instruments {
100 let py_object = instrument_any_to_pyobject(py, value)?;
101 instruments_dict
102 .set_item(key, py_object)
103 .map_err(to_pyvalue_err)?;
104 }
105 dict.set_item("instruments", instruments_dict)
106 .map_err(to_pyvalue_err)?;
107
108 let synthetics_dict = PyDict::new(py);
110 for (key, value) in cache_map.synthetics {
111 synthetics_dict
112 .set_item(key, value)
113 .map_err(to_pyvalue_err)?;
114 }
115 dict.set_item("synthetics", synthetics_dict)
116 .map_err(to_pyvalue_err)?;
117
118 let accounts_dict = PyDict::new(py);
120 for (key, value) in cache_map.accounts {
121 let py_object = account_any_to_pyobject(py, value)?;
122 accounts_dict
123 .set_item(key, py_object)
124 .map_err(to_pyvalue_err)?;
125 }
126 dict.set_item("accounts", accounts_dict)
127 .map_err(to_pyvalue_err)?;
128
129 let orders_dict = PyDict::new(py);
131 for (key, value) in cache_map.orders {
132 let py_object = order_any_to_pyobject(py, value)?;
133 orders_dict
134 .set_item(key, py_object)
135 .map_err(to_pyvalue_err)?;
136 }
137 dict.set_item("orders", orders_dict)
138 .map_err(to_pyvalue_err)?;
139
140 let positions_dict = PyDict::new(py);
142 for (key, value) in cache_map.positions {
143 positions_dict
144 .set_item(key, value)
145 .map_err(to_pyvalue_err)?;
146 }
147 dict.set_item("positions", positions_dict)
148 .map_err(to_pyvalue_err)?;
149
150 dict.into_py_any(py)
151 }),
152 Err(e) => Err(to_pyruntime_err(e)),
153 }
154 }
155
156 #[pyo3(name = "read")]
162 fn py_read(&mut self, py: Python, key: &str) -> PyResult<Vec<Py<PyAny>>> {
163 let result = get_runtime().block_on(async { self.read(key).await });
164 match result {
165 Ok(result) => {
166 let vec_py_bytes = result
167 .into_iter()
168 .map(|r| PyBytes::new(py, r.as_ref()).into())
169 .collect::<Vec<Py<PyAny>>>();
170 Ok(vec_py_bytes)
171 }
172 Err(e) => Err(to_pyruntime_err(e)),
173 }
174 }
175
176 #[pyo3(name = "read_bulk")]
182 #[expect(clippy::needless_pass_by_value)]
183 fn py_read_bulk(&mut self, py: Python, keys: Vec<String>) -> PyResult<Vec<Option<Py<PyAny>>>> {
184 let result = get_runtime().block_on(async { self.read_bulk(&keys).await });
185 match result {
186 Ok(results) => {
187 let vec_py_bytes = results
188 .into_iter()
189 .map(|opt| opt.map(|bytes| PyBytes::new(py, bytes.as_ref()).into()))
190 .collect::<Vec<Option<Py<PyAny>>>>();
191 Ok(vec_py_bytes)
192 }
193 Err(e) => Err(to_pyruntime_err(e)),
194 }
195 }
196
197 #[pyo3(name = "insert")]
203 fn py_insert(&mut self, key: String, payload: Vec<Vec<u8>>) -> PyResult<()> {
204 let payload: Vec<Bytes> = payload.into_iter().map(Bytes::from).collect();
205 self.insert(key, Some(payload)).map_err(to_pyvalue_err)
206 }
207
208 #[pyo3(name = "update")]
214 fn py_update(&mut self, key: String, payload: Vec<Vec<u8>>) -> PyResult<()> {
215 let payload: Vec<Bytes> = payload.into_iter().map(Bytes::from).collect();
216 self.update(key, Some(payload)).map_err(to_pyvalue_err)
217 }
218
219 #[pyo3(name = "delete")]
225 #[pyo3(signature = (key, payload=None))]
226 fn py_delete(&mut self, key: String, payload: Option<Vec<Vec<u8>>>) -> PyResult<()> {
227 let payload: Option<Vec<Bytes>> =
228 payload.map(|vec| vec.into_iter().map(Bytes::from).collect());
229 self.delete(key, payload).map_err(to_pyvalue_err)
230 }
231
232 #[pyo3(name = "delete_order")]
238 fn py_delete_order(&mut self, client_order_id: &str) -> PyResult<()> {
239 let client_order_id = ClientOrderId::new(client_order_id);
240 self.delete_order(&client_order_id).map_err(to_pyvalue_err)
241 }
242
243 #[pyo3(name = "delete_position")]
249 fn py_delete_position(&mut self, position_id: &str) -> PyResult<()> {
250 let position_id = PositionId::new(position_id);
251 self.delete_position(&position_id).map_err(to_pyvalue_err)
252 }
253
254 #[pyo3(name = "delete_account_event")]
260 fn py_delete_account_event(&mut self, account_id: &str, event_id: &str) -> PyResult<()> {
261 let account_id = AccountId::new(account_id);
262 self.delete_account_event(&account_id, event_id)
263 .map_err(to_pyvalue_err)
264 }
265
266 #[pyo3(name = "add_custom_data")]
272 #[expect(clippy::needless_pass_by_value)]
273 fn py_add_custom_data(&mut self, data: CustomData) -> PyResult<()> {
274 self.add_custom_data(&data).map_err(to_pyvalue_err)
275 }
276
277 #[pyo3(name = "load_custom_data")]
283 #[expect(clippy::needless_pass_by_value)]
284 fn py_load_custom_data(
285 &mut self,
286 py: Python<'_>,
287 data_type: DataType,
288 ) -> PyResult<Vec<CustomData>> {
289 py.detach(|| self.load_custom_data(&data_type).map_err(to_pyvalue_err))
290 }
291}