1use std::sync::Arc;
24
25use nautilus_core::{AtomicMap, UUID4, UnixNanos};
26use nautilus_live::ExecutionEventEmitter;
27use nautilus_model::{
28 enums::OrderStatus,
29 events::{
30 OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderTriggered, OrderUpdated,
31 },
32 identifiers::{AccountId, ClientOrderId, InstrumentId},
33 instruments::{Instrument, InstrumentAny},
34 reports::{FillReport, OrderStatusReport},
35 types::Quantity,
36};
37
38use super::{
39 OrderIdentity, WsDispatchState, ensure_accepted_emitted, fill_report_to_order_filled,
40 lookup_instrument, resolve_client_order_id,
41};
42use crate::websocket::spot_v2::{
43 enums::KrakenExecType,
44 messages::KrakenWsExecutionData,
45 parse::{parse_ws_fill_report, parse_ws_order_status_report},
46};
47
48#[expect(clippy::too_many_arguments)]
50pub fn execution(
51 exec: &KrakenWsExecutionData,
52 state: &WsDispatchState,
53 emitter: &ExecutionEventEmitter,
54 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
55 truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
56 order_qty_cache: &Arc<AtomicMap<String, f64>>,
57 account_id: AccountId,
58 ts_init: UnixNanos,
59) {
60 let symbol = match &exec.symbol {
61 Some(s) => s.as_str(),
62 None => {
63 log::debug!(
64 "Execution message without symbol: exec_type={:?}, order_id={}",
65 exec.exec_type,
66 exec.order_id
67 );
68 return;
69 }
70 };
71 let Some(instrument) = lookup_instrument(instruments, symbol) else {
72 log::warn!("No instrument for symbol: {symbol}");
73 return;
74 };
75
76 let cached_qty = exec
79 .cl_ord_id
80 .as_ref()
81 .and_then(|id| order_qty_cache.load().get(id).copied());
82 if let (Some(qty), Some(cl_ord_id)) = (exec.order_qty, &exec.cl_ord_id) {
83 order_qty_cache.insert(cl_ord_id.clone(), qty);
84 }
85
86 let resolved_id = exec
87 .cl_ord_id
88 .as_ref()
89 .map(|id| resolve_client_order_id(id, truncated_id_map));
90
91 if let Some(cid) = resolved_id
94 && state.filled_orders.contains(&cid)
95 {
96 log::debug!(
97 "Skipping stale spot execution for filled order: cid={cid}, order_id={}",
98 exec.order_id,
99 );
100 return;
101 }
102
103 let identity = resolved_id.and_then(|cid| state.lookup_identity(&cid));
104
105 match parse_ws_order_status_report(exec, &instrument, account_id, cached_qty, ts_init) {
107 Ok(mut report) => {
108 if let Some(cid) = resolved_id {
109 report = report.with_client_order_id(cid);
110 }
111
112 if let (Some(client_order_id), Some(identity)) = (resolved_id, identity.as_ref()) {
113 status_tracked(
114 &report,
115 exec.exec_type,
116 exec.exec_id.is_some(),
117 client_order_id,
118 identity,
119 state,
120 emitter,
121 account_id,
122 ts_init,
123 );
124 } else {
125 emitter.send_order_status_report(report);
126 }
127 }
128 Err(e) => log::error!("Failed to parse order status report: {e}"),
129 }
130
131 if exec.exec_id.is_some() {
133 match parse_ws_fill_report(exec, &instrument, account_id, ts_init) {
134 Ok(mut report) => {
135 if let Some(cid) = resolved_id {
136 report.client_order_id = Some(cid);
137 }
138
139 if let (Some(client_order_id), Some(identity)) = (resolved_id, identity.as_ref()) {
140 fill_tracked(
141 &report,
142 client_order_id,
143 identity,
144 &instrument,
145 state,
146 emitter,
147 account_id,
148 ts_init,
149 );
150 } else {
151 if state.check_and_insert_trade(report.trade_id) {
152 log::debug!(
153 "Skipping duplicate external spot fill: trade_id={}",
154 report.trade_id
155 );
156 return;
157 }
158 emitter.send_fill_report(report);
159 }
160 }
161 Err(e) => log::error!("Failed to parse fill report: {e}"),
162 }
163 }
164}
165
166#[expect(clippy::too_many_arguments)]
167fn status_tracked(
168 report: &OrderStatusReport,
169 exec_type: KrakenExecType,
170 has_fill: bool,
171 client_order_id: ClientOrderId,
172 identity: &OrderIdentity,
173 state: &WsDispatchState,
174 emitter: &ExecutionEventEmitter,
175 account_id: AccountId,
176 ts_init: UnixNanos,
177) {
178 let venue_order_id = report.venue_order_id;
179 let ts_event = report.ts_last;
180 let trader_id = emitter.trader_id();
181
182 if matches!(
186 exec_type,
187 KrakenExecType::Amended | KrakenExecType::Restated
188 ) && state.emitted_accepted.contains(&client_order_id)
189 {
190 state.update_identity_quantity(&client_order_id, report.quantity);
191 let updated = OrderUpdated::new(
192 trader_id,
193 identity.strategy_id,
194 identity.instrument_id,
195 client_order_id,
196 report.quantity,
197 UUID4::new(),
198 ts_event,
199 ts_init,
200 false,
201 Some(venue_order_id),
202 Some(account_id),
203 report.price,
204 report.trigger_price,
205 None,
206 false,
207 );
208 emitter.send_order_event(OrderEventAny::Updated(updated));
209 return;
210 }
211
212 match report.order_status {
213 OrderStatus::Accepted => {
214 if state.emitted_accepted.contains(&client_order_id) {
215 return;
218 }
219 state.insert_accepted(client_order_id);
220 let accepted = OrderAccepted::new(
221 trader_id,
222 identity.strategy_id,
223 identity.instrument_id,
224 client_order_id,
225 venue_order_id,
226 account_id,
227 UUID4::new(),
228 ts_event,
229 ts_init,
230 false,
231 );
232 emitter.send_order_event(OrderEventAny::Accepted(accepted));
233 }
234 OrderStatus::Triggered => {
235 ensure_accepted_emitted(
238 client_order_id,
239 venue_order_id,
240 account_id,
241 identity,
242 state,
243 emitter,
244 ts_event,
245 ts_init,
246 );
247 let triggered = OrderTriggered::new(
248 trader_id,
249 identity.strategy_id,
250 identity.instrument_id,
251 client_order_id,
252 UUID4::new(),
253 ts_event,
254 ts_init,
255 false,
256 Some(venue_order_id),
257 Some(account_id),
258 );
259 emitter.send_order_event(OrderEventAny::Triggered(triggered));
260 }
261 OrderStatus::PartiallyFilled => {
262 }
265 OrderStatus::Filled
266 if !has_fill => {
272 state.insert_filled(client_order_id);
273 state.cleanup_terminal(&client_order_id);
274 }
275 OrderStatus::Canceled => {
276 ensure_accepted_emitted(
277 client_order_id,
278 venue_order_id,
279 account_id,
280 identity,
281 state,
282 emitter,
283 ts_event,
284 ts_init,
285 );
286 let canceled = OrderCanceled::new(
287 trader_id,
288 identity.strategy_id,
289 identity.instrument_id,
290 client_order_id,
291 UUID4::new(),
292 ts_event,
293 ts_init,
294 false,
295 Some(venue_order_id),
296 Some(account_id),
297 );
298 emitter.send_order_event(OrderEventAny::Canceled(canceled));
299 state.cleanup_terminal(&client_order_id);
300 }
301 OrderStatus::Expired => {
302 ensure_accepted_emitted(
303 client_order_id,
304 venue_order_id,
305 account_id,
306 identity,
307 state,
308 emitter,
309 ts_event,
310 ts_init,
311 );
312 let expired = OrderExpired::new(
313 trader_id,
314 identity.strategy_id,
315 identity.instrument_id,
316 client_order_id,
317 UUID4::new(),
318 ts_event,
319 ts_init,
320 false,
321 Some(venue_order_id),
322 Some(account_id),
323 );
324 emitter.send_order_event(OrderEventAny::Expired(expired));
325 state.cleanup_terminal(&client_order_id);
326 }
327 _ => {}
328 }
329}
330
331#[expect(clippy::too_many_arguments)]
332fn fill_tracked(
333 report: &FillReport,
334 client_order_id: ClientOrderId,
335 identity: &OrderIdentity,
336 instrument: &InstrumentAny,
337 state: &WsDispatchState,
338 emitter: &ExecutionEventEmitter,
339 account_id: AccountId,
340 ts_init: UnixNanos,
341) {
342 if state.check_and_insert_trade(report.trade_id) {
343 log::debug!(
344 "Skipping duplicate spot fill for {client_order_id}: trade_id={}",
345 report.trade_id
346 );
347 return;
348 }
349
350 ensure_accepted_emitted(
351 client_order_id,
352 report.venue_order_id,
353 account_id,
354 identity,
355 state,
356 emitter,
357 report.ts_event,
358 ts_init,
359 );
360
361 let filled = fill_report_to_order_filled(
362 report,
363 emitter.trader_id(),
364 identity,
365 instrument.quote_currency(),
366 client_order_id,
367 );
368 emitter.send_order_event(OrderEventAny::Filled(filled));
369
370 let previous = state
371 .previous_filled_qty(&client_order_id)
372 .unwrap_or_else(|| Quantity::zero(instrument.size_precision()));
373 let cumulative = previous + report.last_qty;
374 state.record_filled_qty(client_order_id, cumulative);
375
376 if cumulative >= identity.quantity {
377 state.insert_filled(client_order_id);
378 state.cleanup_terminal(&client_order_id);
379 }
380}
381
382#[must_use]
385pub fn is_terminal_exec_type(exec_type: KrakenExecType) -> bool {
386 matches!(
387 exec_type,
388 KrakenExecType::Filled | KrakenExecType::Canceled | KrakenExecType::Expired
389 )
390}
391
392#[cfg(test)]
393mod tests {
394 use rstest::rstest;
395
396 use super::*;
397
398 #[rstest]
399 #[case::filled(KrakenExecType::Filled, true)]
400 #[case::canceled(KrakenExecType::Canceled, true)]
401 #[case::expired(KrakenExecType::Expired, true)]
402 #[case::new(KrakenExecType::New, false)]
403 #[case::trade(KrakenExecType::Trade, false)]
404 #[case::pending_new(KrakenExecType::PendingNew, false)]
405 fn test_is_terminal_exec_type(#[case] exec_type: KrakenExecType, #[case] expected: bool) {
406 assert_eq!(is_terminal_exec_type(exec_type), expected);
407 }
408}