Skip to main content

nautilus_bitmex/python/
submitter.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the BitMEX submit broadcaster.
17
18use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20    enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TrailingOffsetType, TriggerType},
21    identifiers::{ClientOrderId, InstrumentId, OrderListId},
22    python::instruments::pyobject_to_instrument_any,
23    types::{Price, Quantity},
24};
25use pyo3::{conversion::IntoPyObjectExt, prelude::*, types::PyDict};
26
27use crate::{
28    broadcast::submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
29    common::enums::{BitmexEnvironment, BitmexPegPriceType},
30};
31
32#[pymethods]
33#[pyo3_stub_gen::derive::gen_stub_pymethods]
34impl SubmitBroadcaster {
35    /// Broadcasts submit requests to multiple HTTP clients for redundancy.
36    ///
37    /// This broadcaster fans out submit requests to multiple pre-warmed HTTP clients
38    /// in parallel, short-circuits when the first successful acknowledgement is received,
39    /// and handles expected rejection patterns (duplicate clOrdID) with appropriate log levels.
40    #[new]
41    #[pyo3(signature = (
42        pool_size,
43        api_key=None,
44        api_secret=None,
45        base_url=None,
46        environment=BitmexEnvironment::Mainnet,
47        timeout_secs=60,
48        max_retries=3,
49        retry_delay_ms=1_000,
50        retry_delay_max_ms=5_000,
51        recv_window_ms=10_000,
52        max_requests_per_second=10,
53        max_requests_per_minute=120,
54        health_check_interval_secs=30,
55        health_check_timeout_secs=5,
56        expected_reject_patterns=None,
57        proxy_urls=None,
58    ))]
59    #[expect(clippy::too_many_arguments)]
60    fn py_new(
61        pool_size: usize,
62        api_key: Option<String>,
63        api_secret: Option<String>,
64        base_url: Option<String>,
65        environment: BitmexEnvironment,
66        timeout_secs: u64,
67        max_retries: u32,
68        retry_delay_ms: u64,
69        retry_delay_max_ms: u64,
70        recv_window_ms: u64,
71        max_requests_per_second: u32,
72        max_requests_per_minute: u32,
73        health_check_interval_secs: u64,
74        health_check_timeout_secs: u64,
75        expected_reject_patterns: Option<Vec<String>>,
76        proxy_urls: Option<Vec<Option<String>>>,
77    ) -> PyResult<Self> {
78        let config = SubmitBroadcasterConfig {
79            pool_size,
80            api_key,
81            api_secret,
82            base_url,
83            environment,
84            timeout_secs,
85            max_retries,
86            retry_delay_ms,
87            retry_delay_max_ms,
88            recv_window_ms,
89            max_requests_per_second,
90            max_requests_per_minute,
91            health_check_interval_secs,
92            health_check_timeout_secs,
93            expected_reject_patterns: expected_reject_patterns
94                .unwrap_or_else(|| SubmitBroadcasterConfig::default().expected_reject_patterns),
95            proxy_urls: proxy_urls.unwrap_or_default(),
96        };
97
98        Self::new(config).map_err(to_pyvalue_err)
99    }
100
101    /// Starts the broadcaster and health check loop.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the broadcaster is already running.
106    #[pyo3(name = "start")]
107    fn py_start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
108        let broadcaster = self.clone_for_async();
109        pyo3_async_runtimes::tokio::future_into_py(py, async move {
110            broadcaster.start().await.map_err(to_pyvalue_err)
111        })
112    }
113
114    /// Stops the broadcaster and health check loop.
115    #[pyo3(name = "stop")]
116    fn py_stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
117        let broadcaster = self.clone_for_async();
118        pyo3_async_runtimes::tokio::future_into_py(py, async move {
119            broadcaster.stop().await;
120            Ok(())
121        })
122    }
123
124    /// Broadcasts a submit request to all healthy clients in parallel.
125    ///
126    /// # Returns
127    ///
128    /// - `Ok(report)` if successfully submitted with a report.
129    /// - `Err` if all requests failed.
130    #[pyo3(name = "broadcast_submit")]
131    #[pyo3(signature = (
132        instrument_id,
133        client_order_id,
134        order_side,
135        order_type,
136        quantity,
137        time_in_force,
138        price=None,
139        trigger_price=None,
140        trigger_type=None,
141        trailing_offset=None,
142        trailing_offset_type=None,
143        display_qty=None,
144        post_only=false,
145        reduce_only=false,
146        order_list_id=None,
147        contingency_type=None,
148        submit_tries=None,
149        peg_price_type=None,
150        peg_offset_value=None
151    ))]
152    #[expect(clippy::too_many_arguments)]
153    fn py_broadcast_submit<'py>(
154        &self,
155        py: Python<'py>,
156        instrument_id: InstrumentId,
157        client_order_id: ClientOrderId,
158        order_side: OrderSide,
159        order_type: OrderType,
160        quantity: Quantity,
161        time_in_force: TimeInForce,
162        price: Option<Price>,
163        trigger_price: Option<Price>,
164        trigger_type: Option<TriggerType>,
165        trailing_offset: Option<f64>,
166        trailing_offset_type: Option<TrailingOffsetType>,
167        display_qty: Option<Quantity>,
168        post_only: bool,
169        reduce_only: bool,
170        order_list_id: Option<OrderListId>,
171        contingency_type: Option<ContingencyType>,
172        submit_tries: Option<usize>,
173        peg_price_type: Option<String>,
174        peg_offset_value: Option<f64>,
175    ) -> PyResult<Bound<'py, PyAny>> {
176        let broadcaster = self.clone_for_async();
177
178        let peg_price_type: Option<BitmexPegPriceType> = peg_price_type
179            .map(|s| {
180                s.parse::<BitmexPegPriceType>()
181                    .map_err(|_| to_pyvalue_err(format!("Invalid peg_price_type: {s}")))
182            })
183            .transpose()?;
184
185        pyo3_async_runtimes::tokio::future_into_py(py, async move {
186            let report = broadcaster
187                .broadcast_submit(
188                    instrument_id,
189                    client_order_id,
190                    order_side,
191                    order_type,
192                    quantity,
193                    time_in_force,
194                    price,
195                    trigger_price,
196                    trigger_type,
197                    trailing_offset,
198                    trailing_offset_type,
199                    display_qty,
200                    post_only,
201                    reduce_only,
202                    order_list_id,
203                    contingency_type,
204                    submit_tries,
205                    peg_price_type,
206                    peg_offset_value,
207                )
208                .await
209                .map_err(to_pyvalue_err)?;
210
211            Python::attach(|py| report.into_py_any(py))
212        })
213    }
214
215    /// Gets broadcaster metrics.
216    #[pyo3(name = "get_metrics")]
217    fn py_get_metrics(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
218        let metrics = self.get_metrics();
219        let dict = PyDict::new(py);
220        dict.set_item("total_submits", metrics.total_submits)?;
221        dict.set_item("successful_submits", metrics.successful_submits)?;
222        dict.set_item("failed_submits", metrics.failed_submits)?;
223        dict.set_item("expected_rejects", metrics.expected_rejects)?;
224        dict.set_item("healthy_clients", metrics.healthy_clients)?;
225        dict.set_item("total_clients", metrics.total_clients)?;
226        Ok(dict.into())
227    }
228
229    /// Gets per-client statistics.
230    #[pyo3(name = "get_client_stats")]
231    fn py_get_client_stats(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
232        let stats = self.get_client_stats();
233        let list = pyo3::types::PyList::empty(py);
234        for stat in stats {
235            let dict = PyDict::new(py);
236            dict.set_item("client_id", stat.client_id.clone())?;
237            dict.set_item("healthy", stat.healthy)?;
238            dict.set_item("submit_count", stat.submit_count)?;
239            dict.set_item("error_count", stat.error_count)?;
240            list.append(dict)?;
241        }
242        Ok(list.into())
243    }
244
245    /// Caches an instrument in all HTTP clients in the pool.
246    #[pyo3(name = "cache_instrument")]
247    fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
248        let inst_any = pyobject_to_instrument_any(py, instrument)?;
249        self.cache_instrument(&inst_any);
250        Ok(())
251    }
252}