1use std::sync::Mutex;
25
26use nautilus_common::cache::fifo::{FifoCache, FifoCacheMap};
27use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos, collections::AtomicMap, time::AtomicTime};
28use nautilus_live::ExecutionEventEmitter;
29use nautilus_model::{
30 enums::{LiquiditySide, OrderSide, OrderStatus, OrderType, TimeInForce},
31 identifiers::{AccountId, VenueOrderId},
32 instruments::{Instrument, InstrumentAny},
33 reports::{FillReport, OrderStatusReport},
34 types::{Money, Price, Quantity},
35};
36use rust_decimal::Decimal;
37use ustr::Ustr;
38
39use super::{
40 messages::{PolymarketUserOrder, PolymarketUserTrade, UserWsMessage},
41 parse::parse_timestamp_ms,
42};
43use crate::{
44 common::enums::{PolymarketLiquiditySide, PolymarketOrderStatus},
45 execution::{
46 order_fill_tracker::OrderFillTrackerMap,
47 parse::{
48 build_maker_fill_report, compute_commission, determine_order_side,
49 instrument_taker_fee, make_composite_trade_id, parse_liquidity_side,
50 },
51 },
52};
53
54#[derive(Debug)]
56pub(crate) struct AccountRefreshRequest;
57
58#[derive(Debug, Default)]
60pub(crate) struct WsDispatchState {
61 pub processed_fills: FifoCache<String, 10_000>,
62 terminal_cancel_reports: FifoCacheMap<VenueOrderId, OrderStatusReport, 10_000>,
66}
67
68#[derive(Debug)]
70pub(crate) struct WsDispatchContext<'a> {
71 pub token_instruments: &'a AtomicMap<Ustr, InstrumentAny>,
72 pub fill_tracker: &'a OrderFillTrackerMap,
73 pub pending_fills: &'a Mutex<FifoCacheMap<VenueOrderId, Vec<FillReport>, 1_000>>,
74 pub pending_order_reports: &'a Mutex<FifoCacheMap<VenueOrderId, Vec<OrderStatusReport>, 1_000>>,
75 pub emitter: &'a ExecutionEventEmitter,
76 pub account_id: AccountId,
77 pub clock: &'static AtomicTime,
78 pub user_address: &'a str,
79 pub user_api_key: &'a str,
80}
81
82pub(crate) fn dispatch_user_message(
84 message: &UserWsMessage,
85 ctx: &WsDispatchContext<'_>,
86 state: &mut WsDispatchState,
87) -> Option<AccountRefreshRequest> {
88 match message {
89 UserWsMessage::Order(order) => {
90 dispatch_order_update(order, ctx, state);
91 None
92 }
93 UserWsMessage::Trade(trade) => dispatch_trade_update(trade, ctx, state),
94 }
95}
96
97fn dispatch_order_update(
98 order: &PolymarketUserOrder,
99 ctx: &WsDispatchContext<'_>,
100 state: &mut WsDispatchState,
101) {
102 let instrument = match ctx.token_instruments.get_cloned(&order.asset_id) {
103 Some(i) => i,
104 None => {
105 log::warn!("Unknown asset_id in order update: {}", order.asset_id);
106 return;
107 }
108 };
109
110 let ts_event = parse_timestamp_ms(&order.timestamp).unwrap_or_else(|_| ctx.clock.get_time_ns());
111 let venue_order_id = VenueOrderId::from(order.id.as_str());
112
113 let ts_init = ctx.clock.get_time_ns();
114 let mut report =
115 build_ws_order_status_report(order, &instrument, ctx.account_id, ts_event, ts_init);
116 let is_accepted = ctx.fill_tracker.contains(&venue_order_id);
117
118 if let Some(tracked_filled) = ctx.fill_tracker.get_cumulative_filled(&venue_order_id) {
121 let tracked_qty = Quantity::new(tracked_filled, instrument.size_precision());
122 if report.filled_qty > tracked_qty {
123 log::debug!(
124 "Capping filled_qty for {venue_order_id} from {} to {} (awaiting trade messages)",
125 report.filled_qty,
126 tracked_qty,
127 );
128 report.filled_qty = tracked_qty;
129 }
130 }
131
132 if report.order_status == OrderStatus::Canceled {
136 state
137 .terminal_cancel_reports
138 .insert(venue_order_id, report.clone());
139 }
140
141 emit_or_buffer_order_report(
142 report,
143 venue_order_id,
144 is_accepted,
145 ctx.emitter,
146 ctx.pending_order_reports,
147 );
148
149 if order.status == PolymarketOrderStatus::Matched {
151 let price = Price::new(
152 order.price.parse::<f64>().unwrap_or(0.0),
153 instrument.price_precision(),
154 );
155
156 if let Some(dust_fill) = ctx.fill_tracker.check_dust_and_build_fill(
157 &venue_order_id,
158 ctx.account_id,
159 &order.id,
160 price.as_f64(),
161 crate::execution::get_pusd_currency(),
162 ts_event,
163 ts_init,
164 ) {
165 emit_or_buffer_fill_report(
166 dust_fill,
167 venue_order_id,
168 is_accepted,
169 ctx.emitter,
170 ctx.pending_fills,
171 );
172 }
173 }
174}
175
176fn dispatch_trade_update(
177 trade: &PolymarketUserTrade,
178 ctx: &WsDispatchContext<'_>,
179 state: &mut WsDispatchState,
180) -> Option<AccountRefreshRequest> {
181 if !trade.status.is_finalized()
182 && !matches!(
183 trade.status,
184 crate::common::enums::PolymarketTradeStatus::Matched
185 )
186 {
187 log::debug!(
188 "Skipping trade with status {:?}: {}",
189 trade.status,
190 trade.id
191 );
192 return None;
193 }
194
195 let dedup_key = format!("{}-{}", trade.id, trade.taker_order_id);
196 let is_duplicate = state.processed_fills.contains(&dedup_key);
197
198 let needs_refresh = trade.status.is_finalized();
199
200 if is_duplicate {
201 log::debug!("Duplicate fill skipped: {dedup_key}");
202 return if needs_refresh {
203 Some(AccountRefreshRequest)
204 } else {
205 None
206 };
207 }
208 state.processed_fills.add(dedup_key);
209
210 let is_maker = trade.trader_side == PolymarketLiquiditySide::Maker;
211 let liquidity_side = parse_liquidity_side(trade.trader_side);
212 let ts_event = parse_timestamp_ms(&trade.timestamp).unwrap_or_else(|_| ctx.clock.get_time_ns());
213 let ts_init = ctx.clock.get_time_ns();
214
215 if is_maker {
216 dispatch_maker_fills(trade, ctx, state, liquidity_side, ts_event, ts_init);
217 } else {
218 dispatch_taker_fill(trade, ctx, state, liquidity_side, ts_event, ts_init);
219 }
220
221 if needs_refresh {
222 Some(AccountRefreshRequest)
223 } else {
224 None
225 }
226}
227
228fn dispatch_maker_fills(
229 trade: &PolymarketUserTrade,
230 ctx: &WsDispatchContext<'_>,
231 state: &WsDispatchState,
232 liquidity_side: LiquiditySide,
233 ts_event: UnixNanos,
234 ts_init: UnixNanos,
235) {
236 let user_orders: Vec<_> = trade
237 .maker_orders
238 .iter()
239 .filter(|mo| mo.maker_address == ctx.user_address || mo.owner == ctx.user_api_key)
240 .collect();
241
242 if user_orders.is_empty() {
243 log::warn!("No matching maker orders for user in trade: {}", trade.id);
244 return;
245 }
246
247 for mo in user_orders {
248 let asset_id = Ustr::from(mo.asset_id.as_str());
249 let instrument = match ctx.token_instruments.get_cloned(&asset_id) {
250 Some(i) => i,
251 None => {
252 log::warn!("Unknown asset_id in maker order: {asset_id}");
253 continue;
254 }
255 };
256 let mut report = build_maker_fill_report(
257 mo,
258 &trade.id,
259 trade.trader_side,
260 trade.side,
261 trade.asset_id.as_str(),
262 ctx.account_id,
263 instrument.id(),
264 instrument.price_precision(),
265 instrument.size_precision(),
266 crate::execution::get_pusd_currency(),
267 liquidity_side,
268 ts_event,
269 ts_init,
270 );
271 let maker_venue_order_id = report.venue_order_id;
272 report.last_qty = ctx
273 .fill_tracker
274 .snap_fill_qty(&maker_venue_order_id, report.last_qty);
275 let is_accepted = ctx.fill_tracker.contains(&maker_venue_order_id);
276
277 if is_accepted {
278 ctx.fill_tracker.record_fill(
279 &maker_venue_order_id,
280 report.last_qty.as_f64(),
281 report.last_px.as_f64(),
282 report.ts_event,
283 );
284 }
285
286 emit_or_buffer_fill_report(
287 report,
288 maker_venue_order_id,
289 is_accepted,
290 ctx.emitter,
291 ctx.pending_fills,
292 );
293
294 if is_accepted {
295 reemit_terminal_cancel(maker_venue_order_id, state, ctx.fill_tracker, ctx.emitter);
296 }
297 }
298}
299
300fn dispatch_taker_fill(
301 trade: &PolymarketUserTrade,
302 ctx: &WsDispatchContext<'_>,
303 state: &WsDispatchState,
304 liquidity_side: LiquiditySide,
305 ts_event: UnixNanos,
306 ts_init: UnixNanos,
307) {
308 let instrument = match ctx.token_instruments.get_cloned(&trade.asset_id) {
309 Some(i) => i,
310 None => {
311 log::warn!("Unknown asset_id in trade: {}", trade.asset_id);
312 return;
313 }
314 };
315
316 let venue_order_id = VenueOrderId::from(trade.taker_order_id.as_str());
317
318 let mut report = build_ws_taker_fill_report(
319 trade,
320 &instrument,
321 ctx.account_id,
322 liquidity_side,
323 ts_event,
324 ts_init,
325 );
326 report.last_qty = ctx
327 .fill_tracker
328 .snap_fill_qty(&venue_order_id, report.last_qty);
329
330 let is_accepted = ctx.fill_tracker.contains(&venue_order_id);
331
332 if is_accepted {
333 ctx.fill_tracker.record_fill(
334 &venue_order_id,
335 report.last_qty.as_f64(),
336 report.last_px.as_f64(),
337 report.ts_event,
338 );
339 }
340
341 emit_or_buffer_fill_report(
342 report,
343 venue_order_id,
344 is_accepted,
345 ctx.emitter,
346 ctx.pending_fills,
347 );
348
349 if is_accepted {
350 reemit_terminal_cancel(venue_order_id, state, ctx.fill_tracker, ctx.emitter);
351 }
352}
353
354fn reemit_terminal_cancel(
364 venue_order_id: VenueOrderId,
365 state: &WsDispatchState,
366 fill_tracker: &OrderFillTrackerMap,
367 emitter: &ExecutionEventEmitter,
368) {
369 if fill_tracker.is_fully_filled(&venue_order_id) {
370 return;
371 }
372
373 if let Some(cancel_report) = state.terminal_cancel_reports.get(&venue_order_id) {
374 log::debug!(
375 "Re-emitting cancel report for {venue_order_id} after fill to restore terminal state"
376 );
377 emitter.send_order_status_report(cancel_report.clone());
378 }
379}
380
381fn build_ws_order_status_report(
382 order: &PolymarketUserOrder,
383 instrument: &InstrumentAny,
384 account_id: AccountId,
385 ts_event: UnixNanos,
386 ts_init: UnixNanos,
387) -> OrderStatusReport {
388 let venue_order_id = VenueOrderId::from(order.id.as_str());
389 let order_status =
390 crate::execution::parse::resolve_order_status(order.status, order.event_type);
391 let order_side = OrderSide::from(order.side);
392 let time_in_force = TimeInForce::from(order.order_type);
393 let quantity = Quantity::new(
394 order.original_size.parse::<f64>().unwrap_or(0.0),
395 instrument.size_precision(),
396 );
397 let filled_qty = Quantity::new(
398 order.size_matched.parse::<f64>().unwrap_or(0.0),
399 instrument.size_precision(),
400 );
401 let price = Price::new(
402 order.price.parse::<f64>().unwrap_or(0.0),
403 instrument.price_precision(),
404 );
405
406 let mut report = OrderStatusReport::new(
407 account_id,
408 instrument.id(),
409 None,
410 venue_order_id,
411 order_side,
412 OrderType::Limit,
413 time_in_force,
414 order_status,
415 quantity,
416 filled_qty,
417 ts_event,
418 ts_event,
419 ts_init,
420 None,
421 );
422 report.price = Some(price);
423 report
424}
425
426fn build_ws_taker_fill_report(
427 trade: &PolymarketUserTrade,
428 instrument: &InstrumentAny,
429 account_id: AccountId,
430 liquidity_side: LiquiditySide,
431 ts_event: UnixNanos,
432 ts_init: UnixNanos,
433) -> FillReport {
434 let venue_order_id = VenueOrderId::from(trade.taker_order_id.as_str());
435 let trade_id = make_composite_trade_id(&trade.id, &trade.taker_order_id);
436 let order_side = determine_order_side(
437 trade.trader_side,
438 trade.side,
439 trade.asset_id.as_str(),
440 trade.asset_id.as_str(),
441 );
442
443 let last_qty = Quantity::new(
444 trade.size.parse::<f64>().unwrap_or(0.0),
445 instrument.size_precision(),
446 );
447 let last_px = Price::new(
448 trade.price.parse::<f64>().unwrap_or(0.0),
449 instrument.price_precision(),
450 );
451
452 let fee_rate = instrument_taker_fee(instrument);
453 let size: Decimal = trade.size.parse().unwrap_or_default();
454 let price_dec: Decimal = trade.price.parse().unwrap_or_default();
455 let commission_value = compute_commission(fee_rate, size, price_dec, liquidity_side);
456 let pusd = crate::execution::get_pusd_currency();
457
458 FillReport {
459 account_id,
460 instrument_id: instrument.id(),
461 venue_order_id,
462 trade_id,
463 order_side,
464 last_qty,
465 last_px,
466 commission: Money::new(commission_value, pusd),
467 liquidity_side,
468 avg_px: None,
469 report_id: UUID4::new(),
470 ts_event,
471 ts_init,
472 client_order_id: None,
473 venue_position_id: None,
474 }
475}
476
477fn emit_or_buffer_order_report(
478 report: OrderStatusReport,
479 venue_order_id: VenueOrderId,
480 is_accepted: bool,
481 emitter: &ExecutionEventEmitter,
482 pending: &Mutex<FifoCacheMap<VenueOrderId, Vec<OrderStatusReport>, 1_000>>,
483) {
484 if is_accepted {
485 emitter.send_order_status_report(report);
486 } else {
487 let mut guard = pending.lock().expect(MUTEX_POISONED);
488 if let Some(reports) = guard.get_mut(&venue_order_id) {
489 reports.push(report);
490 } else {
491 guard.insert(venue_order_id, vec![report]);
492 }
493 }
494}
495
496fn emit_or_buffer_fill_report(
497 report: FillReport,
498 venue_order_id: VenueOrderId,
499 is_accepted: bool,
500 emitter: &ExecutionEventEmitter,
501 pending: &Mutex<FifoCacheMap<VenueOrderId, Vec<FillReport>, 1_000>>,
502) {
503 if is_accepted {
504 emitter.send_fill_report(report);
505 } else {
506 let mut guard = pending.lock().expect(MUTEX_POISONED);
507 if let Some(fills) = guard.get_mut(&venue_order_id) {
508 fills.push(report);
509 } else {
510 guard.insert(venue_order_id, vec![report]);
511 }
512 }
513}
514
515#[cfg(test)]
516mod tests {
517 use nautilus_common::messages::{ExecutionEvent, ExecutionReport};
518 use nautilus_core::time::AtomicTime;
519 use nautilus_model::{
520 enums::{AccountType, OrderStatus},
521 identifiers::TraderId,
522 types::Currency,
523 };
524 use rstest::rstest;
525
526 use super::*;
527 use crate::http::{
528 models::GammaMarket,
529 parse::{create_instrument_from_def, parse_gamma_market},
530 };
531
532 fn load<T: serde::de::DeserializeOwned>(filename: &str) -> T {
533 let path = format!("test_data/{filename}");
534 let content = std::fs::read_to_string(path).expect("Failed to read test data");
535 serde_json::from_str(&content).expect("Failed to parse test data")
536 }
537
538 fn test_instrument() -> InstrumentAny {
539 let market: GammaMarket = load("gamma_market.json");
540 let defs = parse_gamma_market(&market).unwrap();
541 create_instrument_from_def(&defs[0], UnixNanos::from(1_000_000_000u64)).unwrap()
542 }
543
544 fn test_emitter() -> ExecutionEventEmitter {
545 ExecutionEventEmitter::new(
546 nautilus_core::time::get_atomic_clock_realtime(),
547 TraderId::from("TESTER-001"),
548 AccountId::from("POLY-001"),
549 AccountType::Cash,
550 Some(Currency::pUSD()),
551 )
552 }
553
554 #[rstest]
555 fn test_build_ws_order_status_report() {
556 let order: PolymarketUserOrder = load("ws_user_order_placement.json");
557 let instrument = test_instrument();
558 let ts_event = UnixNanos::from(1_000_000_000u64);
559 let ts_init = UnixNanos::from(2_000_000_000u64);
560
561 let report = build_ws_order_status_report(
562 &order,
563 &instrument,
564 AccountId::from("POLY-001"),
565 ts_event,
566 ts_init,
567 );
568
569 assert_eq!(report.order_side, OrderSide::Buy);
570 assert_eq!(report.order_type, OrderType::Limit);
571 assert!(report.price.is_some());
572 assert_eq!(report.ts_accepted, ts_event);
573 assert_eq!(report.ts_init, ts_init);
574 }
575
576 #[rstest]
577 fn test_build_ws_order_status_report_venue_cancel_maps_to_canceled() {
578 let order: PolymarketUserOrder = load("ws_user_order_venue_cancel.json");
579 let instrument = test_instrument();
580 let ts_event = UnixNanos::from(1_000_000_000u64);
581 let ts_init = UnixNanos::from(2_000_000_000u64);
582
583 let report = build_ws_order_status_report(
584 &order,
585 &instrument,
586 AccountId::from("POLY-001"),
587 ts_event,
588 ts_init,
589 );
590
591 assert_eq!(report.order_status, OrderStatus::Canceled);
592 }
593
594 #[rstest]
595 fn test_build_ws_taker_fill_report() {
596 let trade: PolymarketUserTrade = load("ws_user_trade.json");
597 let instrument = test_instrument();
598 let ts_event = UnixNanos::from(1_000_000_000u64);
599 let ts_init = UnixNanos::from(2_000_000_000u64);
600
601 let report = build_ws_taker_fill_report(
602 &trade,
603 &instrument,
604 AccountId::from("POLY-001"),
605 LiquiditySide::Taker,
606 ts_event,
607 ts_init,
608 );
609
610 assert_eq!(report.order_side, OrderSide::Buy);
611 assert_eq!(report.liquidity_side, LiquiditySide::Taker);
612 assert_eq!(report.ts_event, ts_event);
613 assert_eq!(report.ts_init, ts_init);
614 }
615
616 #[rstest]
617 fn test_dispatch_order_message_buffers_when_not_accepted() {
618 let order: PolymarketUserOrder = load("ws_user_order_placement.json");
619 let instrument = test_instrument();
620
621 let token_instruments = AtomicMap::new();
622 token_instruments.insert(order.asset_id, instrument);
623
624 let fill_tracker = OrderFillTrackerMap::new();
625 let pending_fills = Mutex::new(FifoCacheMap::default());
626 let pending_order_reports = Mutex::new(FifoCacheMap::default());
627 let emitter = test_emitter();
628
629 let ctx = WsDispatchContext {
630 token_instruments: &token_instruments,
631 fill_tracker: &fill_tracker,
632 pending_fills: &pending_fills,
633 pending_order_reports: &pending_order_reports,
634 emitter: &emitter,
635 account_id: AccountId::from("POLY-001"),
636 clock: nautilus_core::time::get_atomic_clock_realtime(),
637 user_address: "0xtest",
638 user_api_key: "test-key",
639 };
640 let mut state = WsDispatchState::default();
641
642 let result = dispatch_user_message(&UserWsMessage::Order(order.clone()), &ctx, &mut state);
643 assert!(result.is_none());
644
645 let guard = pending_order_reports.lock().unwrap();
647 let venue_order_id = VenueOrderId::from(order.id.as_str());
648 assert!(guard.get(&venue_order_id).is_some());
649 }
650
651 #[rstest]
652 fn test_dispatch_trade_dedup() {
653 let trade: PolymarketUserTrade = load("ws_user_trade.json");
654 let instrument = test_instrument();
655
656 let token_instruments = AtomicMap::new();
657 token_instruments.insert(trade.asset_id, instrument);
658
659 let fill_tracker = OrderFillTrackerMap::new();
660 let pending_fills = Mutex::new(FifoCacheMap::default());
661 let pending_order_reports = Mutex::new(FifoCacheMap::default());
662 let emitter = test_emitter();
663
664 let ctx = WsDispatchContext {
665 token_instruments: &token_instruments,
666 fill_tracker: &fill_tracker,
667 pending_fills: &pending_fills,
668 pending_order_reports: &pending_order_reports,
669 emitter: &emitter,
670 account_id: AccountId::from("POLY-001"),
671 clock: nautilus_core::time::get_atomic_clock_realtime(),
672 user_address: "0xtest",
673 user_api_key: "test-key",
674 };
675 let mut state = WsDispatchState::default();
676
677 let _ = dispatch_user_message(&UserWsMessage::Trade(trade.clone()), &ctx, &mut state);
679 let fills_count = {
680 let guard = pending_fills.lock().unwrap();
681 let venue_order_id = VenueOrderId::from(trade.taker_order_id.as_str());
682 guard.get(&venue_order_id).map_or(0, |v| v.len())
683 };
684 assert_eq!(fills_count, 1);
685
686 let _ = dispatch_user_message(&UserWsMessage::Trade(trade.clone()), &ctx, &mut state);
688 let fills_count_after = {
689 let guard = pending_fills.lock().unwrap();
690 let venue_order_id = VenueOrderId::from(trade.taker_order_id.as_str());
691 guard.get(&venue_order_id).map_or(0, |v| v.len())
692 };
693 assert_eq!(fills_count_after, 1);
694 }
695
696 #[rstest]
697 fn test_dispatch_order_matched_caps_filled_qty_when_no_trades_tracked() {
698 let order: PolymarketUserOrder = load("ws_user_order_matched.json");
699 let instrument = test_instrument();
700
701 let token_instruments = AtomicMap::new();
702 token_instruments.insert(order.asset_id, instrument.clone());
703
704 let fill_tracker = OrderFillTrackerMap::new();
705 let venue_order_id = VenueOrderId::from(order.id.as_str());
706
707 fill_tracker.register(
709 venue_order_id,
710 Quantity::from("100"),
711 OrderSide::Buy,
712 instrument.id(),
713 instrument.size_precision(),
714 instrument.price_precision(),
715 );
716
717 let pending_fills = Mutex::new(FifoCacheMap::default());
718 let pending_order_reports = Mutex::new(FifoCacheMap::default());
719 let mut emitter = test_emitter();
720 let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
721 emitter.set_sender(sender);
722
723 let ctx = WsDispatchContext {
724 token_instruments: &token_instruments,
725 fill_tracker: &fill_tracker,
726 pending_fills: &pending_fills,
727 pending_order_reports: &pending_order_reports,
728 emitter: &emitter,
729 account_id: AccountId::from("POLY-001"),
730 clock: nautilus_core::time::get_atomic_clock_realtime(),
731 user_address: "0xtest",
732 user_api_key: "test-key",
733 };
734 let mut state = WsDispatchState::default();
735
736 dispatch_user_message(&UserWsMessage::Order(order), &ctx, &mut state);
737
738 let event = receiver.try_recv().expect("Expected report");
739 match event {
740 ExecutionEvent::Report(report) => match report {
741 ExecutionReport::Order(order_report) => {
742 assert_eq!(order_report.filled_qty, Quantity::from("0"));
743 }
744 other => panic!("Expected order report, was {other:?}"),
745 },
746 other => panic!("Expected report event, was {other:?}"),
747 }
748 }
749
750 #[rstest]
751 fn test_dispatch_order_matched_uses_tracked_fills_for_filled_qty() {
752 let order: PolymarketUserOrder = load("ws_user_order_matched.json");
753 let instrument = test_instrument();
754
755 let token_instruments = AtomicMap::new();
756 token_instruments.insert(order.asset_id, instrument.clone());
757
758 let fill_tracker = OrderFillTrackerMap::new();
759 let venue_order_id = VenueOrderId::from(order.id.as_str());
760
761 fill_tracker.register(
763 venue_order_id,
764 Quantity::from("100"),
765 OrderSide::Buy,
766 instrument.id(),
767 instrument.size_precision(),
768 instrument.price_precision(),
769 );
770 fill_tracker.record_fill(&venue_order_id, 50.0, 0.5, UnixNanos::from(1_000u64));
771
772 let pending_fills = Mutex::new(FifoCacheMap::default());
773 let pending_order_reports = Mutex::new(FifoCacheMap::default());
774 let mut emitter = test_emitter();
775 let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
776 emitter.set_sender(sender);
777
778 let ctx = WsDispatchContext {
779 token_instruments: &token_instruments,
780 fill_tracker: &fill_tracker,
781 pending_fills: &pending_fills,
782 pending_order_reports: &pending_order_reports,
783 emitter: &emitter,
784 account_id: AccountId::from("POLY-001"),
785 clock: nautilus_core::time::get_atomic_clock_realtime(),
786 user_address: "0xtest",
787 user_api_key: "test-key",
788 };
789 let mut state = WsDispatchState::default();
790
791 dispatch_user_message(&UserWsMessage::Order(order), &ctx, &mut state);
792
793 let event = receiver.try_recv().expect("Expected report");
794 match event {
795 ExecutionEvent::Report(report) => match report {
796 ExecutionReport::Order(order_report) => {
797 assert_eq!(order_report.filled_qty, Quantity::from("50"));
798 }
799 other => panic!("Expected order report, was {other:?}"),
800 },
801 other => panic!("Expected report event, was {other:?}"),
802 }
803 }
804
805 #[rstest]
806 fn test_dispatch_order_matched_dust_fill_uses_local_ts_init() {
807 let order: PolymarketUserOrder = load("ws_user_order_matched.json");
808 let instrument = test_instrument();
809
810 let token_instruments = AtomicMap::new();
811 token_instruments.insert(order.asset_id, instrument.clone());
812
813 let fill_tracker = OrderFillTrackerMap::new();
814 let venue_order_id = VenueOrderId::from(order.id.as_str());
815 fill_tracker.register(
816 venue_order_id,
817 Quantity::from("100"),
818 OrderSide::Buy,
819 instrument.id(),
820 instrument.size_precision(),
821 instrument.price_precision(),
822 );
823 fill_tracker.record_fill(&venue_order_id, 99.995, 0.5, UnixNanos::from(1_000u64));
824
825 let pending_fills = Mutex::new(FifoCacheMap::default());
826 let pending_order_reports = Mutex::new(FifoCacheMap::default());
827 let mut emitter = test_emitter();
828 let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
829 emitter.set_sender(sender);
830
831 let clock = Box::leak(Box::new(AtomicTime::new(
832 false,
833 UnixNanos::from(2_000_000_000u64),
834 )));
835
836 let ctx = WsDispatchContext {
837 token_instruments: &token_instruments,
838 fill_tracker: &fill_tracker,
839 pending_fills: &pending_fills,
840 pending_order_reports: &pending_order_reports,
841 emitter: &emitter,
842 account_id: AccountId::from("POLY-001"),
843 clock,
844 user_address: "0xtest",
845 user_api_key: "test-key",
846 };
847 let mut state = WsDispatchState::default();
848
849 dispatch_user_message(&UserWsMessage::Order(order), &ctx, &mut state);
850
851 let first = receiver.try_recv().expect("Expected order report");
852 let second = receiver.try_recv().expect("Expected dust fill report");
853
854 match first {
855 ExecutionEvent::Report(ExecutionReport::Order(_)) => {}
856 other => panic!("Expected order report, was {other:?}"),
857 }
858
859 match second {
860 ExecutionEvent::Report(ExecutionReport::Fill(fill_report)) => {
861 assert_eq!(
862 fill_report.ts_event,
863 UnixNanos::from(1_703_875_201_000_000_000u64)
864 );
865 assert_eq!(fill_report.ts_init, UnixNanos::from(2_000_000_000u64));
866 }
867 other => panic!("Expected fill report, was {other:?}"),
868 }
869 }
870
871 #[rstest]
872 fn test_cancel_reemitted_after_fill_for_canceled_order() {
873 let cancel_order: PolymarketUserOrder = load("ws_user_order_cancellation.json");
874 let trade: PolymarketUserTrade = load("ws_user_trade.json");
875 let instrument = test_instrument();
876
877 let token_instruments = AtomicMap::new();
878 token_instruments.insert(cancel_order.asset_id, instrument.clone());
879
880 let fill_tracker = OrderFillTrackerMap::new();
881 let venue_order_id = VenueOrderId::from(cancel_order.id.as_str());
882
883 fill_tracker.register(
885 venue_order_id,
886 Quantity::from("100"),
887 OrderSide::Buy,
888 instrument.id(),
889 instrument.size_precision(),
890 instrument.price_precision(),
891 );
892
893 let pending_fills = Mutex::new(FifoCacheMap::default());
894 let pending_order_reports = Mutex::new(FifoCacheMap::default());
895 let mut emitter = test_emitter();
896 let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
897 emitter.set_sender(sender);
898
899 let ctx = WsDispatchContext {
900 token_instruments: &token_instruments,
901 fill_tracker: &fill_tracker,
902 pending_fills: &pending_fills,
903 pending_order_reports: &pending_order_reports,
904 emitter: &emitter,
905 account_id: AccountId::from("POLY-001"),
906 clock: nautilus_core::time::get_atomic_clock_realtime(),
907 user_address: "0xtest",
908 user_api_key: "test-key",
909 };
910 let mut state = WsDispatchState::default();
911
912 dispatch_user_message(&UserWsMessage::Order(cancel_order), &ctx, &mut state);
914 let cancel_event = receiver.try_recv().expect("Expected cancel report");
915 match &cancel_event {
916 ExecutionEvent::Report(ExecutionReport::Order(r)) => {
917 assert_eq!(r.order_status, OrderStatus::Canceled);
918 }
919 other => panic!("Expected order report, was {other:?}"),
920 }
921
922 dispatch_user_message(&UserWsMessage::Trade(trade), &ctx, &mut state);
924
925 let fill_event = receiver.try_recv().expect("Expected fill report");
927 match &fill_event {
928 ExecutionEvent::Report(ExecutionReport::Fill(f)) => {
929 assert_eq!(f.venue_order_id, venue_order_id);
930 }
931 other => panic!("Expected fill report, was {other:?}"),
932 }
933
934 let reemitted_cancel = receiver
935 .try_recv()
936 .expect("Expected re-emitted cancel report");
937
938 match &reemitted_cancel {
939 ExecutionEvent::Report(ExecutionReport::Order(r)) => {
940 assert_eq!(r.order_status, OrderStatus::Canceled);
941 assert_eq!(r.venue_order_id, venue_order_id);
942 }
943 other => panic!("Expected order cancel report, was {other:?}"),
944 }
945 }
946
947 #[rstest]
948 fn test_cancel_not_reemitted_when_fill_completes_order() {
949 let cancel_order: PolymarketUserOrder = load("ws_user_order_cancellation.json");
950 let trade: PolymarketUserTrade = load("ws_user_trade.json");
951 let instrument = test_instrument();
952
953 let token_instruments = AtomicMap::new();
954 token_instruments.insert(cancel_order.asset_id, instrument.clone());
955
956 let fill_tracker = OrderFillTrackerMap::new();
957 let venue_order_id = VenueOrderId::from(cancel_order.id.as_str());
958
959 fill_tracker.register(
961 venue_order_id,
962 Quantity::from("25"),
963 OrderSide::Buy,
964 instrument.id(),
965 instrument.size_precision(),
966 instrument.price_precision(),
967 );
968
969 let pending_fills = Mutex::new(FifoCacheMap::default());
970 let pending_order_reports = Mutex::new(FifoCacheMap::default());
971 let mut emitter = test_emitter();
972 let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
973 emitter.set_sender(sender);
974
975 let ctx = WsDispatchContext {
976 token_instruments: &token_instruments,
977 fill_tracker: &fill_tracker,
978 pending_fills: &pending_fills,
979 pending_order_reports: &pending_order_reports,
980 emitter: &emitter,
981 account_id: AccountId::from("POLY-001"),
982 clock: nautilus_core::time::get_atomic_clock_realtime(),
983 user_address: "0xtest",
984 user_api_key: "test-key",
985 };
986 let mut state = WsDispatchState::default();
987
988 dispatch_user_message(&UserWsMessage::Order(cancel_order), &ctx, &mut state);
990 let _cancel = receiver.try_recv().expect("Expected cancel report");
991
992 dispatch_user_message(&UserWsMessage::Trade(trade), &ctx, &mut state);
993 let _fill = receiver.try_recv().expect("Expected fill report");
994
995 assert!(
997 receiver.try_recv().is_err(),
998 "Should not re-emit cancel when fill completes the order"
999 );
1000 }
1001
1002 #[rstest]
1003 fn test_cancel_saved_before_acceptance() {
1004 let cancel_order: PolymarketUserOrder = load("ws_user_order_cancellation.json");
1005 let instrument = test_instrument();
1006
1007 let token_instruments = AtomicMap::new();
1008 token_instruments.insert(cancel_order.asset_id, instrument);
1009
1010 let fill_tracker = OrderFillTrackerMap::new();
1012 let venue_order_id = VenueOrderId::from(cancel_order.id.as_str());
1013
1014 let pending_fills = Mutex::new(FifoCacheMap::default());
1015 let pending_order_reports = Mutex::new(FifoCacheMap::default());
1016 let emitter = test_emitter();
1017
1018 let ctx = WsDispatchContext {
1019 token_instruments: &token_instruments,
1020 fill_tracker: &fill_tracker,
1021 pending_fills: &pending_fills,
1022 pending_order_reports: &pending_order_reports,
1023 emitter: &emitter,
1024 account_id: AccountId::from("POLY-001"),
1025 clock: nautilus_core::time::get_atomic_clock_realtime(),
1026 user_address: "0xtest",
1027 user_api_key: "test-key",
1028 };
1029 let mut state = WsDispatchState::default();
1030
1031 dispatch_user_message(&UserWsMessage::Order(cancel_order), &ctx, &mut state);
1033
1034 let guard = pending_order_reports.lock().unwrap();
1036 assert!(guard.get(&venue_order_id).is_some());
1037 drop(guard);
1038
1039 assert!(state.terminal_cancel_reports.get(&venue_order_id).is_some());
1040 }
1041
1042 #[rstest]
1054 fn test_issue_3797_interleaved_cancel_fill_sequence() {
1055 use crate::common::{
1056 enums::{
1057 PolymarketEventType, PolymarketLiquiditySide, PolymarketOrderSide,
1058 PolymarketOrderStatus, PolymarketOrderType, PolymarketOutcome,
1059 PolymarketTradeStatus,
1060 },
1061 models::PolymarketMakerOrder,
1062 };
1063
1064 let instrument = test_instrument();
1065 let asset_id = instrument.id().symbol.inner();
1066
1067 let order_id =
1068 "0xe743f6c823ecdfa9ddaaf08673b2441d15a38d89e14dcb25b3b70c284be4f6ad".to_string();
1069 let venue_order_id = VenueOrderId::from(order_id.as_str());
1070
1071 let token_instruments = AtomicMap::new();
1072 token_instruments.insert(asset_id, instrument.clone());
1073
1074 let fill_tracker = OrderFillTrackerMap::new();
1075 fill_tracker.register(
1076 venue_order_id,
1077 Quantity::from("20"),
1078 OrderSide::Buy,
1079 instrument.id(),
1080 instrument.size_precision(),
1081 instrument.price_precision(),
1082 );
1083
1084 let pending_fills = Mutex::new(FifoCacheMap::default());
1085 let pending_order_reports = Mutex::new(FifoCacheMap::default());
1086 let mut emitter = test_emitter();
1087 let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
1088 emitter.set_sender(sender);
1089
1090 let ctx = WsDispatchContext {
1091 token_instruments: &token_instruments,
1092 fill_tracker: &fill_tracker,
1093 pending_fills: &pending_fills,
1094 pending_order_reports: &pending_order_reports,
1095 emitter: &emitter,
1096 account_id: AccountId::from("POLY-001"),
1097 clock: nautilus_core::time::get_atomic_clock_realtime(),
1098 user_address: "0xabc",
1099 user_api_key: "xxx",
1100 };
1101 let mut state = WsDispatchState::default();
1102
1103 let make_order =
1105 |size_matched: &str, ts: &str, event_type: PolymarketEventType| PolymarketUserOrder {
1106 asset_id,
1107 associate_trades: None,
1108 created_at: "1775074735".to_string(),
1109 expiration: Some("0".to_string()),
1110 id: order_id.clone(),
1111 maker_address: Ustr::from("0xabc"),
1112 market: Ustr::from("0x4134"),
1113 order_owner: Ustr::from("xxx"),
1114 order_type: PolymarketOrderType::GTC,
1115 original_size: "20".to_string(),
1116 outcome: PolymarketOutcome::yes(),
1117 owner: Ustr::from("xxx"),
1118 price: "0.18".to_string(),
1119 side: PolymarketOrderSide::Buy,
1120 size_matched: size_matched.to_string(),
1121 status: PolymarketOrderStatus::Canceled,
1122 timestamp: ts.to_string(),
1123 event_type,
1124 };
1125
1126 let make_trade = |trade_id: &str, matched_amount: f64, ts: &str| PolymarketUserTrade {
1128 asset_id,
1129 bucket_index: 0,
1130 fee_rate_bps: "1000".to_string(),
1131 id: trade_id.to_string(),
1132 last_update: "1775074738".to_string(),
1133 maker_address: Ustr::from("0xother"),
1134 maker_orders: vec![PolymarketMakerOrder {
1135 asset_id,
1136 maker_address: "0xabc".to_string(),
1137 matched_amount: Decimal::from_f64_retain(matched_amount).unwrap_or(Decimal::ZERO),
1138 order_id: order_id.clone(),
1139 outcome: PolymarketOutcome::yes(),
1140 owner: "xxx".to_string(),
1141 price: Decimal::from_f64_retain(0.18).unwrap_or(Decimal::ZERO),
1142 side: None,
1143 }],
1144 market: Ustr::from("0x4134"),
1145 match_time: "1775074735".to_string(),
1146 outcome: PolymarketOutcome::yes(),
1147 owner: Ustr::from("other-owner"),
1148 price: "0.82".to_string(),
1149 side: PolymarketOrderSide::Buy,
1150 size: "1.219511".to_string(),
1151 status: PolymarketTradeStatus::Matched,
1152 taker_order_id: "0xtaker01".to_string(),
1153 timestamp: ts.to_string(),
1154 trade_owner: Ustr::from("other-owner"),
1155 trader_side: PolymarketLiquiditySide::Maker,
1156 event_type: PolymarketEventType::Trade,
1157 };
1158
1159 let msg_a = make_order("0", "1775074738031", PolymarketEventType::Cancellation);
1161 dispatch_user_message(&UserWsMessage::Order(msg_a), &ctx, &mut state);
1162
1163 let evt = receiver.try_recv().expect("(A) cancel report");
1164 match &evt {
1165 ExecutionEvent::Report(ExecutionReport::Order(r)) => {
1166 assert_eq!(r.order_status, OrderStatus::Canceled);
1167 assert_eq!(r.filled_qty, Quantity::from("0"));
1168 }
1169 other => panic!("(A) expected order report, was {other:?}"),
1170 }
1171
1172 let msg_b = make_trade("trade-b", 1.219511, "1775074738032");
1174 dispatch_user_message(&UserWsMessage::Trade(msg_b), &ctx, &mut state);
1175
1176 let evt = receiver.try_recv().expect("(B) fill report");
1177 match &evt {
1178 ExecutionEvent::Report(ExecutionReport::Fill(f)) => {
1179 assert_eq!(f.venue_order_id, venue_order_id);
1180 }
1181 other => panic!("(B) expected fill report, was {other:?}"),
1182 }
1183 let evt = receiver.try_recv().expect("(B) re-emitted cancel");
1185 match &evt {
1186 ExecutionEvent::Report(ExecutionReport::Order(r)) => {
1187 assert_eq!(r.order_status, OrderStatus::Canceled);
1188 }
1189 other => panic!("(B) expected re-emitted cancel, was {other:?}"),
1190 }
1191
1192 let msg_c = make_order("1.219511", "1775074738034", PolymarketEventType::Update);
1194 dispatch_user_message(&UserWsMessage::Order(msg_c), &ctx, &mut state);
1195
1196 let evt = receiver.try_recv().expect("(C) cancel report");
1197 match &evt {
1198 ExecutionEvent::Report(ExecutionReport::Order(r)) => {
1199 assert_eq!(r.order_status, OrderStatus::Canceled);
1200 }
1201 other => panic!("(C) expected order report, was {other:?}"),
1202 }
1203
1204 let msg_d = make_order("2.560972", "1775074738038", PolymarketEventType::Update);
1206 dispatch_user_message(&UserWsMessage::Order(msg_d), &ctx, &mut state);
1207
1208 let evt = receiver.try_recv().expect("(D) cancel report");
1209 match &evt {
1210 ExecutionEvent::Report(ExecutionReport::Order(r)) => {
1211 assert_eq!(r.order_status, OrderStatus::Canceled);
1212 assert_eq!(r.filled_qty, Quantity::new(1.219511, 6));
1214 }
1215 other => panic!("(D) expected order report, was {other:?}"),
1216 }
1217
1218 let msg_e = make_trade("trade-e", 1.341461, "1775074738036");
1220 dispatch_user_message(&UserWsMessage::Trade(msg_e), &ctx, &mut state);
1221
1222 let evt = receiver.try_recv().expect("(E) fill report");
1223 match &evt {
1224 ExecutionEvent::Report(ExecutionReport::Fill(f)) => {
1225 assert_eq!(f.venue_order_id, venue_order_id);
1226 }
1227 other => panic!("(E) expected fill report, was {other:?}"),
1228 }
1229
1230 let evt = receiver.try_recv().expect("(E) re-emitted cancel");
1232 match &evt {
1233 ExecutionEvent::Report(ExecutionReport::Order(r)) => {
1234 assert_eq!(r.order_status, OrderStatus::Canceled);
1235 assert_eq!(r.venue_order_id, venue_order_id);
1236 }
1237 other => panic!("(E) expected re-emitted cancel, was {other:?}"),
1238 }
1239
1240 assert!(
1242 receiver.try_recv().is_err(),
1243 "No further events expected after the sequence"
1244 );
1245 }
1246
1247 #[rstest]
1248 fn test_dispatch_taker_fill_snaps_overfill_to_submitted_qty() {
1249 use crate::common::enums::{
1254 PolymarketEventType, PolymarketOrderSide, PolymarketOutcome, PolymarketTradeStatus,
1255 };
1256
1257 let instrument = test_instrument();
1258 let asset_id = instrument.id().symbol.inner();
1259 let token_instruments = AtomicMap::new();
1260 token_instruments.insert(asset_id, instrument.clone());
1261
1262 let fill_tracker = OrderFillTrackerMap::new();
1263 let venue_order_id = VenueOrderId::from("0xtaker-overfill");
1264 let submitted = Quantity::new(714.285710, instrument.size_precision());
1266 fill_tracker.register(
1267 venue_order_id,
1268 submitted,
1269 OrderSide::Buy,
1270 instrument.id(),
1271 instrument.size_precision(),
1272 instrument.price_precision(),
1273 );
1274
1275 let pending_fills = Mutex::new(FifoCacheMap::default());
1276 let pending_order_reports = Mutex::new(FifoCacheMap::default());
1277 let mut emitter = test_emitter();
1278 let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
1279 emitter.set_sender(sender);
1280
1281 let ctx = WsDispatchContext {
1282 token_instruments: &token_instruments,
1283 fill_tracker: &fill_tracker,
1284 pending_fills: &pending_fills,
1285 pending_order_reports: &pending_order_reports,
1286 emitter: &emitter,
1287 account_id: AccountId::from("POLY-001"),
1288 clock: nautilus_core::time::get_atomic_clock_realtime(),
1289 user_address: "0xtest",
1290 user_api_key: "test-key",
1291 };
1292 let mut state = WsDispatchState::default();
1293
1294 let trade = PolymarketUserTrade {
1295 asset_id,
1296 bucket_index: 0,
1297 fee_rate_bps: "0".to_string(),
1298 id: "trade-overfill".to_string(),
1299 last_update: "1700000001".to_string(),
1300 maker_address: Ustr::from("0xmaker"),
1301 maker_orders: vec![],
1302 market: Ustr::from("0xmarket"),
1303 match_time: "1700000000".to_string(),
1304 outcome: PolymarketOutcome::yes(),
1305 owner: Ustr::from("00000000-0000-0000-0000-000000000001"),
1306 price: "0.014".to_string(),
1307 side: PolymarketOrderSide::Buy,
1308 size: "714.285714".to_string(),
1311 status: PolymarketTradeStatus::Matched,
1312 taker_order_id: venue_order_id.as_str().to_string(),
1313 timestamp: "1700000000000".to_string(),
1314 trade_owner: Ustr::from("00000000-0000-0000-0000-000000000001"),
1315 trader_side: PolymarketLiquiditySide::Taker,
1316 event_type: PolymarketEventType::Trade,
1317 };
1318
1319 dispatch_user_message(&UserWsMessage::Trade(trade), &ctx, &mut state);
1320
1321 let cumulative = fill_tracker
1325 .get_cumulative_filled(&venue_order_id)
1326 .expect("order must be registered");
1327 let expected_snapped = submitted.as_f64();
1328 let drift = (cumulative - expected_snapped).abs();
1329 assert!(
1330 drift < 1e-9,
1331 "cumulative_filled {cumulative} must be snapped to submitted {expected_snapped}",
1332 );
1333
1334 let event = receiver.try_recv().expect("expected a fill report");
1337 match event {
1338 ExecutionEvent::Report(ExecutionReport::Fill(report)) => {
1339 assert_eq!(
1340 report.last_qty, submitted,
1341 "fill report qty must be snapped to submitted",
1342 );
1343 assert_eq!(report.venue_order_id, venue_order_id);
1344 }
1345 other => panic!("expected fill report, was {other:?}"),
1346 }
1347 }
1348}