1use std::sync::Arc;
23
24use nautilus_core::{AtomicMap, UUID4, UnixNanos};
25use nautilus_live::ExecutionEventEmitter;
26use nautilus_model::{
27 enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
28 events::{OrderCanceled, OrderEventAny, OrderUpdated},
29 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
30 instruments::{Instrument, InstrumentAny},
31 reports::OrderStatusReport,
32 types::{Price, Quantity},
33};
34
35use super::{
36 DeltaSnapshot, OrderIdentity, WsDispatchState, ensure_accepted_emitted,
37 fill_report_to_order_filled, lookup_instrument, resolve_client_order_id,
38};
39use crate::websocket::futures::{
40 messages::{
41 KrakenFuturesFill, KrakenFuturesFillsDelta, KrakenFuturesOpenOrdersCancel,
42 KrakenFuturesOpenOrdersDelta,
43 },
44 parse::{parse_futures_ws_fill_report, parse_futures_ws_order_status_report},
45};
46
47#[expect(clippy::too_many_arguments)]
54pub fn open_orders_delta(
55 delta: &KrakenFuturesOpenOrdersDelta,
56 state: &WsDispatchState,
57 emitter: &ExecutionEventEmitter,
58 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
59 truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
60 order_instrument_map: &Arc<AtomicMap<String, InstrumentId>>,
61 venue_client_map: &Arc<AtomicMap<String, ClientOrderId>>,
62 venue_order_qty: &Arc<AtomicMap<String, Quantity>>,
63 account_id: AccountId,
64 ts_init: UnixNanos,
65) {
66 if delta.is_fill_driven_cancel() {
67 log::debug!(
68 "Skipping fill-driven open_orders delta: order_id={}, reason={:?}",
69 delta.order.order_id,
70 delta.reason,
71 );
72 return;
73 }
74
75 let product_id = delta.order.instrument.as_str();
76 let Some(instrument) = lookup_instrument(instruments, product_id) else {
77 log::warn!("No instrument for product_id: {product_id}");
78 return;
79 };
80
81 order_instrument_map.insert(delta.order.order_id.clone(), instrument.id());
85 let qty = Quantity::new(delta.order.qty, instrument.size_precision());
86 venue_order_qty.insert(delta.order.order_id.clone(), qty);
87
88 let resolved_id = delta
89 .order
90 .cli_ord_id
91 .as_ref()
92 .map(|id| resolve_client_order_id(id, truncated_id_map));
93
94 if let Some(cid) = resolved_id
99 && state.filled_orders.contains(&cid)
100 {
101 log::debug!(
102 "Skipping stale open_orders delta for filled order: cid={cid}, order_id={}",
103 delta.order.order_id,
104 );
105 return;
106 }
107
108 if let Some(client_order_id) = resolved_id {
109 venue_client_map.insert(delta.order.order_id.clone(), client_order_id);
110
111 if let Some(identity) = state.lookup_identity(&client_order_id) {
112 delta_tracked(
113 delta,
114 client_order_id,
115 &identity,
116 &instrument,
117 state,
118 emitter,
119 account_id,
120 ts_init,
121 );
122 return;
123 }
124 }
125
126 match parse_futures_ws_order_status_report(
128 &delta.order,
129 delta.is_cancel,
130 delta.reason.as_deref(),
131 &instrument,
132 account_id,
133 ts_init,
134 ) {
135 Ok(mut report) => {
136 if let Some(cid) = resolved_id {
137 report = report.with_client_order_id(cid);
138 }
139 emitter.send_order_status_report(report);
140 }
141 Err(e) => log::error!("Failed to parse futures order status report: {e}"),
142 }
143}
144
145#[expect(clippy::too_many_arguments)]
146fn delta_tracked(
147 delta: &KrakenFuturesOpenOrdersDelta,
148 client_order_id: ClientOrderId,
149 identity: &OrderIdentity,
150 instrument: &InstrumentAny,
151 state: &WsDispatchState,
152 emitter: &ExecutionEventEmitter,
153 account_id: AccountId,
154 ts_init: UnixNanos,
155) {
156 let venue_order_id = VenueOrderId::new(&delta.order.order_id);
157 let ts_event = millis_to_nanos(delta.order.last_update_time);
158 let new_filled = Quantity::new(delta.order.filled, instrument.size_precision());
159
160 if delta.is_cancel {
161 ensure_accepted_emitted(
162 client_order_id,
163 venue_order_id,
164 account_id,
165 identity,
166 state,
167 emitter,
168 ts_event,
169 ts_init,
170 );
171 let canceled = OrderCanceled::new(
172 emitter.trader_id(),
173 identity.strategy_id,
174 identity.instrument_id,
175 client_order_id,
176 UUID4::new(),
177 ts_event,
178 ts_init,
179 false,
180 Some(venue_order_id),
181 Some(account_id),
182 );
183 emitter.send_order_event(OrderEventAny::Canceled(canceled));
184 state.cleanup_terminal(&client_order_id);
185 return;
186 }
187
188 let already_accepted = state.emitted_accepted.contains(&client_order_id);
189 ensure_accepted_emitted(
190 client_order_id,
191 venue_order_id,
192 account_id,
193 identity,
194 state,
195 emitter,
196 ts_event,
197 ts_init,
198 );
199
200 let qty = Quantity::new(delta.order.qty, instrument.size_precision());
201 let snapshot = DeltaSnapshot::new(
202 qty,
203 new_filled,
204 delta.order.limit_price,
205 delta.order.stop_price,
206 );
207
208 if !already_accepted {
209 state.record_delta_snapshot(client_order_id, snapshot);
211 return;
212 }
213
214 let previous = state.previous_delta_snapshot(&client_order_id);
221 state.record_delta_snapshot(client_order_id, snapshot);
222
223 let non_fill_changed = previous.is_some_and(|prev| !snapshot.non_fill_fields_match(&prev));
224 if !non_fill_changed {
225 return;
226 }
227
228 state.update_identity_quantity(&client_order_id, qty);
231 let updated = OrderUpdated::new(
232 emitter.trader_id(),
233 identity.strategy_id,
234 identity.instrument_id,
235 client_order_id,
236 qty,
237 UUID4::new(),
238 ts_event,
239 ts_init,
240 false,
241 Some(venue_order_id),
242 Some(account_id),
243 delta
244 .order
245 .limit_price
246 .map(|p| Price::new(p, instrument.price_precision())),
247 delta
248 .order
249 .stop_price
250 .map(|p| Price::new(p, instrument.price_precision())),
251 None,
252 false,
253 );
254 emitter.send_order_event(OrderEventAny::Updated(updated));
255}
256
257#[expect(clippy::too_many_arguments)]
259pub fn open_orders_cancel(
260 cancel: &KrakenFuturesOpenOrdersCancel,
261 state: &WsDispatchState,
262 emitter: &ExecutionEventEmitter,
263 truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
264 order_instrument_map: &Arc<AtomicMap<String, InstrumentId>>,
265 venue_client_map: &Arc<AtomicMap<String, ClientOrderId>>,
266 venue_order_qty: &Arc<AtomicMap<String, Quantity>>,
267 account_id: AccountId,
268 ts_init: UnixNanos,
269) {
270 if let Some(ref reason) = cancel.reason
272 && (reason == "full_fill" || reason == "partial_fill")
273 {
274 log::debug!(
275 "Skipping fill-driven cancel: order_id={}, reason={reason}",
276 cancel.order_id,
277 );
278 return;
279 }
280
281 let venue_order_id = VenueOrderId::new(&cancel.order_id);
282 let resolved_id = cancel
283 .cli_ord_id
284 .as_ref()
285 .map(|id| resolve_client_order_id(id, truncated_id_map))
286 .or_else(|| venue_client_map.load().get(&cancel.order_id).copied());
287
288 if let Some(client_order_id) = resolved_id
289 && let Some(identity) = state.lookup_identity(&client_order_id)
290 {
291 let ts_event = ts_init;
292 ensure_accepted_emitted(
293 client_order_id,
294 venue_order_id,
295 account_id,
296 &identity,
297 state,
298 emitter,
299 ts_event,
300 ts_init,
301 );
302 let canceled = OrderCanceled::new(
303 emitter.trader_id(),
304 identity.strategy_id,
305 identity.instrument_id,
306 client_order_id,
307 UUID4::new(),
308 ts_event,
309 ts_init,
310 false,
311 Some(venue_order_id),
312 Some(account_id),
313 );
314 emitter.send_order_event(OrderEventAny::Canceled(canceled));
315 state.cleanup_terminal(&client_order_id);
316 return;
317 }
318
319 let Some(instrument_id) = order_instrument_map.load().get(&cancel.order_id).copied() else {
321 log::warn!(
322 "Cannot resolve instrument for cancel: order_id={}, \
323 order not seen in previous delta",
324 cancel.order_id
325 );
326 return;
327 };
328
329 let Some(quantity) = venue_order_qty.load().get(&cancel.order_id).copied() else {
330 log::warn!(
331 "Cannot resolve quantity for cancel: order_id={}, skipping",
332 cancel.order_id
333 );
334 return;
335 };
336
337 let report = OrderStatusReport::new(
338 account_id,
339 instrument_id,
340 resolved_id,
341 venue_order_id,
342 OrderSide::NoOrderSide,
343 OrderType::Limit,
344 TimeInForce::Gtc,
345 OrderStatus::Canceled,
346 quantity,
347 Quantity::zero(0),
348 ts_init,
349 ts_init,
350 ts_init,
351 None,
352 );
353 let report = if let Some(ref reason) = cancel.reason
354 && !reason.is_empty()
355 {
356 report.with_cancel_reason(reason.clone())
357 } else {
358 report
359 };
360 emitter.send_order_status_report(report);
361}
362
363#[expect(clippy::too_many_arguments)]
365pub fn fills_delta(
366 fills_delta: &KrakenFuturesFillsDelta,
367 state: &WsDispatchState,
368 emitter: &ExecutionEventEmitter,
369 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
370 truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
371 venue_client_map: &Arc<AtomicMap<String, ClientOrderId>>,
372 account_id: AccountId,
373 ts_init: UnixNanos,
374) {
375 for fill in &fills_delta.fills {
376 single_fill(
377 fill,
378 state,
379 emitter,
380 instruments,
381 truncated_id_map,
382 venue_client_map,
383 account_id,
384 ts_init,
385 );
386 }
387}
388
389#[expect(clippy::too_many_arguments)]
390fn single_fill(
391 fill: &KrakenFuturesFill,
392 state: &WsDispatchState,
393 emitter: &ExecutionEventEmitter,
394 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
395 truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
396 venue_client_map: &Arc<AtomicMap<String, ClientOrderId>>,
397 account_id: AccountId,
398 ts_init: UnixNanos,
399) {
400 let product_id = match &fill.instrument {
401 Some(id) => id.as_str(),
402 None => {
403 log::warn!("Fill missing instrument field: fill_id={}", fill.fill_id);
404 return;
405 }
406 };
407
408 let Some(instrument) = lookup_instrument(instruments, product_id) else {
409 log::warn!("No instrument for product_id: {product_id}");
410 return;
411 };
412
413 let mut report = match parse_futures_ws_fill_report(fill, &instrument, account_id, ts_init) {
414 Ok(report) => report,
415 Err(e) => {
416 log::error!("Failed to parse futures fill report: {e}");
417 return;
418 }
419 };
420
421 let resolved_id = fill
422 .cli_ord_id
423 .as_deref()
424 .filter(|s| !s.is_empty())
425 .map(|id| resolve_client_order_id(id, truncated_id_map))
426 .or_else(|| venue_client_map.load().get(&fill.order_id).copied());
427
428 if let Some(cid) = resolved_id
429 && state.filled_orders.contains(&cid)
430 {
431 log::debug!(
432 "Skipping stale fill for filled order: cid={cid}, order_id={}",
433 fill.order_id,
434 );
435 return;
436 }
437
438 if let Some(client_order_id) = resolved_id {
439 report.client_order_id = Some(client_order_id);
440
441 if let Some(identity) = state.lookup_identity(&client_order_id) {
442 if state.check_and_insert_trade(report.trade_id) {
443 log::debug!(
444 "Skipping duplicate fill for {client_order_id}: trade_id={}",
445 report.trade_id
446 );
447 return;
448 }
449 ensure_accepted_emitted(
450 client_order_id,
451 report.venue_order_id,
452 account_id,
453 &identity,
454 state,
455 emitter,
456 report.ts_event,
457 ts_init,
458 );
459 let filled = fill_report_to_order_filled(
460 &report,
461 emitter.trader_id(),
462 &identity,
463 instrument.quote_currency(),
464 client_order_id,
465 );
466 emitter.send_order_event(OrderEventAny::Filled(filled));
467
468 let previous = state
470 .previous_filled_qty(&client_order_id)
471 .unwrap_or_else(|| Quantity::zero(instrument.size_precision()));
472 let cumulative = previous + report.last_qty;
473 state.record_filled_qty(client_order_id, cumulative);
474
475 if cumulative >= identity.quantity {
476 state.insert_filled(client_order_id);
477 state.cleanup_terminal(&client_order_id);
478 }
479 return;
480 }
481 }
482
483 if state.check_and_insert_trade(report.trade_id) {
485 log::debug!(
486 "Skipping duplicate external fill: trade_id={}",
487 report.trade_id
488 );
489 return;
490 }
491 emitter.send_fill_report(report);
492}
493
494#[inline]
495fn millis_to_nanos(millis: i64) -> UnixNanos {
496 UnixNanos::from((millis as u64) * 1_000_000)
497}