1pub mod order_builder;
19pub(crate) mod order_fill_tracker;
20pub mod parse;
21pub(crate) mod reconciliation;
22pub(crate) mod submitter;
23pub(crate) mod types;
24
25use std::{
26 sync::{
27 Arc, Mutex,
28 atomic::{AtomicBool, Ordering},
29 },
30 time::{Duration, Instant},
31};
32
33use ahash::AHashSet;
34use anyhow::Context;
35use async_trait::async_trait;
36use nautilus_common::{
37 cache::fifo::FifoCacheMap,
38 clients::ExecutionClient,
39 live::{runner::get_exec_event_sender, runtime::get_runtime},
40 messages::execution::{
41 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
42 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
43 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
44 },
45};
46use nautilus_core::{
47 MUTEX_POISONED, UUID4, UnixNanos,
48 collections::AtomicMap,
49 time::{AtomicTime, get_atomic_clock_realtime},
50};
51use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
52use nautilus_model::{
53 accounts::AccountAny,
54 enums::{AccountType, LiquiditySide, OmsType, OrderSide, OrderStatus, OrderType, TimeInForce},
55 events::{OrderEventAny, OrderUpdated},
56 identifiers::{
57 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue, VenueOrderId,
58 },
59 instruments::{Instrument, InstrumentAny},
60 orders::{Order, OrderAny},
61 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
62 types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
63};
64use nautilus_network::retry::RetryConfig;
65use rust_decimal::Decimal;
66use tokio::task::JoinHandle;
67use ustr::Ustr;
68
69use self::{
70 order_builder::PolymarketOrderBuilder,
71 order_fill_tracker::OrderFillTrackerMap,
72 parse::{
73 compute_commission, instrument_fee_exponent, instrument_taker_fee, parse_balance_allowance,
74 parse_order_status_report,
75 },
76 reconciliation::{
77 FillContext, apply_fill_filters, build_fill_reports_from_trades, build_position_reports,
78 },
79 submitter::{MarketBuyFeeContext, OrderSubmitter},
80 types::{BatchLimitOrderContext, CancelOutcome, LimitOrderSubmitRequest},
81};
82use crate::{
83 common::{
84 consts::{BATCH_ORDER_LIMIT, POLYMARKET_VENUE},
85 credential::Secrets,
86 enums::SignatureType,
87 },
88 config::PolymarketExecClientConfig,
89 http::{
90 clob::PolymarketClobHttpClient,
91 data_api::PolymarketDataApiHttpClient,
92 query::{CancelResponse, GetBalanceAllowanceParams, GetTradesParams, OrderResponse},
93 },
94 signing::eip712::OrderSigner,
95 websocket::{
96 client::PolymarketWebSocketClient,
97 dispatch::{WsDispatchContext, WsDispatchState, dispatch_user_message},
98 messages::PolymarketWsMessage,
99 },
100};
101
102#[derive(Debug)]
104pub struct PolymarketExecutionClient {
105 core: ExecutionClientCore,
106 clock: &'static AtomicTime,
107 config: PolymarketExecClientConfig,
108 emitter: ExecutionEventEmitter,
109 http_client: PolymarketClobHttpClient,
110 data_api_client: PolymarketDataApiHttpClient,
111 submitter: OrderSubmitter,
112 ws_client: PolymarketWebSocketClient,
113 secrets: Secrets,
114 pending_tasks: Arc<Mutex<Vec<JoinHandle<()>>>>,
115 stopping: Arc<AtomicBool>,
116 ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
117 shared_token_instruments: Arc<AtomicMap<Ustr, InstrumentAny>>,
118 neg_risk_index: Arc<AtomicMap<InstrumentId, bool>>,
119 fill_tracker: Arc<OrderFillTrackerMap>,
120 pending_fills: Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<FillReport>, 1_000>>>,
121 pending_order_reports: Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<OrderStatusReport>, 1_000>>>,
122 pending_cancels: Arc<Mutex<AHashSet<ClientOrderId>>>,
123}
124
125impl PolymarketExecutionClient {
126 pub fn new(
132 core: ExecutionClientCore,
133 config: PolymarketExecClientConfig,
134 ) -> anyhow::Result<Self> {
135 let secrets = Secrets::resolve(
136 config.private_key.as_deref(),
137 config.api_key.clone(),
138 config.api_secret.clone(),
139 config.passphrase.clone(),
140 config.funder.clone(),
141 )
142 .context("failed to resolve Polymarket credentials")?;
143
144 let http_client = PolymarketClobHttpClient::new(
145 secrets.credential.clone(),
146 secrets.address.clone(),
147 config.base_url_http.clone(),
148 config.http_timeout_secs,
149 )
150 .map_err(|e| anyhow::anyhow!("{e}"))
151 .context("failed to create CLOB HTTP client")?;
152
153 let data_api_client =
154 PolymarketDataApiHttpClient::new(Some(config.data_api_url()), config.http_timeout_secs)
155 .map_err(|e| anyhow::anyhow!("{e}"))
156 .context("failed to create Data API HTTP client")?;
157
158 let order_signer =
159 OrderSigner::new(&secrets.private_key).context("failed to create order signer")?;
160
161 let signer_address = secrets.address.clone();
162 let maker_address = secrets
163 .funder
164 .clone()
165 .unwrap_or_else(|| signer_address.clone());
166 let order_builder = Arc::new(PolymarketOrderBuilder::new(
167 order_signer,
168 signer_address,
169 maker_address,
170 config.signature_type,
171 ));
172
173 let retry_config = RetryConfig {
174 max_retries: config.max_retries,
175 initial_delay_ms: config.retry_delay_initial_ms,
176 max_delay_ms: config.retry_delay_max_ms,
177 backoff_factor: 2.0,
178 jitter_ms: 1_000,
179 operation_timeout_ms: Some(config.http_timeout_secs * 1_000),
180 immediate_first: false,
181 max_elapsed_ms: Some(180_000),
182 };
183 let submitter = OrderSubmitter::new(http_client.clone(), order_builder, retry_config);
184
185 let ws_client = PolymarketWebSocketClient::new_user(
186 config.base_url_ws.clone(),
187 secrets.credential.clone(),
188 config.transport_backend,
189 );
190
191 let clock = get_atomic_clock_realtime();
192 let pusd = get_pusd_currency();
193 let emitter = ExecutionEventEmitter::new(
194 clock,
195 core.trader_id,
196 core.account_id,
197 AccountType::Cash,
198 Some(pusd),
199 );
200
201 Ok(Self {
202 core,
203 clock,
204 config,
205 emitter,
206 http_client,
207 data_api_client,
208 submitter,
209 ws_client,
210 secrets,
211 pending_tasks: Arc::new(Mutex::new(Vec::new())),
212 stopping: Arc::new(AtomicBool::new(false)),
213 ws_stream_handle: Mutex::new(None),
214 shared_token_instruments: Arc::new(AtomicMap::new()),
215 neg_risk_index: Arc::new(AtomicMap::new()),
216 fill_tracker: Arc::new(OrderFillTrackerMap::new()),
217 pending_fills: Arc::new(Mutex::new(FifoCacheMap::default())),
218 pending_order_reports: Arc::new(Mutex::new(FifoCacheMap::default())),
219 pending_cancels: Arc::new(Mutex::new(AHashSet::new())),
220 })
221 }
222
223 fn spawn_task<F>(&self, description: &'static str, fut: F)
224 where
225 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
226 {
227 let runtime = get_runtime();
228 let handle = runtime.spawn(async move {
229 if let Err(e) = fut.await {
230 log::warn!("{description} failed: {e:?}");
231 }
232 });
233
234 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
235 tasks.retain(|handle| !handle.is_finished());
236 tasks.push(handle);
237 }
238
239 fn abort_pending_tasks(&self) {
240 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
241 for handle in tasks.drain(..) {
242 handle.abort();
243 }
244 }
245
246 async fn refresh_account_state(&self) -> anyhow::Result<()> {
247 fetch_and_emit_account_state(
248 &self.http_client,
249 &self.emitter,
250 self.clock,
251 self.config.signature_type,
252 )
253 .await
254 }
255
256 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
257 let account_id = self.core.account_id;
258
259 if self.core.cache().account(&account_id).is_some() {
260 log::info!("Account {account_id} registered");
261 return Ok(());
262 }
263
264 let start = Instant::now();
265 let timeout = Duration::from_secs_f64(timeout_secs);
266 let interval = Duration::from_millis(10);
267
268 loop {
269 tokio::time::sleep(interval).await;
270
271 if self.core.cache().account(&account_id).is_some() {
272 log::info!("Account {account_id} registered");
273 return Ok(());
274 }
275
276 if start.elapsed() >= timeout {
277 anyhow::bail!(
278 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
279 );
280 }
281 }
282 }
283
284 async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
285 self.ws_client
286 .connect()
287 .await
288 .context("failed to connect user WebSocket")?;
289
290 self.ws_client
291 .subscribe_user()
292 .await
293 .context("failed to subscribe to user channel")?;
294
295 let mut rx = self
296 .ws_client
297 .take_message_receiver()
298 .ok_or_else(|| anyhow::anyhow!("WebSocket message receiver not available"))?;
299
300 let emitter = self.emitter.clone();
301 let token_instruments = self.shared_token_instruments.clone();
302 let account_id = self.core.account_id;
303 let http_client = self.http_client.clone();
304 let clock = self.clock;
305 let signature_type = self.config.signature_type;
306 let user_address = self
307 .secrets
308 .funder
309 .clone()
310 .unwrap_or_else(|| self.secrets.address.clone());
311 let user_api_key = self.secrets.credential.api_key().to_string();
312
313 let fill_tracker = self.fill_tracker.clone();
314 let pending_fills = self.pending_fills.clone();
315 let pending_order_reports = self.pending_order_reports.clone();
316
317 let handle = get_runtime().spawn(async move {
318 let mut state = WsDispatchState::default();
319 let ctx = WsDispatchContext {
320 token_instruments: &token_instruments,
321 fill_tracker: &fill_tracker,
322 pending_fills: &pending_fills,
323 pending_order_reports: &pending_order_reports,
324 emitter: &emitter,
325 account_id,
326 clock,
327 user_address: &user_address,
328 user_api_key: &user_api_key,
329 };
330
331 loop {
332 match rx.recv().await {
333 Some(PolymarketWsMessage::User(user_msg)) => {
334 if let Some(_refresh) =
335 dispatch_user_message(&user_msg, &ctx, &mut state)
336 {
337 let http = http_client.clone();
338 let emit = emitter.clone();
339
340 get_runtime().spawn(async move {
341 match fetch_and_emit_account_state(
342 &http, &emit, clock, signature_type,
343 )
344 .await
345 {
346 Ok(()) => log::info!(
347 "Account state refreshed after finalized trade for {account_id}"
348 ),
349 Err(e) => log::warn!(
350 "Failed to refresh account after finalized trade: {e}"
351 ),
352 }
353 });
354 }
355 }
356 Some(PolymarketWsMessage::Market(_)) => {}
357 Some(PolymarketWsMessage::Reconnected) => {
358 log::info!("User WebSocket reconnected");
359 }
360 None => {
361 log::debug!("User WebSocket stream ended");
362 break;
363 }
364 }
365 }
366
367 log::debug!("User WebSocket handler task completed");
368 });
369
370 *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
371 Ok(())
372 }
373
374 fn get_neg_risk(&self, instrument_id: &InstrumentId) -> bool {
375 self.neg_risk_index
376 .get_cloned(instrument_id)
377 .unwrap_or(false)
378 }
379
380 fn load_instruments_from_cache(&self) {
381 let cache = self.core.cache();
382 let instruments: Vec<InstrumentAny> = cache
383 .instruments(&self.core.venue, None)
384 .into_iter()
385 .cloned()
386 .collect();
387 drop(cache);
388
389 for inst in &instruments {
391 self.shared_token_instruments
392 .insert(Ustr::from(inst.raw_symbol().as_str()), inst.clone());
393 }
394
395 for inst in &instruments {
397 if let InstrumentAny::BinaryOption(bo) = inst {
398 let neg_risk = bo
399 .info
400 .as_ref()
401 .and_then(|i| i.get_bool("neg_risk"))
402 .unwrap_or(false);
403 self.neg_risk_index.insert(bo.id, neg_risk);
404 }
405 }
406
407 log::info!("Loaded {} instruments from cache", instruments.len());
408 }
409
410 fn submit_limit_order(&self, order: OrderAny) {
411 if let Err(reason) = PolymarketOrderBuilder::validate_limit_order(&order) {
412 self.emitter.emit_order_denied(&order, &reason);
413 return;
414 }
415
416 let instrument = match self.resolve_instrument(&order) {
417 Some(i) => i,
418 None => return,
419 };
420
421 let neg_risk = self.get_neg_risk(&order.instrument_id());
422 let token_id = instrument.raw_symbol().to_string();
423 let tick_decimals = instrument.price_precision() as u32;
424 let price = order.price().unwrap(); let quantity = order.quantity();
426 let tif = order.time_in_force();
427 let post_only = order.is_post_only();
428 let side = order.order_side();
429 let expire_time = order.expire_time();
430
431 self.emitter.emit_order_submitted(&order);
432
433 let submitter = self.submitter.clone();
434 let emitter = self.emitter.clone();
435 let clock = self.clock;
436 let fill_tracker = self.fill_tracker.clone();
437 let pending_fills = self.pending_fills.clone();
438 let pending_order_reports = self.pending_order_reports.clone();
439 let pending_cancels = self.pending_cancels.clone();
440 let account_id = self.core.account_id;
441 let size_precision = instrument.size_precision();
442 let price_precision = instrument.price_precision();
443
444 self.spawn_task("submit_limit_order", async move {
445 match submitter
446 .submit_limit_order(
447 &token_id,
448 side,
449 price,
450 quantity,
451 tif,
452 post_only,
453 neg_risk,
454 expire_time,
455 tick_decimals,
456 )
457 .await
458 {
459 Ok(response) => {
460 if let Some((order_id_str, venue_order_id)) = handle_order_response(
461 Ok(response),
462 &order,
463 &emitter,
464 clock,
465 &fill_tracker,
466 &pending_fills,
467 &pending_order_reports,
468 &pending_cancels,
469 account_id,
470 size_precision,
471 price_precision,
472 ) {
473 execute_deferred_cancel(
474 &submitter,
475 &order,
476 &order_id_str,
477 venue_order_id,
478 &emitter,
479 clock,
480 )
481 .await;
482 }
483 }
484 Err(e) => {
485 let ts_now = clock.get_time_ns();
486 emitter.emit_order_rejected(&order, &format!("{e}"), ts_now, false);
487 }
488 }
489 Ok(())
490 });
491 }
492
493 fn submit_market_order(&self, order: OrderAny) {
494 if let Err(reason) = PolymarketOrderBuilder::validate_market_order(&order) {
495 self.emitter.emit_order_denied(&order, &reason);
496 return;
497 }
498
499 let instrument = match self.resolve_instrument(&order) {
500 Some(i) => i,
501 None => return,
502 };
503
504 let neg_risk = self.get_neg_risk(&order.instrument_id());
505 let token_id = instrument.raw_symbol().to_string();
506 let tick_decimals = instrument.price_precision() as u32;
507 let side = order.order_side();
508 let amount = order.quantity();
509 let is_quote_qty = order.is_quote_quantity();
510
511 let needs_fee_adjustment = side == OrderSide::Buy && is_quote_qty;
516 let fee_rate = if needs_fee_adjustment {
517 instrument_taker_fee(&instrument)
518 } else {
519 Decimal::ZERO
520 };
521 let fee_exponent = if needs_fee_adjustment {
522 instrument_fee_exponent(&instrument)
523 } else {
524 1.0
525 };
526
527 let submitter = self.submitter.clone();
528 let http_client = self.http_client.clone();
529 let signature_type = self.config.signature_type;
530 let emitter = self.emitter.clone();
531 let clock = self.clock;
532 let fill_tracker = self.fill_tracker.clone();
533 let pending_fills = self.pending_fills.clone();
534 let pending_order_reports = self.pending_order_reports.clone();
535 let pending_cancels = self.pending_cancels.clone();
536 let account_id = self.core.account_id;
537 let size_precision = instrument.size_precision();
538 let price_precision = instrument.price_precision();
539
540 self.spawn_task("submit_market_order", async move {
541 let fee_context = if needs_fee_adjustment {
542 match fetch_collateral_balance_pusd(&http_client, signature_type).await {
543 Ok(balance) => Some(MarketBuyFeeContext {
544 user_pusd_balance: balance,
545 fee_rate,
546 fee_exponent,
547 builder_taker_fee_rate: Decimal::ZERO,
548 }),
549 Err(e) => {
550 emitter.emit_order_rejected(
551 &order,
552 &format!("Failed to fetch pUSD balance for fee adjustment: {e}"),
553 clock.get_time_ns(),
554 false,
555 );
556 return Ok(());
557 }
558 }
559 } else {
560 None
561 };
562
563 match submitter
564 .submit_market_order(
565 &token_id,
566 side,
567 amount,
568 neg_risk,
569 tick_decimals,
570 fee_context,
571 )
572 .await
573 {
574 Ok((response, expected_base_qty)) => {
575 let mut order = order;
576 emitter.emit_order_submitted(&order);
577
578 if response.success
580 && is_quote_qty
581 && side == OrderSide::Buy
582 && !expected_base_qty.is_zero()
583 && let Ok(base_qty) =
584 Quantity::from_decimal_dp(expected_base_qty, size_precision)
585 {
586 log::info!(
587 "Converted {} quote quantity {} to base quantity {} \
588 (from signed taker_amount)",
589 order.instrument_id(),
590 amount,
591 base_qty,
592 );
593
594 let ts_now = clock.get_time_ns();
595 let updated = OrderUpdated::new(
596 order.trader_id(),
597 order.strategy_id(),
598 order.instrument_id(),
599 order.client_order_id(),
600 base_qty,
601 UUID4::new(),
602 ts_now,
603 ts_now,
604 false,
605 order.venue_order_id(),
606 order.account_id(),
607 order.price(),
608 None,
609 None,
610 false, );
612
613 let event = OrderEventAny::Updated(updated);
614 emitter.send_order_event(event.clone());
615
616 if let Err(e) = order.apply(event) {
617 log::error!("Failed to apply quote-to-base OrderUpdated: {e}");
618 }
619 }
620
621 let fok_order_id = response
622 .order_id
623 .as_ref()
624 .filter(|_| response.success)
625 .cloned();
626
627 if let Some((order_id_str, venue_order_id)) = handle_order_response(
628 Ok(response),
629 &order,
630 &emitter,
631 clock,
632 &fill_tracker,
633 &pending_fills,
634 &pending_order_reports,
635 &pending_cancels,
636 account_id,
637 size_precision,
638 price_precision,
639 ) {
640 execute_deferred_cancel(
641 &submitter,
642 &order,
643 &order_id_str,
644 venue_order_id,
645 &emitter,
646 clock,
647 )
648 .await;
649 }
650
651 if let Some(order_id) = fok_order_id {
652 check_fok_status(
653 &submitter,
654 &order_id,
655 &fill_tracker,
656 &emitter,
657 account_id,
658 order.instrument_id(),
659 order.order_side(),
660 size_precision,
661 price_precision,
662 clock,
663 )
664 .await;
665 }
666 }
667 Err(e) => {
668 let ts_now = clock.get_time_ns();
669 emitter.emit_order_rejected(&order, &format!("{e}"), ts_now, false);
670 }
671 }
672 Ok(())
673 });
674 }
675
676 fn resolve_instrument(&self, order: &OrderAny) -> Option<InstrumentAny> {
677 let instrument = self
678 .core
679 .cache()
680 .instrument(&order.instrument_id())
681 .cloned();
682
683 match instrument {
684 Some(i) => Some(i),
685 None => {
686 self.emitter.emit_order_denied(
687 order,
688 &format!("Instrument not found: {}", order.instrument_id()),
689 );
690 None
691 }
692 }
693 }
694
695 fn fill_context(&self) -> FillContext<'_> {
696 let user_address = self
697 .secrets
698 .funder
699 .as_deref()
700 .unwrap_or(&self.secrets.address);
701 FillContext {
702 account_id: self.core.account_id,
703 user_address,
704 api_key: self.secrets.credential.api_key().as_str(),
705 pusd: get_pusd_currency(),
706 clock: self.clock,
707 }
708 }
709}
710
711#[async_trait(?Send)]
712impl ExecutionClient for PolymarketExecutionClient {
713 fn is_connected(&self) -> bool {
714 self.core.is_connected()
715 }
716
717 fn client_id(&self) -> ClientId {
718 self.core.client_id
719 }
720
721 fn account_id(&self) -> AccountId {
722 self.core.account_id
723 }
724
725 fn venue(&self) -> Venue {
726 *POLYMARKET_VENUE
727 }
728
729 fn oms_type(&self) -> OmsType {
730 OmsType::Netting
731 }
732
733 fn get_account(&self) -> Option<AccountAny> {
734 self.core.cache().account(&self.core.account_id).cloned()
735 }
736
737 fn generate_account_state(
738 &self,
739 balances: Vec<AccountBalance>,
740 margins: Vec<MarginBalance>,
741 reported: bool,
742 ts_event: UnixNanos,
743 ) -> anyhow::Result<()> {
744 self.emitter
745 .emit_account_state(balances, margins, reported, ts_event);
746 Ok(())
747 }
748
749 fn start(&mut self) -> anyhow::Result<()> {
750 if self.core.is_started() {
751 return Ok(());
752 }
753
754 self.stopping.store(false, Ordering::Release);
755 let sender = get_exec_event_sender();
756 self.emitter.set_sender(sender);
757 self.core.set_started();
758
759 log::info!(
760 "Started: client_id={}, account_id={}",
761 self.core.client_id,
762 self.core.account_id,
763 );
764
765 Ok(())
766 }
767
768 fn stop(&mut self) -> anyhow::Result<()> {
769 if self.core.is_stopped() {
770 return Ok(());
771 }
772
773 log::info!("Stopping Polymarket execution client");
774
775 self.stopping.store(true, Ordering::Release);
777
778 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
779 handle.abort();
780 }
781
782 self.abort_pending_tasks();
783 self.ws_client.abort();
784
785 self.core.set_disconnected();
786 self.core.set_stopped();
787
788 log::info!("Polymarket execution client stopped");
789 Ok(())
790 }
791
792 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
793 let order = self
794 .core
795 .cache()
796 .order(&cmd.client_order_id)
797 .cloned()
798 .ok_or_else(|| {
799 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
800 })?;
801
802 if order.is_closed() {
803 log::warn!("Cannot submit closed order {}", order.client_order_id());
804 return Ok(());
805 }
806
807 match order.order_type() {
808 OrderType::Limit => self.submit_limit_order(order),
809 OrderType::Market => self.submit_market_order(order),
810 _ => {
811 self.emitter.emit_order_denied(
812 &order,
813 &format!(
814 "Unsupported order type for Polymarket: {:?}",
815 order.order_type()
816 ),
817 );
818 }
819 }
820 Ok(())
821 }
822
823 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
824 let mut batch_orders = Vec::with_capacity(cmd.order_inits.len());
825
826 for order_init in &cmd.order_inits {
827 let Some(order) = self
828 .core
829 .cache()
830 .order(&order_init.client_order_id)
831 .cloned()
832 else {
833 log::warn!(
834 "Order not found in cache for {}",
835 order_init.client_order_id
836 );
837 continue;
838 };
839
840 if order.is_closed() {
841 log::warn!("Cannot submit closed order {}", order.client_order_id());
842 continue;
843 }
844
845 match order.order_type() {
848 OrderType::Limit => {}
849 OrderType::Market => {
850 self.submit_market_order(order);
851 continue;
852 }
853 other => {
854 self.emitter.emit_order_denied(
855 &order,
856 &format!("Unsupported order type for Polymarket: {other:?}"),
857 );
858 continue;
859 }
860 }
861
862 if let Err(reason) = PolymarketOrderBuilder::validate_limit_order(&order) {
863 self.emitter.emit_order_denied(&order, &reason);
864 continue;
865 }
866
867 let instrument = match self.resolve_instrument(&order) {
868 Some(i) => i,
869 None => continue,
870 };
871
872 let price = order
873 .price()
874 .expect("validated limit order must have a price");
875 batch_orders.push(BatchLimitOrderContext {
876 request: LimitOrderSubmitRequest {
877 token_id: instrument.raw_symbol().to_string(),
878 side: order.order_side(),
879 price,
880 quantity: order.quantity(),
881 time_in_force: order.time_in_force(),
882 post_only: order.is_post_only(),
883 neg_risk: self.get_neg_risk(&order.instrument_id()),
884 expire_time: order.expire_time(),
885 tick_decimals: instrument.price_precision() as u32,
886 },
887 size_precision: instrument.size_precision(),
888 price_precision: instrument.price_precision(),
889 order,
890 });
891 }
892
893 if batch_orders.is_empty() {
894 return Ok(());
895 }
896
897 if batch_orders.len() == 1 {
898 let batch_order = batch_orders.pop().expect("len checked");
901 self.submit_limit_order(batch_order.order);
902 return Ok(());
903 }
904
905 let submitter = self.submitter.clone();
906 let emitter = self.emitter.clone();
907 let clock = self.clock;
908 let fill_tracker = self.fill_tracker.clone();
909 let pending_fills = self.pending_fills.clone();
910 let pending_order_reports = self.pending_order_reports.clone();
911 let pending_cancels = self.pending_cancels.clone();
912 let pending_tasks = self.pending_tasks.clone();
913 let stopping = self.stopping.clone();
914 let account_id = self.core.account_id;
915
916 self.spawn_task("submit_order_list", async move {
917 for batch_order in &batch_orders {
918 emitter.emit_order_submitted(&batch_order.order);
919 }
920
921 let requests: Vec<LimitOrderSubmitRequest> =
922 batch_orders.iter().map(|bo| bo.request.clone()).collect();
923 let prepare_results = submitter.prepare_limit_order_submissions(&requests).await;
924
925 let mut prepared_orders = Vec::with_capacity(batch_orders.len());
926 let mut submissions = Vec::with_capacity(batch_orders.len());
927
928 for (batch_order, result) in batch_orders.into_iter().zip(prepare_results) {
929 match result {
930 Ok(submission) => {
931 prepared_orders.push(batch_order);
932 submissions.push(submission);
933 }
934 Err(e) => {
935 reject_submit_order(
936 &batch_order.order,
937 &format!("{e}"),
938 &emitter,
939 clock,
940 &pending_cancels,
941 );
942 }
943 }
944 }
945
946 if submissions.is_empty() {
947 return Ok(());
948 }
949
950 let total = submissions.len();
954 let mut offset = 0;
955 while offset < total {
956 let end = (offset + BATCH_ORDER_LIMIT).min(total);
957 let mut submissions_chunk = submissions[offset..end].to_vec();
958 let mut orders_chunk = prepared_orders[offset..end].to_vec();
959
960 if submissions_chunk.len() == 1 {
961 let submission = submissions_chunk.pop().expect("len 1");
962 let batch_order = orders_chunk.pop().expect("len 1");
963 handle_single_order_response(
964 submitter.post_limit_order_submission(submission).await,
965 batch_order,
966 &submitter,
967 &emitter,
968 clock,
969 &fill_tracker,
970 &pending_fills,
971 &pending_order_reports,
972 &pending_cancels,
973 account_id,
974 )
975 .await;
976 } else {
977 match submitter
978 .post_limit_order_submissions(submissions_chunk)
979 .await
980 {
981 Ok(responses) => {
982 handle_batch_order_responses(
983 responses,
984 orders_chunk,
985 &submitter,
986 &emitter,
987 clock,
988 &fill_tracker,
989 &pending_fills,
990 &pending_order_reports,
991 &pending_cancels,
992 &pending_tasks,
993 &stopping,
994 account_id,
995 )
996 .await;
997 }
998 Err(e) => {
999 for batch_order in orders_chunk {
1000 reject_submit_order(
1001 &batch_order.order,
1002 &format!("{e}"),
1003 &emitter,
1004 clock,
1005 &pending_cancels,
1006 );
1007 }
1008 }
1009 }
1010 }
1011
1012 offset = end;
1013 }
1014
1015 Ok(())
1016 });
1017
1018 Ok(())
1019 }
1020
1021 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1022 let order = self.core.cache().order(&cmd.client_order_id).cloned();
1023 if let Some(order) = order {
1024 let venue_order_id = order.venue_order_id();
1025 let ts_now = self.clock.get_time_ns();
1026 self.emitter.emit_order_modify_rejected(
1027 &order,
1028 venue_order_id,
1029 "Order modification not supported on Polymarket",
1030 ts_now,
1031 );
1032 }
1033 Ok(())
1034 }
1035
1036 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1037 let order = self.core.cache().order(&cmd.client_order_id).cloned();
1038 let order_ref = match &order {
1039 Some(o) => o,
1040 None => {
1041 log::warn!(
1042 "Order not found in cache for cancel: {}",
1043 cmd.client_order_id
1044 );
1045 return Ok(());
1046 }
1047 };
1048
1049 if !order_ref.is_open() {
1050 log::warn!(
1051 "Cannot cancel order that is not open: {}",
1052 cmd.client_order_id
1053 );
1054 let ts_now = self.clock.get_time_ns();
1055 self.emitter.emit_order_cancel_rejected(
1056 order_ref,
1057 order_ref.venue_order_id(),
1058 &format!("Order is not open (status: {:?})", order_ref.status()),
1059 ts_now,
1060 );
1061 return Ok(());
1062 }
1063
1064 let venue_order_id = match order_ref.venue_order_id() {
1065 Some(id) => id,
1066 None => {
1067 match self
1069 .core
1070 .cache()
1071 .venue_order_id(&cmd.client_order_id)
1072 .copied()
1073 {
1074 Some(id) => id,
1075 None => {
1076 log::info!(
1077 "Cancel for {} deferred, venue_order_id not yet available",
1078 cmd.client_order_id
1079 );
1080 self.pending_cancels
1081 .lock()
1082 .expect(MUTEX_POISONED)
1083 .insert(cmd.client_order_id);
1084 return Ok(());
1085 }
1086 }
1087 }
1088 };
1089
1090 let order_id_str = venue_order_id.to_string();
1091 let submitter = self.submitter.clone();
1092 let emitter = self.emitter.clone();
1093 let clock = self.clock;
1094 let order_clone = order.unwrap();
1095
1096 self.spawn_task("cancel_order", async move {
1097 match submitter.cancel_order(&order_id_str).await {
1098 Ok(response) => {
1099 process_cancel_result(
1100 &response,
1101 &order_id_str,
1102 &order_clone,
1103 venue_order_id,
1104 &emitter,
1105 clock,
1106 );
1107 }
1108 Err(e) => {
1109 let ts_now = clock.get_time_ns();
1110 emitter.emit_order_cancel_rejected(
1111 &order_clone,
1112 Some(venue_order_id),
1113 &format!("HTTP request failed: {e}"),
1114 ts_now,
1115 );
1116 }
1117 }
1118 Ok(())
1119 });
1120
1121 Ok(())
1122 }
1123
1124 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1125 let cache = self.core.cache();
1126 let open_orders = cache.orders_open(
1127 Some(&self.core.venue),
1128 Some(&cmd.instrument_id),
1129 Some(&cmd.strategy_id),
1130 None,
1131 Some(cmd.order_side),
1132 );
1133
1134 if open_orders.is_empty() {
1135 log::debug!("No open orders to cancel for {}", cmd.instrument_id);
1136 return Ok(());
1137 }
1138
1139 let venue_order_ids: Vec<String> = open_orders
1140 .iter()
1141 .filter_map(|o| o.venue_order_id().map(|v| v.to_string()))
1142 .collect();
1143
1144 if venue_order_ids.is_empty() {
1145 log::warn!("No venue order IDs found for cancel all");
1146 return Ok(());
1147 }
1148
1149 let submitter = self.submitter.clone();
1150 let emitter = self.emitter.clone();
1151 let clock = self.clock;
1152 let orders: Vec<OrderAny> = open_orders.into_iter().cloned().collect();
1153
1154 self.spawn_task("cancel_all_orders", async move {
1155 let order_id_refs: Vec<&str> = venue_order_ids.iter().map(String::as_str).collect();
1156 let response = submitter
1157 .cancel_orders(&order_id_refs)
1158 .await
1159 .context("failed to cancel all orders")?;
1160
1161 for order in &orders {
1162 if let Some(vid) = order.venue_order_id() {
1163 let vid_str = vid.to_string();
1164 process_cancel_result(&response, &vid_str, order, vid, &emitter, clock);
1165 }
1166 }
1167
1168 log::info!("Canceled {} orders", response.canceled.len());
1169 Ok(())
1170 });
1171
1172 Ok(())
1173 }
1174
1175 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1176 if cmd.cancels.is_empty() {
1177 return Ok(());
1178 }
1179
1180 let mut venue_to_order: Vec<(String, OrderAny)> = Vec::new();
1181
1182 for c in &cmd.cancels {
1183 if let Some(order) = self.core.cache().order(&c.client_order_id)
1184 && let Some(vid) = order.venue_order_id()
1185 {
1186 venue_to_order.push((vid.to_string(), order.clone()));
1187 }
1188 }
1189
1190 if venue_to_order.is_empty() {
1191 log::warn!("No venue order IDs found for batch cancel");
1192 return Ok(());
1193 }
1194
1195 let order_ids: Vec<String> = venue_to_order.iter().map(|(id, _)| id.clone()).collect();
1196 let submitter = self.submitter.clone();
1197 let emitter = self.emitter.clone();
1198 let clock = self.clock;
1199
1200 self.spawn_task("batch_cancel_orders", async move {
1201 let order_id_refs: Vec<&str> = order_ids.iter().map(String::as_str).collect();
1202 let response = submitter
1203 .cancel_orders(&order_id_refs)
1204 .await
1205 .context("failed to batch cancel orders")?;
1206
1207 for (venue_id_str, order) in &venue_to_order {
1208 let vid = VenueOrderId::from(venue_id_str.as_str());
1209 process_cancel_result(&response, venue_id_str, order, vid, &emitter, clock);
1210 }
1211
1212 log::info!("Batch canceled {} orders", response.canceled.len());
1213 Ok(())
1214 });
1215
1216 Ok(())
1217 }
1218
1219 fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
1220 let http_client = self.http_client.clone();
1221 let emitter = self.emitter.clone();
1222 let clock = self.clock;
1223 let signature_type = self.config.signature_type;
1224
1225 self.spawn_task("query_account", async move {
1226 fetch_and_emit_account_state(&http_client, &emitter, clock, signature_type).await
1227 });
1228 Ok(())
1229 }
1230
1231 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
1232 log::debug!("Querying order: client_order_id={}", cmd.client_order_id);
1233
1234 let venue_order_id = match &cmd.venue_order_id {
1235 Some(id) => id.to_string(),
1236 None => {
1237 log::warn!("query_order requires venue_order_id for Polymarket");
1238 return Ok(());
1239 }
1240 };
1241
1242 let instrument_id = cmd.instrument_id;
1243 let client_order_id = cmd.client_order_id;
1244 let account_id = self.core.account_id;
1245 let cache = self.core.cache();
1246
1247 let (price_prec, size_prec) = match cache.instrument(&instrument_id) {
1248 Some(i) => (i.price_precision(), i.size_precision()),
1249 None => (4, 6),
1250 };
1251
1252 let http_client = self.http_client.clone();
1253 let emitter = self.emitter.clone();
1254 let clock = self.clock;
1255
1256 self.spawn_task("query_order", async move {
1257 match http_client.get_order_optional(&venue_order_id).await {
1258 Ok(Some(order)) => {
1259 let report = parse_order_status_report(
1260 &order,
1261 instrument_id,
1262 account_id,
1263 Some(client_order_id),
1264 price_prec,
1265 size_prec,
1266 clock.get_time_ns(),
1267 );
1268 emitter.send_order_status_report(report);
1269 }
1270 Ok(None) => {
1271 log::warn!("Order {venue_order_id} not found (empty response)");
1272 }
1273 Err(e) => {
1274 log::warn!("Failed to query order {venue_order_id}: {e}");
1275 }
1276 }
1277 Ok(())
1278 });
1279
1280 Ok(())
1281 }
1282
1283 fn register_external_order(
1284 &self,
1285 _client_order_id: ClientOrderId,
1286 _venue_order_id: VenueOrderId,
1287 _instrument_id: InstrumentId,
1288 _strategy_id: StrategyId,
1289 _ts_init: UnixNanos,
1290 ) {
1291 }
1292
1293 fn on_instrument(&mut self, instrument: InstrumentAny) {
1294 let token_id = Ustr::from(instrument.raw_symbol().as_str());
1295 if let InstrumentAny::BinaryOption(bo) = &instrument {
1296 let neg_risk = bo
1297 .info
1298 .as_ref()
1299 .and_then(|i| i.get_bool("neg_risk"))
1300 .unwrap_or(false);
1301 self.neg_risk_index.insert(bo.id, neg_risk);
1302 }
1303 self.shared_token_instruments.insert(token_id, instrument);
1304 }
1305
1306 fn calculate_commission(
1307 &self,
1308 instrument: &InstrumentAny,
1309 last_qty: Quantity,
1310 last_px: Price,
1311 liquidity_side: LiquiditySide,
1312 ) -> Option<Money> {
1313 let fee_rate = instrument_taker_fee(instrument);
1314 let commission = compute_commission(
1315 fee_rate,
1316 last_qty.as_decimal(),
1317 last_px.as_decimal(),
1318 liquidity_side,
1319 );
1320
1321 Some(Money::new(commission, instrument.quote_currency()))
1322 }
1323
1324 async fn connect(&mut self) -> anyhow::Result<()> {
1325 if self.core.is_connected() {
1326 return Ok(());
1327 }
1328
1329 log::info!("Connecting Polymarket execution client");
1330
1331 self.stopping.store(false, Ordering::Release);
1332
1333 self.load_instruments_from_cache();
1335 self.core.set_instruments_initialized();
1336
1337 self.start_ws_stream().await?;
1338
1339 let post_ws = async {
1340 self.refresh_account_state().await?;
1341 self.await_account_registered(30.0).await?;
1342 Ok::<(), anyhow::Error>(())
1343 };
1344
1345 if let Err(e) = post_ws.await {
1346 log::warn!("Connect failed after WS started, tearing down: {e}");
1347 self.stopping.store(true, Ordering::Release);
1348 let _ = self.ws_client.disconnect().await;
1349 self.abort_pending_tasks();
1350 return Err(e);
1351 }
1352
1353 self.core.set_connected();
1354
1355 log::info!("Connected: client_id={}", self.core.client_id);
1356 Ok(())
1357 }
1358
1359 async fn disconnect(&mut self) -> anyhow::Result<()> {
1360 if self.core.is_disconnected() {
1361 return Ok(());
1362 }
1363
1364 log::info!("Disconnecting Polymarket execution client");
1365
1366 self.stopping.store(true, Ordering::Release);
1368
1369 self.ws_client.disconnect().await?;
1370
1371 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
1372 handle.abort();
1373 }
1374
1375 self.abort_pending_tasks();
1376 self.core.set_disconnected();
1377
1378 log::info!("Disconnected: client_id={}", self.core.client_id);
1379 Ok(())
1380 }
1381
1382 async fn generate_order_status_report(
1383 &self,
1384 cmd: &GenerateOrderStatusReport,
1385 ) -> anyhow::Result<Option<OrderStatusReport>> {
1386 let venue_order_id = match &cmd.venue_order_id {
1387 Some(id) => id.to_string(),
1388 None => {
1389 log::warn!("generate_order_status_report requires venue_order_id");
1390 return Ok(None);
1391 }
1392 };
1393
1394 let instrument_id = match cmd.instrument_id {
1395 Some(id) => id,
1396 None => {
1397 log::warn!("generate_order_status_report requires instrument_id");
1398 return Ok(None);
1399 }
1400 };
1401
1402 let order = match self
1403 .http_client
1404 .get_order_optional(&venue_order_id)
1405 .await
1406 .context("failed to fetch order")?
1407 {
1408 Some(o) => o,
1409 None => {
1410 log::info!("Order {venue_order_id} not found (empty response)");
1411 return Ok(None);
1412 }
1413 };
1414
1415 let instrument = self.core.cache().instrument(&instrument_id).cloned();
1416 let (price_prec, size_prec) = match &instrument {
1417 Some(i) => (i.price_precision(), i.size_precision()),
1418 None => (4, 6),
1419 };
1420
1421 let report = parse_order_status_report(
1422 &order,
1423 instrument_id,
1424 self.core.account_id,
1425 cmd.client_order_id,
1426 price_prec,
1427 size_prec,
1428 self.clock.get_time_ns(),
1429 );
1430
1431 Ok(Some(report))
1432 }
1433
1434 async fn generate_order_status_reports(
1435 &self,
1436 cmd: &GenerateOrderStatusReports,
1437 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1438 let params = crate::http::query::GetOrdersParams::default();
1439 let orders = self
1440 .http_client
1441 .get_orders(params)
1442 .await
1443 .context("failed to fetch orders")?;
1444
1445 let (reports, _) = reconciliation::build_order_reports_from_orders(
1446 &orders,
1447 &self.shared_token_instruments,
1448 self.core.account_id,
1449 cmd.instrument_id,
1450 self.clock.get_time_ns(),
1451 );
1452
1453 let reports = if cmd.open_only {
1454 reports
1455 .into_iter()
1456 .filter(|r| r.order_status.is_open())
1457 .collect()
1458 } else {
1459 reports
1460 };
1461
1462 log::info!("Generated {} order status reports", reports.len());
1463 Ok(reports)
1464 }
1465
1466 async fn generate_fill_reports(
1467 &self,
1468 cmd: GenerateFillReports,
1469 ) -> anyhow::Result<Vec<FillReport>> {
1470 let trades = self
1471 .http_client
1472 .get_trades(GetTradesParams::default())
1473 .await
1474 .context("failed to fetch trades")?;
1475
1476 let ctx = self.fill_context();
1477 let (reports, _) = build_fill_reports_from_trades(
1478 &trades,
1479 &ctx,
1480 &self.shared_token_instruments,
1481 cmd.instrument_id,
1482 self.clock.get_time_ns(),
1483 );
1484
1485 let reports = apply_fill_filters(reports, cmd.venue_order_id, cmd.start, cmd.end);
1486
1487 log::info!("Generated {} fill reports", reports.len());
1488 Ok(reports)
1489 }
1490
1491 async fn generate_position_status_reports(
1492 &self,
1493 cmd: &GeneratePositionStatusReports,
1494 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1495 let ctx = self.fill_context();
1496 let positions = self
1497 .data_api_client
1498 .get_positions(ctx.user_address)
1499 .await
1500 .context("failed to fetch positions from Data API")?;
1501
1502 let ts_now = self.clock.get_time_ns();
1503 let mut reports = build_position_reports(&positions, self.core.account_id, ts_now);
1504
1505 if let Some(ref filter_id) = cmd.instrument_id {
1506 reports.retain(|r| &r.instrument_id == filter_id);
1507 }
1508
1509 log::info!("Generated {} position status reports", reports.len());
1510 Ok(reports)
1511 }
1512
1513 async fn generate_mass_status(
1514 &self,
1515 lookback_mins: Option<u64>,
1516 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1517 let ctx = self.fill_context();
1518 reconciliation::generate_mass_status(
1519 &self.http_client,
1520 &self.data_api_client,
1521 &self.shared_token_instruments,
1522 &ctx,
1523 self.core.client_id,
1524 self.core.venue,
1525 lookback_mins,
1526 )
1527 .await
1528 }
1529}
1530
1531fn process_cancel_result(
1532 response: &CancelResponse,
1533 venue_order_id_str: &str,
1534 order: &OrderAny,
1535 venue_order_id: VenueOrderId,
1536 emitter: &ExecutionEventEmitter,
1537 clock: &'static AtomicTime,
1538) {
1539 if let Some(reason_opt) = response.not_canceled.get(venue_order_id_str) {
1540 let reason = reason_opt.as_deref().unwrap_or("unknown reason");
1541 match CancelOutcome::classify(reason) {
1542 CancelOutcome::AlreadyDone => {
1543 log::info!(
1544 "Cancel rejected for {}: {reason} - awaiting WS for terminal state",
1545 order.client_order_id()
1546 );
1547 }
1548 CancelOutcome::Rejected(msg) => {
1549 let ts_now = clock.get_time_ns();
1550 emitter.emit_order_cancel_rejected(order, Some(venue_order_id), &msg, ts_now);
1551 }
1552 }
1553 }
1554}
1555
1556#[expect(clippy::too_many_arguments)]
1557async fn handle_batch_order_responses(
1558 responses: Vec<OrderResponse>,
1559 batch_orders: Vec<BatchLimitOrderContext>,
1560 submitter: &OrderSubmitter,
1561 emitter: &ExecutionEventEmitter,
1562 clock: &'static AtomicTime,
1563 fill_tracker: &Arc<OrderFillTrackerMap>,
1564 pending_fills: &Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<FillReport>, 1_000>>>,
1565 pending_order_reports: &Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<OrderStatusReport>, 1_000>>>,
1566 pending_cancels: &Arc<Mutex<AHashSet<ClientOrderId>>>,
1567 pending_tasks: &Arc<Mutex<Vec<JoinHandle<()>>>>,
1568 stopping: &Arc<AtomicBool>,
1569 account_id: AccountId,
1570) {
1571 let response_len = responses.len();
1572 let order_len = batch_orders.len();
1573
1574 if response_len != order_len {
1575 log::warn!(
1576 "Batch submit response length ({response_len}) does not match order count ({order_len})"
1577 );
1578 }
1579
1580 let mut deferred = Vec::new();
1584
1585 for (batch_order, response) in batch_orders.iter().zip(responses) {
1586 if let Some((order_id_str, venue_order_id)) = handle_order_response(
1587 Ok(response),
1588 &batch_order.order,
1589 emitter,
1590 clock,
1591 fill_tracker,
1592 pending_fills,
1593 pending_order_reports,
1594 pending_cancels,
1595 account_id,
1596 batch_order.size_precision,
1597 batch_order.price_precision,
1598 ) {
1599 deferred.push((batch_order.order.clone(), order_id_str, venue_order_id));
1600 }
1601 }
1602
1603 if order_len > response_len {
1604 for batch_order in batch_orders.iter().skip(response_len) {
1605 reject_submit_order(
1606 &batch_order.order,
1607 "Order not included in API response",
1608 emitter,
1609 clock,
1610 pending_cancels,
1611 );
1612 }
1613 }
1614
1615 if !deferred.is_empty() {
1623 let mut tasks = pending_tasks.lock().expect(MUTEX_POISONED);
1624
1625 if stopping.load(Ordering::Acquire) {
1626 return;
1627 }
1628 tasks.retain(|handle| !handle.is_finished());
1629
1630 for (order, order_id_str, venue_order_id) in deferred {
1631 let submitter = submitter.clone();
1632 let emitter = emitter.clone();
1633
1634 let handle = get_runtime().spawn(async move {
1635 execute_deferred_cancel(
1636 &submitter,
1637 &order,
1638 &order_id_str,
1639 venue_order_id,
1640 &emitter,
1641 clock,
1642 )
1643 .await;
1644 });
1645 tasks.push(handle);
1646 }
1647 }
1648}
1649
1650fn reject_submit_order(
1651 order: &OrderAny,
1652 reason: &str,
1653 emitter: &ExecutionEventEmitter,
1654 clock: &'static AtomicTime,
1655 pending_cancels: &Arc<Mutex<AHashSet<ClientOrderId>>>,
1656) {
1657 let ts_now = clock.get_time_ns();
1658 emitter.emit_order_rejected(order, reason, ts_now, false);
1659 pending_cancels
1660 .lock()
1661 .expect(MUTEX_POISONED)
1662 .remove(&order.client_order_id());
1663}
1664
1665#[expect(clippy::too_many_arguments)]
1666async fn handle_single_order_response(
1667 result: anyhow::Result<OrderResponse>,
1668 batch_order: BatchLimitOrderContext,
1669 submitter: &OrderSubmitter,
1670 emitter: &ExecutionEventEmitter,
1671 clock: &'static AtomicTime,
1672 fill_tracker: &Arc<OrderFillTrackerMap>,
1673 pending_fills: &Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<FillReport>, 1_000>>>,
1674 pending_order_reports: &Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<OrderStatusReport>, 1_000>>>,
1675 pending_cancels: &Arc<Mutex<AHashSet<ClientOrderId>>>,
1676 account_id: AccountId,
1677) {
1678 match result {
1679 Ok(response) => {
1680 if let Some((order_id_str, venue_order_id)) = handle_order_response(
1681 Ok(response),
1682 &batch_order.order,
1683 emitter,
1684 clock,
1685 fill_tracker,
1686 pending_fills,
1687 pending_order_reports,
1688 pending_cancels,
1689 account_id,
1690 batch_order.size_precision,
1691 batch_order.price_precision,
1692 ) {
1693 execute_deferred_cancel(
1694 submitter,
1695 &batch_order.order,
1696 &order_id_str,
1697 venue_order_id,
1698 emitter,
1699 clock,
1700 )
1701 .await;
1702 }
1703 }
1704 Err(e) => {
1705 reject_submit_order(
1706 &batch_order.order,
1707 &format!("{e}"),
1708 emitter,
1709 clock,
1710 pending_cancels,
1711 );
1712 }
1713 }
1714}
1715
1716#[expect(clippy::too_many_arguments)]
1717fn handle_order_response(
1718 result: crate::http::error::Result<OrderResponse>,
1719 order: &OrderAny,
1720 emitter: &ExecutionEventEmitter,
1721 clock: &'static AtomicTime,
1722 fill_tracker: &Arc<OrderFillTrackerMap>,
1723 pending_fills: &Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<FillReport>, 1_000>>>,
1724 pending_order_reports: &Arc<Mutex<FifoCacheMap<VenueOrderId, Vec<OrderStatusReport>, 1_000>>>,
1725 pending_cancels: &Arc<Mutex<AHashSet<ClientOrderId>>>,
1726 account_id: AccountId,
1727 size_precision: u8,
1728 price_precision: u8,
1729) -> Option<(String, VenueOrderId)> {
1730 match result {
1731 Ok(response) => {
1732 if response.success {
1733 if let Some(order_id) = response.order_id {
1734 let venue_order_id = VenueOrderId::from(order_id.as_str());
1735 let ts_now = clock.get_time_ns();
1736 emitter.emit_order_accepted(order, venue_order_id, ts_now);
1737
1738 fill_tracker.register(
1740 venue_order_id,
1741 order.quantity(),
1742 order.order_side(),
1743 order.instrument_id(),
1744 size_precision,
1745 price_precision,
1746 );
1747
1748 if let Some(buffered) = pending_fills
1751 .lock()
1752 .expect(MUTEX_POISONED)
1753 .remove(&venue_order_id)
1754 {
1755 for mut fill in buffered {
1756 fill.last_qty =
1757 fill_tracker.snap_fill_qty(&venue_order_id, fill.last_qty);
1758 fill_tracker.record_fill(
1759 &venue_order_id,
1760 fill.last_qty.as_f64(),
1761 fill.last_px.as_f64(),
1762 fill.ts_event,
1763 );
1764 emitter.send_fill_report(fill);
1765 }
1766 }
1767
1768 if let Some(buffered) = pending_order_reports
1770 .lock()
1771 .expect(MUTEX_POISONED)
1772 .remove(&venue_order_id)
1773 {
1774 let mut has_filled = false;
1775
1776 for report in &buffered {
1777 if report.order_status == OrderStatus::Filled {
1778 has_filled = true;
1779 }
1780 }
1781
1782 let tracked_filled = fill_tracker
1785 .get_cumulative_filled(&venue_order_id)
1786 .unwrap_or(0.0);
1787 let tracked_qty = Quantity::new(tracked_filled, size_precision);
1788
1789 for mut report in buffered {
1790 if report.filled_qty > tracked_qty {
1791 log::debug!(
1792 "Capping buffered filled_qty for {venue_order_id} \
1793 from {} to {} (awaiting trade messages)",
1794 report.filled_qty,
1795 tracked_qty,
1796 );
1797 report.filled_qty = tracked_qty;
1798 }
1799 emitter.send_order_status_report(report);
1800 }
1801
1802 if has_filled {
1804 let fallback_px = order.price().map_or(0.0, |p| p.as_f64());
1805 if let Some(dust_fill) = fill_tracker.check_dust_and_build_fill(
1806 &venue_order_id,
1807 account_id,
1808 &order_id,
1809 fallback_px,
1810 get_pusd_currency(),
1811 ts_now,
1812 ts_now,
1813 ) {
1814 emitter.send_fill_report(dust_fill);
1815 }
1816 }
1817 }
1818
1819 if pending_cancels
1821 .lock()
1822 .expect(MUTEX_POISONED)
1823 .remove(&order.client_order_id())
1824 {
1825 log::info!(
1826 "Order {} has pending cancel, issuing deferred cancel for {}",
1827 order.client_order_id(),
1828 venue_order_id
1829 );
1830 return Some((order_id, venue_order_id));
1831 }
1832 } else {
1833 log::warn!(
1834 "Order accepted but no order_id returned for {}",
1835 order.client_order_id()
1836 );
1837 }
1838 } else {
1839 let reason = response
1840 .error_msg
1841 .unwrap_or_else(|| "unknown error".to_string());
1842 let ts_now = clock.get_time_ns();
1843 emitter.emit_order_rejected(order, &reason, ts_now, false);
1844 pending_cancels
1845 .lock()
1846 .expect(MUTEX_POISONED)
1847 .remove(&order.client_order_id());
1848 }
1849 }
1850 Err(e) => {
1851 let ts_now = clock.get_time_ns();
1852 emitter.emit_order_rejected(order, &format!("HTTP request failed: {e}"), ts_now, false);
1853 pending_cancels
1854 .lock()
1855 .expect(MUTEX_POISONED)
1856 .remove(&order.client_order_id());
1857 }
1858 }
1859 None
1860}
1861
1862async fn execute_deferred_cancel(
1863 submitter: &OrderSubmitter,
1864 order: &OrderAny,
1865 order_id_str: &str,
1866 venue_order_id: VenueOrderId,
1867 emitter: &ExecutionEventEmitter,
1868 clock: &'static AtomicTime,
1869) {
1870 match submitter.cancel_order(order_id_str).await {
1871 Ok(response) => {
1872 process_cancel_result(
1873 &response,
1874 order_id_str,
1875 order,
1876 venue_order_id,
1877 emitter,
1878 clock,
1879 );
1880 }
1881 Err(e) => {
1882 let ts_now = clock.get_time_ns();
1883 emitter.emit_order_cancel_rejected(
1884 order,
1885 Some(venue_order_id),
1886 &format!("Deferred cancel failed: {e}"),
1887 ts_now,
1888 );
1889 }
1890 }
1891}
1892
1893#[expect(clippy::too_many_arguments)]
1900async fn check_fok_status(
1901 submitter: &OrderSubmitter,
1902 order_id: &str,
1903 fill_tracker: &Arc<OrderFillTrackerMap>,
1904 emitter: &ExecutionEventEmitter,
1905 account_id: AccountId,
1906 instrument_id: InstrumentId,
1907 order_side: OrderSide,
1908 size_precision: u8,
1909 price_precision: u8,
1910 clock: &'static AtomicTime,
1911) {
1912 const FOK_CHECK_DELAY: Duration = Duration::from_secs(5);
1913
1914 tokio::time::sleep(FOK_CHECK_DELAY).await;
1915
1916 let venue_order_id = VenueOrderId::from(order_id);
1917 if fill_tracker.has_fills_or_settled(&venue_order_id) {
1918 return;
1919 }
1920
1921 log::info!("FOK order {order_id} unresolved after 5s, checking REST status");
1922
1923 let venue_order = match submitter.get_order(order_id).await {
1924 Ok(Some(o)) => o,
1925 Ok(None) => {
1926 log::info!("FOK order {order_id} not found (empty response), WS will reconcile");
1927 return;
1928 }
1929 Err(e) => {
1930 log::warn!("FOK status check failed for {order_id}: {e}");
1931 return;
1932 }
1933 };
1934
1935 let order_status = OrderStatus::from(venue_order.status);
1936
1937 if !matches!(
1938 order_status,
1939 OrderStatus::Rejected | OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Filled
1940 ) {
1941 return;
1942 }
1943
1944 let quantity = Quantity::new(
1945 venue_order
1946 .original_size
1947 .to_string()
1948 .parse::<f64>()
1949 .unwrap_or(0.0),
1950 size_precision,
1951 );
1952 let filled_qty = Quantity::new(
1953 venue_order
1954 .size_matched
1955 .to_string()
1956 .parse::<f64>()
1957 .unwrap_or(0.0),
1958 size_precision,
1959 );
1960 let price = Price::new(
1961 venue_order.price.to_string().parse::<f64>().unwrap_or(0.0),
1962 price_precision,
1963 );
1964
1965 let ts_now = clock.get_time_ns();
1966 let mut report = OrderStatusReport::new(
1967 account_id,
1968 instrument_id,
1969 None,
1970 venue_order_id,
1971 order_side,
1972 OrderType::Limit,
1973 TimeInForce::Ioc,
1974 order_status,
1975 quantity,
1976 filled_qty,
1977 ts_now,
1978 ts_now,
1979 ts_now,
1980 None,
1981 );
1982 report.price = Some(price);
1983
1984 log::info!("FOK order {order_id} resolved via REST as {order_status:?}");
1985
1986 emitter.send_order_status_report(report);
1987}
1988
1989pub fn get_pusd_currency() -> Currency {
1990 Currency::pUSD()
1991}
1992
1993async fn fetch_and_emit_account_state(
1994 http_client: &PolymarketClobHttpClient,
1995 emitter: &ExecutionEventEmitter,
1996 clock: &'static AtomicTime,
1997 signature_type: SignatureType,
1998) -> anyhow::Result<()> {
1999 use anyhow::Context;
2000
2001 let params = GetBalanceAllowanceParams {
2002 asset_type: Some(crate::http::query::AssetType::Collateral),
2003 signature_type: Some(signature_type),
2004 ..Default::default()
2005 };
2006
2007 let balance_allowance = http_client
2008 .get_balance_allowance(params)
2009 .await
2010 .context("failed to fetch balance allowance")?;
2011
2012 let pusd = get_pusd_currency();
2013 let account_balance = parse_balance_allowance(balance_allowance.balance, pusd)
2014 .context("failed to parse balance allowance")?;
2015
2016 let ts_event = clock.get_time_ns();
2017 log::info!(
2018 "Account state updated: balance={} pUSD",
2019 account_balance.total
2020 );
2021 emitter.emit_account_state(vec![account_balance], vec![], true, ts_event);
2022 Ok(())
2023}
2024
2025async fn fetch_collateral_balance_pusd(
2029 http_client: &PolymarketClobHttpClient,
2030 signature_type: SignatureType,
2031) -> anyhow::Result<Decimal> {
2032 use anyhow::Context;
2033
2034 let params = GetBalanceAllowanceParams {
2035 asset_type: Some(crate::http::query::AssetType::Collateral),
2036 signature_type: Some(signature_type),
2037 ..Default::default()
2038 };
2039
2040 let balance_allowance = http_client
2041 .get_balance_allowance(params)
2042 .await
2043 .context("failed to fetch balance allowance")?;
2044
2045 let usdc_scale = Decimal::from(1_000_000u32);
2047 Ok(balance_allowance.balance / usdc_scale)
2048}