Skip to main content

nautilus_bitmex/python/
canceller.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the BitMEX cancel broadcaster.
17
18use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20    enums::OrderSide,
21    identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
22    python::instruments::pyobject_to_instrument_any,
23};
24use pyo3::{conversion::IntoPyObjectExt, prelude::*, types::PyDict};
25
26use crate::{
27    broadcast::canceller::{CancelBroadcaster, CancelBroadcasterConfig},
28    common::enums::BitmexEnvironment,
29};
30
31#[pymethods]
32#[pyo3_stub_gen::derive::gen_stub_pymethods]
33impl CancelBroadcaster {
34    /// Broadcasts cancel requests to multiple HTTP clients for redundancy.
35    ///
36    /// This broadcaster fans out cancel requests to multiple pre-warmed HTTP clients
37    /// in parallel, short-circuits when the first successful acknowledgement is received,
38    /// and handles expected rejection patterns with appropriate log levels.
39    #[new]
40    #[pyo3(signature = (
41        pool_size,
42        api_key=None,
43        api_secret=None,
44        base_url=None,
45        environment=BitmexEnvironment::Mainnet,
46        timeout_secs=60,
47        max_retries=3,
48        retry_delay_ms=1_000,
49        retry_delay_max_ms=5_000,
50        recv_window_ms=10_000,
51        max_requests_per_second=10,
52        max_requests_per_minute=120,
53        health_check_interval_secs=30,
54        health_check_timeout_secs=5,
55        expected_reject_patterns=None,
56        idempotent_success_patterns=None,
57        proxy_urls=None
58    ))]
59    #[expect(clippy::too_many_arguments)]
60    fn py_new(
61        pool_size: usize,
62        api_key: Option<String>,
63        api_secret: Option<String>,
64        base_url: Option<String>,
65        environment: BitmexEnvironment,
66        timeout_secs: u64,
67        max_retries: u32,
68        retry_delay_ms: u64,
69        retry_delay_max_ms: u64,
70        recv_window_ms: u64,
71        max_requests_per_second: u32,
72        max_requests_per_minute: u32,
73        health_check_interval_secs: u64,
74        health_check_timeout_secs: u64,
75        expected_reject_patterns: Option<Vec<String>>,
76        idempotent_success_patterns: Option<Vec<String>>,
77        proxy_urls: Option<Vec<Option<String>>>,
78    ) -> PyResult<Self> {
79        let config = CancelBroadcasterConfig {
80            pool_size,
81            api_key,
82            api_secret,
83            base_url,
84            environment,
85            timeout_secs,
86            max_retries,
87            retry_delay_ms,
88            retry_delay_max_ms,
89            recv_window_ms,
90            max_requests_per_second,
91            max_requests_per_minute,
92            health_check_interval_secs,
93            health_check_timeout_secs,
94            expected_reject_patterns: expected_reject_patterns
95                .unwrap_or_else(|| CancelBroadcasterConfig::default().expected_reject_patterns),
96            idempotent_success_patterns: idempotent_success_patterns
97                .unwrap_or_else(|| CancelBroadcasterConfig::default().idempotent_success_patterns),
98            proxy_urls: proxy_urls.unwrap_or_default(),
99        };
100
101        Self::new(config).map_err(to_pyvalue_err)
102    }
103
104    /// Caches an instrument in all HTTP clients in the pool.
105    #[pyo3(name = "cache_instrument")]
106    fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
107        let inst_any = pyobject_to_instrument_any(py, instrument)?;
108        self.cache_instrument(&inst_any);
109        Ok(())
110    }
111
112    /// Starts the broadcaster and health check loop.
113    ///
114    /// # Errors
115    ///
116    /// Returns an error if the broadcaster is already running.
117    #[pyo3(name = "start")]
118    fn py_start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
119        let broadcaster = self.clone_for_async();
120        pyo3_async_runtimes::tokio::future_into_py(py, async move {
121            broadcaster.start().await.map_err(to_pyvalue_err)
122        })
123    }
124
125    /// Stops the broadcaster and health check loop.
126    #[pyo3(name = "stop")]
127    fn py_stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
128        let broadcaster = self.clone_for_async();
129        pyo3_async_runtimes::tokio::future_into_py(py, async move {
130            broadcaster.stop().await;
131            Ok(())
132        })
133    }
134
135    /// Broadcasts a single cancel request to all healthy clients in parallel.
136    ///
137    /// # Returns
138    ///
139    /// - `Ok(Some(report))` if successfully cancelled with a report.
140    /// - `Ok(None)` if the order was already cancelled (idempotent success).
141    /// - `Err` if all requests failed.
142    #[pyo3(name = "broadcast_cancel")]
143    fn py_broadcast_cancel<'py>(
144        &self,
145        py: Python<'py>,
146        instrument_id: InstrumentId,
147        client_order_id: Option<ClientOrderId>,
148        venue_order_id: Option<VenueOrderId>,
149    ) -> PyResult<Bound<'py, PyAny>> {
150        let broadcaster = self.clone_for_async();
151        pyo3_async_runtimes::tokio::future_into_py(py, async move {
152            let report = broadcaster
153                .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
154                .await
155                .map_err(to_pyvalue_err)?;
156
157            Python::attach(|py| match report {
158                Some(r) => r.into_py_any(py),
159                None => Ok(py.None()),
160            })
161        })
162    }
163
164    /// Broadcasts a batch cancel request to all healthy clients in parallel.
165    #[pyo3(name = "broadcast_batch_cancel")]
166    fn py_broadcast_batch_cancel<'py>(
167        &self,
168        py: Python<'py>,
169        instrument_id: InstrumentId,
170        client_order_ids: Option<Vec<ClientOrderId>>,
171        venue_order_ids: Option<Vec<VenueOrderId>>,
172    ) -> PyResult<Bound<'py, PyAny>> {
173        let broadcaster = self.clone_for_async();
174        pyo3_async_runtimes::tokio::future_into_py(py, async move {
175            let reports = broadcaster
176                .broadcast_batch_cancel(instrument_id, client_order_ids, venue_order_ids)
177                .await
178                .map_err(to_pyvalue_err)?;
179
180            Python::attach(|py| {
181                let py_reports: PyResult<Vec<_>> = reports
182                    .into_iter()
183                    .map(|report| report.into_py_any(py))
184                    .collect();
185                let pylist = pyo3::types::PyList::new(py, py_reports?)
186                    .unwrap()
187                    .into_any()
188                    .unbind();
189                Ok(pylist)
190            })
191        })
192    }
193
194    /// Broadcasts a cancel all request to all healthy clients in parallel.
195    #[pyo3(name = "broadcast_cancel_all")]
196    fn py_broadcast_cancel_all<'py>(
197        &self,
198        py: Python<'py>,
199        instrument_id: InstrumentId,
200        order_side: Option<OrderSide>,
201    ) -> PyResult<Bound<'py, PyAny>> {
202        let broadcaster = self.clone_for_async();
203        pyo3_async_runtimes::tokio::future_into_py(py, async move {
204            let reports = broadcaster
205                .broadcast_cancel_all(instrument_id, order_side)
206                .await
207                .map_err(to_pyvalue_err)?;
208
209            Python::attach(|py| {
210                let py_reports: PyResult<Vec<_>> = reports
211                    .into_iter()
212                    .map(|report| report.into_py_any(py))
213                    .collect();
214                let pylist = pyo3::types::PyList::new(py, py_reports?)
215                    .unwrap()
216                    .into_any()
217                    .unbind();
218                Ok(pylist)
219            })
220        })
221    }
222
223    /// Gets broadcaster metrics.
224    #[pyo3(name = "get_metrics")]
225    fn py_get_metrics(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
226        let metrics = self.get_metrics();
227        let dict = PyDict::new(py);
228        dict.set_item("total_cancels", metrics.total_cancels)?;
229        dict.set_item("successful_cancels", metrics.successful_cancels)?;
230        dict.set_item("failed_cancels", metrics.failed_cancels)?;
231        dict.set_item("expected_rejects", metrics.expected_rejects)?;
232        dict.set_item("idempotent_successes", metrics.idempotent_successes)?;
233        dict.set_item("healthy_clients", metrics.healthy_clients)?;
234        dict.set_item("total_clients", metrics.total_clients)?;
235        Ok(dict.into())
236    }
237
238    /// Gets per-client statistics.
239    #[pyo3(name = "get_client_stats")]
240    fn py_get_client_stats(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
241        let stats = self.get_client_stats();
242        let list = pyo3::types::PyList::empty(py);
243        for stat in stats {
244            let dict = PyDict::new(py);
245            dict.set_item("client_id", stat.client_id.clone())?;
246            dict.set_item("healthy", stat.healthy)?;
247            dict.set_item("cancel_count", stat.cancel_count)?;
248            dict.set_item("error_count", stat.error_count)?;
249            list.append(dict)?;
250        }
251        Ok(list.into())
252    }
253}