Skip to main content

nautilus_kraken/websocket/dispatch/
spot.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket execution dispatch for the Kraken Spot v2 API.
17//!
18//! A single spot execution can carry both a status update (handled via the
19//! order event path) and a fill (when `exec_id` is present, handled via the
20//! fill path). Tracked orders emit typed events; external orders fall through
21//! to reports.
22
23use std::sync::Arc;
24
25use nautilus_core::{AtomicMap, UUID4, UnixNanos};
26use nautilus_live::ExecutionEventEmitter;
27use nautilus_model::{
28    enums::OrderStatus,
29    events::{
30        OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderTriggered, OrderUpdated,
31    },
32    identifiers::{AccountId, ClientOrderId, InstrumentId},
33    instruments::{Instrument, InstrumentAny},
34    reports::{FillReport, OrderStatusReport},
35    types::Quantity,
36};
37
38use super::{
39    OrderIdentity, WsDispatchState, ensure_accepted_emitted, fill_report_to_order_filled,
40    lookup_instrument, resolve_client_order_id,
41};
42use crate::websocket::spot_v2::{
43    enums::KrakenExecType,
44    messages::KrakenWsExecutionData,
45    parse::{parse_ws_fill_report, parse_ws_order_status_report},
46};
47
48/// Dispatches a Kraken Spot v2 execution message.
49#[expect(clippy::too_many_arguments)]
50pub fn execution(
51    exec: &KrakenWsExecutionData,
52    state: &WsDispatchState,
53    emitter: &ExecutionEventEmitter,
54    instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
55    truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
56    order_qty_cache: &Arc<AtomicMap<String, f64>>,
57    account_id: AccountId,
58    ts_init: UnixNanos,
59) {
60    let symbol = match &exec.symbol {
61        Some(s) => s.as_str(),
62        None => {
63            log::debug!(
64                "Execution message without symbol: exec_type={:?}, order_id={}",
65                exec.exec_type,
66                exec.order_id
67            );
68            return;
69        }
70    };
71    let Some(instrument) = lookup_instrument(instruments, symbol) else {
72        log::warn!("No instrument for symbol: {symbol}");
73        return;
74    };
75
76    // Mirror the existing behaviour: cache the order quantity by truncated cli
77    // ord id so the parser can fall back to it for quote-quantity orders.
78    let cached_qty = exec
79        .cl_ord_id
80        .as_ref()
81        .and_then(|id| order_qty_cache.load().get(id).copied());
82    if let (Some(qty), Some(cl_ord_id)) = (exec.order_qty, &exec.cl_ord_id) {
83        order_qty_cache.insert(cl_ord_id.clone(), qty);
84    }
85
86    let resolved_id = exec
87        .cl_ord_id
88        .as_ref()
89        .map(|id| resolve_client_order_id(id, truncated_id_map));
90
91    // Stale-report suppression for previously-tracked orders that already
92    // reached the filled terminal state.
93    if let Some(cid) = resolved_id
94        && state.filled_orders.contains(&cid)
95    {
96        log::debug!(
97            "Skipping stale spot execution for filled order: cid={cid}, order_id={}",
98            exec.order_id,
99        );
100        return;
101    }
102
103    let identity = resolved_id.and_then(|cid| state.lookup_identity(&cid));
104
105    // Status update.
106    match parse_ws_order_status_report(exec, &instrument, account_id, cached_qty, ts_init) {
107        Ok(mut report) => {
108            if let Some(cid) = resolved_id {
109                report = report.with_client_order_id(cid);
110            }
111
112            if let (Some(client_order_id), Some(identity)) = (resolved_id, identity.as_ref()) {
113                status_tracked(
114                    &report,
115                    exec.exec_type,
116                    exec.exec_id.is_some(),
117                    client_order_id,
118                    identity,
119                    state,
120                    emitter,
121                    account_id,
122                    ts_init,
123                );
124            } else {
125                emitter.send_order_status_report(report);
126            }
127        }
128        Err(e) => log::error!("Failed to parse order status report: {e}"),
129    }
130
131    // Fill (when present).
132    if exec.exec_id.is_some() {
133        match parse_ws_fill_report(exec, &instrument, account_id, ts_init) {
134            Ok(mut report) => {
135                if let Some(cid) = resolved_id {
136                    report.client_order_id = Some(cid);
137                }
138
139                if let (Some(client_order_id), Some(identity)) = (resolved_id, identity.as_ref()) {
140                    fill_tracked(
141                        &report,
142                        client_order_id,
143                        identity,
144                        &instrument,
145                        state,
146                        emitter,
147                        account_id,
148                        ts_init,
149                    );
150                } else {
151                    if state.check_and_insert_trade(report.trade_id) {
152                        log::debug!(
153                            "Skipping duplicate external spot fill: trade_id={}",
154                            report.trade_id
155                        );
156                        return;
157                    }
158                    emitter.send_fill_report(report);
159                }
160            }
161            Err(e) => log::error!("Failed to parse fill report: {e}"),
162        }
163    }
164}
165
166#[expect(clippy::too_many_arguments)]
167fn status_tracked(
168    report: &OrderStatusReport,
169    exec_type: KrakenExecType,
170    has_fill: bool,
171    client_order_id: ClientOrderId,
172    identity: &OrderIdentity,
173    state: &WsDispatchState,
174    emitter: &ExecutionEventEmitter,
175    account_id: AccountId,
176    ts_init: UnixNanos,
177) {
178    let venue_order_id = report.venue_order_id;
179    let ts_event = report.ts_last;
180    let trader_id = emitter.trader_id();
181
182    // Amended (user modify) and Restated (engine adjustment) both surface
183    // post-modify state. Refresh tracked quantity (size may have changed) and
184    // emit OrderUpdated so the engine clears PendingUpdate.
185    if matches!(
186        exec_type,
187        KrakenExecType::Amended | KrakenExecType::Restated
188    ) && state.emitted_accepted.contains(&client_order_id)
189    {
190        state.update_identity_quantity(&client_order_id, report.quantity);
191        let updated = OrderUpdated::new(
192            trader_id,
193            identity.strategy_id,
194            identity.instrument_id,
195            client_order_id,
196            report.quantity,
197            UUID4::new(),
198            ts_event,
199            ts_init,
200            false,
201            Some(venue_order_id),
202            Some(account_id),
203            report.price,
204            report.trigger_price,
205            None,
206            false,
207        );
208        emitter.send_order_event(OrderEventAny::Updated(updated));
209        return;
210    }
211
212    match report.order_status {
213        OrderStatus::Accepted => {
214            if state.emitted_accepted.contains(&client_order_id) {
215                // Already accepted; this is a redundant New / Restated / Status
216                // exec. The strategy already saw OrderAccepted; nothing to emit.
217                return;
218            }
219            state.insert_accepted(client_order_id);
220            let accepted = OrderAccepted::new(
221                trader_id,
222                identity.strategy_id,
223                identity.instrument_id,
224                client_order_id,
225                venue_order_id,
226                account_id,
227                UUID4::new(),
228                ts_event,
229                ts_init,
230                false,
231            );
232            emitter.send_order_event(OrderEventAny::Accepted(accepted));
233        }
234        OrderStatus::Triggered => {
235            // Stop / take-profit transition. Synthesize Accepted first if the
236            // venue compressed placement and trigger into one message.
237            ensure_accepted_emitted(
238                client_order_id,
239                venue_order_id,
240                account_id,
241                identity,
242                state,
243                emitter,
244                ts_event,
245                ts_init,
246            );
247            let triggered = OrderTriggered::new(
248                trader_id,
249                identity.strategy_id,
250                identity.instrument_id,
251                client_order_id,
252                UUID4::new(),
253                ts_event,
254                ts_init,
255                false,
256                Some(venue_order_id),
257                Some(account_id),
258            );
259            emitter.send_order_event(OrderEventAny::Triggered(triggered));
260        }
261        OrderStatus::PartiallyFilled => {
262            // The fill itself is emitted from the trade-side of dispatch via
263            // fill_tracked; nothing to do here.
264        }
265        OrderStatus::Filled
266            // Terminal-fill marker. If the same execution carries fill data
267            // (`exec_id` is present) the fill side runs next and is
268            // responsible for cumulative tracking + cleanup; only do the
269            // cleanup here when this is a status-only Filled marker without
270            // an accompanying fill payload.
271            if !has_fill => {
272                state.insert_filled(client_order_id);
273                state.cleanup_terminal(&client_order_id);
274            }
275        OrderStatus::Canceled => {
276            ensure_accepted_emitted(
277                client_order_id,
278                venue_order_id,
279                account_id,
280                identity,
281                state,
282                emitter,
283                ts_event,
284                ts_init,
285            );
286            let canceled = OrderCanceled::new(
287                trader_id,
288                identity.strategy_id,
289                identity.instrument_id,
290                client_order_id,
291                UUID4::new(),
292                ts_event,
293                ts_init,
294                false,
295                Some(venue_order_id),
296                Some(account_id),
297            );
298            emitter.send_order_event(OrderEventAny::Canceled(canceled));
299            state.cleanup_terminal(&client_order_id);
300        }
301        OrderStatus::Expired => {
302            ensure_accepted_emitted(
303                client_order_id,
304                venue_order_id,
305                account_id,
306                identity,
307                state,
308                emitter,
309                ts_event,
310                ts_init,
311            );
312            let expired = OrderExpired::new(
313                trader_id,
314                identity.strategy_id,
315                identity.instrument_id,
316                client_order_id,
317                UUID4::new(),
318                ts_event,
319                ts_init,
320                false,
321                Some(venue_order_id),
322                Some(account_id),
323            );
324            emitter.send_order_event(OrderEventAny::Expired(expired));
325            state.cleanup_terminal(&client_order_id);
326        }
327        _ => {}
328    }
329}
330
331#[expect(clippy::too_many_arguments)]
332fn fill_tracked(
333    report: &FillReport,
334    client_order_id: ClientOrderId,
335    identity: &OrderIdentity,
336    instrument: &InstrumentAny,
337    state: &WsDispatchState,
338    emitter: &ExecutionEventEmitter,
339    account_id: AccountId,
340    ts_init: UnixNanos,
341) {
342    if state.check_and_insert_trade(report.trade_id) {
343        log::debug!(
344            "Skipping duplicate spot fill for {client_order_id}: trade_id={}",
345            report.trade_id
346        );
347        return;
348    }
349
350    ensure_accepted_emitted(
351        client_order_id,
352        report.venue_order_id,
353        account_id,
354        identity,
355        state,
356        emitter,
357        report.ts_event,
358        ts_init,
359    );
360
361    let filled = fill_report_to_order_filled(
362        report,
363        emitter.trader_id(),
364        identity,
365        instrument.quote_currency(),
366        client_order_id,
367    );
368    emitter.send_order_event(OrderEventAny::Filled(filled));
369
370    let previous = state
371        .previous_filled_qty(&client_order_id)
372        .unwrap_or_else(|| Quantity::zero(instrument.size_precision()));
373    let cumulative = previous + report.last_qty;
374    state.record_filled_qty(client_order_id, cumulative);
375
376    if cumulative >= identity.quantity {
377        state.insert_filled(client_order_id);
378        state.cleanup_terminal(&client_order_id);
379    }
380}
381
382/// Returns true when this spot execution carries a terminal status that
383/// should remove the order from dispatch state.
384#[must_use]
385pub fn is_terminal_exec_type(exec_type: KrakenExecType) -> bool {
386    matches!(
387        exec_type,
388        KrakenExecType::Filled | KrakenExecType::Canceled | KrakenExecType::Expired
389    )
390}
391
392#[cfg(test)]
393mod tests {
394    use rstest::rstest;
395
396    use super::*;
397
398    #[rstest]
399    #[case::filled(KrakenExecType::Filled, true)]
400    #[case::canceled(KrakenExecType::Canceled, true)]
401    #[case::expired(KrakenExecType::Expired, true)]
402    #[case::new(KrakenExecType::New, false)]
403    #[case::trade(KrakenExecType::Trade, false)]
404    #[case::pending_new(KrakenExecType::PendingNew, false)]
405    fn test_is_terminal_exec_type(#[case] exec_type: KrakenExecType, #[case] expected: bool) {
406        assert_eq!(is_terminal_exec_type(exec_type), expected);
407    }
408}