1use std::sync::{
45 Arc,
46 atomic::{AtomicU64, Ordering},
47};
48
49use futures_util::StreamExt;
50use nautilus_common::live::get_runtime;
51use nautilus_core::{
52 AtomicMap,
53 python::{call_python_threadsafe, to_pyruntime_err},
54 time::get_atomic_clock_realtime,
55};
56use nautilus_model::{
57 data::{BarType, Data, OrderBookDeltas, OrderBookDeltas_API},
58 identifiers::{
59 AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
60 },
61 instruments::{Instrument, InstrumentAny},
62 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
63 reports::{FillReport, OrderStatusReport},
64};
65use pyo3::{IntoPyObjectExt, prelude::*};
66use tokio_util::sync::CancellationToken;
67
68use crate::{
69 common::{
70 consts::KRAKEN_VENUE,
71 enums::{KrakenEnvironment, KrakenProductType},
72 urls::get_kraken_ws_private_url,
73 },
74 config::KrakenDataClientConfig,
75 websocket::spot_v2::{
76 client::KrakenSpotWebSocketClient,
77 messages::KrakenSpotWsMessage,
78 parse::{
79 parse_book_deltas, parse_quote_tick, parse_trade_tick, parse_ws_bar,
80 parse_ws_fill_report, parse_ws_order_status_report,
81 },
82 },
83};
84
85#[pymethods]
86#[pyo3_stub_gen::derive::gen_stub_pymethods]
87impl KrakenSpotWebSocketClient {
88 #[new]
90 #[pyo3(signature = (environment=None, private=false, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None, proxy_url=None))]
91 fn py_new(
92 environment: Option<KrakenEnvironment>,
93 private: bool,
94 base_url: Option<String>,
95 heartbeat_secs: Option<u64>,
96 api_key: Option<String>,
97 api_secret: Option<String>,
98 proxy_url: Option<String>,
99 ) -> Self {
100 let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
101
102 let (resolved_api_key, resolved_api_secret) =
103 crate::common::credential::KrakenCredential::resolve_spot(api_key, api_secret)
104 .map(|c| c.into_parts())
105 .map_or((None, None), |(k, s)| (Some(k), Some(s)));
106
107 let (ws_public_url, ws_private_url) = if private {
108 let private_url = base_url.unwrap_or_else(|| {
110 get_kraken_ws_private_url(KrakenProductType::Spot, env).to_string()
111 });
112 (None, Some(private_url))
113 } else {
114 (base_url, None)
115 };
116
117 let config = KrakenDataClientConfig {
118 environment: env,
119 ws_public_url,
120 ws_private_url,
121 heartbeat_interval_secs: heartbeat_secs
122 .unwrap_or(KrakenDataClientConfig::default().heartbeat_interval_secs),
123 api_key: resolved_api_key,
124 api_secret: resolved_api_secret,
125 proxy_url: proxy_url.clone(),
126 ..Default::default()
127 };
128
129 let token = CancellationToken::new();
130
131 Self::new(config, token, proxy_url)
132 }
133
134 #[getter]
136 #[pyo3(name = "url")]
137 #[must_use]
138 pub fn py_url(&self) -> &str {
139 self.url()
140 }
141
142 #[pyo3(name = "is_connected")]
144 fn py_is_connected(&self) -> bool {
145 self.is_connected()
146 }
147
148 #[pyo3(name = "is_active")]
150 fn py_is_active(&self) -> bool {
151 self.is_active()
152 }
153
154 #[pyo3(name = "is_closed")]
156 fn py_is_closed(&self) -> bool {
157 self.is_closed()
158 }
159
160 #[pyo3(name = "get_subscriptions")]
162 fn py_get_subscriptions(&self) -> Vec<String> {
163 self.get_subscriptions()
164 }
165
166 #[pyo3(name = "cancel_all_requests")]
168 fn py_cancel_all_requests(&self) {
169 self.cancel_all_requests();
170 }
171
172 #[pyo3(name = "connect")]
174 #[expect(clippy::needless_pass_by_value)]
175 fn py_connect<'py>(
176 &mut self,
177 py: Python<'py>,
178 loop_: Py<PyAny>,
179 instruments: Vec<Py<PyAny>>,
180 callback: Py<PyAny>,
181 ) -> PyResult<Bound<'py, PyAny>> {
182 let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
183
184 let instruments_map = Arc::new(AtomicMap::<InstrumentId, InstrumentAny>::new());
185
186 for inst in instruments {
187 let inst_any = pyobject_to_instrument_any(py, inst)?;
188 instruments_map.insert(inst_any.id(), inst_any);
189 }
190
191 let account_id = self.account_id_shared().clone();
192 let truncated_id_map = self.truncated_id_map().clone();
193 let mut client = self.clone();
194
195 pyo3_async_runtimes::tokio::future_into_py(py, async move {
196 client.connect().await.map_err(to_pyruntime_err)?;
197
198 let stream = client.stream().map_err(to_pyruntime_err)?;
199 let clock = get_atomic_clock_realtime();
200 let book_sequence = Arc::new(AtomicU64::new(0));
201
202 get_runtime().spawn(async move {
203 tokio::pin!(stream);
204 let order_qty_cache: Arc<AtomicMap<String, f64>> =
205 Arc::new(AtomicMap::new());
206
207 while let Some(msg) = stream.next().await {
208 let ts_init = clock.get_time_ns();
209
210 match msg {
211 KrakenSpotWsMessage::Ticker(tickers) => {
212 for ticker in &tickers {
213 let instrument_id = InstrumentId::new(
214 Symbol::new(ticker.symbol.as_str()),
215 *KRAKEN_VENUE,
216 );
217 let instrument =
218 instruments_map.load().get(&instrument_id).cloned();
219
220 if let Some(ref inst) = instrument {
221 match parse_quote_tick(ticker, inst, ts_init) {
222 Ok(quote) => {
223 Python::attach(|py| {
224 let py_obj =
225 data_to_pycapsule(py, Data::Quote(quote));
226 call_python_threadsafe(
227 py, &call_soon, &callback, py_obj,
228 );
229 });
230 }
231 Err(e) => {
232 log::error!("Failed to parse quote tick: {e}");
233 }
234 }
235 }
236 }
237 }
238 KrakenSpotWsMessage::Trade(trades) => {
239 for trade in &trades {
240 let instrument_id = InstrumentId::new(
241 Symbol::new(trade.symbol.as_str()),
242 *KRAKEN_VENUE,
243 );
244 let instrument =
245 instruments_map.load().get(&instrument_id).cloned();
246
247 if let Some(ref inst) = instrument {
248 match parse_trade_tick(trade, inst, ts_init) {
249 Ok(tick) => {
250 Python::attach(|py| {
251 let py_obj =
252 data_to_pycapsule(py, Data::Trade(tick));
253 call_python_threadsafe(
254 py, &call_soon, &callback, py_obj,
255 );
256 });
257 }
258 Err(e) => {
259 log::error!("Failed to parse trade tick: {e}");
260 }
261 }
262 }
263 }
264 }
265 KrakenSpotWsMessage::Book {
266 data,
267 is_snapshot: _,
268 } => {
269 for book in &data {
270 let instrument_id = InstrumentId::new(
271 Symbol::new(book.symbol.as_str()),
272 *KRAKEN_VENUE,
273 );
274 let instrument =
275 instruments_map.load().get(&instrument_id).cloned();
276
277 if let Some(ref inst) = instrument {
278 let sequence = book_sequence.fetch_add(1, Ordering::Relaxed);
279 match parse_book_deltas(book, inst, sequence, ts_init) {
280 Ok(delta_vec) => {
281 if delta_vec.is_empty() {
282 continue;
283 }
284 let deltas = OrderBookDeltas::new(inst.id(), delta_vec);
285 Python::attach(|py| {
286 let py_obj = data_to_pycapsule(
287 py,
288 Data::Deltas(OrderBookDeltas_API::new(deltas)),
289 );
290 call_python_threadsafe(
291 py, &call_soon, &callback, py_obj,
292 );
293 });
294 }
295 Err(e) => {
296 log::error!("Failed to parse book deltas: {e}");
297 }
298 }
299 }
300 }
301 }
302 KrakenSpotWsMessage::Ohlc(ohlc_data) => {
303 for ohlc in &ohlc_data {
304 let instrument_id = InstrumentId::new(
305 Symbol::new(ohlc.symbol.as_str()),
306 *KRAKEN_VENUE,
307 );
308 let instrument =
309 instruments_map.load().get(&instrument_id).cloned();
310
311 if let Some(ref inst) = instrument {
312 match parse_ws_bar(ohlc, inst, ts_init) {
313 Ok(bar) => {
314 Python::attach(|py| {
315 let py_obj = data_to_pycapsule(py, Data::Bar(bar));
316 call_python_threadsafe(
317 py, &call_soon, &callback, py_obj,
318 );
319 });
320 }
321 Err(e) => {
322 log::error!("Failed to parse bar: {e}");
323 }
324 }
325 }
326 }
327 }
328 KrakenSpotWsMessage::Execution(executions) => {
329 let acct_id = account_id.read().ok().and_then(|g| *g);
330 let Some(acct_id) = acct_id else {
331 log::trace!(
332 "Execution message received but no account_id set (data-only client)"
333 );
334 continue;
335 };
336
337 for exec in &executions {
338 let symbol = match &exec.symbol {
339 Some(s) => s.as_str(),
340 None => {
341 log::debug!(
342 "Execution without symbol: exec_type={:?}, order_id={}",
343 exec.exec_type,
344 exec.order_id
345 );
346 continue;
347 }
348 };
349
350 let instrument_id = InstrumentId::new(
351 Symbol::new(symbol),
352 *KRAKEN_VENUE,
353 );
354 let instrument =
355 instruments_map.load().get(&instrument_id).cloned();
356
357 let Some(ref inst) = instrument else {
358 log::warn!("No instrument for symbol: {symbol}");
359 continue;
360 };
361
362 let cached_qty = exec.cl_ord_id.as_ref().and_then(|id| {
363 order_qty_cache.load().get(id).copied()
364 });
365
366 if let (Some(qty), Some(cl_ord_id)) =
367 (exec.order_qty, &exec.cl_ord_id)
368 {
369 order_qty_cache.insert(cl_ord_id.clone(), qty);
370 }
371
372 match parse_ws_order_status_report(
373 exec, inst, acct_id, cached_qty, ts_init,
374 ) {
375 Ok(mut report) => {
376 if let Some(ref cl_ord_id) = exec.cl_ord_id {
377 let full_id = truncated_id_map
378 .load()
379 .get(cl_ord_id)
380 .copied()
381 .unwrap_or_else(|| ClientOrderId::new(cl_ord_id));
382 report = report.with_client_order_id(full_id);
383 }
384 dispatch_order_status_report(
385 report, &call_soon, &callback,
386 );
387 }
388 Err(e) => {
389 log::error!("Failed to parse order status report: {e}");
390 }
391 }
392
393 if exec.exec_id.is_some() {
394 match parse_ws_fill_report(exec, inst, acct_id, ts_init) {
395 Ok(mut report) => {
396 if let Some(ref cl_ord_id) = exec.cl_ord_id {
397 let full_id = truncated_id_map
398 .load()
399 .get(cl_ord_id)
400 .copied()
401 .unwrap_or_else(|| {
402 ClientOrderId::new(cl_ord_id)
403 });
404 report.client_order_id = Some(full_id);
405 }
406 dispatch_fill_report(report, &call_soon, &callback);
407 }
408 Err(e) => {
409 log::error!("Failed to parse fill report: {e}");
410 }
411 }
412 }
413 }
414 }
415 KrakenSpotWsMessage::Reconnected => {
416 log::info!("WebSocket reconnected");
417 }
418 }
419 }
420 });
421
422 Ok(())
423 })
424 }
425
426 #[pyo3(name = "wait_until_active")]
428 fn py_wait_until_active<'py>(
429 &self,
430 py: Python<'py>,
431 timeout_secs: f64,
432 ) -> PyResult<Bound<'py, PyAny>> {
433 let client = self.clone();
434
435 pyo3_async_runtimes::tokio::future_into_py(py, async move {
436 client
437 .wait_until_active(timeout_secs)
438 .await
439 .map_err(to_pyruntime_err)?;
440 Ok(())
441 })
442 }
443
444 #[pyo3(name = "authenticate")]
446 fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
447 let client = self.clone();
448
449 pyo3_async_runtimes::tokio::future_into_py(py, async move {
450 client.authenticate().await.map_err(to_pyruntime_err)?;
451 Ok(())
452 })
453 }
454
455 #[pyo3(name = "is_authenticated")]
457 fn py_is_authenticated(&self) -> bool {
458 self.is_authenticated()
459 }
460
461 #[pyo3(name = "wait_until_authenticated")]
465 fn py_wait_until_authenticated<'py>(
466 &self,
467 py: Python<'py>,
468 timeout_secs: f64,
469 ) -> PyResult<Bound<'py, PyAny>> {
470 let client = self.clone();
471
472 pyo3_async_runtimes::tokio::future_into_py(py, async move {
473 client
474 .wait_until_authenticated(timeout_secs)
475 .await
476 .map_err(to_pyruntime_err)?;
477 Ok(())
478 })
479 }
480
481 #[pyo3(name = "disconnect")]
483 fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
484 let mut client = self.clone();
485
486 pyo3_async_runtimes::tokio::future_into_py(py, async move {
487 client.disconnect().await.map_err(to_pyruntime_err)?;
488 Ok(())
489 })
490 }
491
492 #[pyo3(name = "send_ping")]
494 fn py_send_ping<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
495 let client = self.clone();
496
497 pyo3_async_runtimes::tokio::future_into_py(py, async move {
498 client.send_ping().await.map_err(to_pyruntime_err)?;
499 Ok(())
500 })
501 }
502
503 #[pyo3(name = "close")]
505 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
506 let mut client = self.clone();
507
508 pyo3_async_runtimes::tokio::future_into_py(py, async move {
509 client.close().await.map_err(to_pyruntime_err)?;
510 Ok(())
511 })
512 }
513
514 #[pyo3(name = "set_account_id")]
516 fn py_set_account_id(&self, account_id: AccountId) {
517 self.set_account_id(account_id);
518 }
519
520 #[pyo3(name = "cache_instrument")]
522 fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
523 let inst_any = pyobject_to_instrument_any(py, instrument)?;
524 self.cache_instrument(inst_any);
525 Ok(())
526 }
527
528 #[pyo3(name = "cache_client_order")]
530 fn py_cache_client_order(
531 &self,
532 client_order_id: ClientOrderId,
533 venue_order_id: Option<VenueOrderId>,
534 instrument_id: InstrumentId,
535 trader_id: TraderId,
536 strategy_id: StrategyId,
537 ) {
538 self.cache_client_order(
539 client_order_id,
540 venue_order_id,
541 instrument_id,
542 trader_id,
543 strategy_id,
544 );
545 }
546
547 #[pyo3(name = "subscribe_book")]
549 fn py_subscribe_book<'py>(
550 &self,
551 py: Python<'py>,
552 instrument_id: InstrumentId,
553 depth: Option<u32>,
554 ) -> PyResult<Bound<'py, PyAny>> {
555 let client = self.clone();
556
557 pyo3_async_runtimes::tokio::future_into_py(py, async move {
558 client
559 .subscribe_book(instrument_id, depth)
560 .await
561 .map_err(to_pyruntime_err)?;
562 Ok(())
563 })
564 }
565
566 #[pyo3(name = "subscribe_quotes")]
571 fn py_subscribe_quotes<'py>(
572 &self,
573 py: Python<'py>,
574 instrument_id: InstrumentId,
575 ) -> PyResult<Bound<'py, PyAny>> {
576 let client = self.clone();
577
578 pyo3_async_runtimes::tokio::future_into_py(py, async move {
579 client
580 .subscribe_quotes(instrument_id)
581 .await
582 .map_err(to_pyruntime_err)?;
583 Ok(())
584 })
585 }
586
587 #[pyo3(name = "subscribe_trades")]
589 fn py_subscribe_trades<'py>(
590 &self,
591 py: Python<'py>,
592 instrument_id: InstrumentId,
593 ) -> PyResult<Bound<'py, PyAny>> {
594 let client = self.clone();
595
596 pyo3_async_runtimes::tokio::future_into_py(py, async move {
597 client
598 .subscribe_trades(instrument_id)
599 .await
600 .map_err(to_pyruntime_err)?;
601 Ok(())
602 })
603 }
604
605 #[pyo3(name = "subscribe_bars")]
607 fn py_subscribe_bars<'py>(
608 &self,
609 py: Python<'py>,
610 bar_type: BarType,
611 ) -> PyResult<Bound<'py, PyAny>> {
612 let client = self.clone();
613
614 pyo3_async_runtimes::tokio::future_into_py(py, async move {
615 client
616 .subscribe_bars(bar_type)
617 .await
618 .map_err(to_pyruntime_err)?;
619 Ok(())
620 })
621 }
622
623 #[pyo3(name = "subscribe_executions")]
627 #[pyo3(signature = (snap_orders=true, snap_trades=true))]
628 fn py_subscribe_executions<'py>(
629 &self,
630 py: Python<'py>,
631 snap_orders: bool,
632 snap_trades: bool,
633 ) -> PyResult<Bound<'py, PyAny>> {
634 let client = self.clone();
635
636 pyo3_async_runtimes::tokio::future_into_py(py, async move {
637 client
638 .subscribe_executions(snap_orders, snap_trades)
639 .await
640 .map_err(to_pyruntime_err)?;
641 Ok(())
642 })
643 }
644
645 #[pyo3(name = "unsubscribe_book")]
647 fn py_unsubscribe_book<'py>(
648 &self,
649 py: Python<'py>,
650 instrument_id: InstrumentId,
651 ) -> PyResult<Bound<'py, PyAny>> {
652 let client = self.clone();
653
654 pyo3_async_runtimes::tokio::future_into_py(py, async move {
655 client
656 .unsubscribe_book(instrument_id)
657 .await
658 .map_err(to_pyruntime_err)?;
659 Ok(())
660 })
661 }
662
663 #[pyo3(name = "unsubscribe_quotes")]
665 fn py_unsubscribe_quotes<'py>(
666 &self,
667 py: Python<'py>,
668 instrument_id: InstrumentId,
669 ) -> PyResult<Bound<'py, PyAny>> {
670 let client = self.clone();
671
672 pyo3_async_runtimes::tokio::future_into_py(py, async move {
673 client
674 .unsubscribe_quotes(instrument_id)
675 .await
676 .map_err(to_pyruntime_err)?;
677 Ok(())
678 })
679 }
680
681 #[pyo3(name = "unsubscribe_trades")]
683 fn py_unsubscribe_trades<'py>(
684 &self,
685 py: Python<'py>,
686 instrument_id: InstrumentId,
687 ) -> PyResult<Bound<'py, PyAny>> {
688 let client = self.clone();
689
690 pyo3_async_runtimes::tokio::future_into_py(py, async move {
691 client
692 .unsubscribe_trades(instrument_id)
693 .await
694 .map_err(to_pyruntime_err)?;
695 Ok(())
696 })
697 }
698
699 #[pyo3(name = "unsubscribe_bars")]
701 fn py_unsubscribe_bars<'py>(
702 &self,
703 py: Python<'py>,
704 bar_type: BarType,
705 ) -> PyResult<Bound<'py, PyAny>> {
706 let client = self.clone();
707
708 pyo3_async_runtimes::tokio::future_into_py(py, async move {
709 client
710 .unsubscribe_bars(bar_type)
711 .await
712 .map_err(to_pyruntime_err)?;
713 Ok(())
714 })
715 }
716}
717
718fn dispatch_order_status_report(
719 report: OrderStatusReport,
720 call_soon: &Py<PyAny>,
721 callback: &Py<PyAny>,
722) {
723 Python::attach(|py| match report.into_py_any(py) {
724 Ok(py_obj) => {
725 call_python_threadsafe(py, call_soon, callback, py_obj);
726 }
727 Err(e) => {
728 log::error!("Failed to convert OrderStatusReport to Python: {e}");
729 }
730 });
731}
732
733fn dispatch_fill_report(report: FillReport, call_soon: &Py<PyAny>, callback: &Py<PyAny>) {
734 Python::attach(|py| match report.into_py_any(py) {
735 Ok(py_obj) => {
736 call_python_threadsafe(py, call_soon, callback, py_obj);
737 }
738 Err(e) => {
739 log::error!("Failed to convert FillReport to Python: {e}");
740 }
741 });
742}