Skip to main content

nautilus_hyperliquid/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Hyperliquid WebSocket client.
17
18use nautilus_common::live::get_runtime;
19use nautilus_core::python::{call_python_threadsafe, to_pyruntime_err};
20use nautilus_model::{
21    data::{BarType, Data, OrderBookDeltas_API},
22    identifiers::{AccountId, ClientOrderId, InstrumentId},
23    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use nautilus_network::websocket::TransportBackend;
26use pyo3::{conversion::IntoPyObjectExt, prelude::*};
27
28use crate::{
29    common::enums::HyperliquidEnvironment,
30    websocket::{
31        HyperliquidWebSocketClient,
32        messages::{ExecutionReport, NautilusWsMessage},
33    },
34};
35
36#[pymethods]
37#[pyo3_stub_gen::derive::gen_stub_pymethods]
38impl HyperliquidWebSocketClient {
39    /// Hyperliquid WebSocket client following the BitMEX pattern.
40    ///
41    /// Orchestrates WebSocket connection and subscriptions using a command-based architecture,
42    /// where the inner FeedHandler owns the WebSocketClient and handles all I/O.
43    #[new]
44    #[pyo3(signature = (url=None, environment=HyperliquidEnvironment::Mainnet, account_id=None, proxy_url=None))]
45    fn py_new(
46        url: Option<String>,
47        environment: HyperliquidEnvironment,
48        account_id: Option<String>,
49        proxy_url: Option<String>,
50    ) -> Self {
51        let account_id = account_id.map(|s| AccountId::from(s.as_str()));
52        Self::new(
53            url,
54            environment,
55            account_id,
56            TransportBackend::default(),
57            proxy_url,
58        )
59    }
60
61    /// Returns the URL of this WebSocket client.
62    #[getter]
63    #[pyo3(name = "url")]
64    #[must_use]
65    pub fn py_url(&self) -> String {
66        self.url().to_string()
67    }
68
69    /// Returns true if the WebSocket is actively connected.
70    #[pyo3(name = "is_active")]
71    fn py_is_active(&self) -> bool {
72        self.is_active()
73    }
74
75    #[pyo3(name = "is_closed")]
76    fn py_is_closed(&self) -> bool {
77        !self.is_active()
78    }
79
80    /// Caches spot fill coin mappings for instrument lookup.
81    ///
82    /// Hyperliquid WebSocket fills for spot use `@{pair_index}` format (e.g., `@107`),
83    /// while instruments are identified by full symbols (e.g., `HYPE-USDC-SPOT`).
84    /// This mapping allows the handler to look up instruments from spot fills.
85    #[pyo3(name = "cache_spot_fill_coins")]
86    fn py_cache_spot_fill_coins(&self, mapping: std::collections::HashMap<String, String>) {
87        let ahash_mapping: ahash::AHashMap<ustr::Ustr, ustr::Ustr> = mapping
88            .into_iter()
89            .map(|(k, v)| (ustr::Ustr::from(&k), ustr::Ustr::from(&v)))
90            .collect();
91        self.cache_spot_fill_coins(ahash_mapping);
92    }
93
94    /// Caches a cloid (hex hash) to client_order_id mapping for order/fill resolution.
95    ///
96    /// The cloid is a keccak256 hash of the client_order_id that Hyperliquid uses internally.
97    /// This mapping allows WebSocket order status and fill reports to be resolved back to
98    /// the original client_order_id.
99    ///
100    /// This writes directly to a shared cache that the handler reads from, avoiding any
101    /// race conditions between caching and WebSocket message processing.
102    #[pyo3(name = "cache_cloid_mapping")]
103    fn py_cache_cloid_mapping(&self, cloid: &str, client_order_id: ClientOrderId) {
104        self.cache_cloid_mapping(ustr::Ustr::from(cloid), client_order_id);
105    }
106
107    /// Removes a cloid mapping from the cache.
108    ///
109    /// Should be called when an order reaches a terminal state (filled, canceled, expired)
110    /// to prevent unbounded memory growth in long-running sessions.
111    #[pyo3(name = "remove_cloid_mapping")]
112    fn py_remove_cloid_mapping(&self, cloid: &str) {
113        self.remove_cloid_mapping(&ustr::Ustr::from(cloid));
114    }
115
116    /// Clears all cloid mappings from the cache.
117    ///
118    /// Useful for cleanup during reconnection or shutdown.
119    #[pyo3(name = "clear_cloid_cache")]
120    fn py_clear_cloid_cache(&self) {
121        self.clear_cloid_cache();
122    }
123
124    /// Returns the number of cloid mappings in the cache.
125    #[pyo3(name = "cloid_cache_len")]
126    fn py_cloid_cache_len(&self) -> usize {
127        self.cloid_cache_len()
128    }
129
130    /// Looks up a client_order_id by its cloid hash.
131    ///
132    /// Returns `Some(ClientOrderId)` if the mapping exists, `None` otherwise.
133    #[pyo3(name = "get_cloid_mapping")]
134    fn py_get_cloid_mapping(&self, cloid: &str) -> Option<ClientOrderId> {
135        self.get_cloid_mapping(&ustr::Ustr::from(cloid))
136    }
137
138    /// Establishes WebSocket connection and spawns the message handler.
139    #[pyo3(name = "connect")]
140    #[expect(clippy::needless_pass_by_value)]
141    fn py_connect<'py>(
142        &self,
143        py: Python<'py>,
144        loop_: Py<PyAny>,
145        instruments: Vec<Py<PyAny>>,
146        callback: Py<PyAny>,
147    ) -> PyResult<Bound<'py, PyAny>> {
148        let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
149
150        for inst in instruments {
151            let inst_any = pyobject_to_instrument_any(py, inst)?;
152            self.cache_instrument(inst_any);
153        }
154
155        let mut client = self.clone();
156
157        pyo3_async_runtimes::tokio::future_into_py(py, async move {
158            client.connect().await.map_err(to_pyruntime_err)?;
159
160            get_runtime().spawn(async move {
161                loop {
162                    let event = client.next_event().await;
163
164                    match event {
165                        Some(msg) => {
166                            log::trace!("Received WebSocket message: {msg:?}");
167
168                            match msg {
169                                NautilusWsMessage::Trades(trade_ticks) => {
170                                    Python::attach(|py| {
171                                        for tick in trade_ticks {
172                                            let py_obj = data_to_pycapsule(py, Data::Trade(tick));
173                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
174                                        }
175                                    });
176                                }
177                                NautilusWsMessage::Quote(quote_tick) => {
178                                    Python::attach(|py| {
179                                        let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
180                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
181                                    });
182                                }
183                                NautilusWsMessage::Deltas(deltas) => {
184                                    Python::attach(|py| {
185                                        let py_obj = data_to_pycapsule(
186                                            py,
187                                            Data::Deltas(OrderBookDeltas_API::new(deltas)),
188                                        );
189                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
190                                    });
191                                }
192                                NautilusWsMessage::Depth10(depth) => {
193                                    Python::attach(|py| {
194                                        let py_obj = data_to_pycapsule(py, Data::Depth10(depth));
195                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
196                                    });
197                                }
198                                NautilusWsMessage::Candle(bar) => {
199                                    Python::attach(|py| {
200                                        let py_obj = data_to_pycapsule(py, Data::Bar(bar));
201                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
202                                    });
203                                }
204                                NautilusWsMessage::MarkPrice(mark_price) => {
205                                    Python::attach(|py| {
206                                        let py_obj = data_to_pycapsule(
207                                            py,
208                                            Data::MarkPriceUpdate(mark_price),
209                                        );
210                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
211                                    });
212                                }
213                                NautilusWsMessage::IndexPrice(index_price) => {
214                                    Python::attach(|py| {
215                                        let py_obj = data_to_pycapsule(
216                                            py,
217                                            Data::IndexPriceUpdate(index_price),
218                                        );
219                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
220                                    });
221                                }
222                                NautilusWsMessage::FundingRate(funding_rate) => {
223                                    Python::attach(|py| {
224                                        if let Ok(py_obj) = funding_rate.into_py_any(py) {
225                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
226                                        }
227                                    });
228                                }
229                                NautilusWsMessage::ExecutionReports(reports) => {
230                                    Python::attach(|py| {
231                                        for report in reports {
232                                            match report {
233                                                ExecutionReport::Order(order_report) => {
234                                                    log::debug!(
235                                                        "Forwarding order status report: order_id={}, status={:?}",
236                                                        order_report.venue_order_id,
237                                                        order_report.order_status
238                                                    );
239
240                                                    match Py::new(py, order_report) {
241                                                        Ok(py_obj) => {
242                                                            call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
243                                                        }
244                                                        Err(e) => {
245                                                            log::error!("Error converting OrderStatusReport to Python: {e}");
246                                                        }
247                                                    }
248                                                }
249                                                ExecutionReport::Fill(fill_report) => {
250                                                    log::debug!(
251                                                        "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
252                                                        fill_report.trade_id,
253                                                        fill_report.order_side,
254                                                        fill_report.last_qty,
255                                                        fill_report.last_px
256                                                    );
257
258                                                    match Py::new(py, fill_report) {
259                                                        Ok(py_obj) => {
260                                                            call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
261                                                        }
262                                                        Err(e) => {
263                                                            log::error!("Error converting FillReport to Python: {e}");
264                                                        }
265                                                    }
266                                                }
267                                            }
268                                        }
269                                    });
270                                }
271                                _ => {
272                                    log::debug!("Unhandled message type: {msg:?}");
273                                }
274                            }
275                        }
276                        None => {
277                            log::debug!("WebSocket connection closed");
278                            break;
279                        }
280                    }
281                }
282            });
283
284            Ok(())
285        })
286    }
287
288    #[pyo3(name = "wait_until_active")]
289    fn py_wait_until_active<'py>(
290        &self,
291        py: Python<'py>,
292        timeout_secs: f64,
293    ) -> PyResult<Bound<'py, PyAny>> {
294        let client = self.clone();
295
296        pyo3_async_runtimes::tokio::future_into_py(py, async move {
297            let start = std::time::Instant::now();
298
299            loop {
300                if client.is_active() {
301                    return Ok(());
302                }
303
304                if start.elapsed().as_secs_f64() >= timeout_secs {
305                    return Err(to_pyruntime_err(format!(
306                        "WebSocket connection did not become active within {timeout_secs} seconds"
307                    )));
308                }
309
310                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
311            }
312        })
313    }
314
315    #[pyo3(name = "close")]
316    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
317        let mut client = self.clone();
318
319        pyo3_async_runtimes::tokio::future_into_py(py, async move {
320            if let Err(e) = client.disconnect().await {
321                log::error!("Error on close: {e}");
322            }
323            Ok(())
324        })
325    }
326
327    /// Subscribe to trades for an instrument.
328    #[pyo3(name = "subscribe_trades")]
329    fn py_subscribe_trades<'py>(
330        &self,
331        py: Python<'py>,
332        instrument_id: InstrumentId,
333    ) -> PyResult<Bound<'py, PyAny>> {
334        let client = self.clone();
335
336        pyo3_async_runtimes::tokio::future_into_py(py, async move {
337            client
338                .subscribe_trades(instrument_id)
339                .await
340                .map_err(to_pyruntime_err)?;
341            Ok(())
342        })
343    }
344
345    /// Unsubscribe from trades for an instrument.
346    #[pyo3(name = "unsubscribe_trades")]
347    fn py_unsubscribe_trades<'py>(
348        &self,
349        py: Python<'py>,
350        instrument_id: InstrumentId,
351    ) -> PyResult<Bound<'py, PyAny>> {
352        let client = self.clone();
353
354        pyo3_async_runtimes::tokio::future_into_py(py, async move {
355            client
356                .unsubscribe_trades(instrument_id)
357                .await
358                .map_err(to_pyruntime_err)?;
359            Ok(())
360        })
361    }
362
363    /// Subscribe to L2 order book for an instrument.
364    #[pyo3(name = "subscribe_book")]
365    fn py_subscribe_book<'py>(
366        &self,
367        py: Python<'py>,
368        instrument_id: InstrumentId,
369    ) -> PyResult<Bound<'py, PyAny>> {
370        let client = self.clone();
371
372        pyo3_async_runtimes::tokio::future_into_py(py, async move {
373            client
374                .subscribe_book(instrument_id)
375                .await
376                .map_err(to_pyruntime_err)?;
377            Ok(())
378        })
379    }
380
381    /// Unsubscribe from L2 order book for an instrument.
382    #[pyo3(name = "unsubscribe_book")]
383    fn py_unsubscribe_book<'py>(
384        &self,
385        py: Python<'py>,
386        instrument_id: InstrumentId,
387    ) -> PyResult<Bound<'py, PyAny>> {
388        let client = self.clone();
389
390        pyo3_async_runtimes::tokio::future_into_py(py, async move {
391            client
392                .unsubscribe_book(instrument_id)
393                .await
394                .map_err(to_pyruntime_err)?;
395            Ok(())
396        })
397    }
398
399    #[pyo3(name = "subscribe_book_deltas")]
400    fn py_subscribe_book_deltas<'py>(
401        &self,
402        py: Python<'py>,
403        instrument_id: InstrumentId,
404        _book_type: u8,
405        _depth: u64,
406    ) -> PyResult<Bound<'py, PyAny>> {
407        let client = self.clone();
408
409        pyo3_async_runtimes::tokio::future_into_py(py, async move {
410            client
411                .subscribe_book(instrument_id)
412                .await
413                .map_err(to_pyruntime_err)?;
414            Ok(())
415        })
416    }
417
418    #[pyo3(name = "unsubscribe_book_deltas")]
419    fn py_unsubscribe_book_deltas<'py>(
420        &self,
421        py: Python<'py>,
422        instrument_id: InstrumentId,
423    ) -> PyResult<Bound<'py, PyAny>> {
424        let client = self.clone();
425
426        pyo3_async_runtimes::tokio::future_into_py(py, async move {
427            client
428                .unsubscribe_book(instrument_id)
429                .await
430                .map_err(to_pyruntime_err)?;
431            Ok(())
432        })
433    }
434
435    #[pyo3(name = "subscribe_book_snapshots")]
436    fn py_subscribe_book_snapshots<'py>(
437        &self,
438        py: Python<'py>,
439        instrument_id: InstrumentId,
440        _book_type: u8,
441        _depth: u64,
442    ) -> PyResult<Bound<'py, PyAny>> {
443        let client = self.clone();
444
445        pyo3_async_runtimes::tokio::future_into_py(py, async move {
446            client
447                .subscribe_book(instrument_id)
448                .await
449                .map_err(to_pyruntime_err)?;
450            Ok(())
451        })
452    }
453
454    /// Subscribe to best bid/offer (BBO) quotes for an instrument.
455    #[pyo3(name = "subscribe_quotes")]
456    fn py_subscribe_quotes<'py>(
457        &self,
458        py: Python<'py>,
459        instrument_id: InstrumentId,
460    ) -> PyResult<Bound<'py, PyAny>> {
461        let client = self.clone();
462
463        pyo3_async_runtimes::tokio::future_into_py(py, async move {
464            client
465                .subscribe_quotes(instrument_id)
466                .await
467                .map_err(to_pyruntime_err)?;
468            Ok(())
469        })
470    }
471
472    /// Unsubscribe from quote ticks for an instrument.
473    #[pyo3(name = "unsubscribe_quotes")]
474    fn py_unsubscribe_quotes<'py>(
475        &self,
476        py: Python<'py>,
477        instrument_id: InstrumentId,
478    ) -> PyResult<Bound<'py, PyAny>> {
479        let client = self.clone();
480
481        pyo3_async_runtimes::tokio::future_into_py(py, async move {
482            client
483                .unsubscribe_quotes(instrument_id)
484                .await
485                .map_err(to_pyruntime_err)?;
486            Ok(())
487        })
488    }
489
490    /// Subscribe to candle/bar data for a specific coin and interval.
491    #[pyo3(name = "subscribe_bars")]
492    fn py_subscribe_bars<'py>(
493        &self,
494        py: Python<'py>,
495        bar_type: BarType,
496    ) -> PyResult<Bound<'py, PyAny>> {
497        let client = self.clone();
498
499        pyo3_async_runtimes::tokio::future_into_py(py, async move {
500            client
501                .subscribe_bars(bar_type)
502                .await
503                .map_err(to_pyruntime_err)?;
504            Ok(())
505        })
506    }
507
508    /// Unsubscribe from candle/bar data.
509    #[pyo3(name = "unsubscribe_bars")]
510    fn py_unsubscribe_bars<'py>(
511        &self,
512        py: Python<'py>,
513        bar_type: BarType,
514    ) -> PyResult<Bound<'py, PyAny>> {
515        let client = self.clone();
516
517        pyo3_async_runtimes::tokio::future_into_py(py, async move {
518            client
519                .unsubscribe_bars(bar_type)
520                .await
521                .map_err(to_pyruntime_err)?;
522            Ok(())
523        })
524    }
525
526    /// Subscribe to order updates for a specific user address.
527    #[pyo3(name = "subscribe_order_updates")]
528    fn py_subscribe_order_updates<'py>(
529        &self,
530        py: Python<'py>,
531        user: String,
532    ) -> PyResult<Bound<'py, PyAny>> {
533        let client = self.clone();
534
535        pyo3_async_runtimes::tokio::future_into_py(py, async move {
536            client
537                .subscribe_order_updates(&user)
538                .await
539                .map_err(to_pyruntime_err)?;
540            Ok(())
541        })
542    }
543
544    /// Subscribe to user events (fills, funding, liquidations) for a specific user address.
545    #[pyo3(name = "subscribe_user_events")]
546    fn py_subscribe_user_events<'py>(
547        &self,
548        py: Python<'py>,
549        user: String,
550    ) -> PyResult<Bound<'py, PyAny>> {
551        let client = self.clone();
552
553        pyo3_async_runtimes::tokio::future_into_py(py, async move {
554            client
555                .subscribe_user_events(&user)
556                .await
557                .map_err(to_pyruntime_err)?;
558            Ok(())
559        })
560    }
561
562    /// Subscribe to user fills for a specific user address.
563    ///
564    /// Note: This channel is redundant with `userEvents` which already includes fills.
565    /// Prefer using `subscribe_user_events` or `subscribe_all_user_channels` instead.
566    #[pyo3(name = "subscribe_user_fills")]
567    fn py_subscribe_user_fills<'py>(
568        &self,
569        py: Python<'py>,
570        user: String,
571    ) -> PyResult<Bound<'py, PyAny>> {
572        let client = self.clone();
573
574        pyo3_async_runtimes::tokio::future_into_py(py, async move {
575            client
576                .subscribe_user_fills(&user)
577                .await
578                .map_err(to_pyruntime_err)?;
579            Ok(())
580        })
581    }
582
583    /// Subscribe to mark price updates for an instrument.
584    #[pyo3(name = "subscribe_mark_prices")]
585    fn py_subscribe_mark_prices<'py>(
586        &self,
587        py: Python<'py>,
588        instrument_id: InstrumentId,
589    ) -> PyResult<Bound<'py, PyAny>> {
590        let client = self.clone();
591
592        pyo3_async_runtimes::tokio::future_into_py(py, async move {
593            client
594                .subscribe_mark_prices(instrument_id)
595                .await
596                .map_err(to_pyruntime_err)?;
597            Ok(())
598        })
599    }
600
601    /// Unsubscribe from mark price updates for an instrument.
602    #[pyo3(name = "unsubscribe_mark_prices")]
603    fn py_unsubscribe_mark_prices<'py>(
604        &self,
605        py: Python<'py>,
606        instrument_id: InstrumentId,
607    ) -> PyResult<Bound<'py, PyAny>> {
608        let client = self.clone();
609
610        pyo3_async_runtimes::tokio::future_into_py(py, async move {
611            client
612                .unsubscribe_mark_prices(instrument_id)
613                .await
614                .map_err(to_pyruntime_err)?;
615            Ok(())
616        })
617    }
618
619    /// Subscribe to index/oracle price updates for an instrument.
620    #[pyo3(name = "subscribe_index_prices")]
621    fn py_subscribe_index_prices<'py>(
622        &self,
623        py: Python<'py>,
624        instrument_id: InstrumentId,
625    ) -> PyResult<Bound<'py, PyAny>> {
626        let client = self.clone();
627
628        pyo3_async_runtimes::tokio::future_into_py(py, async move {
629            client
630                .subscribe_index_prices(instrument_id)
631                .await
632                .map_err(to_pyruntime_err)?;
633            Ok(())
634        })
635    }
636
637    /// Unsubscribe from index/oracle price updates for an instrument.
638    #[pyo3(name = "unsubscribe_index_prices")]
639    fn py_unsubscribe_index_prices<'py>(
640        &self,
641        py: Python<'py>,
642        instrument_id: InstrumentId,
643    ) -> PyResult<Bound<'py, PyAny>> {
644        let client = self.clone();
645
646        pyo3_async_runtimes::tokio::future_into_py(py, async move {
647            client
648                .unsubscribe_index_prices(instrument_id)
649                .await
650                .map_err(to_pyruntime_err)?;
651            Ok(())
652        })
653    }
654
655    /// Subscribe to funding rate updates for an instrument.
656    #[pyo3(name = "subscribe_funding_rates")]
657    fn py_subscribe_funding_rates<'py>(
658        &self,
659        py: Python<'py>,
660        instrument_id: InstrumentId,
661    ) -> PyResult<Bound<'py, PyAny>> {
662        let client = self.clone();
663
664        pyo3_async_runtimes::tokio::future_into_py(py, async move {
665            client
666                .subscribe_funding_rates(instrument_id)
667                .await
668                .map_err(to_pyruntime_err)?;
669            Ok(())
670        })
671    }
672
673    /// Unsubscribe from funding rate updates for an instrument.
674    #[pyo3(name = "unsubscribe_funding_rates")]
675    fn py_unsubscribe_funding_rates<'py>(
676        &self,
677        py: Python<'py>,
678        instrument_id: InstrumentId,
679    ) -> PyResult<Bound<'py, PyAny>> {
680        let client = self.clone();
681
682        pyo3_async_runtimes::tokio::future_into_py(py, async move {
683            client
684                .unsubscribe_funding_rates(instrument_id)
685                .await
686                .map_err(to_pyruntime_err)?;
687            Ok(())
688        })
689    }
690}