1use std::sync::{
19 Arc, Mutex as StdMutex,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashSet;
24use anyhow::Context;
25use dashmap::DashMap;
26use nautilus_common::{
27 clients::DataClient,
28 live::{get_runtime, runner::get_data_event_sender},
29 messages::{
30 DataEvent, DataResponse,
31 data::{
32 BookResponse, InstrumentResponse, InstrumentsResponse, RequestBookSnapshot,
33 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBookDeltas,
34 SubscribeInstruments, SubscribeQuotes, SubscribeTrades, TradesResponse,
35 UnsubscribeBookDeltas, UnsubscribeQuotes, UnsubscribeTrades,
36 },
37 },
38 providers::InstrumentProvider,
39};
40use nautilus_core::{
41 AtomicMap, AtomicSet,
42 datetime::datetime_to_unix_nanos,
43 time::{AtomicTime, get_atomic_clock_realtime},
44};
45use nautilus_model::{
46 data::{Data as NautilusData, InstrumentStatus, OrderBookDeltas_API, QuoteTick},
47 enums::{BookType, MarketStatusAction},
48 identifiers::{ClientId, InstrumentId, Venue},
49 instruments::{Instrument, InstrumentAny},
50 orderbook::OrderBook,
51};
52use tokio::task::JoinHandle;
53use tokio_util::sync::CancellationToken;
54use ustr::Ustr;
55
56use crate::{
57 common::consts::POLYMARKET_VENUE,
58 config::PolymarketDataClientConfig,
59 filters::InstrumentFilter,
60 http::{
61 clob::PolymarketClobPublicClient, data_api::PolymarketDataApiHttpClient,
62 gamma::PolymarketGammaHttpClient, parse::rebuild_instrument_with_tick_size,
63 query::GetGammaMarketsParams,
64 },
65 providers::{PolymarketInstrumentProvider, extract_condition_id, fetch_instruments},
66 websocket::{
67 client::PolymarketWebSocketClient,
68 messages::{MarketWsMessage, PolymarketQuotes, PolymarketWsMessage},
69 parse::{
70 parse_book_deltas, parse_book_snapshot, parse_quote_from_price_change,
71 parse_quote_from_snapshot, parse_timestamp_ms, parse_trade_tick,
72 },
73 },
74};
75
76const GAMMA_CONDITION_ID_CHUNK: usize = 100;
77
78fn resolve_token_id_from(
79 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
80 instrument_id: InstrumentId,
81) -> anyhow::Result<String> {
82 let loaded = instruments.load();
83 let instrument = loaded
84 .get(&instrument_id)
85 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
86 Ok(instrument.raw_symbol().as_str().to_string())
87}
88
89#[allow(
94 clippy::too_many_arguments,
95 reason = "shared state comes in as Arc refs"
96)]
97async fn sync_ws_subscription_async(
98 instrument_id: InstrumentId,
99 token_id_str: String,
100 active_quote_subs: Arc<AtomicSet<InstrumentId>>,
101 active_delta_subs: Arc<AtomicSet<InstrumentId>>,
102 active_trade_subs: Arc<AtomicSet<InstrumentId>>,
103 ws_open_tokens: Arc<AtomicSet<Ustr>>,
104 ws_sub_mutex: Arc<tokio::sync::Mutex<()>>,
105 ws: crate::websocket::client::WsSubscriptionHandle,
106) {
107 let token_id = Ustr::from(token_id_str.as_str());
108 let _guard = ws_sub_mutex.lock().await;
109
110 let wants_subscribe = active_quote_subs.contains(&instrument_id)
111 || active_delta_subs.contains(&instrument_id)
112 || active_trade_subs.contains(&instrument_id);
113 let is_open = ws_open_tokens.contains(&token_id);
114
115 if wants_subscribe && !is_open {
116 ws_open_tokens.insert(token_id);
117
118 if let Err(e) = ws.subscribe_market(vec![token_id_str]).await {
119 log::error!("Failed to subscribe to market data: {e:?}");
120 ws_open_tokens.remove(&token_id);
122 }
123 } else if !wants_subscribe && is_open {
124 ws_open_tokens.remove(&token_id);
125
126 if let Err(e) = ws.unsubscribe_market(vec![token_id_str]).await {
127 log::error!("Failed to unsubscribe from market data: {e:?}");
128 }
129 }
130}
131
132#[derive(Clone, Copy, Debug)]
133struct TokenMeta {
134 instrument_id: InstrumentId,
135 price_precision: u8,
136 size_precision: u8,
137}
138
139fn cache_instrument(
144 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
145 token_meta: &Arc<DashMap<Ustr, TokenMeta>>,
146 instrument: &InstrumentAny,
147) {
148 let instrument_id = instrument.id();
149 token_meta.insert(
150 Ustr::from(instrument.raw_symbol().as_str()),
151 TokenMeta {
152 instrument_id,
153 price_precision: instrument.price_precision(),
154 size_precision: instrument.size_precision(),
155 },
156 );
157 instruments.insert(instrument_id, instrument.clone());
158}
159
160struct WsMessageContext {
161 clock: &'static AtomicTime,
162 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
163 token_meta: Arc<DashMap<Ustr, TokenMeta>>,
164 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
165 gamma_client: PolymarketGammaHttpClient,
166 filters: Vec<Arc<dyn InstrumentFilter>>,
167 order_books: Arc<DashMap<InstrumentId, OrderBook>>,
168 last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
169 active_quote_subs: Arc<AtomicSet<InstrumentId>>,
170 active_delta_subs: Arc<AtomicSet<InstrumentId>>,
171 active_trade_subs: Arc<AtomicSet<InstrumentId>>,
172 subscribe_new_markets: bool,
173 new_market_filter: Option<Arc<dyn InstrumentFilter>>,
174 cancellation_token: CancellationToken,
175}
176
177#[derive(Debug)]
185pub struct PolymarketDataClient {
186 clock: &'static AtomicTime,
187 client_id: ClientId,
188 config: PolymarketDataClientConfig,
189 provider: PolymarketInstrumentProvider,
190 clob_public_client: PolymarketClobPublicClient,
191 data_api_client: PolymarketDataApiHttpClient,
192 ws_client: PolymarketWebSocketClient,
193 is_connected: AtomicBool,
194 cancellation_token: CancellationToken,
195 tasks: Vec<JoinHandle<()>>,
196 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
197 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
198 token_meta: Arc<DashMap<Ustr, TokenMeta>>,
199 order_books: Arc<DashMap<InstrumentId, OrderBook>>,
200 last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
201 active_quote_subs: Arc<AtomicSet<InstrumentId>>,
202 active_delta_subs: Arc<AtomicSet<InstrumentId>>,
203 active_trade_subs: Arc<AtomicSet<InstrumentId>>,
204 ws_open_tokens: Arc<AtomicSet<Ustr>>,
205 ws_sub_mutex: Arc<tokio::sync::Mutex<()>>,
206 pending_auto_loads: Arc<StdMutex<AHashSet<InstrumentId>>>,
207 auto_load_scheduled: Arc<AtomicBool>,
208}
209
210impl PolymarketDataClient {
211 pub fn new(
213 client_id: ClientId,
214 config: PolymarketDataClientConfig,
215 gamma_client: PolymarketGammaHttpClient,
216 clob_public_client: PolymarketClobPublicClient,
217 data_api_client: PolymarketDataApiHttpClient,
218 ws_client: PolymarketWebSocketClient,
219 ) -> Self {
220 let clock = get_atomic_clock_realtime();
221 let data_sender = get_data_event_sender();
222 let provider = PolymarketInstrumentProvider::new(gamma_client);
223
224 Self {
225 clock,
226 client_id,
227 config,
228 provider,
229 clob_public_client,
230 data_api_client,
231 ws_client,
232 is_connected: AtomicBool::new(false),
233 cancellation_token: CancellationToken::new(),
234 tasks: Vec::new(),
235 data_sender,
236 instruments: Arc::new(AtomicMap::new()),
237 token_meta: Arc::new(DashMap::new()),
238 order_books: Arc::new(DashMap::new()),
239 last_quotes: Arc::new(DashMap::new()),
240 active_quote_subs: Arc::new(AtomicSet::new()),
241 active_delta_subs: Arc::new(AtomicSet::new()),
242 active_trade_subs: Arc::new(AtomicSet::new()),
243 ws_open_tokens: Arc::new(AtomicSet::new()),
244 ws_sub_mutex: Arc::new(tokio::sync::Mutex::new(())),
245 pending_auto_loads: Arc::new(StdMutex::new(AHashSet::new())),
246 auto_load_scheduled: Arc::new(AtomicBool::new(false)),
247 }
248 }
249
250 #[must_use]
252 pub fn config(&self) -> &PolymarketDataClientConfig {
253 &self.config
254 }
255
256 #[must_use]
258 pub fn venue(&self) -> Venue {
259 *POLYMARKET_VENUE
260 }
261
262 #[must_use]
264 pub fn provider(&self) -> &PolymarketInstrumentProvider {
265 &self.provider
266 }
267
268 pub fn add_instrument_filter(&mut self, filter: Arc<dyn InstrumentFilter>) {
270 self.provider.add_filter(filter);
271 }
272
273 #[must_use]
275 pub fn is_connected(&self) -> bool {
276 self.is_connected.load(Ordering::Relaxed)
277 }
278
279 fn resolve_token_id(&self, instrument_id: InstrumentId) -> anyhow::Result<String> {
280 let instruments = self.instruments.load();
281 let instrument = instruments
282 .get(&instrument_id)
283 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
284 Ok(instrument.raw_symbol().as_str().to_string())
285 }
286
287 fn sync_ws_subscription(&self, instrument_id: InstrumentId) {
292 let token_id_str = match self.resolve_token_id(instrument_id) {
293 Ok(s) => s,
294 Err(_) => return,
295 };
296 let active_quote_subs = self.active_quote_subs.clone();
297 let active_delta_subs = self.active_delta_subs.clone();
298 let active_trade_subs = self.active_trade_subs.clone();
299 let ws_open_tokens = self.ws_open_tokens.clone();
300 let ws_sub_mutex = self.ws_sub_mutex.clone();
301 let ws = self.ws_client.clone_subscription_handle();
302
303 get_runtime().spawn(sync_ws_subscription_async(
304 instrument_id,
305 token_id_str,
306 active_quote_subs,
307 active_delta_subs,
308 active_trade_subs,
309 ws_open_tokens,
310 ws_sub_mutex,
311 ws,
312 ));
313 }
314
315 fn queue_pending_load(&self, instrument_id: InstrumentId) {
316 {
317 let mut pending = self
318 .pending_auto_loads
319 .lock()
320 .expect("pending_auto_loads mutex poisoned");
321 pending.insert(instrument_id);
322 }
323
324 self.ensure_auto_load_task();
325 }
326
327 fn drop_pending_if_unwanted(&self, instrument_id: InstrumentId) {
328 if self.active_quote_subs.contains(&instrument_id)
329 || self.active_delta_subs.contains(&instrument_id)
330 || self.active_trade_subs.contains(&instrument_id)
331 {
332 return;
333 }
334 let mut pending = self
335 .pending_auto_loads
336 .lock()
337 .expect("pending_auto_loads mutex poisoned");
338 pending.remove(&instrument_id);
339 }
340
341 fn ensure_auto_load_task(&self) {
342 if self
343 .auto_load_scheduled
344 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
345 .is_err()
346 {
347 return;
348 }
349
350 let pending = self.pending_auto_loads.clone();
351 let scheduled = self.auto_load_scheduled.clone();
352 let debounce_ms = self.config.auto_load_debounce_ms;
353 let http = self.provider.http_client().clone();
354 let filters = self.provider.filters();
355 let instruments = self.instruments.clone();
356 let token_meta = self.token_meta.clone();
357 let active_quote_subs = self.active_quote_subs.clone();
358 let active_delta_subs = self.active_delta_subs.clone();
359 let active_trade_subs = self.active_trade_subs.clone();
360 let ws_open_tokens = self.ws_open_tokens.clone();
361 let ws_sub_mutex = self.ws_sub_mutex.clone();
362 let ws_client = self.ws_client.clone_subscription_handle();
363 let data_sender = self.data_sender.clone();
364 let cancellation = self.cancellation_token.clone();
365
366 get_runtime().spawn(async move {
367 loop {
374 tokio::select! {
375 () = tokio::time::sleep(tokio::time::Duration::from_millis(debounce_ms)) => {}
376 () = cancellation.cancelled() => {
377 scheduled.store(false, Ordering::Release);
378 return;
379 }
380 }
381
382 let ids: Vec<InstrumentId> = {
383 let guard = pending.lock().expect("pending_auto_loads mutex poisoned");
384 guard.iter().copied().collect()
385 };
386
387 if ids.is_empty() {
388 scheduled.store(false, Ordering::Release);
389 return;
390 }
391
392 log::info!("Auto-loading {} missing instrument(s): {ids:?}", ids.len());
393
394 let mut condition_ids: Vec<String> = ids
395 .iter()
396 .filter_map(|id| extract_condition_id(id).ok())
397 .collect();
398 condition_ids.sort();
399 condition_ids.dedup();
400
401 if condition_ids.is_empty() {
402 log::error!("Auto-load aborted: no condition_ids could be extracted");
403 let mut guard = pending.lock().expect("pending_auto_loads mutex poisoned");
405 for id in &ids {
406 guard.remove(id);
407 }
408 continue;
409 }
410
411 let mut loaded: Vec<InstrumentAny> =
415 Vec::with_capacity(condition_ids.len().min(GAMMA_CONDITION_ID_CHUNK));
416 let mut chunk_failed = false;
417
418 for chunk in condition_ids.chunks(GAMMA_CONDITION_ID_CHUNK) {
419 let params = GetGammaMarketsParams {
420 condition_ids: Some(chunk.join(",")),
421 ..Default::default()
422 };
423
424 match http.request_instruments_by_params(params).await {
425 Ok(insts) => loaded.extend(insts),
426 Err(e) => {
427 log::error!(
428 "Auto-load batch failed for chunk of {} condition_id(s): {e:?}",
429 chunk.len()
430 );
431 chunk_failed = true;
432 break;
433 }
434 }
435 }
436
437 if chunk_failed {
438 continue;
441 }
442
443 for inst in loaded {
444 if !filters.iter().all(|f| f.accept(&inst)) {
445 log::debug!("Auto-loaded instrument {} filtered out", inst.id());
446 continue;
447 }
448
449 cache_instrument(&instruments, &token_meta, &inst);
450
451 let instrument_id = inst.id();
452 if let Err(e) = data_sender.send(DataEvent::Instrument(inst)) {
453 log::error!("Failed to emit auto-loaded instrument {instrument_id}: {e}");
454 }
455 }
456
457 for instrument_id in ids {
458 let was_pending = {
461 let mut guard = pending.lock().expect("pending_auto_loads mutex poisoned");
462 guard.remove(&instrument_id)
463 };
464
465 if !was_pending {
466 continue;
467 }
468
469 let Ok(token_id) = resolve_token_id_from(&instruments, instrument_id) else {
470 log::error!("Auto-load did not return instrument {instrument_id}");
471 continue;
472 };
473
474 sync_ws_subscription_async(
477 instrument_id,
478 token_id,
479 active_quote_subs.clone(),
480 active_delta_subs.clone(),
481 active_trade_subs.clone(),
482 ws_open_tokens.clone(),
483 ws_sub_mutex.clone(),
484 ws_client.clone(),
485 )
486 .await;
487 }
488 }
489 });
490 }
491
492 async fn bootstrap_instruments(&mut self) -> anyhow::Result<()> {
493 self.provider.load_all(None).await?;
494
495 let all_instruments = self.provider.store().list_all();
496 let total = all_instruments.len();
497 for instrument in all_instruments {
498 cache_instrument(&self.instruments, &self.token_meta, instrument);
499 let instrument_id = instrument.id();
500
501 if let Err(e) = self
502 .data_sender
503 .send(DataEvent::Instrument(instrument.clone()))
504 {
505 log::warn!("Failed to publish instrument {instrument_id}: {e}");
506 }
507 }
508
509 log::info!("Published all {total} instruments to data engine");
510 Ok(())
511 }
512
513 fn spawn_message_handler(
514 &mut self,
515 mut rx: tokio::sync::mpsc::UnboundedReceiver<PolymarketWsMessage>,
516 ) {
517 let cancellation = self.cancellation_token.clone();
518
519 for (token_id, instrument) in self.provider.build_token_map() {
520 self.token_meta.insert(
521 token_id,
522 TokenMeta {
523 instrument_id: instrument.id(),
524 price_precision: instrument.price_precision(),
525 size_precision: instrument.size_precision(),
526 },
527 );
528 }
529
530 let ctx = WsMessageContext {
531 clock: self.clock,
532 data_sender: self.data_sender.clone(),
533 token_meta: self.token_meta.clone(),
534 instruments: self.instruments.clone(),
535 gamma_client: self.provider.http_client().clone(),
536 filters: self.provider.filters(),
537 order_books: self.order_books.clone(),
538 last_quotes: self.last_quotes.clone(),
539 active_quote_subs: self.active_quote_subs.clone(),
540 active_delta_subs: self.active_delta_subs.clone(),
541 active_trade_subs: self.active_trade_subs.clone(),
542 subscribe_new_markets: self.config.subscribe_new_markets,
543 new_market_filter: self.config.new_market_filter.clone(),
544 cancellation_token: cancellation.clone(),
545 };
546
547 let handle = get_runtime().spawn(async move {
548 log::debug!("Polymarket message handler started");
549
550 loop {
551 tokio::select! {
552 maybe_msg = rx.recv() => {
553 match maybe_msg {
554 Some(msg) => Self::handle_ws_message(msg, &ctx),
555 None => {
556 log::debug!("WebSocket message channel closed");
557 break;
558 }
559 }
560 }
561 () = cancellation.cancelled() => {
562 log::debug!("Polymarket message handler cancelled");
563 break;
564 }
565 }
566 }
567
568 log::debug!("Polymarket message handler ended");
569 });
570
571 self.tasks.push(handle);
572 }
573
574 fn handle_ws_message(message: PolymarketWsMessage, ctx: &WsMessageContext) {
575 match message {
576 PolymarketWsMessage::Market(market_msg) => {
577 Self::handle_market_message(market_msg, ctx);
578 }
579 PolymarketWsMessage::User(_) => {
580 log::debug!("Ignoring user message on data client");
581 }
582 PolymarketWsMessage::Reconnected => {
583 log::info!("Polymarket WS reconnected");
584 }
585 }
586 }
587
588 fn handle_market_message(message: MarketWsMessage, ctx: &WsMessageContext) {
589 match message {
590 MarketWsMessage::Book(snap) => {
591 let token_id = Ustr::from(snap.asset_id.as_str());
592 let meta = match ctx.token_meta.get(&token_id) {
593 Some(m) => *m,
594 None => {
595 log::debug!("No instrument for token_id {token_id}");
596 return;
597 }
598 };
599 let instrument_id = meta.instrument_id;
600 let ts_init = ctx.clock.get_time_ns();
601
602 if ctx.active_delta_subs.contains(&instrument_id) {
603 match parse_book_snapshot(
604 &snap,
605 instrument_id,
606 meta.price_precision,
607 meta.size_precision,
608 ts_init,
609 ) {
610 Ok(deltas) => {
611 let mut book = ctx
612 .order_books
613 .entry(instrument_id)
614 .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
615
616 if let Err(e) = book.apply_deltas(&deltas) {
617 log::error!(
618 "Failed to apply book snapshot for {instrument_id}: {e}"
619 );
620 }
621
622 let data: NautilusData = OrderBookDeltas_API::new(deltas).into();
623 if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
624 log::error!("Failed to emit book deltas: {e}");
625 }
626 }
627 Err(e) => log::error!("Failed to parse book snapshot: {e}"),
628 }
629 }
630
631 if ctx.active_quote_subs.contains(&instrument_id) {
632 match parse_quote_from_snapshot(
633 &snap,
634 instrument_id,
635 meta.price_precision,
636 meta.size_precision,
637 ts_init,
638 ) {
639 Ok(Some(quote)) => {
640 Self::emit_quote_if_changed(ctx, instrument_id, quote);
641 }
642 Ok(None) => {}
643 Err(e) => log::error!("Failed to parse quote from snapshot: {e}"),
644 }
645 }
646 }
647
648 MarketWsMessage::PriceChange(quotes) => {
649 let ts_init = ctx.clock.get_time_ns();
650 let ts_event = match parse_timestamp_ms("es.timestamp) {
651 Ok(ts) => ts,
652 Err(e) => {
653 log::error!("Failed to parse price change timestamp: {e}");
654 return;
655 }
656 };
657
658 for change in "es.price_changes {
660 let token_id = Ustr::from(change.asset_id.as_str());
661 let meta = match ctx.token_meta.get(&token_id) {
662 Some(m) => *m,
663 None => {
664 log::debug!("No instrument for token_id {token_id}");
665 continue;
666 }
667 };
668 let instrument_id = meta.instrument_id;
669
670 if ctx.active_delta_subs.contains(&instrument_id) {
671 let per_asset = PolymarketQuotes {
672 market: quotes.market,
673 price_changes: vec![change.clone()],
674 timestamp: quotes.timestamp.clone(),
675 };
676
677 match parse_book_deltas(
678 &per_asset,
679 instrument_id,
680 meta.price_precision,
681 meta.size_precision,
682 ts_init,
683 ) {
684 Ok(deltas) => {
685 if let Some(mut book) = ctx.order_books.get_mut(&instrument_id)
686 && let Err(e) = book.apply_deltas(&deltas)
687 {
688 log::error!(
689 "Failed to apply book deltas for {instrument_id}: {e}"
690 );
691 }
692
693 let data: NautilusData = OrderBookDeltas_API::new(deltas).into();
694
695 if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
696 log::error!("Failed to emit book deltas: {e}");
697 }
698 }
699 Err(e) => log::error!("Failed to parse book deltas: {e}"),
700 }
701 }
702
703 if ctx.active_quote_subs.contains(&instrument_id) {
704 let last_quote = ctx.last_quotes.get(&instrument_id).map(|r| *r);
706
707 match parse_quote_from_price_change(
708 change,
709 instrument_id,
710 meta.price_precision,
711 meta.size_precision,
712 last_quote.as_ref(),
713 ts_event,
714 ts_init,
715 ) {
716 Ok(Some(quote)) => {
717 Self::emit_quote_if_changed(ctx, instrument_id, quote);
718 }
719 Ok(None) => {} Err(e) => {
721 log::error!("Failed to parse quote from price change: {e}");
722 }
723 }
724 }
725 }
726 }
727
728 MarketWsMessage::LastTradePrice(trade) => {
729 let token_id = Ustr::from(trade.asset_id.as_str());
730 let meta = match ctx.token_meta.get(&token_id) {
731 Some(m) => *m,
732 None => {
733 log::debug!("No instrument for token_id {token_id}");
734 return;
735 }
736 };
737 let instrument_id = meta.instrument_id;
738
739 if ctx.active_trade_subs.contains(&instrument_id) {
740 let ts_init = ctx.clock.get_time_ns();
741
742 match parse_trade_tick(
743 &trade,
744 instrument_id,
745 meta.price_precision,
746 meta.size_precision,
747 ts_init,
748 ) {
749 Ok(tick) => {
750 if let Err(e) = ctx
751 .data_sender
752 .send(DataEvent::Data(NautilusData::Trade(tick)))
753 {
754 log::error!("Failed to emit trade tick: {e}");
755 }
756 }
757 Err(e) => log::error!("Failed to parse trade tick: {e}"),
758 }
759 }
760 }
761
762 MarketWsMessage::TickSizeChange(change) => {
763 log::info!(
764 "Tick size changed for {}: {} -> {}",
765 change.asset_id,
766 change.old_tick_size,
767 change.new_tick_size
768 );
769
770 let token_id = Ustr::from(change.asset_id.as_str());
771 let meta = match ctx.token_meta.get(&token_id) {
772 Some(m) => *m,
773 None => {
774 log::error!("No instrument for token_id {token_id}");
775 return;
776 }
777 };
778
779 let tick_size: rust_decimal::Decimal = match change.new_tick_size.parse() {
780 Ok(d) => d,
781 Err(e) => {
782 log::error!(
783 "Failed to parse new tick size '{}': {e}",
784 change.new_tick_size
785 );
786 return;
787 }
788 };
789 let new_price_precision = tick_size.scale() as u8;
790
791 ctx.token_meta.insert(
793 token_id,
794 TokenMeta {
795 price_precision: new_price_precision,
796 ..meta
797 },
798 );
799
800 let instruments = ctx.instruments.load();
802 if let Some(existing) = instruments.get(&meta.instrument_id) {
803 let ts_init = ctx.clock.get_time_ns();
804
805 match rebuild_instrument_with_tick_size(
806 existing,
807 &change.new_tick_size,
808 ts_init,
809 ts_init,
810 ) {
811 Ok(rebuilt) => {
812 ctx.instruments.insert(rebuilt.id(), rebuilt.clone());
813 if let Err(e) = ctx.data_sender.send(DataEvent::Instrument(rebuilt)) {
814 log::error!("Failed to emit rebuilt instrument: {e}");
815 }
816 }
817 Err(e) => {
818 log::error!("Failed to rebuild instrument for tick size change: {e}");
819 }
820 }
821 }
822 }
823
824 MarketWsMessage::NewMarket(nm) => {
825 if !ctx.subscribe_new_markets {
826 log::trace!("Ignoring new market event (subscribe_new_markets=false)");
827 return;
828 }
829
830 if let Some(ref nf) = ctx.new_market_filter
831 && !nf.accept_new_market(&nm)
832 {
833 log::debug!("New market slug={} rejected by new_market_filter", nm.slug);
834 return;
835 }
836
837 let gamma_client = ctx.gamma_client.clone();
838 let filters = ctx.filters.clone();
839 let token_meta = ctx.token_meta.clone();
840 let instruments = ctx.instruments.clone();
841 let data_sender = ctx.data_sender.clone();
842 let clock = ctx.clock;
843 let cancellation = ctx.cancellation_token.clone();
844 let slug = nm.slug;
845 let active = nm.active;
846
847 get_runtime().spawn(async move {
848 let fetch = gamma_client
849 .request_instruments_by_slugs_with_retry(vec![slug.clone()]);
850
851 let result = tokio::select! {
852 r = fetch => r,
853 () = cancellation.cancelled() => {
854 log::debug!("New market fetch for '{slug}' cancelled during shutdown");
855 return;
856 }
857 };
858
859 match result {
860 Ok(new_instruments) => {
861 for inst in new_instruments {
862 if cancellation.is_cancelled() {
863 log::debug!("New market processing cancelled during shutdown");
864 return;
865 }
866
867 if !filters.iter().all(|f| f.accept(&inst)) {
868 log::debug!("New market instrument {} filtered out", inst.id());
869 continue;
870 }
871
872 cache_instrument(&instruments, &token_meta, &inst);
873
874 let instrument_id = inst.id();
875 if let Err(e) = data_sender.send(DataEvent::Instrument(inst)) {
876 log::error!(
877 "Failed to emit new market instrument {instrument_id}: {e}"
878 );
879 }
880
881 let ts_now = clock.get_time_ns();
883 let action = if active {
884 MarketStatusAction::Trading
885 } else {
886 MarketStatusAction::PreOpen
887 };
888 let status = InstrumentStatus::new(
889 instrument_id,
890 action,
891 ts_now,
892 ts_now,
893 None,
894 None,
895 None,
896 None,
897 None,
898 );
899
900 if let Err(e) =
901 data_sender.send(DataEvent::InstrumentStatus(status))
902 {
903 log::error!(
904 "Failed to emit instrument status for {instrument_id}: {e}"
905 );
906 }
907 }
908 }
909 Err(e) => log::warn!(
910 "Failed to fetch instruments for new market slug '{slug}' after retries: {e}"
911 ),
912 }
913 });
914 }
915
916 MarketWsMessage::MarketResolved(resolved) => {
917 log::info!(
918 "Market resolved: {} winner={} ({})",
919 resolved.market,
920 resolved.winning_asset_id,
921 resolved.winning_outcome
922 );
923
924 let ts_init = ctx.clock.get_time_ns();
925 let reason = Ustr::from(&format!(
926 "Winner: {} ({})",
927 resolved.winning_asset_id, resolved.winning_outcome
928 ));
929
930 for asset_id in &resolved.assets_ids {
931 let token_id = Ustr::from(asset_id.as_str());
932 if let Some(meta) = ctx.token_meta.get(&token_id) {
933 let status = InstrumentStatus::new(
934 meta.instrument_id,
935 MarketStatusAction::Close,
936 ts_init,
937 ts_init,
938 Some(reason),
939 None,
940 Some(false),
941 None,
942 None,
943 );
944
945 if let Err(e) = ctx.data_sender.send(DataEvent::InstrumentStatus(status)) {
946 log::error!(
947 "Failed to emit instrument status for {}: {e}",
948 meta.instrument_id
949 );
950 }
951 }
952 }
953 }
954
955 MarketWsMessage::BestBidAsk(bba) => {
956 log::trace!(
957 "best_bid_ask for {}: bid={} ask={}",
958 bba.asset_id,
959 bba.best_bid,
960 bba.best_ask
961 );
962 }
963 }
964 }
965
966 fn emit_quote_if_changed(
967 ctx: &WsMessageContext,
968 instrument_id: InstrumentId,
969 quote: QuoteTick,
970 ) {
971 let emit = !matches!(
973 ctx.last_quotes.get(&instrument_id),
974 Some(existing) if existing.bid_price == quote.bid_price
975 && existing.ask_price == quote.ask_price
976 && existing.bid_size == quote.bid_size
977 && existing.ask_size == quote.ask_size
978 );
979
980 if emit {
981 ctx.last_quotes.insert(instrument_id, quote);
982 if let Err(e) = ctx
983 .data_sender
984 .send(DataEvent::Data(NautilusData::Quote(quote)))
985 {
986 log::error!("Failed to emit quote tick: {e}");
987 }
988 }
989 }
990
991 async fn await_tasks_with_timeout(&mut self, timeout: tokio::time::Duration) {
992 for handle in self.tasks.drain(..) {
993 let _ = tokio::time::timeout(timeout, handle).await;
994 }
995 }
996}
997
998#[async_trait::async_trait(?Send)]
999impl DataClient for PolymarketDataClient {
1000 fn client_id(&self) -> ClientId {
1001 self.client_id
1002 }
1003
1004 fn venue(&self) -> Option<Venue> {
1005 Some(*POLYMARKET_VENUE)
1006 }
1007
1008 fn start(&mut self) -> anyhow::Result<()> {
1009 log::info!("Starting Polymarket data client: {}", self.client_id);
1010 Ok(())
1011 }
1012
1013 fn stop(&mut self) -> anyhow::Result<()> {
1014 log::info!("Stopping Polymarket data client: {}", self.client_id);
1015 self.cancellation_token.cancel();
1016 self.is_connected.store(false, Ordering::Relaxed);
1017 Ok(())
1018 }
1019
1020 fn reset(&mut self) -> anyhow::Result<()> {
1021 log::debug!("Resetting Polymarket data client: {}", self.client_id);
1022 self.is_connected.store(false, Ordering::Relaxed);
1023 self.cancellation_token = CancellationToken::new();
1024
1025 for handle in self.tasks.drain(..) {
1026 handle.abort();
1027 }
1028 Ok(())
1029 }
1030
1031 fn dispose(&mut self) -> anyhow::Result<()> {
1032 self.stop()
1033 }
1034
1035 async fn connect(&mut self) -> anyhow::Result<()> {
1036 if self.is_connected() {
1037 return Ok(());
1038 }
1039
1040 self.cancellation_token = CancellationToken::new();
1041
1042 log::info!("Connecting Polymarket data client");
1043
1044 log::info!("Bootstrapping instruments from Gamma API...");
1045 self.bootstrap_instruments().await?;
1046 log::info!(
1047 "Bootstrap complete, {} instruments loaded",
1048 self.instruments.load().len(),
1049 );
1050
1051 self.ws_client.connect().await?;
1052
1053 if self.config.subscribe_new_markets {
1054 log::info!("Subscribing to new markets...");
1055 self.ws_client.subscribe_market(vec![]).await?;
1056 }
1057
1058 let rx = self
1059 .ws_client
1060 .take_message_receiver()
1061 .ok_or_else(|| anyhow::anyhow!("WS message receiver not available after connect"))?;
1062
1063 self.spawn_message_handler(rx);
1064
1065 self.is_connected.store(true, Ordering::Relaxed);
1066 log::info!("Connected Polymarket data client");
1067
1068 Ok(())
1069 }
1070
1071 async fn disconnect(&mut self) -> anyhow::Result<()> {
1072 if !self.is_connected() {
1073 return Ok(());
1074 }
1075
1076 log::info!("Disconnecting Polymarket data client");
1077
1078 self.cancellation_token.cancel();
1079 self.await_tasks_with_timeout(tokio::time::Duration::from_secs(5))
1080 .await;
1081
1082 self.ws_client.disconnect().await?;
1083
1084 self.is_connected.store(false, Ordering::Relaxed);
1085 log::info!("Disconnected Polymarket data client");
1086
1087 Ok(())
1088 }
1089
1090 fn is_connected(&self) -> bool {
1091 self.is_connected.load(Ordering::Relaxed)
1092 }
1093
1094 fn is_disconnected(&self) -> bool {
1095 !self.is_connected()
1096 }
1097
1098 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1099 let http = self.provider.http_client().clone();
1100 let filters = self.provider.filters();
1101 let sender = self.data_sender.clone();
1102 let instruments_cache = self.instruments.clone();
1103 let token_meta = self.token_meta.clone();
1104 let request_id = request.request_id;
1105 let client_id = request.client_id.unwrap_or(self.client_id);
1106 let venue = *POLYMARKET_VENUE;
1107 let start_nanos = datetime_to_unix_nanos(request.start);
1108 let end_nanos = datetime_to_unix_nanos(request.end);
1109 let params = request.params;
1110 let clock = self.clock;
1111
1112 get_runtime().spawn(async move {
1113 match fetch_instruments(&http, &filters).await {
1114 Ok(instruments) => {
1115 log::info!("Fetched {} instruments from Gamma API", instruments.len());
1116
1117 for instrument in &instruments {
1118 cache_instrument(&instruments_cache, &token_meta, instrument);
1119 }
1120
1121 let response = DataResponse::Instruments(InstrumentsResponse::new(
1122 request_id,
1123 client_id,
1124 venue,
1125 instruments,
1126 start_nanos,
1127 end_nanos,
1128 clock.get_time_ns(),
1129 params,
1130 ));
1131
1132 if let Err(e) = sender.send(DataEvent::Response(response)) {
1133 log::error!("Failed to send instruments response: {e}");
1134 }
1135 }
1136 Err(e) => {
1137 log::error!("Failed to fetch instruments from Gamma API: {e:?}");
1138 }
1139 }
1140 });
1141
1142 Ok(())
1143 }
1144
1145 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1146 let instrument_id = request.instrument_id;
1147 let http = self.provider.http_client().clone();
1148 let sender = self.data_sender.clone();
1149 let instruments_cache = self.instruments.clone();
1150 let token_meta = self.token_meta.clone();
1151 let client_id = request.client_id.unwrap_or(self.client_id);
1152 let request_id = request.request_id;
1153 let start = request.start;
1154 let end = request.end;
1155 let params = request.params;
1156 let clock = self.clock;
1157
1158 get_runtime().spawn(async move {
1159 let condition_id = match extract_condition_id(&instrument_id) {
1160 Ok(cid) => cid,
1161 Err(e) => {
1162 log::error!("Failed to extract condition_id for {instrument_id}: {e}");
1163 return;
1164 }
1165 };
1166
1167 let query_params = GetGammaMarketsParams {
1168 condition_ids: Some(condition_id),
1169 ..Default::default()
1170 };
1171
1172 let instrument = match http.request_instruments_by_params(query_params).await {
1173 Ok(instruments) => instruments.into_iter().find(|i| i.id() == instrument_id),
1174 Err(e) => {
1175 log::error!("Failed to fetch instrument {instrument_id} from Gamma API: {e}");
1176 return;
1177 }
1178 };
1179
1180 if let Some(inst) = instrument {
1181 cache_instrument(&instruments_cache, &token_meta, &inst);
1182
1183 if let Err(e) = sender.send(DataEvent::Instrument(inst.clone())) {
1186 log::warn!("Failed to publish instrument {instrument_id}: {e}");
1187 }
1188
1189 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1190 request_id,
1191 client_id,
1192 instrument_id,
1193 inst,
1194 datetime_to_unix_nanos(start),
1195 datetime_to_unix_nanos(end),
1196 clock.get_time_ns(),
1197 params,
1198 )));
1199
1200 if let Err(e) = sender.send(DataEvent::Response(response)) {
1201 log::error!("Failed to send instrument response: {e}");
1202 }
1203 } else {
1204 log::error!("Instrument {instrument_id} not found on Polymarket");
1205 }
1206 });
1207
1208 Ok(())
1209 }
1210
1211 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1212 let instrument_id = request.instrument_id;
1213 let instruments = self.instruments.load();
1214 let instrument = instruments
1215 .get(&instrument_id)
1216 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1217
1218 let token_id = instrument.raw_symbol().as_str().to_string();
1219 let price_precision = instrument.price_precision();
1220 let size_precision = instrument.size_precision();
1221
1222 let clob_client = self.clob_public_client.clone();
1223 let sender = self.data_sender.clone();
1224 let client_id = request.client_id.unwrap_or(self.client_id);
1225 let request_id = request.request_id;
1226 let params = request.params;
1227 let clock = self.clock;
1228
1229 get_runtime().spawn(async move {
1230 match clob_client
1231 .request_book_snapshot(instrument_id, &token_id, price_precision, size_precision)
1232 .await
1233 .context("failed to request book snapshot from Polymarket")
1234 {
1235 Ok(book) => {
1236 let response = DataResponse::Book(BookResponse::new(
1237 request_id,
1238 client_id,
1239 instrument_id,
1240 book,
1241 None,
1242 None,
1243 clock.get_time_ns(),
1244 params,
1245 ));
1246
1247 if let Err(e) = sender.send(DataEvent::Response(response)) {
1248 log::error!("Failed to send book snapshot response: {e}");
1249 }
1250 }
1251 Err(e) => log::error!("Book snapshot request failed: {e:?}"),
1252 }
1253 });
1254
1255 Ok(())
1256 }
1257
1258 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1259 let instrument_id = request.instrument_id;
1260 let instruments = self.instruments.load();
1261 let instrument = instruments
1262 .get(&instrument_id)
1263 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1264
1265 let condition_id = extract_condition_id(&instrument_id)?;
1266 let token_id = instrument.raw_symbol().as_str().to_string();
1267 let price_precision = instrument.price_precision();
1268 let size_precision = instrument.size_precision();
1269 let limit = request.limit.map(|n| n.get() as u32);
1270
1271 let data_api_client = self.data_api_client.clone();
1272 let sender = self.data_sender.clone();
1273 let client_id = request.client_id.unwrap_or(self.client_id);
1274 let request_id = request.request_id;
1275 let params = request.params;
1276 let clock = self.clock;
1277 let start_nanos = datetime_to_unix_nanos(request.start);
1278 let end_nanos = datetime_to_unix_nanos(request.end);
1279
1280 get_runtime().spawn(async move {
1281 match data_api_client
1282 .request_trade_ticks(
1283 instrument_id,
1284 &condition_id,
1285 &token_id,
1286 price_precision,
1287 size_precision,
1288 limit,
1289 )
1290 .await
1291 .context("failed to request trades from Polymarket Data API")
1292 {
1293 Ok(trades) => {
1294 let response = DataResponse::Trades(TradesResponse::new(
1295 request_id,
1296 client_id,
1297 instrument_id,
1298 trades,
1299 start_nanos,
1300 end_nanos,
1301 clock.get_time_ns(),
1302 params,
1303 ));
1304
1305 if let Err(e) = sender.send(DataEvent::Response(response)) {
1306 log::error!("Failed to send trades response: {e}");
1307 }
1308 }
1309 Err(e) => log::error!("Trade request failed for {instrument_id}: {e:?}"),
1310 }
1311 });
1312
1313 Ok(())
1314 }
1315
1316 fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
1317 log::debug!("subscribe_instruments: subscribed individually via data subscription methods");
1318 Ok(())
1319 }
1320
1321 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
1322 if cmd.book_type != BookType::L2_MBP {
1323 anyhow::bail!(
1324 "Polymarket only supports L2_MBP order book deltas, received {:?}",
1325 cmd.book_type
1326 );
1327 }
1328
1329 let instrument_id = cmd.instrument_id;
1330 let cached = self.instruments.load().contains_key(&instrument_id);
1331
1332 if !cached && !self.config.auto_load_missing_instruments {
1333 anyhow::bail!(
1334 "Instrument {instrument_id} not found, and `auto_load_missing_instruments` is disabled"
1335 );
1336 }
1337
1338 self.active_delta_subs.insert(instrument_id);
1340 self.order_books
1341 .entry(instrument_id)
1342 .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
1343
1344 if !cached {
1345 self.queue_pending_load(instrument_id);
1346 return Ok(());
1347 }
1348
1349 self.sync_ws_subscription(instrument_id);
1350 log::debug!("Subscribed to book deltas for {instrument_id}");
1351 Ok(())
1352 }
1353
1354 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
1355 let instrument_id = cmd.instrument_id;
1356 let cached = self.instruments.load().contains_key(&instrument_id);
1357
1358 if !cached && !self.config.auto_load_missing_instruments {
1359 anyhow::bail!(
1360 "Instrument {instrument_id} not found, and `auto_load_missing_instruments` is disabled"
1361 );
1362 }
1363
1364 self.active_quote_subs.insert(instrument_id);
1365
1366 if !cached {
1367 self.queue_pending_load(instrument_id);
1368 return Ok(());
1369 }
1370
1371 self.sync_ws_subscription(instrument_id);
1372 log::debug!("Subscribed to quotes for {instrument_id}");
1373 Ok(())
1374 }
1375
1376 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
1377 let instrument_id = cmd.instrument_id;
1378 let cached = self.instruments.load().contains_key(&instrument_id);
1379
1380 if !cached && !self.config.auto_load_missing_instruments {
1381 anyhow::bail!(
1382 "Instrument {instrument_id} not found, and `auto_load_missing_instruments` is disabled"
1383 );
1384 }
1385
1386 self.active_trade_subs.insert(instrument_id);
1387
1388 if !cached {
1389 self.queue_pending_load(instrument_id);
1390 return Ok(());
1391 }
1392
1393 self.sync_ws_subscription(instrument_id);
1394 log::debug!("Subscribed to trades for {instrument_id}");
1395 Ok(())
1396 }
1397
1398 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1399 let instrument_id = cmd.instrument_id;
1400 self.active_delta_subs.remove(&instrument_id);
1401 self.drop_pending_if_unwanted(instrument_id);
1402 self.sync_ws_subscription(instrument_id);
1403 log::debug!("Unsubscribed from book deltas for {instrument_id}");
1404 Ok(())
1405 }
1406
1407 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1408 let instrument_id = cmd.instrument_id;
1409 self.active_quote_subs.remove(&instrument_id);
1410 self.drop_pending_if_unwanted(instrument_id);
1411 self.sync_ws_subscription(instrument_id);
1412 log::debug!("Unsubscribed from quotes for {instrument_id}");
1413 Ok(())
1414 }
1415
1416 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1417 let instrument_id = cmd.instrument_id;
1418 self.active_trade_subs.remove(&instrument_id);
1419 self.drop_pending_if_unwanted(instrument_id);
1420 self.sync_ws_subscription(instrument_id);
1421 log::debug!("Unsubscribed from trades for {instrument_id}");
1422 Ok(())
1423 }
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428 use nautilus_core::UnixNanos;
1429 use nautilus_model::{
1430 enums::AssetClass,
1431 identifiers::{InstrumentId, Symbol},
1432 instruments::BinaryOption,
1433 types::{Currency, Price, Quantity},
1434 };
1435 use rstest::rstest;
1436
1437 use super::*;
1438 use crate::websocket::{client::WsSubscriptionHandle, handler::HandlerCommand};
1439
1440 fn make_handle() -> (
1441 WsSubscriptionHandle,
1442 tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
1443 ) {
1444 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
1445 (WsSubscriptionHandle::from_sender(tx), rx)
1446 }
1447
1448 type ActiveSet = Arc<AtomicSet<InstrumentId>>;
1449 type OpenTokens = Arc<AtomicSet<Ustr>>;
1450 type WsMutex = Arc<tokio::sync::Mutex<()>>;
1451
1452 fn make_state() -> (ActiveSet, ActiveSet, ActiveSet, OpenTokens, WsMutex) {
1453 (
1454 Arc::new(AtomicSet::new()),
1455 Arc::new(AtomicSet::new()),
1456 Arc::new(AtomicSet::new()),
1457 Arc::new(AtomicSet::new()),
1458 Arc::new(tokio::sync::Mutex::new(())),
1459 )
1460 }
1461
1462 fn instrument_id() -> InstrumentId {
1463 InstrumentId::from("0xCOND-0xTOKEN.POLYMARKET")
1464 }
1465
1466 fn token_ustr() -> Ustr {
1467 Ustr::from("0xCOND-0xTOKEN")
1468 }
1469
1470 #[rstest]
1471 #[tokio::test]
1472 async fn sync_ws_subscribes_when_intent_present_and_ws_closed() {
1473 let (ws, mut rx) = make_handle();
1474 let (quotes, deltas, trades, open, mutex) = make_state();
1475
1476 let inst = instrument_id();
1478 quotes.insert(inst);
1479
1480 sync_ws_subscription_async(
1481 inst,
1482 inst.symbol.as_str().to_string(),
1483 quotes.clone(),
1484 deltas,
1485 trades,
1486 open.clone(),
1487 mutex,
1488 ws,
1489 )
1490 .await;
1491
1492 assert!(open.contains(&token_ustr()));
1493
1494 match rx.try_recv().expect("expected SubscribeMarket command") {
1495 HandlerCommand::SubscribeMarket(ids) => {
1496 assert_eq!(ids, vec![inst.symbol.as_str().to_string()]);
1497 }
1498 other => panic!("unexpected command: {other:?}"),
1499 }
1500 assert!(rx.try_recv().is_err());
1501 }
1502
1503 #[rstest]
1504 #[tokio::test]
1505 async fn sync_ws_unsubscribes_when_intent_absent_and_ws_open() {
1506 let (ws, mut rx) = make_handle();
1507 let (quotes, deltas, trades, open, mutex) = make_state();
1508
1509 let inst = instrument_id();
1511 open.insert(token_ustr());
1512
1513 sync_ws_subscription_async(
1514 inst,
1515 inst.symbol.as_str().to_string(),
1516 quotes,
1517 deltas,
1518 trades,
1519 open.clone(),
1520 mutex,
1521 ws,
1522 )
1523 .await;
1524
1525 assert!(!open.contains(&token_ustr()));
1526
1527 match rx.try_recv().expect("expected UnsubscribeMarket command") {
1528 HandlerCommand::UnsubscribeMarket(ids) => {
1529 assert_eq!(ids, vec![inst.symbol.as_str().to_string()]);
1530 }
1531 other => panic!("unexpected command: {other:?}"),
1532 }
1533 }
1534
1535 #[rstest]
1536 #[case::intent_matches_open(true, true, false)]
1537 #[case::no_intent_not_open(false, false, false)]
1538 #[tokio::test]
1539 async fn sync_ws_no_op_when_state_already_matches(
1540 #[case] want: bool,
1541 #[case] is_open_initial: bool,
1542 #[case] expect_command: bool,
1543 ) {
1544 let (ws, mut rx) = make_handle();
1545 let (quotes, deltas, trades, open, mutex) = make_state();
1546
1547 let inst = instrument_id();
1548
1549 if want {
1550 quotes.insert(inst);
1551 }
1552
1553 if is_open_initial {
1554 open.insert(token_ustr());
1555 }
1556
1557 sync_ws_subscription_async(
1558 inst,
1559 inst.symbol.as_str().to_string(),
1560 quotes,
1561 deltas,
1562 trades,
1563 open.clone(),
1564 mutex,
1565 ws,
1566 )
1567 .await;
1568
1569 assert_eq!(open.contains(&token_ustr()), is_open_initial);
1571 assert_eq!(rx.try_recv().is_ok(), expect_command);
1572 }
1573
1574 #[rstest]
1575 #[tokio::test]
1576 async fn sync_ws_rolls_back_open_tokens_on_send_failure() {
1577 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
1579 drop(rx);
1580 let ws = WsSubscriptionHandle::from_sender(tx);
1581
1582 let (quotes, deltas, trades, open, mutex) = make_state();
1583
1584 let inst = instrument_id();
1585 quotes.insert(inst);
1586
1587 sync_ws_subscription_async(
1588 inst,
1589 inst.symbol.as_str().to_string(),
1590 quotes,
1591 deltas,
1592 trades,
1593 open.clone(),
1594 mutex,
1595 ws,
1596 )
1597 .await;
1598
1599 assert!(!open.contains(&token_ustr()));
1601 }
1602
1603 #[rstest]
1604 #[case::any_kind(true, false, false)]
1605 #[case::another_kind(false, true, false)]
1606 #[case::third_kind(false, false, true)]
1607 #[tokio::test]
1608 async fn sync_ws_opens_for_any_active_kind(#[case] q: bool, #[case] d: bool, #[case] t: bool) {
1609 let (ws, mut rx) = make_handle();
1610 let (quotes, deltas, trades, open, mutex) = make_state();
1611
1612 let inst = instrument_id();
1613
1614 if q {
1615 quotes.insert(inst);
1616 }
1617
1618 if d {
1619 deltas.insert(inst);
1620 }
1621
1622 if t {
1623 trades.insert(inst);
1624 }
1625
1626 sync_ws_subscription_async(
1627 inst,
1628 inst.symbol.as_str().to_string(),
1629 quotes,
1630 deltas,
1631 trades,
1632 open.clone(),
1633 mutex,
1634 ws,
1635 )
1636 .await;
1637
1638 assert!(open.contains(&token_ustr()));
1639 assert!(matches!(
1640 rx.try_recv(),
1641 Ok(HandlerCommand::SubscribeMarket(_))
1642 ));
1643 }
1644
1645 fn stub_instrument(
1646 raw_symbol: &str,
1647 price_increment: Price,
1648 size_increment: Quantity,
1649 ) -> InstrumentAny {
1650 let price_precision = price_increment.precision;
1651 let size_precision = size_increment.precision;
1652 InstrumentAny::BinaryOption(BinaryOption::new(
1653 InstrumentId::from(format!("{raw_symbol}.POLYMARKET").as_str()),
1654 Symbol::new(raw_symbol),
1655 AssetClass::Alternative,
1656 Currency::pUSD(),
1657 UnixNanos::default(),
1658 UnixNanos::from(u64::MAX),
1659 price_precision,
1660 size_precision,
1661 price_increment,
1662 size_increment,
1663 None,
1664 None,
1665 None,
1666 None,
1667 None,
1668 None,
1669 None,
1670 None,
1671 None,
1672 None,
1673 None,
1674 None,
1675 None,
1676 UnixNanos::default(),
1677 UnixNanos::default(),
1678 ))
1679 }
1680
1681 #[rstest]
1682 #[case::p3_s2("token-a", Price::from("0.001"), Quantity::from("0.01"))]
1683 #[case::p5_s4("token-b", Price::from("0.00001"), Quantity::from("0.0001"))]
1684 fn cache_instrument_writes_both_maps(
1685 #[case] raw_symbol: &str,
1686 #[case] price_increment: Price,
1687 #[case] size_increment: Quantity,
1688 ) {
1689 let instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>> = Arc::new(AtomicMap::new());
1690 let token_meta: Arc<DashMap<Ustr, TokenMeta>> = Arc::new(DashMap::new());
1691 let inst = stub_instrument(raw_symbol, price_increment, size_increment);
1692 let expected_id = inst.id();
1693 let expected_token = Ustr::from(raw_symbol);
1694 let expected_price_precision = price_increment.precision;
1695 let expected_size_precision = size_increment.precision;
1696
1697 cache_instrument(&instruments, &token_meta, &inst);
1698
1699 let loaded = instruments.load();
1700 let cached = loaded
1701 .get(&expected_id)
1702 .expect("instrument inserted into live cache");
1703 assert_eq!(cached.id(), expected_id);
1704 assert_eq!(cached.raw_symbol().as_str(), raw_symbol);
1705
1706 let meta = token_meta
1707 .get(&expected_token)
1708 .expect("token_meta inserted for raw_symbol");
1709 assert_eq!(meta.instrument_id, expected_id);
1710 assert_eq!(meta.price_precision, expected_price_precision);
1711 assert_eq!(meta.size_precision, expected_size_precision);
1712 }
1713
1714 #[rstest]
1715 fn cache_instrument_overwrites_precisions_on_second_call() {
1716 let instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>> = Arc::new(AtomicMap::new());
1717 let token_meta: Arc<DashMap<Ustr, TokenMeta>> = Arc::new(DashMap::new());
1718 let raw_symbol = "token-overwrite";
1719
1720 let first = stub_instrument(raw_symbol, Price::from("0.01"), Quantity::from("0.1"));
1721 cache_instrument(&instruments, &token_meta, &first);
1722
1723 let second = stub_instrument(raw_symbol, Price::from("0.0001"), Quantity::from("0.001"));
1724 cache_instrument(&instruments, &token_meta, &second);
1725
1726 let meta = token_meta
1727 .get(&Ustr::from(raw_symbol))
1728 .expect("token_meta present after overwrite");
1729 assert_eq!(meta.price_precision, 4);
1730 assert_eq!(meta.size_precision, 3);
1731 assert_eq!(token_meta.len(), 1);
1732 assert_eq!(instruments.load().len(), 1);
1733 }
1734
1735 #[rstest]
1736 fn cache_instrument_maintains_dual_cache_invariant() {
1737 let instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>> = Arc::new(AtomicMap::new());
1738 let token_meta: Arc<DashMap<Ustr, TokenMeta>> = Arc::new(DashMap::new());
1739
1740 let samples = [
1741 stub_instrument("token-1", Price::from("0.001"), Quantity::from("0.01")),
1742 stub_instrument("token-2", Price::from("0.0001"), Quantity::from("0.01")),
1743 stub_instrument("token-3", Price::from("0.00001"), Quantity::from("0.001")),
1744 ];
1745
1746 for inst in &samples {
1747 cache_instrument(&instruments, &token_meta, inst);
1748 }
1749
1750 let loaded = instruments.load();
1751 assert_eq!(loaded.len(), samples.len());
1752 for inst in loaded.values() {
1753 let token_id = Ustr::from(inst.raw_symbol().as_str());
1754 let meta = token_meta
1755 .get(&token_id)
1756 .unwrap_or_else(|| panic!("missing token_meta for {token_id}"));
1757 assert_eq!(meta.instrument_id, inst.id());
1758 }
1759 }
1760}