Skip to main content

nautilus_kraken/websocket/dispatch/
futures.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket execution dispatch for the Kraken Futures API.
17//!
18//! Routes `OpenOrdersDelta`, `OpenOrdersCancel`, and `FillsDelta` messages to
19//! typed order events (for tracked orders) or status / fill reports (for
20//! external orders) under the two-tier dispatch contract.
21
22use std::sync::Arc;
23
24use nautilus_core::{AtomicMap, UUID4, UnixNanos};
25use nautilus_live::ExecutionEventEmitter;
26use nautilus_model::{
27    enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
28    events::{OrderCanceled, OrderEventAny, OrderUpdated},
29    identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
30    instruments::{Instrument, InstrumentAny},
31    reports::OrderStatusReport,
32    types::{Price, Quantity},
33};
34
35use super::{
36    DeltaSnapshot, OrderIdentity, WsDispatchState, ensure_accepted_emitted,
37    fill_report_to_order_filled, lookup_instrument, resolve_client_order_id,
38};
39use crate::websocket::futures::{
40    messages::{
41        KrakenFuturesFill, KrakenFuturesFillsDelta, KrakenFuturesOpenOrdersCancel,
42        KrakenFuturesOpenOrdersDelta,
43    },
44    parse::{parse_futures_ws_fill_report, parse_futures_ws_order_status_report},
45};
46
47/// Dispatches a Kraken Futures `OpenOrdersDelta` message.
48///
49/// Fill-driven cancel deltas (`is_cancel=true` with reason `full_fill` /
50/// `partial_fill`) are skipped — the corresponding `FillsDelta` carries the
51/// real fill, so emitting a synthetic Canceled here would race with the
52/// genuine `OrderFilled`.
53#[expect(clippy::too_many_arguments)]
54pub fn open_orders_delta(
55    delta: &KrakenFuturesOpenOrdersDelta,
56    state: &WsDispatchState,
57    emitter: &ExecutionEventEmitter,
58    instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
59    truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
60    order_instrument_map: &Arc<AtomicMap<String, InstrumentId>>,
61    venue_client_map: &Arc<AtomicMap<String, ClientOrderId>>,
62    venue_order_qty: &Arc<AtomicMap<String, Quantity>>,
63    account_id: AccountId,
64    ts_init: UnixNanos,
65) {
66    if delta.is_fill_driven_cancel() {
67        log::debug!(
68            "Skipping fill-driven open_orders delta: order_id={}, reason={:?}",
69            delta.order.order_id,
70            delta.reason,
71        );
72        return;
73    }
74
75    let product_id = delta.order.instrument.as_str();
76    let Some(instrument) = lookup_instrument(instruments, product_id) else {
77        log::warn!("No instrument for product_id: {product_id}");
78        return;
79    };
80
81    // Cache instrument and qty by venue order id so cancel-only messages
82    // (which arrive without the order body) can be reconstructed for the
83    // external fallback path.
84    order_instrument_map.insert(delta.order.order_id.clone(), instrument.id());
85    let qty = Quantity::new(delta.order.qty, instrument.size_precision());
86    venue_order_qty.insert(delta.order.order_id.clone(), qty);
87
88    let resolved_id = delta
89        .order
90        .cli_ord_id
91        .as_ref()
92        .map(|id| resolve_client_order_id(id, truncated_id_map));
93
94    // Stale-report suppression: an order that already reached the filled
95    // terminal state should not produce more events even if a late delta
96    // arrives. `filled_orders` persists past `cleanup_terminal` precisely
97    // for this check.
98    if let Some(cid) = resolved_id
99        && state.filled_orders.contains(&cid)
100    {
101        log::debug!(
102            "Skipping stale open_orders delta for filled order: cid={cid}, order_id={}",
103            delta.order.order_id,
104        );
105        return;
106    }
107
108    if let Some(client_order_id) = resolved_id {
109        venue_client_map.insert(delta.order.order_id.clone(), client_order_id);
110
111        if let Some(identity) = state.lookup_identity(&client_order_id) {
112            delta_tracked(
113                delta,
114                client_order_id,
115                &identity,
116                &instrument,
117                state,
118                emitter,
119                account_id,
120                ts_init,
121            );
122            return;
123        }
124    }
125
126    // External / untracked: fall back to a status report.
127    match parse_futures_ws_order_status_report(
128        &delta.order,
129        delta.is_cancel,
130        delta.reason.as_deref(),
131        &instrument,
132        account_id,
133        ts_init,
134    ) {
135        Ok(mut report) => {
136            if let Some(cid) = resolved_id {
137                report = report.with_client_order_id(cid);
138            }
139            emitter.send_order_status_report(report);
140        }
141        Err(e) => log::error!("Failed to parse futures order status report: {e}"),
142    }
143}
144
145#[expect(clippy::too_many_arguments)]
146fn delta_tracked(
147    delta: &KrakenFuturesOpenOrdersDelta,
148    client_order_id: ClientOrderId,
149    identity: &OrderIdentity,
150    instrument: &InstrumentAny,
151    state: &WsDispatchState,
152    emitter: &ExecutionEventEmitter,
153    account_id: AccountId,
154    ts_init: UnixNanos,
155) {
156    let venue_order_id = VenueOrderId::new(&delta.order.order_id);
157    let ts_event = millis_to_nanos(delta.order.last_update_time);
158    let new_filled = Quantity::new(delta.order.filled, instrument.size_precision());
159
160    if delta.is_cancel {
161        ensure_accepted_emitted(
162            client_order_id,
163            venue_order_id,
164            account_id,
165            identity,
166            state,
167            emitter,
168            ts_event,
169            ts_init,
170        );
171        let canceled = OrderCanceled::new(
172            emitter.trader_id(),
173            identity.strategy_id,
174            identity.instrument_id,
175            client_order_id,
176            UUID4::new(),
177            ts_event,
178            ts_init,
179            false,
180            Some(venue_order_id),
181            Some(account_id),
182        );
183        emitter.send_order_event(OrderEventAny::Canceled(canceled));
184        state.cleanup_terminal(&client_order_id);
185        return;
186    }
187
188    let already_accepted = state.emitted_accepted.contains(&client_order_id);
189    ensure_accepted_emitted(
190        client_order_id,
191        venue_order_id,
192        account_id,
193        identity,
194        state,
195        emitter,
196        ts_event,
197        ts_init,
198    );
199
200    let qty = Quantity::new(delta.order.qty, instrument.size_precision());
201    let snapshot = DeltaSnapshot::new(
202        qty,
203        new_filled,
204        delta.order.limit_price,
205        delta.order.stop_price,
206    );
207
208    if !already_accepted {
209        // First delta seen for this order: the placement Accepted is enough.
210        state.record_delta_snapshot(client_order_id, snapshot);
211        return;
212    }
213
214    // Follow-up delta. The two emission-relevant signals are independent:
215    //   * filled increased       -> partial-fill notification (FillsDelta has it,
216    //                               nothing to emit from here)
217    //   * non-fill field changed -> modify acknowledgement (emit OrderUpdated)
218    // Both can be true simultaneously when a user amends a partially filled
219    // order, so check the modify branch regardless of fill movement.
220    let previous = state.previous_delta_snapshot(&client_order_id);
221    state.record_delta_snapshot(client_order_id, snapshot);
222
223    let non_fill_changed = previous.is_some_and(|prev| !snapshot.non_fill_fields_match(&prev));
224    if !non_fill_changed {
225        return;
226    }
227
228    // Modify ack: refresh tracked quantity (size may have changed) and emit
229    // OrderUpdated so the engine clears PendingUpdate.
230    state.update_identity_quantity(&client_order_id, qty);
231    let updated = OrderUpdated::new(
232        emitter.trader_id(),
233        identity.strategy_id,
234        identity.instrument_id,
235        client_order_id,
236        qty,
237        UUID4::new(),
238        ts_event,
239        ts_init,
240        false,
241        Some(venue_order_id),
242        Some(account_id),
243        delta
244            .order
245            .limit_price
246            .map(|p| Price::new(p, instrument.price_precision())),
247        delta
248            .order
249            .stop_price
250            .map(|p| Price::new(p, instrument.price_precision())),
251        None,
252        false,
253    );
254    emitter.send_order_event(OrderEventAny::Updated(updated));
255}
256
257/// Dispatches a Kraken Futures `OpenOrdersCancel` (cancel-only) message.
258#[expect(clippy::too_many_arguments)]
259pub fn open_orders_cancel(
260    cancel: &KrakenFuturesOpenOrdersCancel,
261    state: &WsDispatchState,
262    emitter: &ExecutionEventEmitter,
263    truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
264    order_instrument_map: &Arc<AtomicMap<String, InstrumentId>>,
265    venue_client_map: &Arc<AtomicMap<String, ClientOrderId>>,
266    venue_order_qty: &Arc<AtomicMap<String, Quantity>>,
267    account_id: AccountId,
268    ts_init: UnixNanos,
269) {
270    // Skip fill-driven removals (the FillsDelta carries the real fill).
271    if let Some(ref reason) = cancel.reason
272        && (reason == "full_fill" || reason == "partial_fill")
273    {
274        log::debug!(
275            "Skipping fill-driven cancel: order_id={}, reason={reason}",
276            cancel.order_id,
277        );
278        return;
279    }
280
281    let venue_order_id = VenueOrderId::new(&cancel.order_id);
282    let resolved_id = cancel
283        .cli_ord_id
284        .as_ref()
285        .map(|id| resolve_client_order_id(id, truncated_id_map))
286        .or_else(|| venue_client_map.load().get(&cancel.order_id).copied());
287
288    if let Some(client_order_id) = resolved_id
289        && let Some(identity) = state.lookup_identity(&client_order_id)
290    {
291        let ts_event = ts_init;
292        ensure_accepted_emitted(
293            client_order_id,
294            venue_order_id,
295            account_id,
296            &identity,
297            state,
298            emitter,
299            ts_event,
300            ts_init,
301        );
302        let canceled = OrderCanceled::new(
303            emitter.trader_id(),
304            identity.strategy_id,
305            identity.instrument_id,
306            client_order_id,
307            UUID4::new(),
308            ts_event,
309            ts_init,
310            false,
311            Some(venue_order_id),
312            Some(account_id),
313        );
314        emitter.send_order_event(OrderEventAny::Canceled(canceled));
315        state.cleanup_terminal(&client_order_id);
316        return;
317    }
318
319    // External fallback: build a status report from the side caches.
320    let Some(instrument_id) = order_instrument_map.load().get(&cancel.order_id).copied() else {
321        log::warn!(
322            "Cannot resolve instrument for cancel: order_id={}, \
323             order not seen in previous delta",
324            cancel.order_id
325        );
326        return;
327    };
328
329    let Some(quantity) = venue_order_qty.load().get(&cancel.order_id).copied() else {
330        log::warn!(
331            "Cannot resolve quantity for cancel: order_id={}, skipping",
332            cancel.order_id
333        );
334        return;
335    };
336
337    let report = OrderStatusReport::new(
338        account_id,
339        instrument_id,
340        resolved_id,
341        venue_order_id,
342        OrderSide::NoOrderSide,
343        OrderType::Limit,
344        TimeInForce::Gtc,
345        OrderStatus::Canceled,
346        quantity,
347        Quantity::zero(0),
348        ts_init,
349        ts_init,
350        ts_init,
351        None,
352    );
353    let report = if let Some(ref reason) = cancel.reason
354        && !reason.is_empty()
355    {
356        report.with_cancel_reason(reason.clone())
357    } else {
358        report
359    };
360    emitter.send_order_status_report(report);
361}
362
363/// Dispatches a Kraken Futures `FillsDelta` message.
364#[expect(clippy::too_many_arguments)]
365pub fn fills_delta(
366    fills_delta: &KrakenFuturesFillsDelta,
367    state: &WsDispatchState,
368    emitter: &ExecutionEventEmitter,
369    instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
370    truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
371    venue_client_map: &Arc<AtomicMap<String, ClientOrderId>>,
372    account_id: AccountId,
373    ts_init: UnixNanos,
374) {
375    for fill in &fills_delta.fills {
376        single_fill(
377            fill,
378            state,
379            emitter,
380            instruments,
381            truncated_id_map,
382            venue_client_map,
383            account_id,
384            ts_init,
385        );
386    }
387}
388
389#[expect(clippy::too_many_arguments)]
390fn single_fill(
391    fill: &KrakenFuturesFill,
392    state: &WsDispatchState,
393    emitter: &ExecutionEventEmitter,
394    instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
395    truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
396    venue_client_map: &Arc<AtomicMap<String, ClientOrderId>>,
397    account_id: AccountId,
398    ts_init: UnixNanos,
399) {
400    let product_id = match &fill.instrument {
401        Some(id) => id.as_str(),
402        None => {
403            log::warn!("Fill missing instrument field: fill_id={}", fill.fill_id);
404            return;
405        }
406    };
407
408    let Some(instrument) = lookup_instrument(instruments, product_id) else {
409        log::warn!("No instrument for product_id: {product_id}");
410        return;
411    };
412
413    let mut report = match parse_futures_ws_fill_report(fill, &instrument, account_id, ts_init) {
414        Ok(report) => report,
415        Err(e) => {
416            log::error!("Failed to parse futures fill report: {e}");
417            return;
418        }
419    };
420
421    let resolved_id = fill
422        .cli_ord_id
423        .as_deref()
424        .filter(|s| !s.is_empty())
425        .map(|id| resolve_client_order_id(id, truncated_id_map))
426        .or_else(|| venue_client_map.load().get(&fill.order_id).copied());
427
428    if let Some(cid) = resolved_id
429        && state.filled_orders.contains(&cid)
430    {
431        log::debug!(
432            "Skipping stale fill for filled order: cid={cid}, order_id={}",
433            fill.order_id,
434        );
435        return;
436    }
437
438    if let Some(client_order_id) = resolved_id {
439        report.client_order_id = Some(client_order_id);
440
441        if let Some(identity) = state.lookup_identity(&client_order_id) {
442            if state.check_and_insert_trade(report.trade_id) {
443                log::debug!(
444                    "Skipping duplicate fill for {client_order_id}: trade_id={}",
445                    report.trade_id
446                );
447                return;
448            }
449            ensure_accepted_emitted(
450                client_order_id,
451                report.venue_order_id,
452                account_id,
453                &identity,
454                state,
455                emitter,
456                report.ts_event,
457                ts_init,
458            );
459            let filled = fill_report_to_order_filled(
460                &report,
461                emitter.trader_id(),
462                &identity,
463                instrument.quote_currency(),
464                client_order_id,
465            );
466            emitter.send_order_event(OrderEventAny::Filled(filled));
467
468            // Update cumulative filled and cleanup on terminal fill.
469            let previous = state
470                .previous_filled_qty(&client_order_id)
471                .unwrap_or_else(|| Quantity::zero(instrument.size_precision()));
472            let cumulative = previous + report.last_qty;
473            state.record_filled_qty(client_order_id, cumulative);
474
475            if cumulative >= identity.quantity {
476                state.insert_filled(client_order_id);
477                state.cleanup_terminal(&client_order_id);
478            }
479            return;
480        }
481    }
482
483    // External fallback.
484    if state.check_and_insert_trade(report.trade_id) {
485        log::debug!(
486            "Skipping duplicate external fill: trade_id={}",
487            report.trade_id
488        );
489        return;
490    }
491    emitter.send_fill_report(report);
492}
493
494#[inline]
495fn millis_to_nanos(millis: i64) -> UnixNanos {
496    UnixNanos::from((millis as u64) * 1_000_000)
497}