1use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20 enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TrailingOffsetType, TriggerType},
21 identifiers::{ClientOrderId, InstrumentId, OrderListId},
22 python::instruments::pyobject_to_instrument_any,
23 types::{Price, Quantity},
24};
25use pyo3::{conversion::IntoPyObjectExt, prelude::*, types::PyDict};
26
27use crate::{
28 broadcast::submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
29 common::enums::{BitmexEnvironment, BitmexPegPriceType},
30};
31
32#[pymethods]
33#[pyo3_stub_gen::derive::gen_stub_pymethods]
34impl SubmitBroadcaster {
35 #[new]
41 #[pyo3(signature = (
42 pool_size,
43 api_key=None,
44 api_secret=None,
45 base_url=None,
46 environment=BitmexEnvironment::Mainnet,
47 timeout_secs=60,
48 max_retries=3,
49 retry_delay_ms=1_000,
50 retry_delay_max_ms=5_000,
51 recv_window_ms=10_000,
52 max_requests_per_second=10,
53 max_requests_per_minute=120,
54 health_check_interval_secs=30,
55 health_check_timeout_secs=5,
56 expected_reject_patterns=None,
57 proxy_urls=None,
58 ))]
59 #[expect(clippy::too_many_arguments)]
60 fn py_new(
61 pool_size: usize,
62 api_key: Option<String>,
63 api_secret: Option<String>,
64 base_url: Option<String>,
65 environment: BitmexEnvironment,
66 timeout_secs: u64,
67 max_retries: u32,
68 retry_delay_ms: u64,
69 retry_delay_max_ms: u64,
70 recv_window_ms: u64,
71 max_requests_per_second: u32,
72 max_requests_per_minute: u32,
73 health_check_interval_secs: u64,
74 health_check_timeout_secs: u64,
75 expected_reject_patterns: Option<Vec<String>>,
76 proxy_urls: Option<Vec<Option<String>>>,
77 ) -> PyResult<Self> {
78 let config = SubmitBroadcasterConfig {
79 pool_size,
80 api_key,
81 api_secret,
82 base_url,
83 environment,
84 timeout_secs,
85 max_retries,
86 retry_delay_ms,
87 retry_delay_max_ms,
88 recv_window_ms,
89 max_requests_per_second,
90 max_requests_per_minute,
91 health_check_interval_secs,
92 health_check_timeout_secs,
93 expected_reject_patterns: expected_reject_patterns
94 .unwrap_or_else(|| SubmitBroadcasterConfig::default().expected_reject_patterns),
95 proxy_urls: proxy_urls.unwrap_or_default(),
96 };
97
98 Self::new(config).map_err(to_pyvalue_err)
99 }
100
101 #[pyo3(name = "start")]
107 fn py_start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
108 let broadcaster = self.clone_for_async();
109 pyo3_async_runtimes::tokio::future_into_py(py, async move {
110 broadcaster.start().await.map_err(to_pyvalue_err)
111 })
112 }
113
114 #[pyo3(name = "stop")]
116 fn py_stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
117 let broadcaster = self.clone_for_async();
118 pyo3_async_runtimes::tokio::future_into_py(py, async move {
119 broadcaster.stop().await;
120 Ok(())
121 })
122 }
123
124 #[pyo3(name = "broadcast_submit")]
131 #[pyo3(signature = (
132 instrument_id,
133 client_order_id,
134 order_side,
135 order_type,
136 quantity,
137 time_in_force,
138 price=None,
139 trigger_price=None,
140 trigger_type=None,
141 trailing_offset=None,
142 trailing_offset_type=None,
143 display_qty=None,
144 post_only=false,
145 reduce_only=false,
146 order_list_id=None,
147 contingency_type=None,
148 submit_tries=None,
149 peg_price_type=None,
150 peg_offset_value=None
151 ))]
152 #[expect(clippy::too_many_arguments)]
153 fn py_broadcast_submit<'py>(
154 &self,
155 py: Python<'py>,
156 instrument_id: InstrumentId,
157 client_order_id: ClientOrderId,
158 order_side: OrderSide,
159 order_type: OrderType,
160 quantity: Quantity,
161 time_in_force: TimeInForce,
162 price: Option<Price>,
163 trigger_price: Option<Price>,
164 trigger_type: Option<TriggerType>,
165 trailing_offset: Option<f64>,
166 trailing_offset_type: Option<TrailingOffsetType>,
167 display_qty: Option<Quantity>,
168 post_only: bool,
169 reduce_only: bool,
170 order_list_id: Option<OrderListId>,
171 contingency_type: Option<ContingencyType>,
172 submit_tries: Option<usize>,
173 peg_price_type: Option<String>,
174 peg_offset_value: Option<f64>,
175 ) -> PyResult<Bound<'py, PyAny>> {
176 let broadcaster = self.clone_for_async();
177
178 let peg_price_type: Option<BitmexPegPriceType> = peg_price_type
179 .map(|s| {
180 s.parse::<BitmexPegPriceType>()
181 .map_err(|_| to_pyvalue_err(format!("Invalid peg_price_type: {s}")))
182 })
183 .transpose()?;
184
185 pyo3_async_runtimes::tokio::future_into_py(py, async move {
186 let report = broadcaster
187 .broadcast_submit(
188 instrument_id,
189 client_order_id,
190 order_side,
191 order_type,
192 quantity,
193 time_in_force,
194 price,
195 trigger_price,
196 trigger_type,
197 trailing_offset,
198 trailing_offset_type,
199 display_qty,
200 post_only,
201 reduce_only,
202 order_list_id,
203 contingency_type,
204 submit_tries,
205 peg_price_type,
206 peg_offset_value,
207 )
208 .await
209 .map_err(to_pyvalue_err)?;
210
211 Python::attach(|py| report.into_py_any(py))
212 })
213 }
214
215 #[pyo3(name = "get_metrics")]
217 fn py_get_metrics(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
218 let metrics = self.get_metrics();
219 let dict = PyDict::new(py);
220 dict.set_item("total_submits", metrics.total_submits)?;
221 dict.set_item("successful_submits", metrics.successful_submits)?;
222 dict.set_item("failed_submits", metrics.failed_submits)?;
223 dict.set_item("expected_rejects", metrics.expected_rejects)?;
224 dict.set_item("healthy_clients", metrics.healthy_clients)?;
225 dict.set_item("total_clients", metrics.total_clients)?;
226 Ok(dict.into())
227 }
228
229 #[pyo3(name = "get_client_stats")]
231 fn py_get_client_stats(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
232 let stats = self.get_client_stats();
233 let list = pyo3::types::PyList::empty(py);
234 for stat in stats {
235 let dict = PyDict::new(py);
236 dict.set_item("client_id", stat.client_id.clone())?;
237 dict.set_item("healthy", stat.healthy)?;
238 dict.set_item("submit_count", stat.submit_count)?;
239 dict.set_item("error_count", stat.error_count)?;
240 list.append(dict)?;
241 }
242 Ok(list.into())
243 }
244
245 #[pyo3(name = "cache_instrument")]
247 fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
248 let inst_any = pyobject_to_instrument_any(py, instrument)?;
249 self.cache_instrument(&inst_any);
250 Ok(())
251 }
252}