1use nautilus_common::live::get_runtime;
19use nautilus_core::python::{call_python_threadsafe, to_pyruntime_err};
20use nautilus_model::{
21 data::{BarType, Data, OrderBookDeltas_API},
22 identifiers::{AccountId, ClientOrderId, InstrumentId},
23 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use nautilus_network::websocket::TransportBackend;
26use pyo3::{conversion::IntoPyObjectExt, prelude::*};
27
28use crate::{
29 common::enums::HyperliquidEnvironment,
30 websocket::{
31 HyperliquidWebSocketClient,
32 messages::{ExecutionReport, NautilusWsMessage},
33 },
34};
35
36#[pymethods]
37#[pyo3_stub_gen::derive::gen_stub_pymethods]
38impl HyperliquidWebSocketClient {
39 #[new]
44 #[pyo3(signature = (url=None, environment=HyperliquidEnvironment::Mainnet, account_id=None, proxy_url=None))]
45 fn py_new(
46 url: Option<String>,
47 environment: HyperliquidEnvironment,
48 account_id: Option<String>,
49 proxy_url: Option<String>,
50 ) -> Self {
51 let account_id = account_id.map(|s| AccountId::from(s.as_str()));
52 Self::new(
53 url,
54 environment,
55 account_id,
56 TransportBackend::default(),
57 proxy_url,
58 )
59 }
60
61 #[getter]
63 #[pyo3(name = "url")]
64 #[must_use]
65 pub fn py_url(&self) -> String {
66 self.url().to_string()
67 }
68
69 #[pyo3(name = "is_active")]
71 fn py_is_active(&self) -> bool {
72 self.is_active()
73 }
74
75 #[pyo3(name = "is_closed")]
76 fn py_is_closed(&self) -> bool {
77 !self.is_active()
78 }
79
80 #[pyo3(name = "cache_spot_fill_coins")]
86 fn py_cache_spot_fill_coins(&self, mapping: std::collections::HashMap<String, String>) {
87 let ahash_mapping: ahash::AHashMap<ustr::Ustr, ustr::Ustr> = mapping
88 .into_iter()
89 .map(|(k, v)| (ustr::Ustr::from(&k), ustr::Ustr::from(&v)))
90 .collect();
91 self.cache_spot_fill_coins(ahash_mapping);
92 }
93
94 #[pyo3(name = "cache_cloid_mapping")]
103 fn py_cache_cloid_mapping(&self, cloid: &str, client_order_id: ClientOrderId) {
104 self.cache_cloid_mapping(ustr::Ustr::from(cloid), client_order_id);
105 }
106
107 #[pyo3(name = "remove_cloid_mapping")]
112 fn py_remove_cloid_mapping(&self, cloid: &str) {
113 self.remove_cloid_mapping(&ustr::Ustr::from(cloid));
114 }
115
116 #[pyo3(name = "clear_cloid_cache")]
120 fn py_clear_cloid_cache(&self) {
121 self.clear_cloid_cache();
122 }
123
124 #[pyo3(name = "cloid_cache_len")]
126 fn py_cloid_cache_len(&self) -> usize {
127 self.cloid_cache_len()
128 }
129
130 #[pyo3(name = "get_cloid_mapping")]
134 fn py_get_cloid_mapping(&self, cloid: &str) -> Option<ClientOrderId> {
135 self.get_cloid_mapping(&ustr::Ustr::from(cloid))
136 }
137
138 #[pyo3(name = "connect")]
140 #[expect(clippy::needless_pass_by_value)]
141 fn py_connect<'py>(
142 &self,
143 py: Python<'py>,
144 loop_: Py<PyAny>,
145 instruments: Vec<Py<PyAny>>,
146 callback: Py<PyAny>,
147 ) -> PyResult<Bound<'py, PyAny>> {
148 let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
149
150 for inst in instruments {
151 let inst_any = pyobject_to_instrument_any(py, inst)?;
152 self.cache_instrument(inst_any);
153 }
154
155 let mut client = self.clone();
156
157 pyo3_async_runtimes::tokio::future_into_py(py, async move {
158 client.connect().await.map_err(to_pyruntime_err)?;
159
160 get_runtime().spawn(async move {
161 loop {
162 let event = client.next_event().await;
163
164 match event {
165 Some(msg) => {
166 log::trace!("Received WebSocket message: {msg:?}");
167
168 match msg {
169 NautilusWsMessage::Trades(trade_ticks) => {
170 Python::attach(|py| {
171 for tick in trade_ticks {
172 let py_obj = data_to_pycapsule(py, Data::Trade(tick));
173 call_python_threadsafe(py, &call_soon, &callback, py_obj);
174 }
175 });
176 }
177 NautilusWsMessage::Quote(quote_tick) => {
178 Python::attach(|py| {
179 let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
180 call_python_threadsafe(py, &call_soon, &callback, py_obj);
181 });
182 }
183 NautilusWsMessage::Deltas(deltas) => {
184 Python::attach(|py| {
185 let py_obj = data_to_pycapsule(
186 py,
187 Data::Deltas(OrderBookDeltas_API::new(deltas)),
188 );
189 call_python_threadsafe(py, &call_soon, &callback, py_obj);
190 });
191 }
192 NautilusWsMessage::Depth10(depth) => {
193 Python::attach(|py| {
194 let py_obj = data_to_pycapsule(py, Data::Depth10(depth));
195 call_python_threadsafe(py, &call_soon, &callback, py_obj);
196 });
197 }
198 NautilusWsMessage::Candle(bar) => {
199 Python::attach(|py| {
200 let py_obj = data_to_pycapsule(py, Data::Bar(bar));
201 call_python_threadsafe(py, &call_soon, &callback, py_obj);
202 });
203 }
204 NautilusWsMessage::MarkPrice(mark_price) => {
205 Python::attach(|py| {
206 let py_obj = data_to_pycapsule(
207 py,
208 Data::MarkPriceUpdate(mark_price),
209 );
210 call_python_threadsafe(py, &call_soon, &callback, py_obj);
211 });
212 }
213 NautilusWsMessage::IndexPrice(index_price) => {
214 Python::attach(|py| {
215 let py_obj = data_to_pycapsule(
216 py,
217 Data::IndexPriceUpdate(index_price),
218 );
219 call_python_threadsafe(py, &call_soon, &callback, py_obj);
220 });
221 }
222 NautilusWsMessage::FundingRate(funding_rate) => {
223 Python::attach(|py| {
224 if let Ok(py_obj) = funding_rate.into_py_any(py) {
225 call_python_threadsafe(py, &call_soon, &callback, py_obj);
226 }
227 });
228 }
229 NautilusWsMessage::ExecutionReports(reports) => {
230 Python::attach(|py| {
231 for report in reports {
232 match report {
233 ExecutionReport::Order(order_report) => {
234 log::debug!(
235 "Forwarding order status report: order_id={}, status={:?}",
236 order_report.venue_order_id,
237 order_report.order_status
238 );
239
240 match Py::new(py, order_report) {
241 Ok(py_obj) => {
242 call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
243 }
244 Err(e) => {
245 log::error!("Error converting OrderStatusReport to Python: {e}");
246 }
247 }
248 }
249 ExecutionReport::Fill(fill_report) => {
250 log::debug!(
251 "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
252 fill_report.trade_id,
253 fill_report.order_side,
254 fill_report.last_qty,
255 fill_report.last_px
256 );
257
258 match Py::new(py, fill_report) {
259 Ok(py_obj) => {
260 call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
261 }
262 Err(e) => {
263 log::error!("Error converting FillReport to Python: {e}");
264 }
265 }
266 }
267 }
268 }
269 });
270 }
271 _ => {
272 log::debug!("Unhandled message type: {msg:?}");
273 }
274 }
275 }
276 None => {
277 log::debug!("WebSocket connection closed");
278 break;
279 }
280 }
281 }
282 });
283
284 Ok(())
285 })
286 }
287
288 #[pyo3(name = "wait_until_active")]
289 fn py_wait_until_active<'py>(
290 &self,
291 py: Python<'py>,
292 timeout_secs: f64,
293 ) -> PyResult<Bound<'py, PyAny>> {
294 let client = self.clone();
295
296 pyo3_async_runtimes::tokio::future_into_py(py, async move {
297 let start = std::time::Instant::now();
298
299 loop {
300 if client.is_active() {
301 return Ok(());
302 }
303
304 if start.elapsed().as_secs_f64() >= timeout_secs {
305 return Err(to_pyruntime_err(format!(
306 "WebSocket connection did not become active within {timeout_secs} seconds"
307 )));
308 }
309
310 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
311 }
312 })
313 }
314
315 #[pyo3(name = "close")]
316 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
317 let mut client = self.clone();
318
319 pyo3_async_runtimes::tokio::future_into_py(py, async move {
320 if let Err(e) = client.disconnect().await {
321 log::error!("Error on close: {e}");
322 }
323 Ok(())
324 })
325 }
326
327 #[pyo3(name = "subscribe_trades")]
329 fn py_subscribe_trades<'py>(
330 &self,
331 py: Python<'py>,
332 instrument_id: InstrumentId,
333 ) -> PyResult<Bound<'py, PyAny>> {
334 let client = self.clone();
335
336 pyo3_async_runtimes::tokio::future_into_py(py, async move {
337 client
338 .subscribe_trades(instrument_id)
339 .await
340 .map_err(to_pyruntime_err)?;
341 Ok(())
342 })
343 }
344
345 #[pyo3(name = "unsubscribe_trades")]
347 fn py_unsubscribe_trades<'py>(
348 &self,
349 py: Python<'py>,
350 instrument_id: InstrumentId,
351 ) -> PyResult<Bound<'py, PyAny>> {
352 let client = self.clone();
353
354 pyo3_async_runtimes::tokio::future_into_py(py, async move {
355 client
356 .unsubscribe_trades(instrument_id)
357 .await
358 .map_err(to_pyruntime_err)?;
359 Ok(())
360 })
361 }
362
363 #[pyo3(name = "subscribe_book")]
365 fn py_subscribe_book<'py>(
366 &self,
367 py: Python<'py>,
368 instrument_id: InstrumentId,
369 ) -> PyResult<Bound<'py, PyAny>> {
370 let client = self.clone();
371
372 pyo3_async_runtimes::tokio::future_into_py(py, async move {
373 client
374 .subscribe_book(instrument_id)
375 .await
376 .map_err(to_pyruntime_err)?;
377 Ok(())
378 })
379 }
380
381 #[pyo3(name = "unsubscribe_book")]
383 fn py_unsubscribe_book<'py>(
384 &self,
385 py: Python<'py>,
386 instrument_id: InstrumentId,
387 ) -> PyResult<Bound<'py, PyAny>> {
388 let client = self.clone();
389
390 pyo3_async_runtimes::tokio::future_into_py(py, async move {
391 client
392 .unsubscribe_book(instrument_id)
393 .await
394 .map_err(to_pyruntime_err)?;
395 Ok(())
396 })
397 }
398
399 #[pyo3(name = "subscribe_book_deltas")]
400 fn py_subscribe_book_deltas<'py>(
401 &self,
402 py: Python<'py>,
403 instrument_id: InstrumentId,
404 _book_type: u8,
405 _depth: u64,
406 ) -> PyResult<Bound<'py, PyAny>> {
407 let client = self.clone();
408
409 pyo3_async_runtimes::tokio::future_into_py(py, async move {
410 client
411 .subscribe_book(instrument_id)
412 .await
413 .map_err(to_pyruntime_err)?;
414 Ok(())
415 })
416 }
417
418 #[pyo3(name = "unsubscribe_book_deltas")]
419 fn py_unsubscribe_book_deltas<'py>(
420 &self,
421 py: Python<'py>,
422 instrument_id: InstrumentId,
423 ) -> PyResult<Bound<'py, PyAny>> {
424 let client = self.clone();
425
426 pyo3_async_runtimes::tokio::future_into_py(py, async move {
427 client
428 .unsubscribe_book(instrument_id)
429 .await
430 .map_err(to_pyruntime_err)?;
431 Ok(())
432 })
433 }
434
435 #[pyo3(name = "subscribe_book_snapshots")]
436 fn py_subscribe_book_snapshots<'py>(
437 &self,
438 py: Python<'py>,
439 instrument_id: InstrumentId,
440 _book_type: u8,
441 _depth: u64,
442 ) -> PyResult<Bound<'py, PyAny>> {
443 let client = self.clone();
444
445 pyo3_async_runtimes::tokio::future_into_py(py, async move {
446 client
447 .subscribe_book(instrument_id)
448 .await
449 .map_err(to_pyruntime_err)?;
450 Ok(())
451 })
452 }
453
454 #[pyo3(name = "subscribe_quotes")]
456 fn py_subscribe_quotes<'py>(
457 &self,
458 py: Python<'py>,
459 instrument_id: InstrumentId,
460 ) -> PyResult<Bound<'py, PyAny>> {
461 let client = self.clone();
462
463 pyo3_async_runtimes::tokio::future_into_py(py, async move {
464 client
465 .subscribe_quotes(instrument_id)
466 .await
467 .map_err(to_pyruntime_err)?;
468 Ok(())
469 })
470 }
471
472 #[pyo3(name = "unsubscribe_quotes")]
474 fn py_unsubscribe_quotes<'py>(
475 &self,
476 py: Python<'py>,
477 instrument_id: InstrumentId,
478 ) -> PyResult<Bound<'py, PyAny>> {
479 let client = self.clone();
480
481 pyo3_async_runtimes::tokio::future_into_py(py, async move {
482 client
483 .unsubscribe_quotes(instrument_id)
484 .await
485 .map_err(to_pyruntime_err)?;
486 Ok(())
487 })
488 }
489
490 #[pyo3(name = "subscribe_bars")]
492 fn py_subscribe_bars<'py>(
493 &self,
494 py: Python<'py>,
495 bar_type: BarType,
496 ) -> PyResult<Bound<'py, PyAny>> {
497 let client = self.clone();
498
499 pyo3_async_runtimes::tokio::future_into_py(py, async move {
500 client
501 .subscribe_bars(bar_type)
502 .await
503 .map_err(to_pyruntime_err)?;
504 Ok(())
505 })
506 }
507
508 #[pyo3(name = "unsubscribe_bars")]
510 fn py_unsubscribe_bars<'py>(
511 &self,
512 py: Python<'py>,
513 bar_type: BarType,
514 ) -> PyResult<Bound<'py, PyAny>> {
515 let client = self.clone();
516
517 pyo3_async_runtimes::tokio::future_into_py(py, async move {
518 client
519 .unsubscribe_bars(bar_type)
520 .await
521 .map_err(to_pyruntime_err)?;
522 Ok(())
523 })
524 }
525
526 #[pyo3(name = "subscribe_order_updates")]
528 fn py_subscribe_order_updates<'py>(
529 &self,
530 py: Python<'py>,
531 user: String,
532 ) -> PyResult<Bound<'py, PyAny>> {
533 let client = self.clone();
534
535 pyo3_async_runtimes::tokio::future_into_py(py, async move {
536 client
537 .subscribe_order_updates(&user)
538 .await
539 .map_err(to_pyruntime_err)?;
540 Ok(())
541 })
542 }
543
544 #[pyo3(name = "subscribe_user_events")]
546 fn py_subscribe_user_events<'py>(
547 &self,
548 py: Python<'py>,
549 user: String,
550 ) -> PyResult<Bound<'py, PyAny>> {
551 let client = self.clone();
552
553 pyo3_async_runtimes::tokio::future_into_py(py, async move {
554 client
555 .subscribe_user_events(&user)
556 .await
557 .map_err(to_pyruntime_err)?;
558 Ok(())
559 })
560 }
561
562 #[pyo3(name = "subscribe_user_fills")]
567 fn py_subscribe_user_fills<'py>(
568 &self,
569 py: Python<'py>,
570 user: String,
571 ) -> PyResult<Bound<'py, PyAny>> {
572 let client = self.clone();
573
574 pyo3_async_runtimes::tokio::future_into_py(py, async move {
575 client
576 .subscribe_user_fills(&user)
577 .await
578 .map_err(to_pyruntime_err)?;
579 Ok(())
580 })
581 }
582
583 #[pyo3(name = "subscribe_mark_prices")]
585 fn py_subscribe_mark_prices<'py>(
586 &self,
587 py: Python<'py>,
588 instrument_id: InstrumentId,
589 ) -> PyResult<Bound<'py, PyAny>> {
590 let client = self.clone();
591
592 pyo3_async_runtimes::tokio::future_into_py(py, async move {
593 client
594 .subscribe_mark_prices(instrument_id)
595 .await
596 .map_err(to_pyruntime_err)?;
597 Ok(())
598 })
599 }
600
601 #[pyo3(name = "unsubscribe_mark_prices")]
603 fn py_unsubscribe_mark_prices<'py>(
604 &self,
605 py: Python<'py>,
606 instrument_id: InstrumentId,
607 ) -> PyResult<Bound<'py, PyAny>> {
608 let client = self.clone();
609
610 pyo3_async_runtimes::tokio::future_into_py(py, async move {
611 client
612 .unsubscribe_mark_prices(instrument_id)
613 .await
614 .map_err(to_pyruntime_err)?;
615 Ok(())
616 })
617 }
618
619 #[pyo3(name = "subscribe_index_prices")]
621 fn py_subscribe_index_prices<'py>(
622 &self,
623 py: Python<'py>,
624 instrument_id: InstrumentId,
625 ) -> PyResult<Bound<'py, PyAny>> {
626 let client = self.clone();
627
628 pyo3_async_runtimes::tokio::future_into_py(py, async move {
629 client
630 .subscribe_index_prices(instrument_id)
631 .await
632 .map_err(to_pyruntime_err)?;
633 Ok(())
634 })
635 }
636
637 #[pyo3(name = "unsubscribe_index_prices")]
639 fn py_unsubscribe_index_prices<'py>(
640 &self,
641 py: Python<'py>,
642 instrument_id: InstrumentId,
643 ) -> PyResult<Bound<'py, PyAny>> {
644 let client = self.clone();
645
646 pyo3_async_runtimes::tokio::future_into_py(py, async move {
647 client
648 .unsubscribe_index_prices(instrument_id)
649 .await
650 .map_err(to_pyruntime_err)?;
651 Ok(())
652 })
653 }
654
655 #[pyo3(name = "subscribe_funding_rates")]
657 fn py_subscribe_funding_rates<'py>(
658 &self,
659 py: Python<'py>,
660 instrument_id: InstrumentId,
661 ) -> PyResult<Bound<'py, PyAny>> {
662 let client = self.clone();
663
664 pyo3_async_runtimes::tokio::future_into_py(py, async move {
665 client
666 .subscribe_funding_rates(instrument_id)
667 .await
668 .map_err(to_pyruntime_err)?;
669 Ok(())
670 })
671 }
672
673 #[pyo3(name = "unsubscribe_funding_rates")]
675 fn py_unsubscribe_funding_rates<'py>(
676 &self,
677 py: Python<'py>,
678 instrument_id: InstrumentId,
679 ) -> PyResult<Bound<'py, PyAny>> {
680 let client = self.clone();
681
682 pyo3_async_runtimes::tokio::future_into_py(py, async move {
683 client
684 .unsubscribe_funding_rates(instrument_id)
685 .await
686 .map_err(to_pyruntime_err)?;
687 Ok(())
688 })
689 }
690}