Skip to main content

nautilus_kraken/python/
websocket_futures.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Kraken Futures WebSocket client.
17
18use std::sync::{
19    Arc, RwLock,
20    atomic::{AtomicU64, Ordering},
21};
22
23use ahash::AHashMap;
24use nautilus_common::live::get_runtime;
25use nautilus_core::{
26    AtomicMap, UnixNanos,
27    python::{call_python_threadsafe, to_pyruntime_err},
28    time::get_atomic_clock_realtime,
29};
30use nautilus_model::{
31    data::{Data, OrderBookDeltas, OrderBookDeltas_API, QuoteTick},
32    enums::{BookType, OrderSide, OrderStatus, OrderType, TimeInForce},
33    identifiers::{
34        AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
35    },
36    instruments::{Instrument, InstrumentAny},
37    orderbook::OrderBook,
38    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
39    reports::{FillReport, OrderStatusReport},
40    types::Quantity,
41};
42use nautilus_network::websocket::{SubscriptionState, TransportBackend};
43use pyo3::{IntoPyObjectExt, prelude::*};
44
45use crate::{
46    common::{
47        consts::KRAKEN_VENUE,
48        credential::KrakenCredential,
49        enums::{KrakenEnvironment, KrakenProductType},
50        urls::get_kraken_ws_public_url,
51    },
52    websocket::futures::{
53        client::KrakenFuturesWebSocketClient,
54        messages::{
55            KrakenFuturesBookDelta, KrakenFuturesBookSnapshot, KrakenFuturesFillsDelta,
56            KrakenFuturesOpenOrdersCancel, KrakenFuturesOpenOrdersDelta, KrakenFuturesTickerData,
57            KrakenFuturesTradeData, KrakenFuturesWsMessage,
58        },
59        parse::{
60            parse_futures_ws_book_delta, parse_futures_ws_book_snapshot_deltas,
61            parse_futures_ws_fill_report, parse_futures_ws_funding_rate,
62            parse_futures_ws_index_price, parse_futures_ws_mark_price,
63            parse_futures_ws_order_status_report, parse_futures_ws_trade_tick,
64        },
65    },
66};
67
68#[pymethods]
69#[pyo3_stub_gen::derive::gen_stub_pymethods]
70impl KrakenFuturesWebSocketClient {
71    /// WebSocket client for the Kraken Futures v1 streaming API.
72    #[new]
73    #[pyo3(signature = (environment=None, base_url=None, heartbeat_secs=60, api_key=None, api_secret=None, proxy_url=None))]
74    fn py_new(
75        environment: Option<KrakenEnvironment>,
76        base_url: Option<String>,
77        heartbeat_secs: u64,
78        api_key: Option<String>,
79        api_secret: Option<String>,
80        proxy_url: Option<String>,
81    ) -> Self {
82        let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
83        let demo = env == KrakenEnvironment::Demo;
84        let url = base_url.unwrap_or_else(|| {
85            get_kraken_ws_public_url(KrakenProductType::Futures, env).to_string()
86        });
87        let credential = KrakenCredential::resolve_futures(api_key, api_secret, demo);
88
89        Self::with_credentials(
90            url,
91            heartbeat_secs,
92            credential,
93            TransportBackend::default(),
94            proxy_url,
95        )
96    }
97
98    /// Returns true if the client has API credentials set.
99    #[getter]
100    #[pyo3(name = "has_credentials")]
101    #[must_use]
102    pub fn py_has_credentials(&self) -> bool {
103        self.has_credentials()
104    }
105
106    /// Returns the WebSocket URL.
107    #[getter]
108    #[pyo3(name = "url")]
109    #[must_use]
110    pub fn py_url(&self) -> &str {
111        self.url()
112    }
113
114    /// Returns true if the connection is closed.
115    #[pyo3(name = "is_closed")]
116    fn py_is_closed(&self) -> bool {
117        self.is_closed()
118    }
119
120    /// Returns true if the connection is active.
121    #[pyo3(name = "is_active")]
122    fn py_is_active(&self) -> bool {
123        self.is_active()
124    }
125
126    /// Waits until the WebSocket connection is active or timeout.
127    #[pyo3(name = "wait_until_active")]
128    fn py_wait_until_active<'py>(
129        &self,
130        py: Python<'py>,
131        timeout_secs: f64,
132    ) -> PyResult<Bound<'py, PyAny>> {
133        let client = self.clone();
134
135        pyo3_async_runtimes::tokio::future_into_py(py, async move {
136            client
137                .wait_until_active(timeout_secs)
138                .await
139                .map_err(to_pyruntime_err)?;
140            Ok(())
141        })
142    }
143
144    /// Returns true if the WebSocket is authenticated for private feeds.
145    #[pyo3(name = "is_authenticated")]
146    fn py_is_authenticated(&self) -> bool {
147        self.is_authenticated()
148    }
149
150    /// Waits until the WebSocket is authenticated or the timeout elapses.
151    ///
152    /// Returns an error on timeout or explicit auth failure.
153    #[pyo3(name = "wait_until_authenticated")]
154    fn py_wait_until_authenticated<'py>(
155        &self,
156        py: Python<'py>,
157        timeout_secs: f64,
158    ) -> PyResult<Bound<'py, PyAny>> {
159        let client = self.clone();
160
161        pyo3_async_runtimes::tokio::future_into_py(py, async move {
162            client
163                .wait_until_authenticated(timeout_secs)
164                .await
165                .map_err(to_pyruntime_err)?;
166            Ok(())
167        })
168    }
169
170    /// Authenticates the WebSocket connection for private feeds.
171    ///
172    /// Sends a challenge request and waits for the handler to parse the response,
173    /// sign it, and mark the `AuthTracker` successful. Private subscriptions gate
174    /// on the stored challenge / signed-challenge pair.
175    #[pyo3(name = "authenticate")]
176    fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
177        let client = self.clone();
178
179        pyo3_async_runtimes::tokio::future_into_py(py, async move {
180            client.authenticate().await.map_err(to_pyruntime_err)?;
181            Ok(())
182        })
183    }
184
185    /// Connects to the WebSocket server.
186    #[pyo3(name = "connect")]
187    #[expect(clippy::needless_pass_by_value)]
188    fn py_connect<'py>(
189        &mut self,
190        py: Python<'py>,
191        loop_: Py<PyAny>,
192        instruments: Vec<Py<PyAny>>,
193        callback: Py<PyAny>,
194    ) -> PyResult<Bound<'py, PyAny>> {
195        let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
196
197        for inst in instruments {
198            let inst_any = pyobject_to_instrument_any(py, inst)?;
199            self.cache_instrument(inst_any);
200        }
201
202        let instruments_map = self.instruments_shared().clone();
203        let subscriptions = self.subscriptions().clone();
204        let account_id = self.account_id_shared().clone();
205        let truncated_id_map = self.truncated_id_map().clone();
206        let order_instrument_map = self.order_instrument_map().clone();
207        let mut client = self.clone();
208
209        pyo3_async_runtimes::tokio::future_into_py(py, async move {
210            client.connect().await.map_err(to_pyruntime_err)?;
211
212            if let Some(mut rx) = client.take_output_rx() {
213                let clock = get_atomic_clock_realtime();
214                let book_sequence = Arc::new(AtomicU64::new(0));
215
216                get_runtime().spawn(async move {
217                    let mut order_books: AHashMap<InstrumentId, OrderBook> = AHashMap::new();
218                    let mut last_quotes: AHashMap<InstrumentId, QuoteTick> = AHashMap::new();
219                    let venue_client_map: Arc<AtomicMap<String, ClientOrderId>> =
220                        Arc::new(AtomicMap::new());
221                    let venue_order_qty: Arc<AtomicMap<String, Quantity>> =
222                        Arc::new(AtomicMap::new());
223
224                    while let Some(msg) = rx.recv().await {
225                        let ts_init = clock.get_time_ns();
226
227                        match msg {
228                            KrakenFuturesWsMessage::OpenOrdersDelta(delta) => {
229                                handle_open_orders_delta(
230                                    &delta,
231                                    &instruments_map,
232                                    &account_id,
233                                    &truncated_id_map,
234                                    &order_instrument_map,
235                                    &venue_client_map,
236                                    &venue_order_qty,
237                                    ts_init,
238                                    &call_soon,
239                                    &callback,
240                                );
241                            }
242                            KrakenFuturesWsMessage::OpenOrdersCancel(cancel) => {
243                                handle_open_orders_cancel(
244                                    &cancel,
245                                    &account_id,
246                                    &truncated_id_map,
247                                    &order_instrument_map,
248                                    &venue_client_map,
249                                    &venue_order_qty,
250                                    ts_init,
251                                    &call_soon,
252                                    &callback,
253                                );
254                            }
255                            KrakenFuturesWsMessage::FillsDelta(fills_delta) => {
256                                handle_fills_delta(
257                                    &fills_delta,
258                                    &instruments_map,
259                                    &account_id,
260                                    &truncated_id_map,
261                                    ts_init,
262                                    &call_soon,
263                                    &callback,
264                                );
265                            }
266                            KrakenFuturesWsMessage::Ticker(ref ticker) => {
267                                handle_ticker(
268                                    ticker,
269                                    &instruments_map,
270                                    ts_init,
271                                    &call_soon,
272                                    &callback,
273                                );
274                            }
275                            KrakenFuturesWsMessage::Trade(ref trade) => {
276                                handle_trade(
277                                    trade,
278                                    &instruments_map,
279                                    ts_init,
280                                    &call_soon,
281                                    &callback,
282                                );
283                            }
284                            KrakenFuturesWsMessage::BookSnapshot(ref snapshot) => {
285                                handle_book_snapshot(
286                                    snapshot,
287                                    &instruments_map,
288                                    &subscriptions,
289                                    &mut order_books,
290                                    &mut last_quotes,
291                                    &book_sequence,
292                                    ts_init,
293                                    &call_soon,
294                                    &callback,
295                                );
296                            }
297                            KrakenFuturesWsMessage::BookDelta(ref delta) => {
298                                handle_book_delta(
299                                    delta,
300                                    &instruments_map,
301                                    &subscriptions,
302                                    &mut order_books,
303                                    &mut last_quotes,
304                                    &book_sequence,
305                                    ts_init,
306                                    &call_soon,
307                                    &callback,
308                                );
309                            }
310                            KrakenFuturesWsMessage::Challenge(_)
311                            | KrakenFuturesWsMessage::Reconnected => {}
312                        }
313                    }
314                });
315            }
316
317            Ok(())
318        })
319    }
320
321    /// Sets the account ID for execution report parsing.
322    #[pyo3(name = "set_account_id")]
323    fn py_set_account_id(&self, account_id: AccountId) {
324        self.set_account_id(account_id);
325    }
326
327    /// Caches an instrument for execution report parsing.
328    #[pyo3(name = "cache_instrument")]
329    fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
330        let inst_any = pyobject_to_instrument_any(py, instrument)?;
331        self.cache_instrument(inst_any);
332        Ok(())
333    }
334
335    /// Caches multiple instruments for execution report parsing.
336    #[pyo3(name = "cache_instruments")]
337    fn py_cache_instruments(&self, py: Python, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
338        let mut inst_vec = Vec::with_capacity(instruments.len());
339        for inst in instruments {
340            inst_vec.push(pyobject_to_instrument_any(py, inst)?);
341        }
342        self.cache_instruments(&inst_vec);
343        Ok(())
344    }
345
346    /// Caches a client order for truncated ID resolution and instrument lookup.
347    ///
348    /// Kraken Futures limits client order IDs to 18 characters, so orders with
349    /// longer IDs are truncated. This method stores the mapping from truncated
350    /// to full ID, and from venue order ID to instrument ID for cancel messages.
351    #[pyo3(name = "cache_client_order")]
352    fn py_cache_client_order(
353        &self,
354        client_order_id: ClientOrderId,
355        venue_order_id: Option<VenueOrderId>,
356        instrument_id: InstrumentId,
357        trader_id: TraderId,
358        strategy_id: StrategyId,
359    ) {
360        self.cache_client_order(
361            client_order_id,
362            venue_order_id,
363            instrument_id,
364            trader_id,
365            strategy_id,
366        );
367    }
368
369    /// Disconnects from the WebSocket server.
370    #[pyo3(name = "disconnect")]
371    fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
372        let mut client = self.clone();
373
374        pyo3_async_runtimes::tokio::future_into_py(py, async move {
375            client.disconnect().await.map_err(to_pyruntime_err)?;
376            Ok(())
377        })
378    }
379
380    /// Closes the WebSocket connection.
381    #[pyo3(name = "close")]
382    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
383        let mut client = self.clone();
384
385        pyo3_async_runtimes::tokio::future_into_py(py, async move {
386            client.close().await.map_err(to_pyruntime_err)?;
387            Ok(())
388        })
389    }
390
391    /// Subscribes to order book updates for the given instrument.
392    ///
393    /// Note: The `depth` parameter is accepted for API compatibility with spot client but is
394    /// not used by Kraken Futures (full book is always returned).
395    #[pyo3(name = "subscribe_book")]
396    #[pyo3(signature = (instrument_id, depth=None))]
397    fn py_subscribe_book<'py>(
398        &self,
399        py: Python<'py>,
400        instrument_id: InstrumentId,
401        depth: Option<u32>,
402    ) -> PyResult<Bound<'py, PyAny>> {
403        let client = self.clone();
404
405        pyo3_async_runtimes::tokio::future_into_py(py, async move {
406            client
407                .subscribe_book(instrument_id, depth)
408                .await
409                .map_err(to_pyruntime_err)?;
410            Ok(())
411        })
412    }
413
414    /// Subscribes to quote updates for the given instrument.
415    ///
416    /// Uses the order book channel for low-latency top-of-book quotes.
417    #[pyo3(name = "subscribe_quotes")]
418    fn py_subscribe_quotes<'py>(
419        &self,
420        py: Python<'py>,
421        instrument_id: InstrumentId,
422    ) -> PyResult<Bound<'py, PyAny>> {
423        let client = self.clone();
424
425        pyo3_async_runtimes::tokio::future_into_py(py, async move {
426            client
427                .subscribe_quotes(instrument_id)
428                .await
429                .map_err(to_pyruntime_err)?;
430            Ok(())
431        })
432    }
433
434    /// Subscribes to trade updates for the given instrument.
435    #[pyo3(name = "subscribe_trades")]
436    fn py_subscribe_trades<'py>(
437        &self,
438        py: Python<'py>,
439        instrument_id: InstrumentId,
440    ) -> PyResult<Bound<'py, PyAny>> {
441        let client = self.clone();
442
443        pyo3_async_runtimes::tokio::future_into_py(py, async move {
444            client
445                .subscribe_trades(instrument_id)
446                .await
447                .map_err(to_pyruntime_err)?;
448            Ok(())
449        })
450    }
451
452    /// Subscribes to mark price updates for the given instrument.
453    #[pyo3(name = "subscribe_mark_price")]
454    fn py_subscribe_mark_price<'py>(
455        &self,
456        py: Python<'py>,
457        instrument_id: InstrumentId,
458    ) -> PyResult<Bound<'py, PyAny>> {
459        let client = self.clone();
460
461        pyo3_async_runtimes::tokio::future_into_py(py, async move {
462            client
463                .subscribe_mark_price(instrument_id)
464                .await
465                .map_err(to_pyruntime_err)?;
466            Ok(())
467        })
468    }
469
470    /// Subscribes to index price updates for the given instrument.
471    #[pyo3(name = "subscribe_index_price")]
472    fn py_subscribe_index_price<'py>(
473        &self,
474        py: Python<'py>,
475        instrument_id: InstrumentId,
476    ) -> PyResult<Bound<'py, PyAny>> {
477        let client = self.clone();
478
479        pyo3_async_runtimes::tokio::future_into_py(py, async move {
480            client
481                .subscribe_index_price(instrument_id)
482                .await
483                .map_err(to_pyruntime_err)?;
484            Ok(())
485        })
486    }
487
488    /// Subscribes to funding rate updates for the given instrument.
489    #[pyo3(name = "subscribe_funding_rate")]
490    fn py_subscribe_funding_rate<'py>(
491        &self,
492        py: Python<'py>,
493        instrument_id: InstrumentId,
494    ) -> PyResult<Bound<'py, PyAny>> {
495        let client = self.clone();
496
497        pyo3_async_runtimes::tokio::future_into_py(py, async move {
498            client
499                .subscribe_funding_rate(instrument_id)
500                .await
501                .map_err(to_pyruntime_err)?;
502            Ok(())
503        })
504    }
505
506    /// Unsubscribes from order book updates for the given instrument.
507    #[pyo3(name = "unsubscribe_book")]
508    fn py_unsubscribe_book<'py>(
509        &self,
510        py: Python<'py>,
511        instrument_id: InstrumentId,
512    ) -> PyResult<Bound<'py, PyAny>> {
513        let client = self.clone();
514
515        pyo3_async_runtimes::tokio::future_into_py(py, async move {
516            client
517                .unsubscribe_book(instrument_id)
518                .await
519                .map_err(to_pyruntime_err)?;
520            Ok(())
521        })
522    }
523
524    /// Unsubscribes from quote updates for the given instrument.
525    #[pyo3(name = "unsubscribe_quotes")]
526    fn py_unsubscribe_quotes<'py>(
527        &self,
528        py: Python<'py>,
529        instrument_id: InstrumentId,
530    ) -> PyResult<Bound<'py, PyAny>> {
531        let client = self.clone();
532
533        pyo3_async_runtimes::tokio::future_into_py(py, async move {
534            client
535                .unsubscribe_quotes(instrument_id)
536                .await
537                .map_err(to_pyruntime_err)?;
538            Ok(())
539        })
540    }
541
542    /// Unsubscribes from trade updates for the given instrument.
543    #[pyo3(name = "unsubscribe_trades")]
544    fn py_unsubscribe_trades<'py>(
545        &self,
546        py: Python<'py>,
547        instrument_id: InstrumentId,
548    ) -> PyResult<Bound<'py, PyAny>> {
549        let client = self.clone();
550
551        pyo3_async_runtimes::tokio::future_into_py(py, async move {
552            client
553                .unsubscribe_trades(instrument_id)
554                .await
555                .map_err(to_pyruntime_err)?;
556            Ok(())
557        })
558    }
559
560    /// Unsubscribes from mark price updates for the given instrument.
561    #[pyo3(name = "unsubscribe_mark_price")]
562    fn py_unsubscribe_mark_price<'py>(
563        &self,
564        py: Python<'py>,
565        instrument_id: InstrumentId,
566    ) -> PyResult<Bound<'py, PyAny>> {
567        let client = self.clone();
568
569        pyo3_async_runtimes::tokio::future_into_py(py, async move {
570            client
571                .unsubscribe_mark_price(instrument_id)
572                .await
573                .map_err(to_pyruntime_err)?;
574            Ok(())
575        })
576    }
577
578    /// Unsubscribes from index price updates for the given instrument.
579    #[pyo3(name = "unsubscribe_index_price")]
580    fn py_unsubscribe_index_price<'py>(
581        &self,
582        py: Python<'py>,
583        instrument_id: InstrumentId,
584    ) -> PyResult<Bound<'py, PyAny>> {
585        let client = self.clone();
586
587        pyo3_async_runtimes::tokio::future_into_py(py, async move {
588            client
589                .unsubscribe_index_price(instrument_id)
590                .await
591                .map_err(to_pyruntime_err)?;
592            Ok(())
593        })
594    }
595
596    /// Unsubscribes from funding rate updates for the given instrument.
597    #[pyo3(name = "unsubscribe_funding_rate")]
598    fn py_unsubscribe_funding_rate<'py>(
599        &self,
600        py: Python<'py>,
601        instrument_id: InstrumentId,
602    ) -> PyResult<Bound<'py, PyAny>> {
603        let client = self.clone();
604
605        pyo3_async_runtimes::tokio::future_into_py(py, async move {
606            client
607                .unsubscribe_funding_rate(instrument_id)
608                .await
609                .map_err(to_pyruntime_err)?;
610            Ok(())
611        })
612    }
613
614    /// Sign a challenge with the API credentials.
615    ///
616    /// Returns the signed challenge on success.
617    #[pyo3(name = "sign_challenge")]
618    fn py_sign_challenge(&self, challenge: &str) -> PyResult<String> {
619        self.sign_challenge(challenge).map_err(to_pyruntime_err)
620    }
621
622    /// Complete authentication with a received challenge.
623    #[pyo3(name = "authenticate_with_challenge")]
624    fn py_authenticate_with_challenge<'py>(
625        &self,
626        py: Python<'py>,
627        challenge: String,
628    ) -> PyResult<Bound<'py, PyAny>> {
629        let client = self.clone();
630
631        pyo3_async_runtimes::tokio::future_into_py(py, async move {
632            client
633                .authenticate_with_challenge(&challenge)
634                .await
635                .map_err(to_pyruntime_err)?;
636            Ok(())
637        })
638    }
639
640    /// Set authentication credentials directly (for when challenge is obtained externally).
641    #[pyo3(name = "set_auth_credentials")]
642    fn py_set_auth_credentials<'py>(
643        &self,
644        py: Python<'py>,
645        original_challenge: String,
646        signed_challenge: String,
647    ) -> PyResult<Bound<'py, PyAny>> {
648        let client = self.clone();
649
650        pyo3_async_runtimes::tokio::future_into_py(py, async move {
651            client
652                .set_auth_credentials(original_challenge, signed_challenge)
653                .await
654                .map_err(to_pyruntime_err)?;
655            Ok(())
656        })
657    }
658
659    /// Subscribes to open orders feed (private, requires authentication).
660    #[pyo3(name = "subscribe_open_orders")]
661    fn py_subscribe_open_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
662        let client = self.clone();
663
664        pyo3_async_runtimes::tokio::future_into_py(py, async move {
665            client
666                .subscribe_open_orders()
667                .await
668                .map_err(to_pyruntime_err)?;
669            Ok(())
670        })
671    }
672
673    /// Subscribes to fills feed (private, requires authentication).
674    #[pyo3(name = "subscribe_fills")]
675    fn py_subscribe_fills<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
676        let client = self.clone();
677
678        pyo3_async_runtimes::tokio::future_into_py(py, async move {
679            client.subscribe_fills().await.map_err(to_pyruntime_err)?;
680            Ok(())
681        })
682    }
683
684    /// Subscribes to both open orders and fills (convenience method).
685    #[pyo3(name = "subscribe_executions")]
686    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
687        let client = self.clone();
688
689        pyo3_async_runtimes::tokio::future_into_py(py, async move {
690            client
691                .subscribe_executions()
692                .await
693                .map_err(to_pyruntime_err)?;
694            Ok(())
695        })
696    }
697}
698
699fn lookup_instrument(
700    instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
701    product_id: &str,
702) -> Option<InstrumentAny> {
703    let instrument_id = InstrumentId::new(Symbol::new(product_id), *KRAKEN_VENUE);
704    instruments.load().get(&instrument_id).cloned()
705}
706
707fn resolve_client_order_id(
708    truncated: &str,
709    truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
710) -> ClientOrderId {
711    truncated_id_map
712        .load()
713        .get(truncated)
714        .copied()
715        .unwrap_or_else(|| ClientOrderId::new(truncated))
716}
717
718fn dispatch_report_to_python(
719    report: OrderStatusReport,
720    call_soon: &Py<PyAny>,
721    callback: &Py<PyAny>,
722) {
723    Python::attach(|py| match report.into_py_any(py) {
724        Ok(py_obj) => call_python_threadsafe(py, call_soon, callback, py_obj),
725        Err(e) => log::error!("Failed to convert OrderStatusReport to Python: {e}"),
726    });
727}
728
729fn dispatch_fill_to_python(report: FillReport, call_soon: &Py<PyAny>, callback: &Py<PyAny>) {
730    Python::attach(|py| match report.into_py_any(py) {
731        Ok(py_obj) => call_python_threadsafe(py, call_soon, callback, py_obj),
732        Err(e) => log::error!("Failed to convert FillReport to Python: {e}"),
733    });
734}
735
736#[expect(clippy::too_many_arguments)]
737fn handle_open_orders_delta(
738    delta: &KrakenFuturesOpenOrdersDelta,
739    instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
740    account_id: &Arc<RwLock<Option<AccountId>>>,
741    truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
742    order_instrument_map: &Arc<AtomicMap<String, InstrumentId>>,
743    venue_client_map: &Arc<AtomicMap<String, ClientOrderId>>,
744    venue_order_qty: &Arc<AtomicMap<String, Quantity>>,
745    ts_init: UnixNanos,
746    call_soon: &Py<PyAny>,
747    callback: &Py<PyAny>,
748) {
749    // The fills delta carries the real fill; skip the cancel-shaped delta
750    // Kraken emits when an order leaves the book because it filled.
751    if delta.is_fill_driven_cancel() {
752        log::debug!(
753            "Skipping fill-driven open_orders delta: order_id={}, reason={:?}",
754            delta.order.order_id,
755            delta.reason,
756        );
757        return;
758    }
759
760    let product_id = delta.order.instrument.as_str();
761
762    let Some(instrument) = lookup_instrument(instruments, product_id) else {
763        log::warn!("No instrument for product_id: {product_id}");
764        return;
765    };
766
767    let Some(acct_id) = account_id.read().ok().and_then(|g| *g) else {
768        log::warn!("Account ID not set, cannot process order delta");
769        return;
770    };
771
772    order_instrument_map.insert(delta.order.order_id.clone(), instrument.id());
773
774    let qty = Quantity::new(delta.order.qty, instrument.size_precision());
775    venue_order_qty.insert(delta.order.order_id.clone(), qty);
776
777    match parse_futures_ws_order_status_report(
778        &delta.order,
779        delta.is_cancel,
780        delta.reason.as_deref(),
781        &instrument,
782        acct_id,
783        ts_init,
784    ) {
785        Ok(mut report) => {
786            if let Some(ref cl_ord_id) = delta.order.cli_ord_id {
787                let full_id = resolve_client_order_id(cl_ord_id, truncated_id_map);
788                report = report.with_client_order_id(full_id);
789
790                venue_client_map.insert(delta.order.order_id.clone(), full_id);
791            }
792            dispatch_report_to_python(report, call_soon, callback);
793        }
794        Err(e) => log::error!("Failed to parse futures order status report: {e}"),
795    }
796}
797
798#[expect(clippy::too_many_arguments)]
799fn handle_open_orders_cancel(
800    cancel: &KrakenFuturesOpenOrdersCancel,
801    account_id: &Arc<RwLock<Option<AccountId>>>,
802    truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
803    order_instrument_map: &Arc<AtomicMap<String, InstrumentId>>,
804    venue_client_map: &Arc<AtomicMap<String, ClientOrderId>>,
805    venue_order_qty: &Arc<AtomicMap<String, Quantity>>,
806    ts_init: UnixNanos,
807    call_soon: &Py<PyAny>,
808    callback: &Py<PyAny>,
809) {
810    if let Some(ref reason) = cancel.reason
811        && (reason == "full_fill" || reason == "partial_fill")
812    {
813        log::debug!(
814            "Skipping fill-driven cancel: order_id={}, reason={reason}",
815            cancel.order_id,
816        );
817        return;
818    }
819
820    let Some(acct_id) = account_id.read().ok().and_then(|g| *g) else {
821        log::warn!("Account ID not set, cannot process order cancel");
822        return;
823    };
824
825    let venue_order_id = VenueOrderId::new(&cancel.order_id);
826
827    let instrument_id = order_instrument_map.load().get(&cancel.order_id).copied();
828
829    let Some(instrument_id) = instrument_id else {
830        log::warn!(
831            "Cannot resolve instrument for cancel: order_id={}, \
832             order not seen in previous delta",
833            cancel.order_id
834        );
835        return;
836    };
837
838    let client_order_id = cancel
839        .cli_ord_id
840        .as_ref()
841        .map(|id| resolve_client_order_id(id, truncated_id_map))
842        .or_else(|| venue_client_map.load().get(&cancel.order_id).copied());
843
844    let Some(quantity) = venue_order_qty.load().get(&cancel.order_id).copied() else {
845        log::warn!(
846            "Cannot resolve quantity for cancel: order_id={}, skipping",
847            cancel.order_id
848        );
849        return;
850    };
851
852    let report = OrderStatusReport::new(
853        acct_id,
854        instrument_id,
855        client_order_id,
856        venue_order_id,
857        OrderSide::NoOrderSide,
858        OrderType::Limit,
859        TimeInForce::Gtc,
860        OrderStatus::Canceled,
861        quantity,
862        Quantity::zero(0),
863        ts_init,
864        ts_init,
865        ts_init,
866        None,
867    );
868
869    let report = if let Some(ref reason) = cancel.reason
870        && !reason.is_empty()
871    {
872        report.with_cancel_reason(reason.clone())
873    } else {
874        report
875    };
876
877    dispatch_report_to_python(report, call_soon, callback);
878}
879
880fn handle_fills_delta(
881    fills_delta: &KrakenFuturesFillsDelta,
882    instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
883    account_id: &Arc<RwLock<Option<AccountId>>>,
884    truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
885    ts_init: UnixNanos,
886    call_soon: &Py<PyAny>,
887    callback: &Py<PyAny>,
888) {
889    let Some(acct_id) = account_id.read().ok().and_then(|g| *g) else {
890        log::warn!("Account ID not set, cannot process fills");
891        return;
892    };
893
894    for fill in &fills_delta.fills {
895        let product_id = match &fill.instrument {
896            Some(id) => id.as_str(),
897            None => {
898                log::warn!("Fill missing instrument field: fill_id={}", fill.fill_id);
899                continue;
900            }
901        };
902
903        let Some(instrument) = lookup_instrument(instruments, product_id) else {
904            log::warn!("No instrument for product_id: {product_id}");
905            continue;
906        };
907
908        match parse_futures_ws_fill_report(fill, &instrument, acct_id, ts_init) {
909            Ok(mut report) => {
910                if let Some(ref cl_ord_id) = fill.cli_ord_id {
911                    let full_id = resolve_client_order_id(cl_ord_id, truncated_id_map);
912                    report.client_order_id = Some(full_id);
913                }
914                dispatch_fill_to_python(report, call_soon, callback);
915            }
916            Err(e) => log::error!("Failed to parse futures fill report: {e}"),
917        }
918    }
919}
920
921fn handle_ticker(
922    ticker: &KrakenFuturesTickerData,
923    instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
924    ts_init: UnixNanos,
925    call_soon: &Py<PyAny>,
926    callback: &Py<PyAny>,
927) {
928    let Some(instrument) = lookup_instrument(instruments, ticker.product_id.as_str()) else {
929        return;
930    };
931
932    if let Some(mark_price) = parse_futures_ws_mark_price(ticker, &instrument, ts_init) {
933        Python::attach(|py| {
934            let py_obj = data_to_pycapsule(py, Data::MarkPriceUpdate(mark_price));
935            call_python_threadsafe(py, call_soon, callback, py_obj);
936        });
937    }
938
939    if let Some(index_price) = parse_futures_ws_index_price(ticker, &instrument, ts_init) {
940        Python::attach(|py| {
941            let py_obj = data_to_pycapsule(py, Data::IndexPriceUpdate(index_price));
942            call_python_threadsafe(py, call_soon, callback, py_obj);
943        });
944    }
945
946    if let Some(funding_rate) = parse_futures_ws_funding_rate(ticker, &instrument, ts_init) {
947        Python::attach(|py| match funding_rate.into_py_any(py) {
948            Ok(py_obj) => call_python_threadsafe(py, call_soon, callback, py_obj),
949            Err(e) => log::error!("Failed to convert FundingRateUpdate to Python: {e}"),
950        });
951    }
952}
953
954fn handle_trade(
955    trade: &KrakenFuturesTradeData,
956    instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
957    ts_init: UnixNanos,
958    call_soon: &Py<PyAny>,
959    callback: &Py<PyAny>,
960) {
961    let Some(instrument) = lookup_instrument(instruments, trade.product_id.as_str()) else {
962        return;
963    };
964
965    match parse_futures_ws_trade_tick(trade, &instrument, ts_init) {
966        Ok(tick) => {
967            Python::attach(|py| {
968                let py_obj = data_to_pycapsule(py, Data::Trade(tick));
969                call_python_threadsafe(py, call_soon, callback, py_obj);
970            });
971        }
972        Err(e) => log::error!("Failed to parse futures trade tick: {e}"),
973    }
974}
975
976#[expect(clippy::too_many_arguments)]
977fn handle_book_snapshot(
978    snapshot: &KrakenFuturesBookSnapshot,
979    instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
980    subscriptions: &SubscriptionState,
981    order_books: &mut AHashMap<InstrumentId, OrderBook>,
982    last_quotes: &mut AHashMap<InstrumentId, QuoteTick>,
983    book_sequence: &Arc<AtomicU64>,
984    ts_init: UnixNanos,
985    call_soon: &Py<PyAny>,
986    callback: &Py<PyAny>,
987) {
988    let Some(instrument) = lookup_instrument(instruments, snapshot.product_id.as_str()) else {
989        return;
990    };
991    let instrument_id = instrument.id();
992
993    let sequence = book_sequence.fetch_add(
994        (snapshot.bids.len() + snapshot.asks.len() + 1) as u64,
995        Ordering::Relaxed,
996    );
997
998    match parse_futures_ws_book_snapshot_deltas(snapshot, &instrument, sequence, ts_init) {
999        Ok(delta_vec) => {
1000            if delta_vec.is_empty() {
1001                return;
1002            }
1003            let deltas = OrderBookDeltas::new(instrument_id, delta_vec);
1004
1005            let quotes_key = format!("quotes:{}", snapshot.product_id);
1006            if subscriptions.get_reference_count(&quotes_key) > 0 {
1007                let book = order_books
1008                    .entry(instrument_id)
1009                    .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
1010
1011                if let Err(e) = book.apply_deltas(&deltas) {
1012                    log::error!("Failed to apply snapshot deltas to order book: {e}");
1013                } else {
1014                    maybe_emit_quote(
1015                        book,
1016                        instrument_id,
1017                        last_quotes,
1018                        ts_init,
1019                        call_soon,
1020                        callback,
1021                    );
1022                }
1023            }
1024
1025            let deltas_key = format!("deltas:{}", snapshot.product_id);
1026            if subscriptions.get_reference_count(&deltas_key) > 0 {
1027                Python::attach(|py| {
1028                    let py_obj =
1029                        data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(deltas)));
1030                    call_python_threadsafe(py, call_soon, callback, py_obj);
1031                });
1032            }
1033        }
1034        Err(e) => log::error!("Failed to parse futures book snapshot: {e}"),
1035    }
1036}
1037
1038#[expect(clippy::too_many_arguments)]
1039fn handle_book_delta(
1040    delta: &KrakenFuturesBookDelta,
1041    instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
1042    subscriptions: &SubscriptionState,
1043    order_books: &mut AHashMap<InstrumentId, OrderBook>,
1044    last_quotes: &mut AHashMap<InstrumentId, QuoteTick>,
1045    book_sequence: &Arc<AtomicU64>,
1046    ts_init: UnixNanos,
1047    call_soon: &Py<PyAny>,
1048    callback: &Py<PyAny>,
1049) {
1050    let Some(instrument) = lookup_instrument(instruments, delta.product_id.as_str()) else {
1051        return;
1052    };
1053    let instrument_id = instrument.id();
1054
1055    let sequence = book_sequence.fetch_add(1, Ordering::Relaxed);
1056
1057    match parse_futures_ws_book_delta(delta, &instrument, sequence, ts_init) {
1058        Ok(book_delta) => {
1059            let deltas = OrderBookDeltas::new(instrument_id, vec![book_delta]);
1060
1061            let quotes_key = format!("quotes:{}", delta.product_id);
1062            if subscriptions.get_reference_count(&quotes_key) > 0
1063                && let Some(book) = order_books.get_mut(&instrument_id)
1064            {
1065                if let Err(e) = book.apply_deltas(&deltas) {
1066                    log::error!("Failed to apply delta to order book: {e}");
1067                } else {
1068                    maybe_emit_quote(
1069                        book,
1070                        instrument_id,
1071                        last_quotes,
1072                        ts_init,
1073                        call_soon,
1074                        callback,
1075                    );
1076                }
1077            }
1078
1079            let deltas_key = format!("deltas:{}", delta.product_id);
1080            if subscriptions.get_reference_count(&deltas_key) > 0 {
1081                Python::attach(|py| {
1082                    let py_obj =
1083                        data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(deltas)));
1084                    call_python_threadsafe(py, call_soon, callback, py_obj);
1085                });
1086            }
1087        }
1088        Err(e) => log::error!("Failed to parse futures book delta: {e}"),
1089    }
1090}
1091
1092fn maybe_emit_quote(
1093    book: &OrderBook,
1094    instrument_id: InstrumentId,
1095    last_quotes: &mut AHashMap<InstrumentId, QuoteTick>,
1096    ts_init: UnixNanos,
1097    call_soon: &Py<PyAny>,
1098    callback: &Py<PyAny>,
1099) {
1100    let (Some(bid_price), Some(ask_price)) = (book.best_bid_price(), book.best_ask_price()) else {
1101        return;
1102    };
1103    let (Some(bid_size), Some(ask_size)) = (book.best_bid_size(), book.best_ask_size()) else {
1104        return;
1105    };
1106
1107    let bid = bid_price.as_f64();
1108    let ask = ask_price.as_f64();
1109    if bid > 0.0 && (ask - bid) / bid > 0.25 {
1110        log::debug!("Filtered quote with wide spread: bid={bid}, ask={ask}");
1111        return;
1112    }
1113
1114    let quote = QuoteTick::new(
1115        instrument_id,
1116        bid_price,
1117        ask_price,
1118        bid_size,
1119        ask_size,
1120        ts_init,
1121        ts_init,
1122    );
1123
1124    if matches!(last_quotes.get(&instrument_id), Some(prev) if *prev == quote) {
1125        return;
1126    }
1127
1128    last_quotes.insert(instrument_id, quote);
1129
1130    Python::attach(|py| {
1131        let py_obj = data_to_pycapsule(py, Data::Quote(quote));
1132        call_python_threadsafe(py, call_soon, callback, py_obj);
1133    });
1134}