Skip to main content

nautilus_polymarket/websocket/
dispatch.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message dispatch for the Polymarket execution client.
17//!
18//! Routes user-channel WS messages (order updates and trades) into Nautilus
19//! order status reports and fill reports. Reports are emitted immediately if
20//! the order is already accepted, otherwise buffered until acceptance.
21//! Trade fills are deduped via a FIFO cache. Maker and taker fills are
22//! handled separately to account for multi-leg maker order matching.
23
24use std::sync::Mutex;
25
26use nautilus_common::cache::fifo::{FifoCache, FifoCacheMap};
27use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos, collections::AtomicMap, time::AtomicTime};
28use nautilus_live::ExecutionEventEmitter;
29use nautilus_model::{
30    enums::{LiquiditySide, OrderSide, OrderStatus, OrderType, TimeInForce},
31    identifiers::{AccountId, VenueOrderId},
32    instruments::{Instrument, InstrumentAny},
33    reports::{FillReport, OrderStatusReport},
34    types::{Money, Price, Quantity},
35};
36use rust_decimal::Decimal;
37use ustr::Ustr;
38
39use super::{
40    messages::{PolymarketUserOrder, PolymarketUserTrade, UserWsMessage},
41    parse::parse_timestamp_ms,
42};
43use crate::{
44    common::enums::{PolymarketLiquiditySide, PolymarketOrderStatus},
45    execution::{
46        order_fill_tracker::OrderFillTrackerMap,
47        parse::{
48            build_maker_fill_report, compute_commission, determine_order_side,
49            instrument_taker_fee, make_composite_trade_id, parse_liquidity_side,
50        },
51    },
52};
53
54/// Signal returned when a finalized trade requires an async account refresh.
55#[derive(Debug)]
56pub(crate) struct AccountRefreshRequest;
57
58/// Mutable state owned by the WS message loop (not shared via Arc).
59#[derive(Debug, Default)]
60pub(crate) struct WsDispatchState {
61    pub processed_fills: FifoCache<String, 10_000>,
62    /// Cancel reports saved for orders known to be terminal at the venue.
63    /// Re-emitted after a fill to restore terminal state when fills race
64    /// ahead of (or arrive after) cancel messages.
65    terminal_cancel_reports: FifoCacheMap<VenueOrderId, OrderStatusReport, 10_000>,
66}
67
68/// Immutable context borrowed from the async block's owned values.
69#[derive(Debug)]
70pub(crate) struct WsDispatchContext<'a> {
71    pub token_instruments: &'a AtomicMap<Ustr, InstrumentAny>,
72    pub fill_tracker: &'a OrderFillTrackerMap,
73    pub pending_fills: &'a Mutex<FifoCacheMap<VenueOrderId, Vec<FillReport>, 1_000>>,
74    pub pending_order_reports: &'a Mutex<FifoCacheMap<VenueOrderId, Vec<OrderStatusReport>, 1_000>>,
75    pub emitter: &'a ExecutionEventEmitter,
76    pub account_id: AccountId,
77    pub clock: &'static AtomicTime,
78    pub user_address: &'a str,
79    pub user_api_key: &'a str,
80}
81
82/// Top-level router: synchronous, returns signal for async account refresh.
83pub(crate) fn dispatch_user_message(
84    message: &UserWsMessage,
85    ctx: &WsDispatchContext<'_>,
86    state: &mut WsDispatchState,
87) -> Option<AccountRefreshRequest> {
88    match message {
89        UserWsMessage::Order(order) => {
90            dispatch_order_update(order, ctx, state);
91            None
92        }
93        UserWsMessage::Trade(trade) => dispatch_trade_update(trade, ctx, state),
94    }
95}
96
97fn dispatch_order_update(
98    order: &PolymarketUserOrder,
99    ctx: &WsDispatchContext<'_>,
100    state: &mut WsDispatchState,
101) {
102    let instrument = match ctx.token_instruments.get_cloned(&order.asset_id) {
103        Some(i) => i,
104        None => {
105            log::warn!("Unknown asset_id in order update: {}", order.asset_id);
106            return;
107        }
108    };
109
110    let ts_event = parse_timestamp_ms(&order.timestamp).unwrap_or_else(|_| ctx.clock.get_time_ns());
111    let venue_order_id = VenueOrderId::from(order.id.as_str());
112
113    let ts_init = ctx.clock.get_time_ns();
114    let mut report =
115        build_ws_order_status_report(order, &instrument, ctx.account_id, ts_event, ts_init);
116    let is_accepted = ctx.fill_tracker.contains(&venue_order_id);
117
118    // Order updates can race ahead of trade messages, so cap filled_qty
119    // to what the fill tracker has recorded to prevent duplicate inferred fills
120    if let Some(tracked_filled) = ctx.fill_tracker.get_cumulative_filled(&venue_order_id) {
121        let tracked_qty = Quantity::new(tracked_filled, instrument.size_precision());
122        if report.filled_qty > tracked_qty {
123            log::debug!(
124                "Capping filled_qty for {venue_order_id} from {} to {} (awaiting trade messages)",
125                report.filled_qty,
126                tracked_qty,
127            );
128            report.filled_qty = tracked_qty;
129        }
130    }
131
132    // Track cancel reports so we can re-emit them after late-arriving fills.
133    // Saved regardless of acceptance state so that cancels arriving during
134    // the HTTP round-trip are available once the order is later accepted.
135    if report.order_status == OrderStatus::Canceled {
136        state
137            .terminal_cancel_reports
138            .insert(venue_order_id, report.clone());
139    }
140
141    emit_or_buffer_order_report(
142        report,
143        venue_order_id,
144        is_accepted,
145        ctx.emitter,
146        ctx.pending_order_reports,
147    );
148
149    // MATCHED convergence: check for dust residual
150    if order.status == PolymarketOrderStatus::Matched {
151        let price = Price::new(
152            order.price.parse::<f64>().unwrap_or(0.0),
153            instrument.price_precision(),
154        );
155
156        if let Some(dust_fill) = ctx.fill_tracker.check_dust_and_build_fill(
157            &venue_order_id,
158            ctx.account_id,
159            &order.id,
160            price.as_f64(),
161            crate::execution::get_pusd_currency(),
162            ts_event,
163            ts_init,
164        ) {
165            emit_or_buffer_fill_report(
166                dust_fill,
167                venue_order_id,
168                is_accepted,
169                ctx.emitter,
170                ctx.pending_fills,
171            );
172        }
173    }
174}
175
176fn dispatch_trade_update(
177    trade: &PolymarketUserTrade,
178    ctx: &WsDispatchContext<'_>,
179    state: &mut WsDispatchState,
180) -> Option<AccountRefreshRequest> {
181    if !trade.status.is_finalized()
182        && !matches!(
183            trade.status,
184            crate::common::enums::PolymarketTradeStatus::Matched
185        )
186    {
187        log::debug!(
188            "Skipping trade with status {:?}: {}",
189            trade.status,
190            trade.id
191        );
192        return None;
193    }
194
195    let dedup_key = format!("{}-{}", trade.id, trade.taker_order_id);
196    let is_duplicate = state.processed_fills.contains(&dedup_key);
197
198    let needs_refresh = trade.status.is_finalized();
199
200    if is_duplicate {
201        log::debug!("Duplicate fill skipped: {dedup_key}");
202        return if needs_refresh {
203            Some(AccountRefreshRequest)
204        } else {
205            None
206        };
207    }
208    state.processed_fills.add(dedup_key);
209
210    let is_maker = trade.trader_side == PolymarketLiquiditySide::Maker;
211    let liquidity_side = parse_liquidity_side(trade.trader_side);
212    let ts_event = parse_timestamp_ms(&trade.timestamp).unwrap_or_else(|_| ctx.clock.get_time_ns());
213    let ts_init = ctx.clock.get_time_ns();
214
215    if is_maker {
216        dispatch_maker_fills(trade, ctx, state, liquidity_side, ts_event, ts_init);
217    } else {
218        dispatch_taker_fill(trade, ctx, state, liquidity_side, ts_event, ts_init);
219    }
220
221    if needs_refresh {
222        Some(AccountRefreshRequest)
223    } else {
224        None
225    }
226}
227
228fn dispatch_maker_fills(
229    trade: &PolymarketUserTrade,
230    ctx: &WsDispatchContext<'_>,
231    state: &WsDispatchState,
232    liquidity_side: LiquiditySide,
233    ts_event: UnixNanos,
234    ts_init: UnixNanos,
235) {
236    let user_orders: Vec<_> = trade
237        .maker_orders
238        .iter()
239        .filter(|mo| mo.maker_address == ctx.user_address || mo.owner == ctx.user_api_key)
240        .collect();
241
242    if user_orders.is_empty() {
243        log::warn!("No matching maker orders for user in trade: {}", trade.id);
244        return;
245    }
246
247    for mo in user_orders {
248        let asset_id = Ustr::from(mo.asset_id.as_str());
249        let instrument = match ctx.token_instruments.get_cloned(&asset_id) {
250            Some(i) => i,
251            None => {
252                log::warn!("Unknown asset_id in maker order: {asset_id}");
253                continue;
254            }
255        };
256        let mut report = build_maker_fill_report(
257            mo,
258            &trade.id,
259            trade.trader_side,
260            trade.side,
261            trade.asset_id.as_str(),
262            ctx.account_id,
263            instrument.id(),
264            instrument.price_precision(),
265            instrument.size_precision(),
266            crate::execution::get_pusd_currency(),
267            liquidity_side,
268            ts_event,
269            ts_init,
270        );
271        let maker_venue_order_id = report.venue_order_id;
272        report.last_qty = ctx
273            .fill_tracker
274            .snap_fill_qty(&maker_venue_order_id, report.last_qty);
275        let is_accepted = ctx.fill_tracker.contains(&maker_venue_order_id);
276
277        if is_accepted {
278            ctx.fill_tracker.record_fill(
279                &maker_venue_order_id,
280                report.last_qty.as_f64(),
281                report.last_px.as_f64(),
282                report.ts_event,
283            );
284        }
285
286        emit_or_buffer_fill_report(
287            report,
288            maker_venue_order_id,
289            is_accepted,
290            ctx.emitter,
291            ctx.pending_fills,
292        );
293
294        if is_accepted {
295            reemit_terminal_cancel(maker_venue_order_id, state, ctx.fill_tracker, ctx.emitter);
296        }
297    }
298}
299
300fn dispatch_taker_fill(
301    trade: &PolymarketUserTrade,
302    ctx: &WsDispatchContext<'_>,
303    state: &WsDispatchState,
304    liquidity_side: LiquiditySide,
305    ts_event: UnixNanos,
306    ts_init: UnixNanos,
307) {
308    let instrument = match ctx.token_instruments.get_cloned(&trade.asset_id) {
309        Some(i) => i,
310        None => {
311            log::warn!("Unknown asset_id in trade: {}", trade.asset_id);
312            return;
313        }
314    };
315
316    let venue_order_id = VenueOrderId::from(trade.taker_order_id.as_str());
317
318    let mut report = build_ws_taker_fill_report(
319        trade,
320        &instrument,
321        ctx.account_id,
322        liquidity_side,
323        ts_event,
324        ts_init,
325    );
326    report.last_qty = ctx
327        .fill_tracker
328        .snap_fill_qty(&venue_order_id, report.last_qty);
329
330    let is_accepted = ctx.fill_tracker.contains(&venue_order_id);
331
332    if is_accepted {
333        ctx.fill_tracker.record_fill(
334            &venue_order_id,
335            report.last_qty.as_f64(),
336            report.last_px.as_f64(),
337            report.ts_event,
338        );
339    }
340
341    emit_or_buffer_fill_report(
342        report,
343        venue_order_id,
344        is_accepted,
345        ctx.emitter,
346        ctx.pending_fills,
347    );
348
349    if is_accepted {
350        reemit_terminal_cancel(venue_order_id, state, ctx.fill_tracker, ctx.emitter);
351    }
352}
353
354/// Re-emits a saved cancel report after a fill to restore terminal state.
355///
356/// When fills race ahead of (or arrive after) cancel messages, the order can
357/// get stuck in `PartiallyFilled`. This re-emission ensures the execution
358/// engine transitions the order back to `Canceled`.
359///
360/// Skips re-emission when the fill tracker shows the order is fully filled,
361/// because `Filled` is already terminal and a spurious cancel would fail
362/// the `Filled -> Canceled` state transition.
363fn reemit_terminal_cancel(
364    venue_order_id: VenueOrderId,
365    state: &WsDispatchState,
366    fill_tracker: &OrderFillTrackerMap,
367    emitter: &ExecutionEventEmitter,
368) {
369    if fill_tracker.is_fully_filled(&venue_order_id) {
370        return;
371    }
372
373    if let Some(cancel_report) = state.terminal_cancel_reports.get(&venue_order_id) {
374        log::debug!(
375            "Re-emitting cancel report for {venue_order_id} after fill to restore terminal state"
376        );
377        emitter.send_order_status_report(cancel_report.clone());
378    }
379}
380
381fn build_ws_order_status_report(
382    order: &PolymarketUserOrder,
383    instrument: &InstrumentAny,
384    account_id: AccountId,
385    ts_event: UnixNanos,
386    ts_init: UnixNanos,
387) -> OrderStatusReport {
388    let venue_order_id = VenueOrderId::from(order.id.as_str());
389    let order_status =
390        crate::execution::parse::resolve_order_status(order.status, order.event_type);
391    let order_side = OrderSide::from(order.side);
392    let time_in_force = TimeInForce::from(order.order_type);
393    let quantity = Quantity::new(
394        order.original_size.parse::<f64>().unwrap_or(0.0),
395        instrument.size_precision(),
396    );
397    let filled_qty = Quantity::new(
398        order.size_matched.parse::<f64>().unwrap_or(0.0),
399        instrument.size_precision(),
400    );
401    let price = Price::new(
402        order.price.parse::<f64>().unwrap_or(0.0),
403        instrument.price_precision(),
404    );
405
406    let mut report = OrderStatusReport::new(
407        account_id,
408        instrument.id(),
409        None,
410        venue_order_id,
411        order_side,
412        OrderType::Limit,
413        time_in_force,
414        order_status,
415        quantity,
416        filled_qty,
417        ts_event,
418        ts_event,
419        ts_init,
420        None,
421    );
422    report.price = Some(price);
423    report
424}
425
426fn build_ws_taker_fill_report(
427    trade: &PolymarketUserTrade,
428    instrument: &InstrumentAny,
429    account_id: AccountId,
430    liquidity_side: LiquiditySide,
431    ts_event: UnixNanos,
432    ts_init: UnixNanos,
433) -> FillReport {
434    let venue_order_id = VenueOrderId::from(trade.taker_order_id.as_str());
435    let trade_id = make_composite_trade_id(&trade.id, &trade.taker_order_id);
436    let order_side = determine_order_side(
437        trade.trader_side,
438        trade.side,
439        trade.asset_id.as_str(),
440        trade.asset_id.as_str(),
441    );
442
443    let last_qty = Quantity::new(
444        trade.size.parse::<f64>().unwrap_or(0.0),
445        instrument.size_precision(),
446    );
447    let last_px = Price::new(
448        trade.price.parse::<f64>().unwrap_or(0.0),
449        instrument.price_precision(),
450    );
451
452    let fee_rate = instrument_taker_fee(instrument);
453    let size: Decimal = trade.size.parse().unwrap_or_default();
454    let price_dec: Decimal = trade.price.parse().unwrap_or_default();
455    let commission_value = compute_commission(fee_rate, size, price_dec, liquidity_side);
456    let pusd = crate::execution::get_pusd_currency();
457
458    FillReport {
459        account_id,
460        instrument_id: instrument.id(),
461        venue_order_id,
462        trade_id,
463        order_side,
464        last_qty,
465        last_px,
466        commission: Money::new(commission_value, pusd),
467        liquidity_side,
468        avg_px: None,
469        report_id: UUID4::new(),
470        ts_event,
471        ts_init,
472        client_order_id: None,
473        venue_position_id: None,
474    }
475}
476
477fn emit_or_buffer_order_report(
478    report: OrderStatusReport,
479    venue_order_id: VenueOrderId,
480    is_accepted: bool,
481    emitter: &ExecutionEventEmitter,
482    pending: &Mutex<FifoCacheMap<VenueOrderId, Vec<OrderStatusReport>, 1_000>>,
483) {
484    if is_accepted {
485        emitter.send_order_status_report(report);
486    } else {
487        let mut guard = pending.lock().expect(MUTEX_POISONED);
488        if let Some(reports) = guard.get_mut(&venue_order_id) {
489            reports.push(report);
490        } else {
491            guard.insert(venue_order_id, vec![report]);
492        }
493    }
494}
495
496fn emit_or_buffer_fill_report(
497    report: FillReport,
498    venue_order_id: VenueOrderId,
499    is_accepted: bool,
500    emitter: &ExecutionEventEmitter,
501    pending: &Mutex<FifoCacheMap<VenueOrderId, Vec<FillReport>, 1_000>>,
502) {
503    if is_accepted {
504        emitter.send_fill_report(report);
505    } else {
506        let mut guard = pending.lock().expect(MUTEX_POISONED);
507        if let Some(fills) = guard.get_mut(&venue_order_id) {
508            fills.push(report);
509        } else {
510            guard.insert(venue_order_id, vec![report]);
511        }
512    }
513}
514
515#[cfg(test)]
516mod tests {
517    use nautilus_common::messages::{ExecutionEvent, ExecutionReport};
518    use nautilus_core::time::AtomicTime;
519    use nautilus_model::{
520        enums::{AccountType, OrderStatus},
521        identifiers::TraderId,
522        types::Currency,
523    };
524    use rstest::rstest;
525
526    use super::*;
527    use crate::http::{
528        models::GammaMarket,
529        parse::{create_instrument_from_def, parse_gamma_market},
530    };
531
532    fn load<T: serde::de::DeserializeOwned>(filename: &str) -> T {
533        let path = format!("test_data/{filename}");
534        let content = std::fs::read_to_string(path).expect("Failed to read test data");
535        serde_json::from_str(&content).expect("Failed to parse test data")
536    }
537
538    fn test_instrument() -> InstrumentAny {
539        let market: GammaMarket = load("gamma_market.json");
540        let defs = parse_gamma_market(&market).unwrap();
541        create_instrument_from_def(&defs[0], UnixNanos::from(1_000_000_000u64)).unwrap()
542    }
543
544    fn test_emitter() -> ExecutionEventEmitter {
545        ExecutionEventEmitter::new(
546            nautilus_core::time::get_atomic_clock_realtime(),
547            TraderId::from("TESTER-001"),
548            AccountId::from("POLY-001"),
549            AccountType::Cash,
550            Some(Currency::pUSD()),
551        )
552    }
553
554    #[rstest]
555    fn test_build_ws_order_status_report() {
556        let order: PolymarketUserOrder = load("ws_user_order_placement.json");
557        let instrument = test_instrument();
558        let ts_event = UnixNanos::from(1_000_000_000u64);
559        let ts_init = UnixNanos::from(2_000_000_000u64);
560
561        let report = build_ws_order_status_report(
562            &order,
563            &instrument,
564            AccountId::from("POLY-001"),
565            ts_event,
566            ts_init,
567        );
568
569        assert_eq!(report.order_side, OrderSide::Buy);
570        assert_eq!(report.order_type, OrderType::Limit);
571        assert!(report.price.is_some());
572        assert_eq!(report.ts_accepted, ts_event);
573        assert_eq!(report.ts_init, ts_init);
574    }
575
576    #[rstest]
577    fn test_build_ws_order_status_report_venue_cancel_maps_to_canceled() {
578        let order: PolymarketUserOrder = load("ws_user_order_venue_cancel.json");
579        let instrument = test_instrument();
580        let ts_event = UnixNanos::from(1_000_000_000u64);
581        let ts_init = UnixNanos::from(2_000_000_000u64);
582
583        let report = build_ws_order_status_report(
584            &order,
585            &instrument,
586            AccountId::from("POLY-001"),
587            ts_event,
588            ts_init,
589        );
590
591        assert_eq!(report.order_status, OrderStatus::Canceled);
592    }
593
594    #[rstest]
595    fn test_build_ws_taker_fill_report() {
596        let trade: PolymarketUserTrade = load("ws_user_trade.json");
597        let instrument = test_instrument();
598        let ts_event = UnixNanos::from(1_000_000_000u64);
599        let ts_init = UnixNanos::from(2_000_000_000u64);
600
601        let report = build_ws_taker_fill_report(
602            &trade,
603            &instrument,
604            AccountId::from("POLY-001"),
605            LiquiditySide::Taker,
606            ts_event,
607            ts_init,
608        );
609
610        assert_eq!(report.order_side, OrderSide::Buy);
611        assert_eq!(report.liquidity_side, LiquiditySide::Taker);
612        assert_eq!(report.ts_event, ts_event);
613        assert_eq!(report.ts_init, ts_init);
614    }
615
616    #[rstest]
617    fn test_dispatch_order_message_buffers_when_not_accepted() {
618        let order: PolymarketUserOrder = load("ws_user_order_placement.json");
619        let instrument = test_instrument();
620
621        let token_instruments = AtomicMap::new();
622        token_instruments.insert(order.asset_id, instrument);
623
624        let fill_tracker = OrderFillTrackerMap::new();
625        let pending_fills = Mutex::new(FifoCacheMap::default());
626        let pending_order_reports = Mutex::new(FifoCacheMap::default());
627        let emitter = test_emitter();
628
629        let ctx = WsDispatchContext {
630            token_instruments: &token_instruments,
631            fill_tracker: &fill_tracker,
632            pending_fills: &pending_fills,
633            pending_order_reports: &pending_order_reports,
634            emitter: &emitter,
635            account_id: AccountId::from("POLY-001"),
636            clock: nautilus_core::time::get_atomic_clock_realtime(),
637            user_address: "0xtest",
638            user_api_key: "test-key",
639        };
640        let mut state = WsDispatchState::default();
641
642        let result = dispatch_user_message(&UserWsMessage::Order(order.clone()), &ctx, &mut state);
643        assert!(result.is_none());
644
645        // Order not registered in fill_tracker, so should be buffered
646        let guard = pending_order_reports.lock().unwrap();
647        let venue_order_id = VenueOrderId::from(order.id.as_str());
648        assert!(guard.get(&venue_order_id).is_some());
649    }
650
651    #[rstest]
652    fn test_dispatch_trade_dedup() {
653        let trade: PolymarketUserTrade = load("ws_user_trade.json");
654        let instrument = test_instrument();
655
656        let token_instruments = AtomicMap::new();
657        token_instruments.insert(trade.asset_id, instrument);
658
659        let fill_tracker = OrderFillTrackerMap::new();
660        let pending_fills = Mutex::new(FifoCacheMap::default());
661        let pending_order_reports = Mutex::new(FifoCacheMap::default());
662        let emitter = test_emitter();
663
664        let ctx = WsDispatchContext {
665            token_instruments: &token_instruments,
666            fill_tracker: &fill_tracker,
667            pending_fills: &pending_fills,
668            pending_order_reports: &pending_order_reports,
669            emitter: &emitter,
670            account_id: AccountId::from("POLY-001"),
671            clock: nautilus_core::time::get_atomic_clock_realtime(),
672            user_address: "0xtest",
673            user_api_key: "test-key",
674        };
675        let mut state = WsDispatchState::default();
676
677        // First dispatch processes the trade
678        let _ = dispatch_user_message(&UserWsMessage::Trade(trade.clone()), &ctx, &mut state);
679        let fills_count = {
680            let guard = pending_fills.lock().unwrap();
681            let venue_order_id = VenueOrderId::from(trade.taker_order_id.as_str());
682            guard.get(&venue_order_id).map_or(0, |v| v.len())
683        };
684        assert_eq!(fills_count, 1);
685
686        // Second dispatch should be deduped, no additional fill
687        let _ = dispatch_user_message(&UserWsMessage::Trade(trade.clone()), &ctx, &mut state);
688        let fills_count_after = {
689            let guard = pending_fills.lock().unwrap();
690            let venue_order_id = VenueOrderId::from(trade.taker_order_id.as_str());
691            guard.get(&venue_order_id).map_or(0, |v| v.len())
692        };
693        assert_eq!(fills_count_after, 1);
694    }
695
696    #[rstest]
697    fn test_dispatch_order_matched_caps_filled_qty_when_no_trades_tracked() {
698        let order: PolymarketUserOrder = load("ws_user_order_matched.json");
699        let instrument = test_instrument();
700
701        let token_instruments = AtomicMap::new();
702        token_instruments.insert(order.asset_id, instrument.clone());
703
704        let fill_tracker = OrderFillTrackerMap::new();
705        let venue_order_id = VenueOrderId::from(order.id.as_str());
706
707        // Register order so it is "accepted" but with no fills tracked
708        fill_tracker.register(
709            venue_order_id,
710            Quantity::from("100"),
711            OrderSide::Buy,
712            instrument.id(),
713            instrument.size_precision(),
714            instrument.price_precision(),
715        );
716
717        let pending_fills = Mutex::new(FifoCacheMap::default());
718        let pending_order_reports = Mutex::new(FifoCacheMap::default());
719        let mut emitter = test_emitter();
720        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
721        emitter.set_sender(sender);
722
723        let ctx = WsDispatchContext {
724            token_instruments: &token_instruments,
725            fill_tracker: &fill_tracker,
726            pending_fills: &pending_fills,
727            pending_order_reports: &pending_order_reports,
728            emitter: &emitter,
729            account_id: AccountId::from("POLY-001"),
730            clock: nautilus_core::time::get_atomic_clock_realtime(),
731            user_address: "0xtest",
732            user_api_key: "test-key",
733        };
734        let mut state = WsDispatchState::default();
735
736        dispatch_user_message(&UserWsMessage::Order(order), &ctx, &mut state);
737
738        let event = receiver.try_recv().expect("Expected report");
739        match event {
740            ExecutionEvent::Report(report) => match report {
741                ExecutionReport::Order(order_report) => {
742                    assert_eq!(order_report.filled_qty, Quantity::from("0"));
743                }
744                other => panic!("Expected order report, was {other:?}"),
745            },
746            other => panic!("Expected report event, was {other:?}"),
747        }
748    }
749
750    #[rstest]
751    fn test_dispatch_order_matched_uses_tracked_fills_for_filled_qty() {
752        let order: PolymarketUserOrder = load("ws_user_order_matched.json");
753        let instrument = test_instrument();
754
755        let token_instruments = AtomicMap::new();
756        token_instruments.insert(order.asset_id, instrument.clone());
757
758        let fill_tracker = OrderFillTrackerMap::new();
759        let venue_order_id = VenueOrderId::from(order.id.as_str());
760
761        // Register and record a partial fill (50 of 100)
762        fill_tracker.register(
763            venue_order_id,
764            Quantity::from("100"),
765            OrderSide::Buy,
766            instrument.id(),
767            instrument.size_precision(),
768            instrument.price_precision(),
769        );
770        fill_tracker.record_fill(&venue_order_id, 50.0, 0.5, UnixNanos::from(1_000u64));
771
772        let pending_fills = Mutex::new(FifoCacheMap::default());
773        let pending_order_reports = Mutex::new(FifoCacheMap::default());
774        let mut emitter = test_emitter();
775        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
776        emitter.set_sender(sender);
777
778        let ctx = WsDispatchContext {
779            token_instruments: &token_instruments,
780            fill_tracker: &fill_tracker,
781            pending_fills: &pending_fills,
782            pending_order_reports: &pending_order_reports,
783            emitter: &emitter,
784            account_id: AccountId::from("POLY-001"),
785            clock: nautilus_core::time::get_atomic_clock_realtime(),
786            user_address: "0xtest",
787            user_api_key: "test-key",
788        };
789        let mut state = WsDispatchState::default();
790
791        dispatch_user_message(&UserWsMessage::Order(order), &ctx, &mut state);
792
793        let event = receiver.try_recv().expect("Expected report");
794        match event {
795            ExecutionEvent::Report(report) => match report {
796                ExecutionReport::Order(order_report) => {
797                    assert_eq!(order_report.filled_qty, Quantity::from("50"));
798                }
799                other => panic!("Expected order report, was {other:?}"),
800            },
801            other => panic!("Expected report event, was {other:?}"),
802        }
803    }
804
805    #[rstest]
806    fn test_dispatch_order_matched_dust_fill_uses_local_ts_init() {
807        let order: PolymarketUserOrder = load("ws_user_order_matched.json");
808        let instrument = test_instrument();
809
810        let token_instruments = AtomicMap::new();
811        token_instruments.insert(order.asset_id, instrument.clone());
812
813        let fill_tracker = OrderFillTrackerMap::new();
814        let venue_order_id = VenueOrderId::from(order.id.as_str());
815        fill_tracker.register(
816            venue_order_id,
817            Quantity::from("100"),
818            OrderSide::Buy,
819            instrument.id(),
820            instrument.size_precision(),
821            instrument.price_precision(),
822        );
823        fill_tracker.record_fill(&venue_order_id, 99.995, 0.5, UnixNanos::from(1_000u64));
824
825        let pending_fills = Mutex::new(FifoCacheMap::default());
826        let pending_order_reports = Mutex::new(FifoCacheMap::default());
827        let mut emitter = test_emitter();
828        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
829        emitter.set_sender(sender);
830
831        let clock = Box::leak(Box::new(AtomicTime::new(
832            false,
833            UnixNanos::from(2_000_000_000u64),
834        )));
835
836        let ctx = WsDispatchContext {
837            token_instruments: &token_instruments,
838            fill_tracker: &fill_tracker,
839            pending_fills: &pending_fills,
840            pending_order_reports: &pending_order_reports,
841            emitter: &emitter,
842            account_id: AccountId::from("POLY-001"),
843            clock,
844            user_address: "0xtest",
845            user_api_key: "test-key",
846        };
847        let mut state = WsDispatchState::default();
848
849        dispatch_user_message(&UserWsMessage::Order(order), &ctx, &mut state);
850
851        let first = receiver.try_recv().expect("Expected order report");
852        let second = receiver.try_recv().expect("Expected dust fill report");
853
854        match first {
855            ExecutionEvent::Report(ExecutionReport::Order(_)) => {}
856            other => panic!("Expected order report, was {other:?}"),
857        }
858
859        match second {
860            ExecutionEvent::Report(ExecutionReport::Fill(fill_report)) => {
861                assert_eq!(
862                    fill_report.ts_event,
863                    UnixNanos::from(1_703_875_201_000_000_000u64)
864                );
865                assert_eq!(fill_report.ts_init, UnixNanos::from(2_000_000_000u64));
866            }
867            other => panic!("Expected fill report, was {other:?}"),
868        }
869    }
870
871    #[rstest]
872    fn test_cancel_reemitted_after_fill_for_canceled_order() {
873        let cancel_order: PolymarketUserOrder = load("ws_user_order_cancellation.json");
874        let trade: PolymarketUserTrade = load("ws_user_trade.json");
875        let instrument = test_instrument();
876
877        let token_instruments = AtomicMap::new();
878        token_instruments.insert(cancel_order.asset_id, instrument.clone());
879
880        let fill_tracker = OrderFillTrackerMap::new();
881        let venue_order_id = VenueOrderId::from(cancel_order.id.as_str());
882
883        // Register order as accepted with original qty=100
884        fill_tracker.register(
885            venue_order_id,
886            Quantity::from("100"),
887            OrderSide::Buy,
888            instrument.id(),
889            instrument.size_precision(),
890            instrument.price_precision(),
891        );
892
893        let pending_fills = Mutex::new(FifoCacheMap::default());
894        let pending_order_reports = Mutex::new(FifoCacheMap::default());
895        let mut emitter = test_emitter();
896        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
897        emitter.set_sender(sender);
898
899        let ctx = WsDispatchContext {
900            token_instruments: &token_instruments,
901            fill_tracker: &fill_tracker,
902            pending_fills: &pending_fills,
903            pending_order_reports: &pending_order_reports,
904            emitter: &emitter,
905            account_id: AccountId::from("POLY-001"),
906            clock: nautilus_core::time::get_atomic_clock_realtime(),
907            user_address: "0xtest",
908            user_api_key: "test-key",
909        };
910        let mut state = WsDispatchState::default();
911
912        // Step 1: Dispatch cancel (simulates message A from the bug)
913        dispatch_user_message(&UserWsMessage::Order(cancel_order), &ctx, &mut state);
914        let cancel_event = receiver.try_recv().expect("Expected cancel report");
915        match &cancel_event {
916            ExecutionEvent::Report(ExecutionReport::Order(r)) => {
917                assert_eq!(r.order_status, OrderStatus::Canceled);
918            }
919            other => panic!("Expected order report, was {other:?}"),
920        }
921
922        // Step 2: Dispatch trade fill (simulates trade arriving after cancel)
923        dispatch_user_message(&UserWsMessage::Trade(trade), &ctx, &mut state);
924
925        // Should get: fill report, then re-emitted cancel report
926        let fill_event = receiver.try_recv().expect("Expected fill report");
927        match &fill_event {
928            ExecutionEvent::Report(ExecutionReport::Fill(f)) => {
929                assert_eq!(f.venue_order_id, venue_order_id);
930            }
931            other => panic!("Expected fill report, was {other:?}"),
932        }
933
934        let reemitted_cancel = receiver
935            .try_recv()
936            .expect("Expected re-emitted cancel report");
937
938        match &reemitted_cancel {
939            ExecutionEvent::Report(ExecutionReport::Order(r)) => {
940                assert_eq!(r.order_status, OrderStatus::Canceled);
941                assert_eq!(r.venue_order_id, venue_order_id);
942            }
943            other => panic!("Expected order cancel report, was {other:?}"),
944        }
945    }
946
947    #[rstest]
948    fn test_cancel_not_reemitted_when_fill_completes_order() {
949        let cancel_order: PolymarketUserOrder = load("ws_user_order_cancellation.json");
950        let trade: PolymarketUserTrade = load("ws_user_trade.json");
951        let instrument = test_instrument();
952
953        let token_instruments = AtomicMap::new();
954        token_instruments.insert(cancel_order.asset_id, instrument.clone());
955
956        let fill_tracker = OrderFillTrackerMap::new();
957        let venue_order_id = VenueOrderId::from(cancel_order.id.as_str());
958
959        // Register with qty=25 matching the trade size so the fill completes the order
960        fill_tracker.register(
961            venue_order_id,
962            Quantity::from("25"),
963            OrderSide::Buy,
964            instrument.id(),
965            instrument.size_precision(),
966            instrument.price_precision(),
967        );
968
969        let pending_fills = Mutex::new(FifoCacheMap::default());
970        let pending_order_reports = Mutex::new(FifoCacheMap::default());
971        let mut emitter = test_emitter();
972        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
973        emitter.set_sender(sender);
974
975        let ctx = WsDispatchContext {
976            token_instruments: &token_instruments,
977            fill_tracker: &fill_tracker,
978            pending_fills: &pending_fills,
979            pending_order_reports: &pending_order_reports,
980            emitter: &emitter,
981            account_id: AccountId::from("POLY-001"),
982            clock: nautilus_core::time::get_atomic_clock_realtime(),
983            user_address: "0xtest",
984            user_api_key: "test-key",
985        };
986        let mut state = WsDispatchState::default();
987
988        // Cancel then fill that completes the order
989        dispatch_user_message(&UserWsMessage::Order(cancel_order), &ctx, &mut state);
990        let _cancel = receiver.try_recv().expect("Expected cancel report");
991
992        dispatch_user_message(&UserWsMessage::Trade(trade), &ctx, &mut state);
993        let _fill = receiver.try_recv().expect("Expected fill report");
994
995        // Channel should be empty: no re-emitted cancel for a fully-filled order
996        assert!(
997            receiver.try_recv().is_err(),
998            "Should not re-emit cancel when fill completes the order"
999        );
1000    }
1001
1002    #[rstest]
1003    fn test_cancel_saved_before_acceptance() {
1004        let cancel_order: PolymarketUserOrder = load("ws_user_order_cancellation.json");
1005        let instrument = test_instrument();
1006
1007        let token_instruments = AtomicMap::new();
1008        token_instruments.insert(cancel_order.asset_id, instrument);
1009
1010        // Fill tracker has NO registration (simulates HTTP still in-flight)
1011        let fill_tracker = OrderFillTrackerMap::new();
1012        let venue_order_id = VenueOrderId::from(cancel_order.id.as_str());
1013
1014        let pending_fills = Mutex::new(FifoCacheMap::default());
1015        let pending_order_reports = Mutex::new(FifoCacheMap::default());
1016        let emitter = test_emitter();
1017
1018        let ctx = WsDispatchContext {
1019            token_instruments: &token_instruments,
1020            fill_tracker: &fill_tracker,
1021            pending_fills: &pending_fills,
1022            pending_order_reports: &pending_order_reports,
1023            emitter: &emitter,
1024            account_id: AccountId::from("POLY-001"),
1025            clock: nautilus_core::time::get_atomic_clock_realtime(),
1026            user_address: "0xtest",
1027            user_api_key: "test-key",
1028        };
1029        let mut state = WsDispatchState::default();
1030
1031        // Dispatch cancel while order is not yet accepted
1032        dispatch_user_message(&UserWsMessage::Order(cancel_order), &ctx, &mut state);
1033
1034        // Cancel should be buffered (not emitted) AND saved to terminal_cancel_reports
1035        let guard = pending_order_reports.lock().unwrap();
1036        assert!(guard.get(&venue_order_id).is_some());
1037        drop(guard);
1038
1039        assert!(state.terminal_cancel_reports.get(&venue_order_id).is_some());
1040    }
1041
1042    /// Replays the exact 5-message WS sequence from issue #3797.
1043    ///
1044    /// Messages in arrival order:
1045    ///   (A) Order Canceled, size_matched=0
1046    ///   (B) Trade fill 1.219511 (maker side)
1047    ///   (C) Order Canceled, size_matched=1.219511
1048    ///   (D) Order Canceled, size_matched=2.560972 (capped to tracked)
1049    ///   (E) Trade fill 1.341461 (maker side)
1050    ///
1051    /// Without the fix, the order ends in PartiallyFilled after (E).
1052    /// With the fix, a re-emitted cancel after (E) restores Canceled.
1053    #[rstest]
1054    fn test_issue_3797_interleaved_cancel_fill_sequence() {
1055        use crate::common::{
1056            enums::{
1057                PolymarketEventType, PolymarketLiquiditySide, PolymarketOrderSide,
1058                PolymarketOrderStatus, PolymarketOrderType, PolymarketOutcome,
1059                PolymarketTradeStatus,
1060            },
1061            models::PolymarketMakerOrder,
1062        };
1063
1064        let instrument = test_instrument();
1065        let asset_id = instrument.id().symbol.inner();
1066
1067        let order_id =
1068            "0xe743f6c823ecdfa9ddaaf08673b2441d15a38d89e14dcb25b3b70c284be4f6ad".to_string();
1069        let venue_order_id = VenueOrderId::from(order_id.as_str());
1070
1071        let token_instruments = AtomicMap::new();
1072        token_instruments.insert(asset_id, instrument.clone());
1073
1074        let fill_tracker = OrderFillTrackerMap::new();
1075        fill_tracker.register(
1076            venue_order_id,
1077            Quantity::from("20"),
1078            OrderSide::Buy,
1079            instrument.id(),
1080            instrument.size_precision(),
1081            instrument.price_precision(),
1082        );
1083
1084        let pending_fills = Mutex::new(FifoCacheMap::default());
1085        let pending_order_reports = Mutex::new(FifoCacheMap::default());
1086        let mut emitter = test_emitter();
1087        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
1088        emitter.set_sender(sender);
1089
1090        let ctx = WsDispatchContext {
1091            token_instruments: &token_instruments,
1092            fill_tracker: &fill_tracker,
1093            pending_fills: &pending_fills,
1094            pending_order_reports: &pending_order_reports,
1095            emitter: &emitter,
1096            account_id: AccountId::from("POLY-001"),
1097            clock: nautilus_core::time::get_atomic_clock_realtime(),
1098            user_address: "0xabc",
1099            user_api_key: "xxx",
1100        };
1101        let mut state = WsDispatchState::default();
1102
1103        // Helper to build order updates
1104        let make_order =
1105            |size_matched: &str, ts: &str, event_type: PolymarketEventType| PolymarketUserOrder {
1106                asset_id,
1107                associate_trades: None,
1108                created_at: "1775074735".to_string(),
1109                expiration: Some("0".to_string()),
1110                id: order_id.clone(),
1111                maker_address: Ustr::from("0xabc"),
1112                market: Ustr::from("0x4134"),
1113                order_owner: Ustr::from("xxx"),
1114                order_type: PolymarketOrderType::GTC,
1115                original_size: "20".to_string(),
1116                outcome: PolymarketOutcome::yes(),
1117                owner: Ustr::from("xxx"),
1118                price: "0.18".to_string(),
1119                side: PolymarketOrderSide::Buy,
1120                size_matched: size_matched.to_string(),
1121                status: PolymarketOrderStatus::Canceled,
1122                timestamp: ts.to_string(),
1123                event_type,
1124            };
1125
1126        // Helper to build maker trades
1127        let make_trade = |trade_id: &str, matched_amount: f64, ts: &str| PolymarketUserTrade {
1128            asset_id,
1129            bucket_index: 0,
1130            fee_rate_bps: "1000".to_string(),
1131            id: trade_id.to_string(),
1132            last_update: "1775074738".to_string(),
1133            maker_address: Ustr::from("0xother"),
1134            maker_orders: vec![PolymarketMakerOrder {
1135                asset_id,
1136                maker_address: "0xabc".to_string(),
1137                matched_amount: Decimal::from_f64_retain(matched_amount).unwrap_or(Decimal::ZERO),
1138                order_id: order_id.clone(),
1139                outcome: PolymarketOutcome::yes(),
1140                owner: "xxx".to_string(),
1141                price: Decimal::from_f64_retain(0.18).unwrap_or(Decimal::ZERO),
1142                side: None,
1143            }],
1144            market: Ustr::from("0x4134"),
1145            match_time: "1775074735".to_string(),
1146            outcome: PolymarketOutcome::yes(),
1147            owner: Ustr::from("other-owner"),
1148            price: "0.82".to_string(),
1149            side: PolymarketOrderSide::Buy,
1150            size: "1.219511".to_string(),
1151            status: PolymarketTradeStatus::Matched,
1152            taker_order_id: "0xtaker01".to_string(),
1153            timestamp: ts.to_string(),
1154            trade_owner: Ustr::from("other-owner"),
1155            trader_side: PolymarketLiquiditySide::Maker,
1156            event_type: PolymarketEventType::Trade,
1157        };
1158
1159        // (A) Cancel with size_matched=0
1160        let msg_a = make_order("0", "1775074738031", PolymarketEventType::Cancellation);
1161        dispatch_user_message(&UserWsMessage::Order(msg_a), &ctx, &mut state);
1162
1163        let evt = receiver.try_recv().expect("(A) cancel report");
1164        match &evt {
1165            ExecutionEvent::Report(ExecutionReport::Order(r)) => {
1166                assert_eq!(r.order_status, OrderStatus::Canceled);
1167                assert_eq!(r.filled_qty, Quantity::from("0"));
1168            }
1169            other => panic!("(A) expected order report, was {other:?}"),
1170        }
1171
1172        // (B) Trade fill 1.219511
1173        let msg_b = make_trade("trade-b", 1.219511, "1775074738032");
1174        dispatch_user_message(&UserWsMessage::Trade(msg_b), &ctx, &mut state);
1175
1176        let evt = receiver.try_recv().expect("(B) fill report");
1177        match &evt {
1178            ExecutionEvent::Report(ExecutionReport::Fill(f)) => {
1179                assert_eq!(f.venue_order_id, venue_order_id);
1180            }
1181            other => panic!("(B) expected fill report, was {other:?}"),
1182        }
1183        // Re-emitted cancel after fill (B)
1184        let evt = receiver.try_recv().expect("(B) re-emitted cancel");
1185        match &evt {
1186            ExecutionEvent::Report(ExecutionReport::Order(r)) => {
1187                assert_eq!(r.order_status, OrderStatus::Canceled);
1188            }
1189            other => panic!("(B) expected re-emitted cancel, was {other:?}"),
1190        }
1191
1192        // (C) Cancel with size_matched=1.219511
1193        let msg_c = make_order("1.219511", "1775074738034", PolymarketEventType::Update);
1194        dispatch_user_message(&UserWsMessage::Order(msg_c), &ctx, &mut state);
1195
1196        let evt = receiver.try_recv().expect("(C) cancel report");
1197        match &evt {
1198            ExecutionEvent::Report(ExecutionReport::Order(r)) => {
1199                assert_eq!(r.order_status, OrderStatus::Canceled);
1200            }
1201            other => panic!("(C) expected order report, was {other:?}"),
1202        }
1203
1204        // (D) Cancel with size_matched=2.560972 (capped to tracked 1.219511)
1205        let msg_d = make_order("2.560972", "1775074738038", PolymarketEventType::Update);
1206        dispatch_user_message(&UserWsMessage::Order(msg_d), &ctx, &mut state);
1207
1208        let evt = receiver.try_recv().expect("(D) cancel report");
1209        match &evt {
1210            ExecutionEvent::Report(ExecutionReport::Order(r)) => {
1211                assert_eq!(r.order_status, OrderStatus::Canceled);
1212                // filled_qty capped to tracked amount
1213                assert_eq!(r.filled_qty, Quantity::new(1.219511, 6));
1214            }
1215            other => panic!("(D) expected order report, was {other:?}"),
1216        }
1217
1218        // (E) Trade fill 1.341461
1219        let msg_e = make_trade("trade-e", 1.341461, "1775074738036");
1220        dispatch_user_message(&UserWsMessage::Trade(msg_e), &ctx, &mut state);
1221
1222        let evt = receiver.try_recv().expect("(E) fill report");
1223        match &evt {
1224            ExecutionEvent::Report(ExecutionReport::Fill(f)) => {
1225                assert_eq!(f.venue_order_id, venue_order_id);
1226            }
1227            other => panic!("(E) expected fill report, was {other:?}"),
1228        }
1229
1230        // The fix: re-emitted cancel after (E) restores terminal state
1231        let evt = receiver.try_recv().expect("(E) re-emitted cancel");
1232        match &evt {
1233            ExecutionEvent::Report(ExecutionReport::Order(r)) => {
1234                assert_eq!(r.order_status, OrderStatus::Canceled);
1235                assert_eq!(r.venue_order_id, venue_order_id);
1236            }
1237            other => panic!("(E) expected re-emitted cancel, was {other:?}"),
1238        }
1239
1240        // No more events
1241        assert!(
1242            receiver.try_recv().is_err(),
1243            "No further events expected after the sequence"
1244        );
1245    }
1246
1247    #[rstest]
1248    fn test_dispatch_taker_fill_snaps_overfill_to_submitted_qty() {
1249        // Reproduces the V2 market-BUY scenario that motivated the dust-snap
1250        // fix: SDK truncates the registered qty to USDC scale, but the
1251        // on-chain fill comes back at full precision and exceeds submitted
1252        // by microshares. Without the snap the engine rejects as overfill.
1253        use crate::common::enums::{
1254            PolymarketEventType, PolymarketOrderSide, PolymarketOutcome, PolymarketTradeStatus,
1255        };
1256
1257        let instrument = test_instrument();
1258        let asset_id = instrument.id().symbol.inner();
1259        let token_instruments = AtomicMap::new();
1260        token_instruments.insert(asset_id, instrument.clone());
1261
1262        let fill_tracker = OrderFillTrackerMap::new();
1263        let venue_order_id = VenueOrderId::from("0xtaker-overfill");
1264        // Submitted qty truncated to USDC scale.
1265        let submitted = Quantity::new(714.285710, instrument.size_precision());
1266        fill_tracker.register(
1267            venue_order_id,
1268            submitted,
1269            OrderSide::Buy,
1270            instrument.id(),
1271            instrument.size_precision(),
1272            instrument.price_precision(),
1273        );
1274
1275        let pending_fills = Mutex::new(FifoCacheMap::default());
1276        let pending_order_reports = Mutex::new(FifoCacheMap::default());
1277        let mut emitter = test_emitter();
1278        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
1279        emitter.set_sender(sender);
1280
1281        let ctx = WsDispatchContext {
1282            token_instruments: &token_instruments,
1283            fill_tracker: &fill_tracker,
1284            pending_fills: &pending_fills,
1285            pending_order_reports: &pending_order_reports,
1286            emitter: &emitter,
1287            account_id: AccountId::from("POLY-001"),
1288            clock: nautilus_core::time::get_atomic_clock_realtime(),
1289            user_address: "0xtest",
1290            user_api_key: "test-key",
1291        };
1292        let mut state = WsDispatchState::default();
1293
1294        let trade = PolymarketUserTrade {
1295            asset_id,
1296            bucket_index: 0,
1297            fee_rate_bps: "0".to_string(),
1298            id: "trade-overfill".to_string(),
1299            last_update: "1700000001".to_string(),
1300            maker_address: Ustr::from("0xmaker"),
1301            maker_orders: vec![],
1302            market: Ustr::from("0xmarket"),
1303            match_time: "1700000000".to_string(),
1304            outcome: PolymarketOutcome::yes(),
1305            owner: Ustr::from("00000000-0000-0000-0000-000000000001"),
1306            price: "0.014".to_string(),
1307            side: PolymarketOrderSide::Buy,
1308            // Fill exceeds submitted_qty by 4 ulps at size_precision=6,
1309            // matching the production drift observed during smoke tests.
1310            size: "714.285714".to_string(),
1311            status: PolymarketTradeStatus::Matched,
1312            taker_order_id: venue_order_id.as_str().to_string(),
1313            timestamp: "1700000000000".to_string(),
1314            trade_owner: Ustr::from("00000000-0000-0000-0000-000000000001"),
1315            trader_side: PolymarketLiquiditySide::Taker,
1316            event_type: PolymarketEventType::Trade,
1317        };
1318
1319        dispatch_user_message(&UserWsMessage::Trade(trade), &ctx, &mut state);
1320
1321        // The dispatcher must record the snapped quantity in the tracker so
1322        // any subsequent ORDER MATCHED with size_matched > submitted_qty is
1323        // capped to it. record_fill happens before the FillReport is sent.
1324        let cumulative = fill_tracker
1325            .get_cumulative_filled(&venue_order_id)
1326            .expect("order must be registered");
1327        let expected_snapped = submitted.as_f64();
1328        let drift = (cumulative - expected_snapped).abs();
1329        assert!(
1330            drift < 1e-9,
1331            "cumulative_filled {cumulative} must be snapped to submitted {expected_snapped}",
1332        );
1333
1334        // The emitted FillReport must carry the snapped qty so the engine
1335        // does not reject it as an overfill.
1336        let event = receiver.try_recv().expect("expected a fill report");
1337        match event {
1338            ExecutionEvent::Report(ExecutionReport::Fill(report)) => {
1339                assert_eq!(
1340                    report.last_qty, submitted,
1341                    "fill report qty must be snapped to submitted",
1342                );
1343                assert_eq!(report.venue_order_id, venue_order_id);
1344            }
1345            other => panic!("expected fill report, was {other:?}"),
1346        }
1347    }
1348}