1use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20 enums::OrderSide,
21 identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
22 python::instruments::pyobject_to_instrument_any,
23};
24use pyo3::{conversion::IntoPyObjectExt, prelude::*, types::PyDict};
25
26use crate::{
27 broadcast::canceller::{CancelBroadcaster, CancelBroadcasterConfig},
28 common::enums::BitmexEnvironment,
29};
30
31#[pymethods]
32#[pyo3_stub_gen::derive::gen_stub_pymethods]
33impl CancelBroadcaster {
34 #[new]
40 #[pyo3(signature = (
41 pool_size,
42 api_key=None,
43 api_secret=None,
44 base_url=None,
45 environment=BitmexEnvironment::Mainnet,
46 timeout_secs=60,
47 max_retries=3,
48 retry_delay_ms=1_000,
49 retry_delay_max_ms=5_000,
50 recv_window_ms=10_000,
51 max_requests_per_second=10,
52 max_requests_per_minute=120,
53 health_check_interval_secs=30,
54 health_check_timeout_secs=5,
55 expected_reject_patterns=None,
56 idempotent_success_patterns=None,
57 proxy_urls=None
58 ))]
59 #[expect(clippy::too_many_arguments)]
60 fn py_new(
61 pool_size: usize,
62 api_key: Option<String>,
63 api_secret: Option<String>,
64 base_url: Option<String>,
65 environment: BitmexEnvironment,
66 timeout_secs: u64,
67 max_retries: u32,
68 retry_delay_ms: u64,
69 retry_delay_max_ms: u64,
70 recv_window_ms: u64,
71 max_requests_per_second: u32,
72 max_requests_per_minute: u32,
73 health_check_interval_secs: u64,
74 health_check_timeout_secs: u64,
75 expected_reject_patterns: Option<Vec<String>>,
76 idempotent_success_patterns: Option<Vec<String>>,
77 proxy_urls: Option<Vec<Option<String>>>,
78 ) -> PyResult<Self> {
79 let config = CancelBroadcasterConfig {
80 pool_size,
81 api_key,
82 api_secret,
83 base_url,
84 environment,
85 timeout_secs,
86 max_retries,
87 retry_delay_ms,
88 retry_delay_max_ms,
89 recv_window_ms,
90 max_requests_per_second,
91 max_requests_per_minute,
92 health_check_interval_secs,
93 health_check_timeout_secs,
94 expected_reject_patterns: expected_reject_patterns
95 .unwrap_or_else(|| CancelBroadcasterConfig::default().expected_reject_patterns),
96 idempotent_success_patterns: idempotent_success_patterns
97 .unwrap_or_else(|| CancelBroadcasterConfig::default().idempotent_success_patterns),
98 proxy_urls: proxy_urls.unwrap_or_default(),
99 };
100
101 Self::new(config).map_err(to_pyvalue_err)
102 }
103
104 #[pyo3(name = "cache_instrument")]
106 fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
107 let inst_any = pyobject_to_instrument_any(py, instrument)?;
108 self.cache_instrument(&inst_any);
109 Ok(())
110 }
111
112 #[pyo3(name = "start")]
118 fn py_start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
119 let broadcaster = self.clone_for_async();
120 pyo3_async_runtimes::tokio::future_into_py(py, async move {
121 broadcaster.start().await.map_err(to_pyvalue_err)
122 })
123 }
124
125 #[pyo3(name = "stop")]
127 fn py_stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
128 let broadcaster = self.clone_for_async();
129 pyo3_async_runtimes::tokio::future_into_py(py, async move {
130 broadcaster.stop().await;
131 Ok(())
132 })
133 }
134
135 #[pyo3(name = "broadcast_cancel")]
143 fn py_broadcast_cancel<'py>(
144 &self,
145 py: Python<'py>,
146 instrument_id: InstrumentId,
147 client_order_id: Option<ClientOrderId>,
148 venue_order_id: Option<VenueOrderId>,
149 ) -> PyResult<Bound<'py, PyAny>> {
150 let broadcaster = self.clone_for_async();
151 pyo3_async_runtimes::tokio::future_into_py(py, async move {
152 let report = broadcaster
153 .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
154 .await
155 .map_err(to_pyvalue_err)?;
156
157 Python::attach(|py| match report {
158 Some(r) => r.into_py_any(py),
159 None => Ok(py.None()),
160 })
161 })
162 }
163
164 #[pyo3(name = "broadcast_batch_cancel")]
166 fn py_broadcast_batch_cancel<'py>(
167 &self,
168 py: Python<'py>,
169 instrument_id: InstrumentId,
170 client_order_ids: Option<Vec<ClientOrderId>>,
171 venue_order_ids: Option<Vec<VenueOrderId>>,
172 ) -> PyResult<Bound<'py, PyAny>> {
173 let broadcaster = self.clone_for_async();
174 pyo3_async_runtimes::tokio::future_into_py(py, async move {
175 let reports = broadcaster
176 .broadcast_batch_cancel(instrument_id, client_order_ids, venue_order_ids)
177 .await
178 .map_err(to_pyvalue_err)?;
179
180 Python::attach(|py| {
181 let py_reports: PyResult<Vec<_>> = reports
182 .into_iter()
183 .map(|report| report.into_py_any(py))
184 .collect();
185 let pylist = pyo3::types::PyList::new(py, py_reports?)
186 .unwrap()
187 .into_any()
188 .unbind();
189 Ok(pylist)
190 })
191 })
192 }
193
194 #[pyo3(name = "broadcast_cancel_all")]
196 fn py_broadcast_cancel_all<'py>(
197 &self,
198 py: Python<'py>,
199 instrument_id: InstrumentId,
200 order_side: Option<OrderSide>,
201 ) -> PyResult<Bound<'py, PyAny>> {
202 let broadcaster = self.clone_for_async();
203 pyo3_async_runtimes::tokio::future_into_py(py, async move {
204 let reports = broadcaster
205 .broadcast_cancel_all(instrument_id, order_side)
206 .await
207 .map_err(to_pyvalue_err)?;
208
209 Python::attach(|py| {
210 let py_reports: PyResult<Vec<_>> = reports
211 .into_iter()
212 .map(|report| report.into_py_any(py))
213 .collect();
214 let pylist = pyo3::types::PyList::new(py, py_reports?)
215 .unwrap()
216 .into_any()
217 .unbind();
218 Ok(pylist)
219 })
220 })
221 }
222
223 #[pyo3(name = "get_metrics")]
225 fn py_get_metrics(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
226 let metrics = self.get_metrics();
227 let dict = PyDict::new(py);
228 dict.set_item("total_cancels", metrics.total_cancels)?;
229 dict.set_item("successful_cancels", metrics.successful_cancels)?;
230 dict.set_item("failed_cancels", metrics.failed_cancels)?;
231 dict.set_item("expected_rejects", metrics.expected_rejects)?;
232 dict.set_item("idempotent_successes", metrics.idempotent_successes)?;
233 dict.set_item("healthy_clients", metrics.healthy_clients)?;
234 dict.set_item("total_clients", metrics.total_clients)?;
235 Ok(dict.into())
236 }
237
238 #[pyo3(name = "get_client_stats")]
240 fn py_get_client_stats(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
241 let stats = self.get_client_stats();
242 let list = pyo3::types::PyList::empty(py);
243 for stat in stats {
244 let dict = PyDict::new(py);
245 dict.set_item("client_id", stat.client_id.clone())?;
246 dict.set_item("healthy", stat.healthy)?;
247 dict.set_item("cancel_count", stat.cancel_count)?;
248 dict.set_item("error_count", stat.error_count)?;
249 list.append(dict)?;
250 }
251 Ok(list.into())
252 }
253}