1use std::sync::{
19 Arc, RwLock,
20 atomic::{AtomicU64, Ordering},
21};
22
23use ahash::AHashMap;
24use nautilus_common::live::get_runtime;
25use nautilus_core::{
26 AtomicMap, UnixNanos,
27 python::{call_python_threadsafe, to_pyruntime_err},
28 time::get_atomic_clock_realtime,
29};
30use nautilus_model::{
31 data::{Data, OrderBookDeltas, OrderBookDeltas_API, QuoteTick},
32 enums::{BookType, OrderSide, OrderStatus, OrderType, TimeInForce},
33 identifiers::{
34 AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
35 },
36 instruments::{Instrument, InstrumentAny},
37 orderbook::OrderBook,
38 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
39 reports::{FillReport, OrderStatusReport},
40 types::Quantity,
41};
42use nautilus_network::websocket::{SubscriptionState, TransportBackend};
43use pyo3::{IntoPyObjectExt, prelude::*};
44
45use crate::{
46 common::{
47 consts::KRAKEN_VENUE,
48 credential::KrakenCredential,
49 enums::{KrakenEnvironment, KrakenProductType},
50 urls::get_kraken_ws_public_url,
51 },
52 websocket::futures::{
53 client::KrakenFuturesWebSocketClient,
54 messages::{
55 KrakenFuturesBookDelta, KrakenFuturesBookSnapshot, KrakenFuturesFillsDelta,
56 KrakenFuturesOpenOrdersCancel, KrakenFuturesOpenOrdersDelta, KrakenFuturesTickerData,
57 KrakenFuturesTradeData, KrakenFuturesWsMessage,
58 },
59 parse::{
60 parse_futures_ws_book_delta, parse_futures_ws_book_snapshot_deltas,
61 parse_futures_ws_fill_report, parse_futures_ws_funding_rate,
62 parse_futures_ws_index_price, parse_futures_ws_mark_price,
63 parse_futures_ws_order_status_report, parse_futures_ws_trade_tick,
64 },
65 },
66};
67
68#[pymethods]
69#[pyo3_stub_gen::derive::gen_stub_pymethods]
70impl KrakenFuturesWebSocketClient {
71 #[new]
73 #[pyo3(signature = (environment=None, base_url=None, heartbeat_secs=60, api_key=None, api_secret=None, proxy_url=None))]
74 fn py_new(
75 environment: Option<KrakenEnvironment>,
76 base_url: Option<String>,
77 heartbeat_secs: u64,
78 api_key: Option<String>,
79 api_secret: Option<String>,
80 proxy_url: Option<String>,
81 ) -> Self {
82 let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
83 let demo = env == KrakenEnvironment::Demo;
84 let url = base_url.unwrap_or_else(|| {
85 get_kraken_ws_public_url(KrakenProductType::Futures, env).to_string()
86 });
87 let credential = KrakenCredential::resolve_futures(api_key, api_secret, demo);
88
89 Self::with_credentials(
90 url,
91 heartbeat_secs,
92 credential,
93 TransportBackend::default(),
94 proxy_url,
95 )
96 }
97
98 #[getter]
100 #[pyo3(name = "has_credentials")]
101 #[must_use]
102 pub fn py_has_credentials(&self) -> bool {
103 self.has_credentials()
104 }
105
106 #[getter]
108 #[pyo3(name = "url")]
109 #[must_use]
110 pub fn py_url(&self) -> &str {
111 self.url()
112 }
113
114 #[pyo3(name = "is_closed")]
116 fn py_is_closed(&self) -> bool {
117 self.is_closed()
118 }
119
120 #[pyo3(name = "is_active")]
122 fn py_is_active(&self) -> bool {
123 self.is_active()
124 }
125
126 #[pyo3(name = "wait_until_active")]
128 fn py_wait_until_active<'py>(
129 &self,
130 py: Python<'py>,
131 timeout_secs: f64,
132 ) -> PyResult<Bound<'py, PyAny>> {
133 let client = self.clone();
134
135 pyo3_async_runtimes::tokio::future_into_py(py, async move {
136 client
137 .wait_until_active(timeout_secs)
138 .await
139 .map_err(to_pyruntime_err)?;
140 Ok(())
141 })
142 }
143
144 #[pyo3(name = "is_authenticated")]
146 fn py_is_authenticated(&self) -> bool {
147 self.is_authenticated()
148 }
149
150 #[pyo3(name = "wait_until_authenticated")]
154 fn py_wait_until_authenticated<'py>(
155 &self,
156 py: Python<'py>,
157 timeout_secs: f64,
158 ) -> PyResult<Bound<'py, PyAny>> {
159 let client = self.clone();
160
161 pyo3_async_runtimes::tokio::future_into_py(py, async move {
162 client
163 .wait_until_authenticated(timeout_secs)
164 .await
165 .map_err(to_pyruntime_err)?;
166 Ok(())
167 })
168 }
169
170 #[pyo3(name = "authenticate")]
176 fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
177 let client = self.clone();
178
179 pyo3_async_runtimes::tokio::future_into_py(py, async move {
180 client.authenticate().await.map_err(to_pyruntime_err)?;
181 Ok(())
182 })
183 }
184
185 #[pyo3(name = "connect")]
187 #[expect(clippy::needless_pass_by_value)]
188 fn py_connect<'py>(
189 &mut self,
190 py: Python<'py>,
191 loop_: Py<PyAny>,
192 instruments: Vec<Py<PyAny>>,
193 callback: Py<PyAny>,
194 ) -> PyResult<Bound<'py, PyAny>> {
195 let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
196
197 for inst in instruments {
198 let inst_any = pyobject_to_instrument_any(py, inst)?;
199 self.cache_instrument(inst_any);
200 }
201
202 let instruments_map = self.instruments_shared().clone();
203 let subscriptions = self.subscriptions().clone();
204 let account_id = self.account_id_shared().clone();
205 let truncated_id_map = self.truncated_id_map().clone();
206 let order_instrument_map = self.order_instrument_map().clone();
207 let mut client = self.clone();
208
209 pyo3_async_runtimes::tokio::future_into_py(py, async move {
210 client.connect().await.map_err(to_pyruntime_err)?;
211
212 if let Some(mut rx) = client.take_output_rx() {
213 let clock = get_atomic_clock_realtime();
214 let book_sequence = Arc::new(AtomicU64::new(0));
215
216 get_runtime().spawn(async move {
217 let mut order_books: AHashMap<InstrumentId, OrderBook> = AHashMap::new();
218 let mut last_quotes: AHashMap<InstrumentId, QuoteTick> = AHashMap::new();
219 let venue_client_map: Arc<AtomicMap<String, ClientOrderId>> =
220 Arc::new(AtomicMap::new());
221 let venue_order_qty: Arc<AtomicMap<String, Quantity>> =
222 Arc::new(AtomicMap::new());
223
224 while let Some(msg) = rx.recv().await {
225 let ts_init = clock.get_time_ns();
226
227 match msg {
228 KrakenFuturesWsMessage::OpenOrdersDelta(delta) => {
229 handle_open_orders_delta(
230 &delta,
231 &instruments_map,
232 &account_id,
233 &truncated_id_map,
234 &order_instrument_map,
235 &venue_client_map,
236 &venue_order_qty,
237 ts_init,
238 &call_soon,
239 &callback,
240 );
241 }
242 KrakenFuturesWsMessage::OpenOrdersCancel(cancel) => {
243 handle_open_orders_cancel(
244 &cancel,
245 &account_id,
246 &truncated_id_map,
247 &order_instrument_map,
248 &venue_client_map,
249 &venue_order_qty,
250 ts_init,
251 &call_soon,
252 &callback,
253 );
254 }
255 KrakenFuturesWsMessage::FillsDelta(fills_delta) => {
256 handle_fills_delta(
257 &fills_delta,
258 &instruments_map,
259 &account_id,
260 &truncated_id_map,
261 ts_init,
262 &call_soon,
263 &callback,
264 );
265 }
266 KrakenFuturesWsMessage::Ticker(ref ticker) => {
267 handle_ticker(
268 ticker,
269 &instruments_map,
270 ts_init,
271 &call_soon,
272 &callback,
273 );
274 }
275 KrakenFuturesWsMessage::Trade(ref trade) => {
276 handle_trade(
277 trade,
278 &instruments_map,
279 ts_init,
280 &call_soon,
281 &callback,
282 );
283 }
284 KrakenFuturesWsMessage::BookSnapshot(ref snapshot) => {
285 handle_book_snapshot(
286 snapshot,
287 &instruments_map,
288 &subscriptions,
289 &mut order_books,
290 &mut last_quotes,
291 &book_sequence,
292 ts_init,
293 &call_soon,
294 &callback,
295 );
296 }
297 KrakenFuturesWsMessage::BookDelta(ref delta) => {
298 handle_book_delta(
299 delta,
300 &instruments_map,
301 &subscriptions,
302 &mut order_books,
303 &mut last_quotes,
304 &book_sequence,
305 ts_init,
306 &call_soon,
307 &callback,
308 );
309 }
310 KrakenFuturesWsMessage::Challenge(_)
311 | KrakenFuturesWsMessage::Reconnected => {}
312 }
313 }
314 });
315 }
316
317 Ok(())
318 })
319 }
320
321 #[pyo3(name = "set_account_id")]
323 fn py_set_account_id(&self, account_id: AccountId) {
324 self.set_account_id(account_id);
325 }
326
327 #[pyo3(name = "cache_instrument")]
329 fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
330 let inst_any = pyobject_to_instrument_any(py, instrument)?;
331 self.cache_instrument(inst_any);
332 Ok(())
333 }
334
335 #[pyo3(name = "cache_instruments")]
337 fn py_cache_instruments(&self, py: Python, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
338 let mut inst_vec = Vec::with_capacity(instruments.len());
339 for inst in instruments {
340 inst_vec.push(pyobject_to_instrument_any(py, inst)?);
341 }
342 self.cache_instruments(&inst_vec);
343 Ok(())
344 }
345
346 #[pyo3(name = "cache_client_order")]
352 fn py_cache_client_order(
353 &self,
354 client_order_id: ClientOrderId,
355 venue_order_id: Option<VenueOrderId>,
356 instrument_id: InstrumentId,
357 trader_id: TraderId,
358 strategy_id: StrategyId,
359 ) {
360 self.cache_client_order(
361 client_order_id,
362 venue_order_id,
363 instrument_id,
364 trader_id,
365 strategy_id,
366 );
367 }
368
369 #[pyo3(name = "disconnect")]
371 fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
372 let mut client = self.clone();
373
374 pyo3_async_runtimes::tokio::future_into_py(py, async move {
375 client.disconnect().await.map_err(to_pyruntime_err)?;
376 Ok(())
377 })
378 }
379
380 #[pyo3(name = "close")]
382 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
383 let mut client = self.clone();
384
385 pyo3_async_runtimes::tokio::future_into_py(py, async move {
386 client.close().await.map_err(to_pyruntime_err)?;
387 Ok(())
388 })
389 }
390
391 #[pyo3(name = "subscribe_book")]
396 #[pyo3(signature = (instrument_id, depth=None))]
397 fn py_subscribe_book<'py>(
398 &self,
399 py: Python<'py>,
400 instrument_id: InstrumentId,
401 depth: Option<u32>,
402 ) -> PyResult<Bound<'py, PyAny>> {
403 let client = self.clone();
404
405 pyo3_async_runtimes::tokio::future_into_py(py, async move {
406 client
407 .subscribe_book(instrument_id, depth)
408 .await
409 .map_err(to_pyruntime_err)?;
410 Ok(())
411 })
412 }
413
414 #[pyo3(name = "subscribe_quotes")]
418 fn py_subscribe_quotes<'py>(
419 &self,
420 py: Python<'py>,
421 instrument_id: InstrumentId,
422 ) -> PyResult<Bound<'py, PyAny>> {
423 let client = self.clone();
424
425 pyo3_async_runtimes::tokio::future_into_py(py, async move {
426 client
427 .subscribe_quotes(instrument_id)
428 .await
429 .map_err(to_pyruntime_err)?;
430 Ok(())
431 })
432 }
433
434 #[pyo3(name = "subscribe_trades")]
436 fn py_subscribe_trades<'py>(
437 &self,
438 py: Python<'py>,
439 instrument_id: InstrumentId,
440 ) -> PyResult<Bound<'py, PyAny>> {
441 let client = self.clone();
442
443 pyo3_async_runtimes::tokio::future_into_py(py, async move {
444 client
445 .subscribe_trades(instrument_id)
446 .await
447 .map_err(to_pyruntime_err)?;
448 Ok(())
449 })
450 }
451
452 #[pyo3(name = "subscribe_mark_price")]
454 fn py_subscribe_mark_price<'py>(
455 &self,
456 py: Python<'py>,
457 instrument_id: InstrumentId,
458 ) -> PyResult<Bound<'py, PyAny>> {
459 let client = self.clone();
460
461 pyo3_async_runtimes::tokio::future_into_py(py, async move {
462 client
463 .subscribe_mark_price(instrument_id)
464 .await
465 .map_err(to_pyruntime_err)?;
466 Ok(())
467 })
468 }
469
470 #[pyo3(name = "subscribe_index_price")]
472 fn py_subscribe_index_price<'py>(
473 &self,
474 py: Python<'py>,
475 instrument_id: InstrumentId,
476 ) -> PyResult<Bound<'py, PyAny>> {
477 let client = self.clone();
478
479 pyo3_async_runtimes::tokio::future_into_py(py, async move {
480 client
481 .subscribe_index_price(instrument_id)
482 .await
483 .map_err(to_pyruntime_err)?;
484 Ok(())
485 })
486 }
487
488 #[pyo3(name = "subscribe_funding_rate")]
490 fn py_subscribe_funding_rate<'py>(
491 &self,
492 py: Python<'py>,
493 instrument_id: InstrumentId,
494 ) -> PyResult<Bound<'py, PyAny>> {
495 let client = self.clone();
496
497 pyo3_async_runtimes::tokio::future_into_py(py, async move {
498 client
499 .subscribe_funding_rate(instrument_id)
500 .await
501 .map_err(to_pyruntime_err)?;
502 Ok(())
503 })
504 }
505
506 #[pyo3(name = "unsubscribe_book")]
508 fn py_unsubscribe_book<'py>(
509 &self,
510 py: Python<'py>,
511 instrument_id: InstrumentId,
512 ) -> PyResult<Bound<'py, PyAny>> {
513 let client = self.clone();
514
515 pyo3_async_runtimes::tokio::future_into_py(py, async move {
516 client
517 .unsubscribe_book(instrument_id)
518 .await
519 .map_err(to_pyruntime_err)?;
520 Ok(())
521 })
522 }
523
524 #[pyo3(name = "unsubscribe_quotes")]
526 fn py_unsubscribe_quotes<'py>(
527 &self,
528 py: Python<'py>,
529 instrument_id: InstrumentId,
530 ) -> PyResult<Bound<'py, PyAny>> {
531 let client = self.clone();
532
533 pyo3_async_runtimes::tokio::future_into_py(py, async move {
534 client
535 .unsubscribe_quotes(instrument_id)
536 .await
537 .map_err(to_pyruntime_err)?;
538 Ok(())
539 })
540 }
541
542 #[pyo3(name = "unsubscribe_trades")]
544 fn py_unsubscribe_trades<'py>(
545 &self,
546 py: Python<'py>,
547 instrument_id: InstrumentId,
548 ) -> PyResult<Bound<'py, PyAny>> {
549 let client = self.clone();
550
551 pyo3_async_runtimes::tokio::future_into_py(py, async move {
552 client
553 .unsubscribe_trades(instrument_id)
554 .await
555 .map_err(to_pyruntime_err)?;
556 Ok(())
557 })
558 }
559
560 #[pyo3(name = "unsubscribe_mark_price")]
562 fn py_unsubscribe_mark_price<'py>(
563 &self,
564 py: Python<'py>,
565 instrument_id: InstrumentId,
566 ) -> PyResult<Bound<'py, PyAny>> {
567 let client = self.clone();
568
569 pyo3_async_runtimes::tokio::future_into_py(py, async move {
570 client
571 .unsubscribe_mark_price(instrument_id)
572 .await
573 .map_err(to_pyruntime_err)?;
574 Ok(())
575 })
576 }
577
578 #[pyo3(name = "unsubscribe_index_price")]
580 fn py_unsubscribe_index_price<'py>(
581 &self,
582 py: Python<'py>,
583 instrument_id: InstrumentId,
584 ) -> PyResult<Bound<'py, PyAny>> {
585 let client = self.clone();
586
587 pyo3_async_runtimes::tokio::future_into_py(py, async move {
588 client
589 .unsubscribe_index_price(instrument_id)
590 .await
591 .map_err(to_pyruntime_err)?;
592 Ok(())
593 })
594 }
595
596 #[pyo3(name = "unsubscribe_funding_rate")]
598 fn py_unsubscribe_funding_rate<'py>(
599 &self,
600 py: Python<'py>,
601 instrument_id: InstrumentId,
602 ) -> PyResult<Bound<'py, PyAny>> {
603 let client = self.clone();
604
605 pyo3_async_runtimes::tokio::future_into_py(py, async move {
606 client
607 .unsubscribe_funding_rate(instrument_id)
608 .await
609 .map_err(to_pyruntime_err)?;
610 Ok(())
611 })
612 }
613
614 #[pyo3(name = "sign_challenge")]
618 fn py_sign_challenge(&self, challenge: &str) -> PyResult<String> {
619 self.sign_challenge(challenge).map_err(to_pyruntime_err)
620 }
621
622 #[pyo3(name = "authenticate_with_challenge")]
624 fn py_authenticate_with_challenge<'py>(
625 &self,
626 py: Python<'py>,
627 challenge: String,
628 ) -> PyResult<Bound<'py, PyAny>> {
629 let client = self.clone();
630
631 pyo3_async_runtimes::tokio::future_into_py(py, async move {
632 client
633 .authenticate_with_challenge(&challenge)
634 .await
635 .map_err(to_pyruntime_err)?;
636 Ok(())
637 })
638 }
639
640 #[pyo3(name = "set_auth_credentials")]
642 fn py_set_auth_credentials<'py>(
643 &self,
644 py: Python<'py>,
645 original_challenge: String,
646 signed_challenge: String,
647 ) -> PyResult<Bound<'py, PyAny>> {
648 let client = self.clone();
649
650 pyo3_async_runtimes::tokio::future_into_py(py, async move {
651 client
652 .set_auth_credentials(original_challenge, signed_challenge)
653 .await
654 .map_err(to_pyruntime_err)?;
655 Ok(())
656 })
657 }
658
659 #[pyo3(name = "subscribe_open_orders")]
661 fn py_subscribe_open_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
662 let client = self.clone();
663
664 pyo3_async_runtimes::tokio::future_into_py(py, async move {
665 client
666 .subscribe_open_orders()
667 .await
668 .map_err(to_pyruntime_err)?;
669 Ok(())
670 })
671 }
672
673 #[pyo3(name = "subscribe_fills")]
675 fn py_subscribe_fills<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
676 let client = self.clone();
677
678 pyo3_async_runtimes::tokio::future_into_py(py, async move {
679 client.subscribe_fills().await.map_err(to_pyruntime_err)?;
680 Ok(())
681 })
682 }
683
684 #[pyo3(name = "subscribe_executions")]
686 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
687 let client = self.clone();
688
689 pyo3_async_runtimes::tokio::future_into_py(py, async move {
690 client
691 .subscribe_executions()
692 .await
693 .map_err(to_pyruntime_err)?;
694 Ok(())
695 })
696 }
697}
698
699fn lookup_instrument(
700 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
701 product_id: &str,
702) -> Option<InstrumentAny> {
703 let instrument_id = InstrumentId::new(Symbol::new(product_id), *KRAKEN_VENUE);
704 instruments.load().get(&instrument_id).cloned()
705}
706
707fn resolve_client_order_id(
708 truncated: &str,
709 truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
710) -> ClientOrderId {
711 truncated_id_map
712 .load()
713 .get(truncated)
714 .copied()
715 .unwrap_or_else(|| ClientOrderId::new(truncated))
716}
717
718fn dispatch_report_to_python(
719 report: OrderStatusReport,
720 call_soon: &Py<PyAny>,
721 callback: &Py<PyAny>,
722) {
723 Python::attach(|py| match report.into_py_any(py) {
724 Ok(py_obj) => call_python_threadsafe(py, call_soon, callback, py_obj),
725 Err(e) => log::error!("Failed to convert OrderStatusReport to Python: {e}"),
726 });
727}
728
729fn dispatch_fill_to_python(report: FillReport, call_soon: &Py<PyAny>, callback: &Py<PyAny>) {
730 Python::attach(|py| match report.into_py_any(py) {
731 Ok(py_obj) => call_python_threadsafe(py, call_soon, callback, py_obj),
732 Err(e) => log::error!("Failed to convert FillReport to Python: {e}"),
733 });
734}
735
736#[expect(clippy::too_many_arguments)]
737fn handle_open_orders_delta(
738 delta: &KrakenFuturesOpenOrdersDelta,
739 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
740 account_id: &Arc<RwLock<Option<AccountId>>>,
741 truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
742 order_instrument_map: &Arc<AtomicMap<String, InstrumentId>>,
743 venue_client_map: &Arc<AtomicMap<String, ClientOrderId>>,
744 venue_order_qty: &Arc<AtomicMap<String, Quantity>>,
745 ts_init: UnixNanos,
746 call_soon: &Py<PyAny>,
747 callback: &Py<PyAny>,
748) {
749 if delta.is_fill_driven_cancel() {
752 log::debug!(
753 "Skipping fill-driven open_orders delta: order_id={}, reason={:?}",
754 delta.order.order_id,
755 delta.reason,
756 );
757 return;
758 }
759
760 let product_id = delta.order.instrument.as_str();
761
762 let Some(instrument) = lookup_instrument(instruments, product_id) else {
763 log::warn!("No instrument for product_id: {product_id}");
764 return;
765 };
766
767 let Some(acct_id) = account_id.read().ok().and_then(|g| *g) else {
768 log::warn!("Account ID not set, cannot process order delta");
769 return;
770 };
771
772 order_instrument_map.insert(delta.order.order_id.clone(), instrument.id());
773
774 let qty = Quantity::new(delta.order.qty, instrument.size_precision());
775 venue_order_qty.insert(delta.order.order_id.clone(), qty);
776
777 match parse_futures_ws_order_status_report(
778 &delta.order,
779 delta.is_cancel,
780 delta.reason.as_deref(),
781 &instrument,
782 acct_id,
783 ts_init,
784 ) {
785 Ok(mut report) => {
786 if let Some(ref cl_ord_id) = delta.order.cli_ord_id {
787 let full_id = resolve_client_order_id(cl_ord_id, truncated_id_map);
788 report = report.with_client_order_id(full_id);
789
790 venue_client_map.insert(delta.order.order_id.clone(), full_id);
791 }
792 dispatch_report_to_python(report, call_soon, callback);
793 }
794 Err(e) => log::error!("Failed to parse futures order status report: {e}"),
795 }
796}
797
798#[expect(clippy::too_many_arguments)]
799fn handle_open_orders_cancel(
800 cancel: &KrakenFuturesOpenOrdersCancel,
801 account_id: &Arc<RwLock<Option<AccountId>>>,
802 truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
803 order_instrument_map: &Arc<AtomicMap<String, InstrumentId>>,
804 venue_client_map: &Arc<AtomicMap<String, ClientOrderId>>,
805 venue_order_qty: &Arc<AtomicMap<String, Quantity>>,
806 ts_init: UnixNanos,
807 call_soon: &Py<PyAny>,
808 callback: &Py<PyAny>,
809) {
810 if let Some(ref reason) = cancel.reason
811 && (reason == "full_fill" || reason == "partial_fill")
812 {
813 log::debug!(
814 "Skipping fill-driven cancel: order_id={}, reason={reason}",
815 cancel.order_id,
816 );
817 return;
818 }
819
820 let Some(acct_id) = account_id.read().ok().and_then(|g| *g) else {
821 log::warn!("Account ID not set, cannot process order cancel");
822 return;
823 };
824
825 let venue_order_id = VenueOrderId::new(&cancel.order_id);
826
827 let instrument_id = order_instrument_map.load().get(&cancel.order_id).copied();
828
829 let Some(instrument_id) = instrument_id else {
830 log::warn!(
831 "Cannot resolve instrument for cancel: order_id={}, \
832 order not seen in previous delta",
833 cancel.order_id
834 );
835 return;
836 };
837
838 let client_order_id = cancel
839 .cli_ord_id
840 .as_ref()
841 .map(|id| resolve_client_order_id(id, truncated_id_map))
842 .or_else(|| venue_client_map.load().get(&cancel.order_id).copied());
843
844 let Some(quantity) = venue_order_qty.load().get(&cancel.order_id).copied() else {
845 log::warn!(
846 "Cannot resolve quantity for cancel: order_id={}, skipping",
847 cancel.order_id
848 );
849 return;
850 };
851
852 let report = OrderStatusReport::new(
853 acct_id,
854 instrument_id,
855 client_order_id,
856 venue_order_id,
857 OrderSide::NoOrderSide,
858 OrderType::Limit,
859 TimeInForce::Gtc,
860 OrderStatus::Canceled,
861 quantity,
862 Quantity::zero(0),
863 ts_init,
864 ts_init,
865 ts_init,
866 None,
867 );
868
869 let report = if let Some(ref reason) = cancel.reason
870 && !reason.is_empty()
871 {
872 report.with_cancel_reason(reason.clone())
873 } else {
874 report
875 };
876
877 dispatch_report_to_python(report, call_soon, callback);
878}
879
880fn handle_fills_delta(
881 fills_delta: &KrakenFuturesFillsDelta,
882 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
883 account_id: &Arc<RwLock<Option<AccountId>>>,
884 truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
885 ts_init: UnixNanos,
886 call_soon: &Py<PyAny>,
887 callback: &Py<PyAny>,
888) {
889 let Some(acct_id) = account_id.read().ok().and_then(|g| *g) else {
890 log::warn!("Account ID not set, cannot process fills");
891 return;
892 };
893
894 for fill in &fills_delta.fills {
895 let product_id = match &fill.instrument {
896 Some(id) => id.as_str(),
897 None => {
898 log::warn!("Fill missing instrument field: fill_id={}", fill.fill_id);
899 continue;
900 }
901 };
902
903 let Some(instrument) = lookup_instrument(instruments, product_id) else {
904 log::warn!("No instrument for product_id: {product_id}");
905 continue;
906 };
907
908 match parse_futures_ws_fill_report(fill, &instrument, acct_id, ts_init) {
909 Ok(mut report) => {
910 if let Some(ref cl_ord_id) = fill.cli_ord_id {
911 let full_id = resolve_client_order_id(cl_ord_id, truncated_id_map);
912 report.client_order_id = Some(full_id);
913 }
914 dispatch_fill_to_python(report, call_soon, callback);
915 }
916 Err(e) => log::error!("Failed to parse futures fill report: {e}"),
917 }
918 }
919}
920
921fn handle_ticker(
922 ticker: &KrakenFuturesTickerData,
923 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
924 ts_init: UnixNanos,
925 call_soon: &Py<PyAny>,
926 callback: &Py<PyAny>,
927) {
928 let Some(instrument) = lookup_instrument(instruments, ticker.product_id.as_str()) else {
929 return;
930 };
931
932 if let Some(mark_price) = parse_futures_ws_mark_price(ticker, &instrument, ts_init) {
933 Python::attach(|py| {
934 let py_obj = data_to_pycapsule(py, Data::MarkPriceUpdate(mark_price));
935 call_python_threadsafe(py, call_soon, callback, py_obj);
936 });
937 }
938
939 if let Some(index_price) = parse_futures_ws_index_price(ticker, &instrument, ts_init) {
940 Python::attach(|py| {
941 let py_obj = data_to_pycapsule(py, Data::IndexPriceUpdate(index_price));
942 call_python_threadsafe(py, call_soon, callback, py_obj);
943 });
944 }
945
946 if let Some(funding_rate) = parse_futures_ws_funding_rate(ticker, &instrument, ts_init) {
947 Python::attach(|py| match funding_rate.into_py_any(py) {
948 Ok(py_obj) => call_python_threadsafe(py, call_soon, callback, py_obj),
949 Err(e) => log::error!("Failed to convert FundingRateUpdate to Python: {e}"),
950 });
951 }
952}
953
954fn handle_trade(
955 trade: &KrakenFuturesTradeData,
956 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
957 ts_init: UnixNanos,
958 call_soon: &Py<PyAny>,
959 callback: &Py<PyAny>,
960) {
961 let Some(instrument) = lookup_instrument(instruments, trade.product_id.as_str()) else {
962 return;
963 };
964
965 match parse_futures_ws_trade_tick(trade, &instrument, ts_init) {
966 Ok(tick) => {
967 Python::attach(|py| {
968 let py_obj = data_to_pycapsule(py, Data::Trade(tick));
969 call_python_threadsafe(py, call_soon, callback, py_obj);
970 });
971 }
972 Err(e) => log::error!("Failed to parse futures trade tick: {e}"),
973 }
974}
975
976#[expect(clippy::too_many_arguments)]
977fn handle_book_snapshot(
978 snapshot: &KrakenFuturesBookSnapshot,
979 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
980 subscriptions: &SubscriptionState,
981 order_books: &mut AHashMap<InstrumentId, OrderBook>,
982 last_quotes: &mut AHashMap<InstrumentId, QuoteTick>,
983 book_sequence: &Arc<AtomicU64>,
984 ts_init: UnixNanos,
985 call_soon: &Py<PyAny>,
986 callback: &Py<PyAny>,
987) {
988 let Some(instrument) = lookup_instrument(instruments, snapshot.product_id.as_str()) else {
989 return;
990 };
991 let instrument_id = instrument.id();
992
993 let sequence = book_sequence.fetch_add(
994 (snapshot.bids.len() + snapshot.asks.len() + 1) as u64,
995 Ordering::Relaxed,
996 );
997
998 match parse_futures_ws_book_snapshot_deltas(snapshot, &instrument, sequence, ts_init) {
999 Ok(delta_vec) => {
1000 if delta_vec.is_empty() {
1001 return;
1002 }
1003 let deltas = OrderBookDeltas::new(instrument_id, delta_vec);
1004
1005 let quotes_key = format!("quotes:{}", snapshot.product_id);
1006 if subscriptions.get_reference_count("es_key) > 0 {
1007 let book = order_books
1008 .entry(instrument_id)
1009 .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
1010
1011 if let Err(e) = book.apply_deltas(&deltas) {
1012 log::error!("Failed to apply snapshot deltas to order book: {e}");
1013 } else {
1014 maybe_emit_quote(
1015 book,
1016 instrument_id,
1017 last_quotes,
1018 ts_init,
1019 call_soon,
1020 callback,
1021 );
1022 }
1023 }
1024
1025 let deltas_key = format!("deltas:{}", snapshot.product_id);
1026 if subscriptions.get_reference_count(&deltas_key) > 0 {
1027 Python::attach(|py| {
1028 let py_obj =
1029 data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(deltas)));
1030 call_python_threadsafe(py, call_soon, callback, py_obj);
1031 });
1032 }
1033 }
1034 Err(e) => log::error!("Failed to parse futures book snapshot: {e}"),
1035 }
1036}
1037
1038#[expect(clippy::too_many_arguments)]
1039fn handle_book_delta(
1040 delta: &KrakenFuturesBookDelta,
1041 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
1042 subscriptions: &SubscriptionState,
1043 order_books: &mut AHashMap<InstrumentId, OrderBook>,
1044 last_quotes: &mut AHashMap<InstrumentId, QuoteTick>,
1045 book_sequence: &Arc<AtomicU64>,
1046 ts_init: UnixNanos,
1047 call_soon: &Py<PyAny>,
1048 callback: &Py<PyAny>,
1049) {
1050 let Some(instrument) = lookup_instrument(instruments, delta.product_id.as_str()) else {
1051 return;
1052 };
1053 let instrument_id = instrument.id();
1054
1055 let sequence = book_sequence.fetch_add(1, Ordering::Relaxed);
1056
1057 match parse_futures_ws_book_delta(delta, &instrument, sequence, ts_init) {
1058 Ok(book_delta) => {
1059 let deltas = OrderBookDeltas::new(instrument_id, vec![book_delta]);
1060
1061 let quotes_key = format!("quotes:{}", delta.product_id);
1062 if subscriptions.get_reference_count("es_key) > 0
1063 && let Some(book) = order_books.get_mut(&instrument_id)
1064 {
1065 if let Err(e) = book.apply_deltas(&deltas) {
1066 log::error!("Failed to apply delta to order book: {e}");
1067 } else {
1068 maybe_emit_quote(
1069 book,
1070 instrument_id,
1071 last_quotes,
1072 ts_init,
1073 call_soon,
1074 callback,
1075 );
1076 }
1077 }
1078
1079 let deltas_key = format!("deltas:{}", delta.product_id);
1080 if subscriptions.get_reference_count(&deltas_key) > 0 {
1081 Python::attach(|py| {
1082 let py_obj =
1083 data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(deltas)));
1084 call_python_threadsafe(py, call_soon, callback, py_obj);
1085 });
1086 }
1087 }
1088 Err(e) => log::error!("Failed to parse futures book delta: {e}"),
1089 }
1090}
1091
1092fn maybe_emit_quote(
1093 book: &OrderBook,
1094 instrument_id: InstrumentId,
1095 last_quotes: &mut AHashMap<InstrumentId, QuoteTick>,
1096 ts_init: UnixNanos,
1097 call_soon: &Py<PyAny>,
1098 callback: &Py<PyAny>,
1099) {
1100 let (Some(bid_price), Some(ask_price)) = (book.best_bid_price(), book.best_ask_price()) else {
1101 return;
1102 };
1103 let (Some(bid_size), Some(ask_size)) = (book.best_bid_size(), book.best_ask_size()) else {
1104 return;
1105 };
1106
1107 let bid = bid_price.as_f64();
1108 let ask = ask_price.as_f64();
1109 if bid > 0.0 && (ask - bid) / bid > 0.25 {
1110 log::debug!("Filtered quote with wide spread: bid={bid}, ask={ask}");
1111 return;
1112 }
1113
1114 let quote = QuoteTick::new(
1115 instrument_id,
1116 bid_price,
1117 ask_price,
1118 bid_size,
1119 ask_size,
1120 ts_init,
1121 ts_init,
1122 );
1123
1124 if matches!(last_quotes.get(&instrument_id), Some(prev) if *prev == quote) {
1125 return;
1126 }
1127
1128 last_quotes.insert(instrument_id, quote);
1129
1130 Python::attach(|py| {
1131 let py_obj = data_to_pycapsule(py, Data::Quote(quote));
1132 call_python_threadsafe(py, call_soon, callback, py_obj);
1133 });
1134}