Skip to main content

nautilus_polymarket/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Polymarket adapter.
17
18pub mod order_builder;
19pub(crate) mod order_fill_tracker;
20pub mod parse;
21pub(crate) mod reconciliation;
22pub(crate) mod submitter;
23pub(crate) mod types;
24
25use std::{
26    sync::{
27        Arc, Mutex,
28        atomic::{AtomicBool, Ordering},
29    },
30    time::{Duration, Instant},
31};
32
33use ahash::AHashSet;
34use anyhow::Context;
35use async_trait::async_trait;
36use nautilus_common::{
37    cache::fifo::FifoCacheMap,
38    clients::ExecutionClient,
39    live::{runner::get_exec_event_sender, runtime::get_runtime},
40    messages::execution::{
41        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
42        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
43        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
44    },
45};
46use nautilus_core::{
47    MUTEX_POISONED, UUID4, UnixNanos,
48    collections::AtomicMap,
49    time::{AtomicTime, get_atomic_clock_realtime},
50};
51use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
52use nautilus_model::{
53    accounts::AccountAny,
54    enums::{AccountType, LiquiditySide, OmsType, OrderSide, OrderStatus, OrderType, TimeInForce},
55    events::{OrderEventAny, OrderUpdated},
56    identifiers::{
57        AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue, VenueOrderId,
58    },
59    instruments::{Instrument, InstrumentAny},
60    orders::{Order, OrderAny},
61    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
62    types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
63};
64use nautilus_network::retry::RetryConfig;
65use rust_decimal::Decimal;
66use tokio::task::JoinHandle;
67use ustr::Ustr;
68
69use self::{
70    order_builder::PolymarketOrderBuilder,
71    order_fill_tracker::OrderFillTrackerMap,
72    parse::{
73        compute_commission, instrument_fee_exponent, instrument_taker_fee, parse_balance_allowance,
74        parse_order_status_report,
75    },
76    reconciliation::{
77        FillContext, apply_fill_filters, build_fill_reports_from_trades, build_position_reports,
78    },
79    submitter::{MarketBuyFeeContext, OrderSubmitter},
80    types::{BatchLimitOrderContext, CancelOutcome, LimitOrderSubmitRequest},
81};
82use crate::{
83    common::{
84        consts::{BATCH_ORDER_LIMIT, POLYMARKET_VENUE},
85        credential::Secrets,
86        enums::SignatureType,
87    },
88    config::PolymarketExecClientConfig,
89    http::{
90        clob::PolymarketClobHttpClient,
91        data_api::PolymarketDataApiHttpClient,
92        query::{CancelResponse, GetBalanceAllowanceParams, GetTradesParams, OrderResponse},
93    },
94    signing::eip712::OrderSigner,
95    websocket::{
96        client::PolymarketWebSocketClient,
97        dispatch::{WsDispatchContext, WsDispatchState, dispatch_user_message},
98        messages::PolymarketWsMessage,
99    },
100};
101
102/// Live execution client for the Polymarket prediction market.
103#[derive(Debug)]
104pub struct PolymarketExecutionClient {
105    core: ExecutionClientCore,
106    clock: &'static AtomicTime,
107    config: PolymarketExecClientConfig,
108    emitter: ExecutionEventEmitter,
109    http_client: PolymarketClobHttpClient,
110    data_api_client: PolymarketDataApiHttpClient,
111    submitter: OrderSubmitter,
112    ws_client: PolymarketWebSocketClient,
113    secrets: Secrets,
114    pending_tasks: Arc<Mutex<Vec<JoinHandle<()>>>>,
115    stopping: Arc<AtomicBool>,
116    ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
117    shared_token_instruments: Arc<AtomicMap<Ustr, InstrumentAny>>,
118    neg_risk_index: Arc<AtomicMap<InstrumentId, bool>>,
119    fill_tracker: Arc<OrderFillTrackerMap>,
120    pending_fills: Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<FillReport>, 1_000>>>,
121    pending_order_reports: Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<OrderStatusReport>, 1_000>>>,
122    pending_cancels: Arc<Mutex<AHashSet<ClientOrderId>>>,
123}
124
125impl PolymarketExecutionClient {
126    /// Creates a new [`PolymarketExecutionClient`].
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if credentials cannot be resolved or clients fail to construct.
131    pub fn new(
132        core: ExecutionClientCore,
133        config: PolymarketExecClientConfig,
134    ) -> anyhow::Result<Self> {
135        let secrets = Secrets::resolve(
136            config.private_key.as_deref(),
137            config.api_key.clone(),
138            config.api_secret.clone(),
139            config.passphrase.clone(),
140            config.funder.clone(),
141        )
142        .context("failed to resolve Polymarket credentials")?;
143
144        let http_client = PolymarketClobHttpClient::new(
145            secrets.credential.clone(),
146            secrets.address.clone(),
147            config.base_url_http.clone(),
148            config.http_timeout_secs,
149        )
150        .map_err(|e| anyhow::anyhow!("{e}"))
151        .context("failed to create CLOB HTTP client")?;
152
153        let data_api_client =
154            PolymarketDataApiHttpClient::new(Some(config.data_api_url()), config.http_timeout_secs)
155                .map_err(|e| anyhow::anyhow!("{e}"))
156                .context("failed to create Data API HTTP client")?;
157
158        let order_signer =
159            OrderSigner::new(&secrets.private_key).context("failed to create order signer")?;
160
161        let signer_address = secrets.address.clone();
162        let maker_address = secrets
163            .funder
164            .clone()
165            .unwrap_or_else(|| signer_address.clone());
166        let order_builder = Arc::new(PolymarketOrderBuilder::new(
167            order_signer,
168            signer_address,
169            maker_address,
170            config.signature_type,
171        ));
172
173        let retry_config = RetryConfig {
174            max_retries: config.max_retries,
175            initial_delay_ms: config.retry_delay_initial_ms,
176            max_delay_ms: config.retry_delay_max_ms,
177            backoff_factor: 2.0,
178            jitter_ms: 1_000,
179            operation_timeout_ms: Some(config.http_timeout_secs * 1_000),
180            immediate_first: false,
181            max_elapsed_ms: Some(180_000),
182        };
183        let submitter = OrderSubmitter::new(http_client.clone(), order_builder, retry_config);
184
185        let ws_client = PolymarketWebSocketClient::new_user(
186            config.base_url_ws.clone(),
187            secrets.credential.clone(),
188            config.transport_backend,
189        );
190
191        let clock = get_atomic_clock_realtime();
192        let pusd = get_pusd_currency();
193        let emitter = ExecutionEventEmitter::new(
194            clock,
195            core.trader_id,
196            core.account_id,
197            AccountType::Cash,
198            Some(pusd),
199        );
200
201        Ok(Self {
202            core,
203            clock,
204            config,
205            emitter,
206            http_client,
207            data_api_client,
208            submitter,
209            ws_client,
210            secrets,
211            pending_tasks: Arc::new(Mutex::new(Vec::new())),
212            stopping: Arc::new(AtomicBool::new(false)),
213            ws_stream_handle: Mutex::new(None),
214            shared_token_instruments: Arc::new(AtomicMap::new()),
215            neg_risk_index: Arc::new(AtomicMap::new()),
216            fill_tracker: Arc::new(OrderFillTrackerMap::new()),
217            pending_fills: Arc::new(Mutex::new(FifoCacheMap::default())),
218            pending_order_reports: Arc::new(Mutex::new(FifoCacheMap::default())),
219            pending_cancels: Arc::new(Mutex::new(AHashSet::new())),
220        })
221    }
222
223    fn spawn_task<F>(&self, description: &'static str, fut: F)
224    where
225        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
226    {
227        let runtime = get_runtime();
228        let handle = runtime.spawn(async move {
229            if let Err(e) = fut.await {
230                log::warn!("{description} failed: {e:?}");
231            }
232        });
233
234        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
235        tasks.retain(|handle| !handle.is_finished());
236        tasks.push(handle);
237    }
238
239    fn abort_pending_tasks(&self) {
240        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
241        for handle in tasks.drain(..) {
242            handle.abort();
243        }
244    }
245
246    async fn refresh_account_state(&self) -> anyhow::Result<()> {
247        fetch_and_emit_account_state(
248            &self.http_client,
249            &self.emitter,
250            self.clock,
251            self.config.signature_type,
252        )
253        .await
254    }
255
256    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
257        let account_id = self.core.account_id;
258
259        if self.core.cache().account(&account_id).is_some() {
260            log::info!("Account {account_id} registered");
261            return Ok(());
262        }
263
264        let start = Instant::now();
265        let timeout = Duration::from_secs_f64(timeout_secs);
266        let interval = Duration::from_millis(10);
267
268        loop {
269            tokio::time::sleep(interval).await;
270
271            if self.core.cache().account(&account_id).is_some() {
272                log::info!("Account {account_id} registered");
273                return Ok(());
274            }
275
276            if start.elapsed() >= timeout {
277                anyhow::bail!(
278                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
279                );
280            }
281        }
282    }
283
284    async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
285        self.ws_client
286            .connect()
287            .await
288            .context("failed to connect user WebSocket")?;
289
290        self.ws_client
291            .subscribe_user()
292            .await
293            .context("failed to subscribe to user channel")?;
294
295        let mut rx = self
296            .ws_client
297            .take_message_receiver()
298            .ok_or_else(|| anyhow::anyhow!("WebSocket message receiver not available"))?;
299
300        let emitter = self.emitter.clone();
301        let token_instruments = self.shared_token_instruments.clone();
302        let account_id = self.core.account_id;
303        let http_client = self.http_client.clone();
304        let clock = self.clock;
305        let signature_type = self.config.signature_type;
306        let user_address = self
307            .secrets
308            .funder
309            .clone()
310            .unwrap_or_else(|| self.secrets.address.clone());
311        let user_api_key = self.secrets.credential.api_key().to_string();
312
313        let fill_tracker = self.fill_tracker.clone();
314        let pending_fills = self.pending_fills.clone();
315        let pending_order_reports = self.pending_order_reports.clone();
316
317        let handle = get_runtime().spawn(async move {
318            let mut state = WsDispatchState::default();
319            let ctx = WsDispatchContext {
320                token_instruments: &token_instruments,
321                fill_tracker: &fill_tracker,
322                pending_fills: &pending_fills,
323                pending_order_reports: &pending_order_reports,
324                emitter: &emitter,
325                account_id,
326                clock,
327                user_address: &user_address,
328                user_api_key: &user_api_key,
329            };
330
331            loop {
332                match rx.recv().await {
333                    Some(PolymarketWsMessage::User(user_msg)) => {
334                        if let Some(_refresh) =
335                            dispatch_user_message(&user_msg, &ctx, &mut state)
336                        {
337                            let http = http_client.clone();
338                            let emit = emitter.clone();
339
340                            get_runtime().spawn(async move {
341                                match fetch_and_emit_account_state(
342                                    &http, &emit, clock, signature_type,
343                                )
344                                .await
345                                {
346                                    Ok(()) => log::info!(
347                                        "Account state refreshed after finalized trade for {account_id}"
348                                    ),
349                                    Err(e) => log::warn!(
350                                        "Failed to refresh account after finalized trade: {e}"
351                                    ),
352                                }
353                            });
354                        }
355                    }
356                    Some(PolymarketWsMessage::Market(_)) => {}
357                    Some(PolymarketWsMessage::Reconnected) => {
358                        log::info!("User WebSocket reconnected");
359                    }
360                    None => {
361                        log::debug!("User WebSocket stream ended");
362                        break;
363                    }
364                }
365            }
366
367            log::debug!("User WebSocket handler task completed");
368        });
369
370        *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
371        Ok(())
372    }
373
374    fn get_neg_risk(&self, instrument_id: &InstrumentId) -> bool {
375        self.neg_risk_index
376            .get_cloned(instrument_id)
377            .unwrap_or(false)
378    }
379
380    fn load_instruments_from_cache(&self) {
381        let cache = self.core.cache();
382        let instruments: Vec<InstrumentAny> = cache
383            .instruments(&self.core.venue, None)
384            .into_iter()
385            .cloned()
386            .collect();
387        drop(cache);
388
389        // Populate shared AtomicMap for WS handler and reconciliation
390        for inst in &instruments {
391            self.shared_token_instruments
392                .insert(Ustr::from(inst.raw_symbol().as_str()), inst.clone());
393        }
394
395        // Build neg_risk_index
396        for inst in &instruments {
397            if let InstrumentAny::BinaryOption(bo) = inst {
398                let neg_risk = bo
399                    .info
400                    .as_ref()
401                    .and_then(|i| i.get_bool("neg_risk"))
402                    .unwrap_or(false);
403                self.neg_risk_index.insert(bo.id, neg_risk);
404            }
405        }
406
407        log::info!("Loaded {} instruments from cache", instruments.len());
408    }
409
410    fn submit_limit_order(&self, order: OrderAny) {
411        if let Err(reason) = PolymarketOrderBuilder::validate_limit_order(&order) {
412            self.emitter.emit_order_denied(&order, &reason);
413            return;
414        }
415
416        let instrument = match self.resolve_instrument(&order) {
417            Some(i) => i,
418            None => return,
419        };
420
421        let neg_risk = self.get_neg_risk(&order.instrument_id());
422        let token_id = instrument.raw_symbol().to_string();
423        let tick_decimals = instrument.price_precision() as u32;
424        let price = order.price().unwrap(); // validated above
425        let quantity = order.quantity();
426        let tif = order.time_in_force();
427        let post_only = order.is_post_only();
428        let side = order.order_side();
429        let expire_time = order.expire_time();
430
431        self.emitter.emit_order_submitted(&order);
432
433        let submitter = self.submitter.clone();
434        let emitter = self.emitter.clone();
435        let clock = self.clock;
436        let fill_tracker = self.fill_tracker.clone();
437        let pending_fills = self.pending_fills.clone();
438        let pending_order_reports = self.pending_order_reports.clone();
439        let pending_cancels = self.pending_cancels.clone();
440        let account_id = self.core.account_id;
441        let size_precision = instrument.size_precision();
442        let price_precision = instrument.price_precision();
443
444        self.spawn_task("submit_limit_order", async move {
445            match submitter
446                .submit_limit_order(
447                    &token_id,
448                    side,
449                    price,
450                    quantity,
451                    tif,
452                    post_only,
453                    neg_risk,
454                    expire_time,
455                    tick_decimals,
456                )
457                .await
458            {
459                Ok(response) => {
460                    if let Some((order_id_str, venue_order_id)) = handle_order_response(
461                        Ok(response),
462                        &order,
463                        &emitter,
464                        clock,
465                        &fill_tracker,
466                        &pending_fills,
467                        &pending_order_reports,
468                        &pending_cancels,
469                        account_id,
470                        size_precision,
471                        price_precision,
472                    ) {
473                        execute_deferred_cancel(
474                            &submitter,
475                            &order,
476                            &order_id_str,
477                            venue_order_id,
478                            &emitter,
479                            clock,
480                        )
481                        .await;
482                    }
483                }
484                Err(e) => {
485                    let ts_now = clock.get_time_ns();
486                    emitter.emit_order_rejected(&order, &format!("{e}"), ts_now, false);
487                }
488            }
489            Ok(())
490        });
491    }
492
493    fn submit_market_order(&self, order: OrderAny) {
494        if let Err(reason) = PolymarketOrderBuilder::validate_market_order(&order) {
495            self.emitter.emit_order_denied(&order, &reason);
496            return;
497        }
498
499        let instrument = match self.resolve_instrument(&order) {
500            Some(i) => i,
501            None => return,
502        };
503
504        let neg_risk = self.get_neg_risk(&order.instrument_id());
505        let token_id = instrument.raw_symbol().to_string();
506        let tick_decimals = instrument.price_precision() as u32;
507        let side = order.order_side();
508        let amount = order.quantity();
509        let is_quote_qty = order.is_quote_quantity();
510
511        // Quote-quantity BUYs are sized in pUSD; the venue computes taker
512        // fees against `amount + fees`, so we shrink the spend to fit the
513        // user's collateral balance before signing. SELL orders are sized
514        // in shares and skip this step.
515        let needs_fee_adjustment = side == OrderSide::Buy && is_quote_qty;
516        let fee_rate = if needs_fee_adjustment {
517            instrument_taker_fee(&instrument)
518        } else {
519            Decimal::ZERO
520        };
521        let fee_exponent = if needs_fee_adjustment {
522            instrument_fee_exponent(&instrument)
523        } else {
524            1.0
525        };
526
527        let submitter = self.submitter.clone();
528        let http_client = self.http_client.clone();
529        let signature_type = self.config.signature_type;
530        let emitter = self.emitter.clone();
531        let clock = self.clock;
532        let fill_tracker = self.fill_tracker.clone();
533        let pending_fills = self.pending_fills.clone();
534        let pending_order_reports = self.pending_order_reports.clone();
535        let pending_cancels = self.pending_cancels.clone();
536        let account_id = self.core.account_id;
537        let size_precision = instrument.size_precision();
538        let price_precision = instrument.price_precision();
539
540        self.spawn_task("submit_market_order", async move {
541            let fee_context = if needs_fee_adjustment {
542                match fetch_collateral_balance_pusd(&http_client, signature_type).await {
543                    Ok(balance) => Some(MarketBuyFeeContext {
544                        user_pusd_balance: balance,
545                        fee_rate,
546                        fee_exponent,
547                        builder_taker_fee_rate: Decimal::ZERO,
548                    }),
549                    Err(e) => {
550                        emitter.emit_order_rejected(
551                            &order,
552                            &format!("Failed to fetch pUSD balance for fee adjustment: {e}"),
553                            clock.get_time_ns(),
554                            false,
555                        );
556                        return Ok(());
557                    }
558                }
559            } else {
560                None
561            };
562
563            match submitter
564                .submit_market_order(
565                    &token_id,
566                    side,
567                    amount,
568                    neg_risk,
569                    tick_decimals,
570                    fee_context,
571                )
572                .await
573            {
574                Ok((response, expected_base_qty)) => {
575                    let mut order = order;
576                    emitter.emit_order_submitted(&order);
577
578                    // Convert quote quantity to base only after successful submission
579                    if response.success
580                        && is_quote_qty
581                        && side == OrderSide::Buy
582                        && !expected_base_qty.is_zero()
583                        && let Ok(base_qty) =
584                            Quantity::from_decimal_dp(expected_base_qty, size_precision)
585                    {
586                        log::info!(
587                            "Converted {} quote quantity {} to base quantity {} \
588                             (from signed taker_amount)",
589                            order.instrument_id(),
590                            amount,
591                            base_qty,
592                        );
593
594                        let ts_now = clock.get_time_ns();
595                        let updated = OrderUpdated::new(
596                            order.trader_id(),
597                            order.strategy_id(),
598                            order.instrument_id(),
599                            order.client_order_id(),
600                            base_qty,
601                            UUID4::new(),
602                            ts_now,
603                            ts_now,
604                            false,
605                            order.venue_order_id(),
606                            order.account_id(),
607                            order.price(),
608                            None,
609                            None,
610                            false, // is_quote_quantity
611                        );
612
613                        let event = OrderEventAny::Updated(updated);
614                        emitter.send_order_event(event.clone());
615
616                        if let Err(e) = order.apply(event) {
617                            log::error!("Failed to apply quote-to-base OrderUpdated: {e}");
618                        }
619                    }
620
621                    let fok_order_id = response
622                        .order_id
623                        .as_ref()
624                        .filter(|_| response.success)
625                        .cloned();
626
627                    if let Some((order_id_str, venue_order_id)) = handle_order_response(
628                        Ok(response),
629                        &order,
630                        &emitter,
631                        clock,
632                        &fill_tracker,
633                        &pending_fills,
634                        &pending_order_reports,
635                        &pending_cancels,
636                        account_id,
637                        size_precision,
638                        price_precision,
639                    ) {
640                        execute_deferred_cancel(
641                            &submitter,
642                            &order,
643                            &order_id_str,
644                            venue_order_id,
645                            &emitter,
646                            clock,
647                        )
648                        .await;
649                    }
650
651                    if let Some(order_id) = fok_order_id {
652                        check_fok_status(
653                            &submitter,
654                            &order_id,
655                            &fill_tracker,
656                            &emitter,
657                            account_id,
658                            order.instrument_id(),
659                            order.order_side(),
660                            size_precision,
661                            price_precision,
662                            clock,
663                        )
664                        .await;
665                    }
666                }
667                Err(e) => {
668                    let ts_now = clock.get_time_ns();
669                    emitter.emit_order_rejected(&order, &format!("{e}"), ts_now, false);
670                }
671            }
672            Ok(())
673        });
674    }
675
676    fn resolve_instrument(&self, order: &OrderAny) -> Option<InstrumentAny> {
677        let instrument = self
678            .core
679            .cache()
680            .instrument(&order.instrument_id())
681            .cloned();
682
683        match instrument {
684            Some(i) => Some(i),
685            None => {
686                self.emitter.emit_order_denied(
687                    order,
688                    &format!("Instrument not found: {}", order.instrument_id()),
689                );
690                None
691            }
692        }
693    }
694
695    fn fill_context(&self) -> FillContext<'_> {
696        let user_address = self
697            .secrets
698            .funder
699            .as_deref()
700            .unwrap_or(&self.secrets.address);
701        FillContext {
702            account_id: self.core.account_id,
703            user_address,
704            api_key: self.secrets.credential.api_key().as_str(),
705            pusd: get_pusd_currency(),
706            clock: self.clock,
707        }
708    }
709}
710
711#[async_trait(?Send)]
712impl ExecutionClient for PolymarketExecutionClient {
713    fn is_connected(&self) -> bool {
714        self.core.is_connected()
715    }
716
717    fn client_id(&self) -> ClientId {
718        self.core.client_id
719    }
720
721    fn account_id(&self) -> AccountId {
722        self.core.account_id
723    }
724
725    fn venue(&self) -> Venue {
726        *POLYMARKET_VENUE
727    }
728
729    fn oms_type(&self) -> OmsType {
730        OmsType::Netting
731    }
732
733    fn get_account(&self) -> Option<AccountAny> {
734        self.core.cache().account(&self.core.account_id).cloned()
735    }
736
737    fn generate_account_state(
738        &self,
739        balances: Vec<AccountBalance>,
740        margins: Vec<MarginBalance>,
741        reported: bool,
742        ts_event: UnixNanos,
743    ) -> anyhow::Result<()> {
744        self.emitter
745            .emit_account_state(balances, margins, reported, ts_event);
746        Ok(())
747    }
748
749    fn start(&mut self) -> anyhow::Result<()> {
750        if self.core.is_started() {
751            return Ok(());
752        }
753
754        self.stopping.store(false, Ordering::Release);
755        let sender = get_exec_event_sender();
756        self.emitter.set_sender(sender);
757        self.core.set_started();
758
759        log::info!(
760            "Started: client_id={}, account_id={}",
761            self.core.client_id,
762            self.core.account_id,
763        );
764
765        Ok(())
766    }
767
768    fn stop(&mut self) -> anyhow::Result<()> {
769        if self.core.is_stopped() {
770            return Ok(());
771        }
772
773        log::info!("Stopping Polymarket execution client");
774
775        // Block new background work from being spawned before we drain.
776        self.stopping.store(true, Ordering::Release);
777
778        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
779            handle.abort();
780        }
781
782        self.abort_pending_tasks();
783        self.ws_client.abort();
784
785        self.core.set_disconnected();
786        self.core.set_stopped();
787
788        log::info!("Polymarket execution client stopped");
789        Ok(())
790    }
791
792    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
793        let order = self
794            .core
795            .cache()
796            .order(&cmd.client_order_id)
797            .cloned()
798            .ok_or_else(|| {
799                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
800            })?;
801
802        if order.is_closed() {
803            log::warn!("Cannot submit closed order {}", order.client_order_id());
804            return Ok(());
805        }
806
807        match order.order_type() {
808            OrderType::Limit => self.submit_limit_order(order),
809            OrderType::Market => self.submit_market_order(order),
810            _ => {
811                self.emitter.emit_order_denied(
812                    &order,
813                    &format!(
814                        "Unsupported order type for Polymarket: {:?}",
815                        order.order_type()
816                    ),
817                );
818            }
819        }
820        Ok(())
821    }
822
823    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
824        let mut batch_orders = Vec::with_capacity(cmd.order_inits.len());
825
826        for order_init in &cmd.order_inits {
827            let Some(order) = self
828                .core
829                .cache()
830                .order(&order_init.client_order_id)
831                .cloned()
832            else {
833                log::warn!(
834                    "Order not found in cache for {}",
835                    order_init.client_order_id
836                );
837                continue;
838            };
839
840            if order.is_closed() {
841                log::warn!("Cannot submit closed order {}", order.client_order_id());
842                continue;
843            }
844
845            // Market orders cannot go through the /orders batch endpoint; route them
846            // through the single-order path which synthesizes a crossing limit order.
847            match order.order_type() {
848                OrderType::Limit => {}
849                OrderType::Market => {
850                    self.submit_market_order(order);
851                    continue;
852                }
853                other => {
854                    self.emitter.emit_order_denied(
855                        &order,
856                        &format!("Unsupported order type for Polymarket: {other:?}"),
857                    );
858                    continue;
859                }
860            }
861
862            if let Err(reason) = PolymarketOrderBuilder::validate_limit_order(&order) {
863                self.emitter.emit_order_denied(&order, &reason);
864                continue;
865            }
866
867            let instrument = match self.resolve_instrument(&order) {
868                Some(i) => i,
869                None => continue,
870            };
871
872            let price = order
873                .price()
874                .expect("validated limit order must have a price");
875            batch_orders.push(BatchLimitOrderContext {
876                request: LimitOrderSubmitRequest {
877                    token_id: instrument.raw_symbol().to_string(),
878                    side: order.order_side(),
879                    price,
880                    quantity: order.quantity(),
881                    time_in_force: order.time_in_force(),
882                    post_only: order.is_post_only(),
883                    neg_risk: self.get_neg_risk(&order.instrument_id()),
884                    expire_time: order.expire_time(),
885                    tick_decimals: instrument.price_precision() as u32,
886                },
887                size_precision: instrument.size_precision(),
888                price_precision: instrument.price_precision(),
889                order,
890            });
891        }
892
893        if batch_orders.is_empty() {
894            return Ok(());
895        }
896
897        if batch_orders.len() == 1 {
898            // Route through the single-order path to preserve retry semantics;
899            // the batch endpoint deliberately disables retry due to missing idempotency keys.
900            let batch_order = batch_orders.pop().expect("len checked");
901            self.submit_limit_order(batch_order.order);
902            return Ok(());
903        }
904
905        let submitter = self.submitter.clone();
906        let emitter = self.emitter.clone();
907        let clock = self.clock;
908        let fill_tracker = self.fill_tracker.clone();
909        let pending_fills = self.pending_fills.clone();
910        let pending_order_reports = self.pending_order_reports.clone();
911        let pending_cancels = self.pending_cancels.clone();
912        let pending_tasks = self.pending_tasks.clone();
913        let stopping = self.stopping.clone();
914        let account_id = self.core.account_id;
915
916        self.spawn_task("submit_order_list", async move {
917            for batch_order in &batch_orders {
918                emitter.emit_order_submitted(&batch_order.order);
919            }
920
921            let requests: Vec<LimitOrderSubmitRequest> =
922                batch_orders.iter().map(|bo| bo.request.clone()).collect();
923            let prepare_results = submitter.prepare_limit_order_submissions(&requests).await;
924
925            let mut prepared_orders = Vec::with_capacity(batch_orders.len());
926            let mut submissions = Vec::with_capacity(batch_orders.len());
927
928            for (batch_order, result) in batch_orders.into_iter().zip(prepare_results) {
929                match result {
930                    Ok(submission) => {
931                        prepared_orders.push(batch_order);
932                        submissions.push(submission);
933                    }
934                    Err(e) => {
935                        reject_submit_order(
936                            &batch_order.order,
937                            &format!("{e}"),
938                            &emitter,
939                            clock,
940                            &pending_cancels,
941                        );
942                    }
943                }
944            }
945
946            if submissions.is_empty() {
947                return Ok(());
948            }
949
950            // Chunk into venue-sized batches; POST /orders caps at BATCH_ORDER_LIMIT orders.
951            // A remainder chunk of size 1 goes through the single-order path so it keeps
952            // the same retry semantics as a list of length 1.
953            let total = submissions.len();
954            let mut offset = 0;
955            while offset < total {
956                let end = (offset + BATCH_ORDER_LIMIT).min(total);
957                let mut submissions_chunk = submissions[offset..end].to_vec();
958                let mut orders_chunk = prepared_orders[offset..end].to_vec();
959
960                if submissions_chunk.len() == 1 {
961                    let submission = submissions_chunk.pop().expect("len 1");
962                    let batch_order = orders_chunk.pop().expect("len 1");
963                    handle_single_order_response(
964                        submitter.post_limit_order_submission(submission).await,
965                        batch_order,
966                        &submitter,
967                        &emitter,
968                        clock,
969                        &fill_tracker,
970                        &pending_fills,
971                        &pending_order_reports,
972                        &pending_cancels,
973                        account_id,
974                    )
975                    .await;
976                } else {
977                    match submitter
978                        .post_limit_order_submissions(submissions_chunk)
979                        .await
980                    {
981                        Ok(responses) => {
982                            handle_batch_order_responses(
983                                responses,
984                                orders_chunk,
985                                &submitter,
986                                &emitter,
987                                clock,
988                                &fill_tracker,
989                                &pending_fills,
990                                &pending_order_reports,
991                                &pending_cancels,
992                                &pending_tasks,
993                                &stopping,
994                                account_id,
995                            )
996                            .await;
997                        }
998                        Err(e) => {
999                            for batch_order in orders_chunk {
1000                                reject_submit_order(
1001                                    &batch_order.order,
1002                                    &format!("{e}"),
1003                                    &emitter,
1004                                    clock,
1005                                    &pending_cancels,
1006                                );
1007                            }
1008                        }
1009                    }
1010                }
1011
1012                offset = end;
1013            }
1014
1015            Ok(())
1016        });
1017
1018        Ok(())
1019    }
1020
1021    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1022        let order = self.core.cache().order(&cmd.client_order_id).cloned();
1023        if let Some(order) = order {
1024            let venue_order_id = order.venue_order_id();
1025            let ts_now = self.clock.get_time_ns();
1026            self.emitter.emit_order_modify_rejected(
1027                &order,
1028                venue_order_id,
1029                "Order modification not supported on Polymarket",
1030                ts_now,
1031            );
1032        }
1033        Ok(())
1034    }
1035
1036    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1037        let order = self.core.cache().order(&cmd.client_order_id).cloned();
1038        let order_ref = match &order {
1039            Some(o) => o,
1040            None => {
1041                log::warn!(
1042                    "Order not found in cache for cancel: {}",
1043                    cmd.client_order_id
1044                );
1045                return Ok(());
1046            }
1047        };
1048
1049        if !order_ref.is_open() {
1050            log::warn!(
1051                "Cannot cancel order that is not open: {}",
1052                cmd.client_order_id
1053            );
1054            let ts_now = self.clock.get_time_ns();
1055            self.emitter.emit_order_cancel_rejected(
1056                order_ref,
1057                order_ref.venue_order_id(),
1058                &format!("Order is not open (status: {:?})", order_ref.status()),
1059                ts_now,
1060            );
1061            return Ok(());
1062        }
1063
1064        let venue_order_id = match order_ref.venue_order_id() {
1065            Some(id) => id,
1066            None => {
1067                // Check cache index: submit may have cached it before OrderAccepted was applied
1068                match self
1069                    .core
1070                    .cache()
1071                    .venue_order_id(&cmd.client_order_id)
1072                    .copied()
1073                {
1074                    Some(id) => id,
1075                    None => {
1076                        log::info!(
1077                            "Cancel for {} deferred, venue_order_id not yet available",
1078                            cmd.client_order_id
1079                        );
1080                        self.pending_cancels
1081                            .lock()
1082                            .expect(MUTEX_POISONED)
1083                            .insert(cmd.client_order_id);
1084                        return Ok(());
1085                    }
1086                }
1087            }
1088        };
1089
1090        let order_id_str = venue_order_id.to_string();
1091        let submitter = self.submitter.clone();
1092        let emitter = self.emitter.clone();
1093        let clock = self.clock;
1094        let order_clone = order.unwrap();
1095
1096        self.spawn_task("cancel_order", async move {
1097            match submitter.cancel_order(&order_id_str).await {
1098                Ok(response) => {
1099                    process_cancel_result(
1100                        &response,
1101                        &order_id_str,
1102                        &order_clone,
1103                        venue_order_id,
1104                        &emitter,
1105                        clock,
1106                    );
1107                }
1108                Err(e) => {
1109                    let ts_now = clock.get_time_ns();
1110                    emitter.emit_order_cancel_rejected(
1111                        &order_clone,
1112                        Some(venue_order_id),
1113                        &format!("HTTP request failed: {e}"),
1114                        ts_now,
1115                    );
1116                }
1117            }
1118            Ok(())
1119        });
1120
1121        Ok(())
1122    }
1123
1124    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1125        let cache = self.core.cache();
1126        let open_orders = cache.orders_open(
1127            Some(&self.core.venue),
1128            Some(&cmd.instrument_id),
1129            Some(&cmd.strategy_id),
1130            None,
1131            Some(cmd.order_side),
1132        );
1133
1134        if open_orders.is_empty() {
1135            log::debug!("No open orders to cancel for {}", cmd.instrument_id);
1136            return Ok(());
1137        }
1138
1139        let venue_order_ids: Vec<String> = open_orders
1140            .iter()
1141            .filter_map(|o| o.venue_order_id().map(|v| v.to_string()))
1142            .collect();
1143
1144        if venue_order_ids.is_empty() {
1145            log::warn!("No venue order IDs found for cancel all");
1146            return Ok(());
1147        }
1148
1149        let submitter = self.submitter.clone();
1150        let emitter = self.emitter.clone();
1151        let clock = self.clock;
1152        let orders: Vec<OrderAny> = open_orders.into_iter().cloned().collect();
1153
1154        self.spawn_task("cancel_all_orders", async move {
1155            let order_id_refs: Vec<&str> = venue_order_ids.iter().map(String::as_str).collect();
1156            let response = submitter
1157                .cancel_orders(&order_id_refs)
1158                .await
1159                .context("failed to cancel all orders")?;
1160
1161            for order in &orders {
1162                if let Some(vid) = order.venue_order_id() {
1163                    let vid_str = vid.to_string();
1164                    process_cancel_result(&response, &vid_str, order, vid, &emitter, clock);
1165                }
1166            }
1167
1168            log::info!("Canceled {} orders", response.canceled.len());
1169            Ok(())
1170        });
1171
1172        Ok(())
1173    }
1174
1175    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1176        if cmd.cancels.is_empty() {
1177            return Ok(());
1178        }
1179
1180        let mut venue_to_order: Vec<(String, OrderAny)> = Vec::new();
1181
1182        for c in &cmd.cancels {
1183            if let Some(order) = self.core.cache().order(&c.client_order_id)
1184                && let Some(vid) = order.venue_order_id()
1185            {
1186                venue_to_order.push((vid.to_string(), order.clone()));
1187            }
1188        }
1189
1190        if venue_to_order.is_empty() {
1191            log::warn!("No venue order IDs found for batch cancel");
1192            return Ok(());
1193        }
1194
1195        let order_ids: Vec<String> = venue_to_order.iter().map(|(id, _)| id.clone()).collect();
1196        let submitter = self.submitter.clone();
1197        let emitter = self.emitter.clone();
1198        let clock = self.clock;
1199
1200        self.spawn_task("batch_cancel_orders", async move {
1201            let order_id_refs: Vec<&str> = order_ids.iter().map(String::as_str).collect();
1202            let response = submitter
1203                .cancel_orders(&order_id_refs)
1204                .await
1205                .context("failed to batch cancel orders")?;
1206
1207            for (venue_id_str, order) in &venue_to_order {
1208                let vid = VenueOrderId::from(venue_id_str.as_str());
1209                process_cancel_result(&response, venue_id_str, order, vid, &emitter, clock);
1210            }
1211
1212            log::info!("Batch canceled {} orders", response.canceled.len());
1213            Ok(())
1214        });
1215
1216        Ok(())
1217    }
1218
1219    fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
1220        let http_client = self.http_client.clone();
1221        let emitter = self.emitter.clone();
1222        let clock = self.clock;
1223        let signature_type = self.config.signature_type;
1224
1225        self.spawn_task("query_account", async move {
1226            fetch_and_emit_account_state(&http_client, &emitter, clock, signature_type).await
1227        });
1228        Ok(())
1229    }
1230
1231    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
1232        log::debug!("Querying order: client_order_id={}", cmd.client_order_id);
1233
1234        let venue_order_id = match &cmd.venue_order_id {
1235            Some(id) => id.to_string(),
1236            None => {
1237                log::warn!("query_order requires venue_order_id for Polymarket");
1238                return Ok(());
1239            }
1240        };
1241
1242        let instrument_id = cmd.instrument_id;
1243        let client_order_id = cmd.client_order_id;
1244        let account_id = self.core.account_id;
1245        let cache = self.core.cache();
1246
1247        let (price_prec, size_prec) = match cache.instrument(&instrument_id) {
1248            Some(i) => (i.price_precision(), i.size_precision()),
1249            None => (4, 6),
1250        };
1251
1252        let http_client = self.http_client.clone();
1253        let emitter = self.emitter.clone();
1254        let clock = self.clock;
1255
1256        self.spawn_task("query_order", async move {
1257            match http_client.get_order_optional(&venue_order_id).await {
1258                Ok(Some(order)) => {
1259                    let report = parse_order_status_report(
1260                        &order,
1261                        instrument_id,
1262                        account_id,
1263                        Some(client_order_id),
1264                        price_prec,
1265                        size_prec,
1266                        clock.get_time_ns(),
1267                    );
1268                    emitter.send_order_status_report(report);
1269                }
1270                Ok(None) => {
1271                    log::warn!("Order {venue_order_id} not found (empty response)");
1272                }
1273                Err(e) => {
1274                    log::warn!("Failed to query order {venue_order_id}: {e}");
1275                }
1276            }
1277            Ok(())
1278        });
1279
1280        Ok(())
1281    }
1282
1283    fn register_external_order(
1284        &self,
1285        _client_order_id: ClientOrderId,
1286        _venue_order_id: VenueOrderId,
1287        _instrument_id: InstrumentId,
1288        _strategy_id: StrategyId,
1289        _ts_init: UnixNanos,
1290    ) {
1291    }
1292
1293    fn on_instrument(&mut self, instrument: InstrumentAny) {
1294        let token_id = Ustr::from(instrument.raw_symbol().as_str());
1295        if let InstrumentAny::BinaryOption(bo) = &instrument {
1296            let neg_risk = bo
1297                .info
1298                .as_ref()
1299                .and_then(|i| i.get_bool("neg_risk"))
1300                .unwrap_or(false);
1301            self.neg_risk_index.insert(bo.id, neg_risk);
1302        }
1303        self.shared_token_instruments.insert(token_id, instrument);
1304    }
1305
1306    fn calculate_commission(
1307        &self,
1308        instrument: &InstrumentAny,
1309        last_qty: Quantity,
1310        last_px: Price,
1311        liquidity_side: LiquiditySide,
1312    ) -> Option<Money> {
1313        let fee_rate = instrument_taker_fee(instrument);
1314        let commission = compute_commission(
1315            fee_rate,
1316            last_qty.as_decimal(),
1317            last_px.as_decimal(),
1318            liquidity_side,
1319        );
1320
1321        Some(Money::new(commission, instrument.quote_currency()))
1322    }
1323
1324    async fn connect(&mut self) -> anyhow::Result<()> {
1325        if self.core.is_connected() {
1326            return Ok(());
1327        }
1328
1329        log::info!("Connecting Polymarket execution client");
1330
1331        self.stopping.store(false, Ordering::Release);
1332
1333        // Read instruments from global cache (populated by data client)
1334        self.load_instruments_from_cache();
1335        self.core.set_instruments_initialized();
1336
1337        self.start_ws_stream().await?;
1338
1339        let post_ws = async {
1340            self.refresh_account_state().await?;
1341            self.await_account_registered(30.0).await?;
1342            Ok::<(), anyhow::Error>(())
1343        };
1344
1345        if let Err(e) = post_ws.await {
1346            log::warn!("Connect failed after WS started, tearing down: {e}");
1347            self.stopping.store(true, Ordering::Release);
1348            let _ = self.ws_client.disconnect().await;
1349            self.abort_pending_tasks();
1350            return Err(e);
1351        }
1352
1353        self.core.set_connected();
1354
1355        log::info!("Connected: client_id={}", self.core.client_id);
1356        Ok(())
1357    }
1358
1359    async fn disconnect(&mut self) -> anyhow::Result<()> {
1360        if self.core.is_disconnected() {
1361            return Ok(());
1362        }
1363
1364        log::info!("Disconnecting Polymarket execution client");
1365
1366        // Block new background work from being spawned before we drain.
1367        self.stopping.store(true, Ordering::Release);
1368
1369        self.ws_client.disconnect().await?;
1370
1371        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
1372            handle.abort();
1373        }
1374
1375        self.abort_pending_tasks();
1376        self.core.set_disconnected();
1377
1378        log::info!("Disconnected: client_id={}", self.core.client_id);
1379        Ok(())
1380    }
1381
1382    async fn generate_order_status_report(
1383        &self,
1384        cmd: &GenerateOrderStatusReport,
1385    ) -> anyhow::Result<Option<OrderStatusReport>> {
1386        let venue_order_id = match &cmd.venue_order_id {
1387            Some(id) => id.to_string(),
1388            None => {
1389                log::warn!("generate_order_status_report requires venue_order_id");
1390                return Ok(None);
1391            }
1392        };
1393
1394        let instrument_id = match cmd.instrument_id {
1395            Some(id) => id,
1396            None => {
1397                log::warn!("generate_order_status_report requires instrument_id");
1398                return Ok(None);
1399            }
1400        };
1401
1402        let order = match self
1403            .http_client
1404            .get_order_optional(&venue_order_id)
1405            .await
1406            .context("failed to fetch order")?
1407        {
1408            Some(o) => o,
1409            None => {
1410                log::info!("Order {venue_order_id} not found (empty response)");
1411                return Ok(None);
1412            }
1413        };
1414
1415        let instrument = self.core.cache().instrument(&instrument_id).cloned();
1416        let (price_prec, size_prec) = match &instrument {
1417            Some(i) => (i.price_precision(), i.size_precision()),
1418            None => (4, 6),
1419        };
1420
1421        let report = parse_order_status_report(
1422            &order,
1423            instrument_id,
1424            self.core.account_id,
1425            cmd.client_order_id,
1426            price_prec,
1427            size_prec,
1428            self.clock.get_time_ns(),
1429        );
1430
1431        Ok(Some(report))
1432    }
1433
1434    async fn generate_order_status_reports(
1435        &self,
1436        cmd: &GenerateOrderStatusReports,
1437    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1438        let params = crate::http::query::GetOrdersParams::default();
1439        let orders = self
1440            .http_client
1441            .get_orders(params)
1442            .await
1443            .context("failed to fetch orders")?;
1444
1445        let (reports, _) = reconciliation::build_order_reports_from_orders(
1446            &orders,
1447            &self.shared_token_instruments,
1448            self.core.account_id,
1449            cmd.instrument_id,
1450            self.clock.get_time_ns(),
1451        );
1452
1453        let reports = if cmd.open_only {
1454            reports
1455                .into_iter()
1456                .filter(|r| r.order_status.is_open())
1457                .collect()
1458        } else {
1459            reports
1460        };
1461
1462        log::info!("Generated {} order status reports", reports.len());
1463        Ok(reports)
1464    }
1465
1466    async fn generate_fill_reports(
1467        &self,
1468        cmd: GenerateFillReports,
1469    ) -> anyhow::Result<Vec<FillReport>> {
1470        let trades = self
1471            .http_client
1472            .get_trades(GetTradesParams::default())
1473            .await
1474            .context("failed to fetch trades")?;
1475
1476        let ctx = self.fill_context();
1477        let (reports, _) = build_fill_reports_from_trades(
1478            &trades,
1479            &ctx,
1480            &self.shared_token_instruments,
1481            cmd.instrument_id,
1482            self.clock.get_time_ns(),
1483        );
1484
1485        let reports = apply_fill_filters(reports, cmd.venue_order_id, cmd.start, cmd.end);
1486
1487        log::info!("Generated {} fill reports", reports.len());
1488        Ok(reports)
1489    }
1490
1491    async fn generate_position_status_reports(
1492        &self,
1493        cmd: &GeneratePositionStatusReports,
1494    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1495        let ctx = self.fill_context();
1496        let positions = self
1497            .data_api_client
1498            .get_positions(ctx.user_address)
1499            .await
1500            .context("failed to fetch positions from Data API")?;
1501
1502        let ts_now = self.clock.get_time_ns();
1503        let mut reports = build_position_reports(&positions, self.core.account_id, ts_now);
1504
1505        if let Some(ref filter_id) = cmd.instrument_id {
1506            reports.retain(|r| &r.instrument_id == filter_id);
1507        }
1508
1509        log::info!("Generated {} position status reports", reports.len());
1510        Ok(reports)
1511    }
1512
1513    async fn generate_mass_status(
1514        &self,
1515        lookback_mins: Option<u64>,
1516    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1517        let ctx = self.fill_context();
1518        reconciliation::generate_mass_status(
1519            &self.http_client,
1520            &self.data_api_client,
1521            &self.shared_token_instruments,
1522            &ctx,
1523            self.core.client_id,
1524            self.core.venue,
1525            lookback_mins,
1526        )
1527        .await
1528    }
1529}
1530
1531fn process_cancel_result(
1532    response: &CancelResponse,
1533    venue_order_id_str: &str,
1534    order: &OrderAny,
1535    venue_order_id: VenueOrderId,
1536    emitter: &ExecutionEventEmitter,
1537    clock: &'static AtomicTime,
1538) {
1539    if let Some(reason_opt) = response.not_canceled.get(venue_order_id_str) {
1540        let reason = reason_opt.as_deref().unwrap_or("unknown reason");
1541        match CancelOutcome::classify(reason) {
1542            CancelOutcome::AlreadyDone => {
1543                log::info!(
1544                    "Cancel rejected for {}: {reason} - awaiting WS for terminal state",
1545                    order.client_order_id()
1546                );
1547            }
1548            CancelOutcome::Rejected(msg) => {
1549                let ts_now = clock.get_time_ns();
1550                emitter.emit_order_cancel_rejected(order, Some(venue_order_id), &msg, ts_now);
1551            }
1552        }
1553    }
1554}
1555
1556#[expect(clippy::too_many_arguments)]
1557async fn handle_batch_order_responses(
1558    responses: Vec<OrderResponse>,
1559    batch_orders: Vec<BatchLimitOrderContext>,
1560    submitter: &OrderSubmitter,
1561    emitter: &ExecutionEventEmitter,
1562    clock: &'static AtomicTime,
1563    fill_tracker: &Arc<OrderFillTrackerMap>,
1564    pending_fills: &Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<FillReport>, 1_000>>>,
1565    pending_order_reports: &Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<OrderStatusReport>, 1_000>>>,
1566    pending_cancels: &Arc<Mutex<AHashSet<ClientOrderId>>>,
1567    pending_tasks: &Arc<Mutex<Vec<JoinHandle<()>>>>,
1568    stopping: &Arc<AtomicBool>,
1569    account_id: AccountId,
1570) {
1571    let response_len = responses.len();
1572    let order_len = batch_orders.len();
1573
1574    if response_len != order_len {
1575        log::warn!(
1576            "Batch submit response length ({response_len}) does not match order count ({order_len})"
1577        );
1578    }
1579
1580    // Polymarket batch responses do not include a client-side correlation key.
1581    // We map entries by submission order and rely on the API preserving array order.
1582    // Reference: https://docs.polymarket.com/#create-and-place-multiple-orders
1583    let mut deferred = Vec::new();
1584
1585    for (batch_order, response) in batch_orders.iter().zip(responses) {
1586        if let Some((order_id_str, venue_order_id)) = handle_order_response(
1587            Ok(response),
1588            &batch_order.order,
1589            emitter,
1590            clock,
1591            fill_tracker,
1592            pending_fills,
1593            pending_order_reports,
1594            pending_cancels,
1595            account_id,
1596            batch_order.size_precision,
1597            batch_order.price_precision,
1598        ) {
1599            deferred.push((batch_order.order.clone(), order_id_str, venue_order_id));
1600        }
1601    }
1602
1603    if order_len > response_len {
1604        for batch_order in batch_orders.iter().skip(response_len) {
1605            reject_submit_order(
1606                &batch_order.order,
1607                "Order not included in API response",
1608                emitter,
1609                clock,
1610                pending_cancels,
1611            );
1612        }
1613    }
1614
1615    // Spawn deferred cancels as independent tasks so retrying cancels cannot stall
1616    // terminal-event emission or delay posting subsequent chunks. Handles are tracked
1617    // in pending_tasks so client shutdown aborts them like any other background work.
1618    // Holding the pending_tasks lock across the spawn loop (and the stopping check)
1619    // closes the race with stop(): abort_pending_tasks() blocks on the same lock,
1620    // so either all new handles are enqueued before the drain runs, or stopping has
1621    // already been observed and no new handles are spawned.
1622    if !deferred.is_empty() {
1623        let mut tasks = pending_tasks.lock().expect(MUTEX_POISONED);
1624
1625        if stopping.load(Ordering::Acquire) {
1626            return;
1627        }
1628        tasks.retain(|handle| !handle.is_finished());
1629
1630        for (order, order_id_str, venue_order_id) in deferred {
1631            let submitter = submitter.clone();
1632            let emitter = emitter.clone();
1633
1634            let handle = get_runtime().spawn(async move {
1635                execute_deferred_cancel(
1636                    &submitter,
1637                    &order,
1638                    &order_id_str,
1639                    venue_order_id,
1640                    &emitter,
1641                    clock,
1642                )
1643                .await;
1644            });
1645            tasks.push(handle);
1646        }
1647    }
1648}
1649
1650fn reject_submit_order(
1651    order: &OrderAny,
1652    reason: &str,
1653    emitter: &ExecutionEventEmitter,
1654    clock: &'static AtomicTime,
1655    pending_cancels: &Arc<Mutex<AHashSet<ClientOrderId>>>,
1656) {
1657    let ts_now = clock.get_time_ns();
1658    emitter.emit_order_rejected(order, reason, ts_now, false);
1659    pending_cancels
1660        .lock()
1661        .expect(MUTEX_POISONED)
1662        .remove(&order.client_order_id());
1663}
1664
1665#[expect(clippy::too_many_arguments)]
1666async fn handle_single_order_response(
1667    result: anyhow::Result<OrderResponse>,
1668    batch_order: BatchLimitOrderContext,
1669    submitter: &OrderSubmitter,
1670    emitter: &ExecutionEventEmitter,
1671    clock: &'static AtomicTime,
1672    fill_tracker: &Arc<OrderFillTrackerMap>,
1673    pending_fills: &Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<FillReport>, 1_000>>>,
1674    pending_order_reports: &Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<OrderStatusReport>, 1_000>>>,
1675    pending_cancels: &Arc<Mutex<AHashSet<ClientOrderId>>>,
1676    account_id: AccountId,
1677) {
1678    match result {
1679        Ok(response) => {
1680            if let Some((order_id_str, venue_order_id)) = handle_order_response(
1681                Ok(response),
1682                &batch_order.order,
1683                emitter,
1684                clock,
1685                fill_tracker,
1686                pending_fills,
1687                pending_order_reports,
1688                pending_cancels,
1689                account_id,
1690                batch_order.size_precision,
1691                batch_order.price_precision,
1692            ) {
1693                execute_deferred_cancel(
1694                    submitter,
1695                    &batch_order.order,
1696                    &order_id_str,
1697                    venue_order_id,
1698                    emitter,
1699                    clock,
1700                )
1701                .await;
1702            }
1703        }
1704        Err(e) => {
1705            reject_submit_order(
1706                &batch_order.order,
1707                &format!("{e}"),
1708                emitter,
1709                clock,
1710                pending_cancels,
1711            );
1712        }
1713    }
1714}
1715
1716#[expect(clippy::too_many_arguments)]
1717fn handle_order_response(
1718    result: crate::http::error::Result<OrderResponse>,
1719    order: &OrderAny,
1720    emitter: &ExecutionEventEmitter,
1721    clock: &'static AtomicTime,
1722    fill_tracker: &Arc<OrderFillTrackerMap>,
1723    pending_fills: &Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<FillReport>, 1_000>>>,
1724    pending_order_reports: &Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<OrderStatusReport>, 1_000>>>,
1725    pending_cancels: &Arc<Mutex<AHashSet<ClientOrderId>>>,
1726    account_id: AccountId,
1727    size_precision: u8,
1728    price_precision: u8,
1729) -> Option<(String, VenueOrderId)> {
1730    match result {
1731        Ok(response) => {
1732            if response.success {
1733                if let Some(order_id) = response.order_id {
1734                    let venue_order_id = VenueOrderId::from(order_id.as_str());
1735                    let ts_now = clock.get_time_ns();
1736                    emitter.emit_order_accepted(order, venue_order_id, ts_now);
1737
1738                    // Register order in fill tracker for dust detection
1739                    fill_tracker.register(
1740                        venue_order_id,
1741                        order.quantity(),
1742                        order.order_side(),
1743                        order.instrument_id(),
1744                        size_precision,
1745                        price_precision,
1746                    );
1747
1748                    // Drain any fills buffered during the HTTP round-trip,
1749                    // snapping dust fills and recording in tracker
1750                    if let Some(buffered) = pending_fills
1751                        .lock()
1752                        .expect(MUTEX_POISONED)
1753                        .remove(&venue_order_id)
1754                    {
1755                        for mut fill in buffered {
1756                            fill.last_qty =
1757                                fill_tracker.snap_fill_qty(&venue_order_id, fill.last_qty);
1758                            fill_tracker.record_fill(
1759                                &venue_order_id,
1760                                fill.last_qty.as_f64(),
1761                                fill.last_px.as_f64(),
1762                                fill.ts_event,
1763                            );
1764                            emitter.send_fill_report(fill);
1765                        }
1766                    }
1767
1768                    // Drain any order reports buffered during the HTTP round-trip
1769                    if let Some(buffered) = pending_order_reports
1770                        .lock()
1771                        .expect(MUTEX_POISONED)
1772                        .remove(&venue_order_id)
1773                    {
1774                        let mut has_filled = false;
1775
1776                        for report in &buffered {
1777                            if report.order_status == OrderStatus::Filled {
1778                                has_filled = true;
1779                            }
1780                        }
1781
1782                        // Cap filled_qty to tracked fills to prevent
1783                        // duplicate inferred fills from the race with trades
1784                        let tracked_filled = fill_tracker
1785                            .get_cumulative_filled(&venue_order_id)
1786                            .unwrap_or(0.0);
1787                        let tracked_qty = Quantity::new(tracked_filled, size_precision);
1788
1789                        for mut report in buffered {
1790                            if report.filled_qty > tracked_qty {
1791                                log::debug!(
1792                                    "Capping buffered filled_qty for {venue_order_id} \
1793                                     from {} to {} (awaiting trade messages)",
1794                                    report.filled_qty,
1795                                    tracked_qty,
1796                                );
1797                                report.filled_qty = tracked_qty;
1798                            }
1799                            emitter.send_order_status_report(report);
1800                        }
1801
1802                        // If a MATCHED (Filled) status was buffered, check for dust residual
1803                        if has_filled {
1804                            let fallback_px = order.price().map_or(0.0, |p| p.as_f64());
1805                            if let Some(dust_fill) = fill_tracker.check_dust_and_build_fill(
1806                                &venue_order_id,
1807                                account_id,
1808                                &order_id,
1809                                fallback_px,
1810                                get_pusd_currency(),
1811                                ts_now,
1812                                ts_now,
1813                            ) {
1814                                emitter.send_fill_report(dust_fill);
1815                            }
1816                        }
1817                    }
1818
1819                    // Check if cancel was requested during the HTTP round-trip
1820                    if pending_cancels
1821                        .lock()
1822                        .expect(MUTEX_POISONED)
1823                        .remove(&order.client_order_id())
1824                    {
1825                        log::info!(
1826                            "Order {} has pending cancel, issuing deferred cancel for {}",
1827                            order.client_order_id(),
1828                            venue_order_id
1829                        );
1830                        return Some((order_id, venue_order_id));
1831                    }
1832                } else {
1833                    log::warn!(
1834                        "Order accepted but no order_id returned for {}",
1835                        order.client_order_id()
1836                    );
1837                }
1838            } else {
1839                let reason = response
1840                    .error_msg
1841                    .unwrap_or_else(|| "unknown error".to_string());
1842                let ts_now = clock.get_time_ns();
1843                emitter.emit_order_rejected(order, &reason, ts_now, false);
1844                pending_cancels
1845                    .lock()
1846                    .expect(MUTEX_POISONED)
1847                    .remove(&order.client_order_id());
1848            }
1849        }
1850        Err(e) => {
1851            let ts_now = clock.get_time_ns();
1852            emitter.emit_order_rejected(order, &format!("HTTP request failed: {e}"), ts_now, false);
1853            pending_cancels
1854                .lock()
1855                .expect(MUTEX_POISONED)
1856                .remove(&order.client_order_id());
1857        }
1858    }
1859    None
1860}
1861
1862async fn execute_deferred_cancel(
1863    submitter: &OrderSubmitter,
1864    order: &OrderAny,
1865    order_id_str: &str,
1866    venue_order_id: VenueOrderId,
1867    emitter: &ExecutionEventEmitter,
1868    clock: &'static AtomicTime,
1869) {
1870    match submitter.cancel_order(order_id_str).await {
1871        Ok(response) => {
1872            process_cancel_result(
1873                &response,
1874                order_id_str,
1875                order,
1876                venue_order_id,
1877                emitter,
1878                clock,
1879            );
1880        }
1881        Err(e) => {
1882            let ts_now = clock.get_time_ns();
1883            emitter.emit_order_cancel_rejected(
1884                order,
1885                Some(venue_order_id),
1886                &format!("Deferred cancel failed: {e}"),
1887                ts_now,
1888            );
1889        }
1890    }
1891}
1892
1893/// Deferred FOK status check.
1894///
1895/// Waits 5 seconds then queries the CLOB REST API for the order status.
1896/// If the order has reached a terminal state that the WS stream missed
1897/// (e.g. UNMATCHED for an unfilled FOK), emits an order status report
1898/// so the engine can reconcile it.
1899#[expect(clippy::too_many_arguments)]
1900async fn check_fok_status(
1901    submitter: &OrderSubmitter,
1902    order_id: &str,
1903    fill_tracker: &Arc<OrderFillTrackerMap>,
1904    emitter: &ExecutionEventEmitter,
1905    account_id: AccountId,
1906    instrument_id: InstrumentId,
1907    order_side: OrderSide,
1908    size_precision: u8,
1909    price_precision: u8,
1910    clock: &'static AtomicTime,
1911) {
1912    const FOK_CHECK_DELAY: Duration = Duration::from_secs(5);
1913
1914    tokio::time::sleep(FOK_CHECK_DELAY).await;
1915
1916    let venue_order_id = VenueOrderId::from(order_id);
1917    if fill_tracker.has_fills_or_settled(&venue_order_id) {
1918        return;
1919    }
1920
1921    log::info!("FOK order {order_id} unresolved after 5s, checking REST status");
1922
1923    let venue_order = match submitter.get_order(order_id).await {
1924        Ok(Some(o)) => o,
1925        Ok(None) => {
1926            log::info!("FOK order {order_id} not found (empty response), WS will reconcile");
1927            return;
1928        }
1929        Err(e) => {
1930            log::warn!("FOK status check failed for {order_id}: {e}");
1931            return;
1932        }
1933    };
1934
1935    let order_status = OrderStatus::from(venue_order.status);
1936
1937    if !matches!(
1938        order_status,
1939        OrderStatus::Rejected | OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Filled
1940    ) {
1941        return;
1942    }
1943
1944    let quantity = Quantity::new(
1945        venue_order
1946            .original_size
1947            .to_string()
1948            .parse::<f64>()
1949            .unwrap_or(0.0),
1950        size_precision,
1951    );
1952    let filled_qty = Quantity::new(
1953        venue_order
1954            .size_matched
1955            .to_string()
1956            .parse::<f64>()
1957            .unwrap_or(0.0),
1958        size_precision,
1959    );
1960    let price = Price::new(
1961        venue_order.price.to_string().parse::<f64>().unwrap_or(0.0),
1962        price_precision,
1963    );
1964
1965    let ts_now = clock.get_time_ns();
1966    let mut report = OrderStatusReport::new(
1967        account_id,
1968        instrument_id,
1969        None,
1970        venue_order_id,
1971        order_side,
1972        OrderType::Limit,
1973        TimeInForce::Ioc,
1974        order_status,
1975        quantity,
1976        filled_qty,
1977        ts_now,
1978        ts_now,
1979        ts_now,
1980        None,
1981    );
1982    report.price = Some(price);
1983
1984    log::info!("FOK order {order_id} resolved via REST as {order_status:?}");
1985
1986    emitter.send_order_status_report(report);
1987}
1988
1989pub fn get_pusd_currency() -> Currency {
1990    Currency::pUSD()
1991}
1992
1993async fn fetch_and_emit_account_state(
1994    http_client: &PolymarketClobHttpClient,
1995    emitter: &ExecutionEventEmitter,
1996    clock: &'static AtomicTime,
1997    signature_type: SignatureType,
1998) -> anyhow::Result<()> {
1999    use anyhow::Context;
2000
2001    let params = GetBalanceAllowanceParams {
2002        asset_type: Some(crate::http::query::AssetType::Collateral),
2003        signature_type: Some(signature_type),
2004        ..Default::default()
2005    };
2006
2007    let balance_allowance = http_client
2008        .get_balance_allowance(params)
2009        .await
2010        .context("failed to fetch balance allowance")?;
2011
2012    let pusd = get_pusd_currency();
2013    let account_balance = parse_balance_allowance(balance_allowance.balance, pusd)
2014        .context("failed to parse balance allowance")?;
2015
2016    let ts_event = clock.get_time_ns();
2017    log::info!(
2018        "Account state updated: balance={} pUSD",
2019        account_balance.total
2020    );
2021    emitter.emit_account_state(vec![account_balance], vec![], true, ts_event);
2022    Ok(())
2023}
2024
2025/// Fetches the user's pUSD collateral balance as a `Decimal`. Mirrors
2026/// [`fetch_and_emit_account_state`] but returns the value directly so the
2027/// market-BUY fee-adjustment path can size against a fresh balance.
2028async fn fetch_collateral_balance_pusd(
2029    http_client: &PolymarketClobHttpClient,
2030    signature_type: SignatureType,
2031) -> anyhow::Result<Decimal> {
2032    use anyhow::Context;
2033
2034    let params = GetBalanceAllowanceParams {
2035        asset_type: Some(crate::http::query::AssetType::Collateral),
2036        signature_type: Some(signature_type),
2037        ..Default::default()
2038    };
2039
2040    let balance_allowance = http_client
2041        .get_balance_allowance(params)
2042        .await
2043        .context("failed to fetch balance allowance")?;
2044
2045    // The API returns balances as integer micro-pUSD (e.g. `20000000` = 20 pUSD).
2046    let usdc_scale = Decimal::from(1_000_000u32);
2047    Ok(balance_allowance.balance / usdc_scale)
2048}