Skip to main content

nautilus_polymarket/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the Polymarket adapter.
17
18use std::sync::{
19    Arc, Mutex as StdMutex,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashSet;
24use anyhow::Context;
25use dashmap::DashMap;
26use nautilus_common::{
27    clients::DataClient,
28    live::{get_runtime, runner::get_data_event_sender},
29    messages::{
30        DataEvent, DataResponse,
31        data::{
32            BookResponse, InstrumentResponse, InstrumentsResponse, RequestBookSnapshot,
33            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBookDeltas,
34            SubscribeInstruments, SubscribeQuotes, SubscribeTrades, TradesResponse,
35            UnsubscribeBookDeltas, UnsubscribeQuotes, UnsubscribeTrades,
36        },
37    },
38    providers::InstrumentProvider,
39};
40use nautilus_core::{
41    AtomicMap, AtomicSet,
42    datetime::datetime_to_unix_nanos,
43    time::{AtomicTime, get_atomic_clock_realtime},
44};
45use nautilus_model::{
46    data::{Data as NautilusData, InstrumentStatus, OrderBookDeltas_API, QuoteTick},
47    enums::{BookType, MarketStatusAction},
48    identifiers::{ClientId, InstrumentId, Venue},
49    instruments::{Instrument, InstrumentAny},
50    orderbook::OrderBook,
51};
52use tokio::task::JoinHandle;
53use tokio_util::sync::CancellationToken;
54use ustr::Ustr;
55
56use crate::{
57    common::consts::POLYMARKET_VENUE,
58    config::PolymarketDataClientConfig,
59    filters::InstrumentFilter,
60    http::{
61        clob::PolymarketClobPublicClient, data_api::PolymarketDataApiHttpClient,
62        gamma::PolymarketGammaHttpClient, parse::rebuild_instrument_with_tick_size,
63        query::GetGammaMarketsParams,
64    },
65    providers::{PolymarketInstrumentProvider, extract_condition_id, fetch_instruments},
66    websocket::{
67        client::PolymarketWebSocketClient,
68        messages::{MarketWsMessage, PolymarketQuotes, PolymarketWsMessage},
69        parse::{
70            parse_book_deltas, parse_book_snapshot, parse_quote_from_price_change,
71            parse_quote_from_snapshot, parse_timestamp_ms, parse_trade_tick,
72        },
73    },
74};
75
76const GAMMA_CONDITION_ID_CHUNK: usize = 100;
77
78fn resolve_token_id_from(
79    instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
80    instrument_id: InstrumentId,
81) -> anyhow::Result<String> {
82    let loaded = instruments.load();
83    let instrument = loaded
84        .get(&instrument_id)
85        .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
86    Ok(instrument.raw_symbol().as_str().to_string())
87}
88
89// Reconciles the WS subscription for `instrument_id` with the union of caller
90// intents. Holds `ws_sub_mutex` across the async WS send so concurrent
91// subscribe/unsubscribe calls arrive at the WS handler in mutex-release order;
92// that makes the final wire state consistent with the last writer.
93#[allow(
94    clippy::too_many_arguments,
95    reason = "shared state comes in as Arc refs"
96)]
97async fn sync_ws_subscription_async(
98    instrument_id: InstrumentId,
99    token_id_str: String,
100    active_quote_subs: Arc<AtomicSet<InstrumentId>>,
101    active_delta_subs: Arc<AtomicSet<InstrumentId>>,
102    active_trade_subs: Arc<AtomicSet<InstrumentId>>,
103    ws_open_tokens: Arc<AtomicSet<Ustr>>,
104    ws_sub_mutex: Arc<tokio::sync::Mutex<()>>,
105    ws: crate::websocket::client::WsSubscriptionHandle,
106) {
107    let token_id = Ustr::from(token_id_str.as_str());
108    let _guard = ws_sub_mutex.lock().await;
109
110    let wants_subscribe = active_quote_subs.contains(&instrument_id)
111        || active_delta_subs.contains(&instrument_id)
112        || active_trade_subs.contains(&instrument_id);
113    let is_open = ws_open_tokens.contains(&token_id);
114
115    if wants_subscribe && !is_open {
116        ws_open_tokens.insert(token_id);
117
118        if let Err(e) = ws.subscribe_market(vec![token_id_str]).await {
119            log::error!("Failed to subscribe to market data: {e:?}");
120            // Roll back tracked WS state so a retry can take effect.
121            ws_open_tokens.remove(&token_id);
122        }
123    } else if !wants_subscribe && is_open {
124        ws_open_tokens.remove(&token_id);
125
126        if let Err(e) = ws.unsubscribe_market(vec![token_id_str]).await {
127            log::error!("Failed to unsubscribe from market data: {e:?}");
128        }
129    }
130}
131
132#[derive(Clone, Copy, Debug)]
133struct TokenMeta {
134    instrument_id: InstrumentId,
135    price_precision: u8,
136    size_precision: u8,
137}
138
139// Inserts `instrument` into the live instrument cache and updates the
140// `token_meta` routing index in one step. Every path that populates the live
141// cache must go through here so WS messages can always resolve token_id back
142// to an InstrumentId.
143fn cache_instrument(
144    instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
145    token_meta: &Arc<DashMap<Ustr, TokenMeta>>,
146    instrument: &InstrumentAny,
147) {
148    let instrument_id = instrument.id();
149    token_meta.insert(
150        Ustr::from(instrument.raw_symbol().as_str()),
151        TokenMeta {
152            instrument_id,
153            price_precision: instrument.price_precision(),
154            size_precision: instrument.size_precision(),
155        },
156    );
157    instruments.insert(instrument_id, instrument.clone());
158}
159
160struct WsMessageContext {
161    clock: &'static AtomicTime,
162    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
163    token_meta: Arc<DashMap<Ustr, TokenMeta>>,
164    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
165    gamma_client: PolymarketGammaHttpClient,
166    filters: Vec<Arc<dyn InstrumentFilter>>,
167    order_books: Arc<DashMap<InstrumentId, OrderBook>>,
168    last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
169    active_quote_subs: Arc<AtomicSet<InstrumentId>>,
170    active_delta_subs: Arc<AtomicSet<InstrumentId>>,
171    active_trade_subs: Arc<AtomicSet<InstrumentId>>,
172    subscribe_new_markets: bool,
173    new_market_filter: Option<Arc<dyn InstrumentFilter>>,
174    cancellation_token: CancellationToken,
175}
176
177/// Polymarket data client for live market data streaming.
178///
179/// Integrates with the Nautilus DataEngine to provide:
180/// - Real-time order book snapshots and deltas via WebSocket
181/// - Quote ticks synthesized from book data
182/// - Trade ticks from last trade price messages
183/// - Automatic instrument discovery from the Gamma API
184#[derive(Debug)]
185pub struct PolymarketDataClient {
186    clock: &'static AtomicTime,
187    client_id: ClientId,
188    config: PolymarketDataClientConfig,
189    provider: PolymarketInstrumentProvider,
190    clob_public_client: PolymarketClobPublicClient,
191    data_api_client: PolymarketDataApiHttpClient,
192    ws_client: PolymarketWebSocketClient,
193    is_connected: AtomicBool,
194    cancellation_token: CancellationToken,
195    tasks: Vec<JoinHandle<()>>,
196    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
197    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
198    token_meta: Arc<DashMap<Ustr, TokenMeta>>,
199    order_books: Arc<DashMap<InstrumentId, OrderBook>>,
200    last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
201    active_quote_subs: Arc<AtomicSet<InstrumentId>>,
202    active_delta_subs: Arc<AtomicSet<InstrumentId>>,
203    active_trade_subs: Arc<AtomicSet<InstrumentId>>,
204    ws_open_tokens: Arc<AtomicSet<Ustr>>,
205    ws_sub_mutex: Arc<tokio::sync::Mutex<()>>,
206    pending_auto_loads: Arc<StdMutex<AHashSet<InstrumentId>>>,
207    auto_load_scheduled: Arc<AtomicBool>,
208}
209
210impl PolymarketDataClient {
211    /// Creates a new [`PolymarketDataClient`].
212    pub fn new(
213        client_id: ClientId,
214        config: PolymarketDataClientConfig,
215        gamma_client: PolymarketGammaHttpClient,
216        clob_public_client: PolymarketClobPublicClient,
217        data_api_client: PolymarketDataApiHttpClient,
218        ws_client: PolymarketWebSocketClient,
219    ) -> Self {
220        let clock = get_atomic_clock_realtime();
221        let data_sender = get_data_event_sender();
222        let provider = PolymarketInstrumentProvider::new(gamma_client);
223
224        Self {
225            clock,
226            client_id,
227            config,
228            provider,
229            clob_public_client,
230            data_api_client,
231            ws_client,
232            is_connected: AtomicBool::new(false),
233            cancellation_token: CancellationToken::new(),
234            tasks: Vec::new(),
235            data_sender,
236            instruments: Arc::new(AtomicMap::new()),
237            token_meta: Arc::new(DashMap::new()),
238            order_books: Arc::new(DashMap::new()),
239            last_quotes: Arc::new(DashMap::new()),
240            active_quote_subs: Arc::new(AtomicSet::new()),
241            active_delta_subs: Arc::new(AtomicSet::new()),
242            active_trade_subs: Arc::new(AtomicSet::new()),
243            ws_open_tokens: Arc::new(AtomicSet::new()),
244            ws_sub_mutex: Arc::new(tokio::sync::Mutex::new(())),
245            pending_auto_loads: Arc::new(StdMutex::new(AHashSet::new())),
246            auto_load_scheduled: Arc::new(AtomicBool::new(false)),
247        }
248    }
249
250    /// Returns a reference to the client configuration.
251    #[must_use]
252    pub fn config(&self) -> &PolymarketDataClientConfig {
253        &self.config
254    }
255
256    /// Returns the venue for this data client.
257    #[must_use]
258    pub fn venue(&self) -> Venue {
259        *POLYMARKET_VENUE
260    }
261
262    /// Returns a reference to the instrument provider.
263    #[must_use]
264    pub fn provider(&self) -> &PolymarketInstrumentProvider {
265        &self.provider
266    }
267
268    /// Adds an instrument filter on the underlying provider.
269    pub fn add_instrument_filter(&mut self, filter: Arc<dyn InstrumentFilter>) {
270        self.provider.add_filter(filter);
271    }
272
273    /// Returns `true` when the client is connected.
274    #[must_use]
275    pub fn is_connected(&self) -> bool {
276        self.is_connected.load(Ordering::Relaxed)
277    }
278
279    fn resolve_token_id(&self, instrument_id: InstrumentId) -> anyhow::Result<String> {
280        let instruments = self.instruments.load();
281        let instrument = instruments
282            .get(&instrument_id)
283            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
284        Ok(instrument.raw_symbol().as_str().to_string())
285    }
286
287    // Spawns an async task that reconciles the WS subscription for
288    // `instrument_id`. The task holds `ws_sub_mutex` across the wire send so
289    // concurrent subscribe/unsubscribe calls deliver commands to the WS handler
290    // in a consistent order with the final `active_*_subs` state.
291    fn sync_ws_subscription(&self, instrument_id: InstrumentId) {
292        let token_id_str = match self.resolve_token_id(instrument_id) {
293            Ok(s) => s,
294            Err(_) => return,
295        };
296        let active_quote_subs = self.active_quote_subs.clone();
297        let active_delta_subs = self.active_delta_subs.clone();
298        let active_trade_subs = self.active_trade_subs.clone();
299        let ws_open_tokens = self.ws_open_tokens.clone();
300        let ws_sub_mutex = self.ws_sub_mutex.clone();
301        let ws = self.ws_client.clone_subscription_handle();
302
303        get_runtime().spawn(sync_ws_subscription_async(
304            instrument_id,
305            token_id_str,
306            active_quote_subs,
307            active_delta_subs,
308            active_trade_subs,
309            ws_open_tokens,
310            ws_sub_mutex,
311            ws,
312        ));
313    }
314
315    fn queue_pending_load(&self, instrument_id: InstrumentId) {
316        {
317            let mut pending = self
318                .pending_auto_loads
319                .lock()
320                .expect("pending_auto_loads mutex poisoned");
321            pending.insert(instrument_id);
322        }
323
324        self.ensure_auto_load_task();
325    }
326
327    fn drop_pending_if_unwanted(&self, instrument_id: InstrumentId) {
328        if self.active_quote_subs.contains(&instrument_id)
329            || self.active_delta_subs.contains(&instrument_id)
330            || self.active_trade_subs.contains(&instrument_id)
331        {
332            return;
333        }
334        let mut pending = self
335            .pending_auto_loads
336            .lock()
337            .expect("pending_auto_loads mutex poisoned");
338        pending.remove(&instrument_id);
339    }
340
341    fn ensure_auto_load_task(&self) {
342        if self
343            .auto_load_scheduled
344            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
345            .is_err()
346        {
347            return;
348        }
349
350        let pending = self.pending_auto_loads.clone();
351        let scheduled = self.auto_load_scheduled.clone();
352        let debounce_ms = self.config.auto_load_debounce_ms;
353        let http = self.provider.http_client().clone();
354        let filters = self.provider.filters();
355        let instruments = self.instruments.clone();
356        let token_meta = self.token_meta.clone();
357        let active_quote_subs = self.active_quote_subs.clone();
358        let active_delta_subs = self.active_delta_subs.clone();
359        let active_trade_subs = self.active_trade_subs.clone();
360        let ws_open_tokens = self.ws_open_tokens.clone();
361        let ws_sub_mutex = self.ws_sub_mutex.clone();
362        let ws_client = self.ws_client.clone_subscription_handle();
363        let data_sender = self.data_sender.clone();
364        let cancellation = self.cancellation_token.clone();
365
366        get_runtime().spawn(async move {
367            // Loop until the pending map is quiescent. Each iteration runs one
368            // debounce window, then snapshots, fetches, and applies. A chunk
369            // failure or a late-arriving miss keeps us in the loop; we exit
370            // (releasing `scheduled`) only once `pending` is empty. This means a
371            // transient Gamma failure is retried on the next debounce without
372            // relying on some unrelated future miss to trigger it.
373            loop {
374                tokio::select! {
375                    () = tokio::time::sleep(tokio::time::Duration::from_millis(debounce_ms)) => {}
376                    () = cancellation.cancelled() => {
377                        scheduled.store(false, Ordering::Release);
378                        return;
379                    }
380                }
381
382                let ids: Vec<InstrumentId> = {
383                    let guard = pending.lock().expect("pending_auto_loads mutex poisoned");
384                    guard.iter().copied().collect()
385                };
386
387                if ids.is_empty() {
388                    scheduled.store(false, Ordering::Release);
389                    return;
390                }
391
392                log::info!("Auto-loading {} missing instrument(s): {ids:?}", ids.len());
393
394                let mut condition_ids: Vec<String> = ids
395                    .iter()
396                    .filter_map(|id| extract_condition_id(id).ok())
397                    .collect();
398                condition_ids.sort();
399                condition_ids.dedup();
400
401                if condition_ids.is_empty() {
402                    log::error!("Auto-load aborted: no condition_ids could be extracted");
403                    // Drop the stranded entries so we do not loop forever.
404                    let mut guard = pending.lock().expect("pending_auto_loads mutex poisoned");
405                    for id in &ids {
406                        guard.remove(id);
407                    }
408                    continue;
409                }
410
411                // Gamma rejects condition_id queries larger than ~100, so chunk
412                // the request and merge the results. This matches the provider's
413                // own `_load_ids_using_gamma_markets` chunking policy.
414                let mut loaded: Vec<InstrumentAny> =
415                    Vec::with_capacity(condition_ids.len().min(GAMMA_CONDITION_ID_CHUNK));
416                let mut chunk_failed = false;
417
418                for chunk in condition_ids.chunks(GAMMA_CONDITION_ID_CHUNK) {
419                    let params = GetGammaMarketsParams {
420                        condition_ids: Some(chunk.join(",")),
421                        ..Default::default()
422                    };
423
424                    match http.request_instruments_by_params(params).await {
425                        Ok(insts) => loaded.extend(insts),
426                        Err(e) => {
427                            log::error!(
428                                "Auto-load batch failed for chunk of {} condition_id(s): {e:?}",
429                                chunk.len()
430                            );
431                            chunk_failed = true;
432                            break;
433                        }
434                    }
435                }
436
437                if chunk_failed {
438                    // Leave entries in `pending` and loop around; the next
439                    // iteration retries after another debounce window.
440                    continue;
441                }
442
443                for inst in loaded {
444                    if !filters.iter().all(|f| f.accept(&inst)) {
445                        log::debug!("Auto-loaded instrument {} filtered out", inst.id());
446                        continue;
447                    }
448
449                    cache_instrument(&instruments, &token_meta, &inst);
450
451                    let instrument_id = inst.id();
452                    if let Err(e) = data_sender.send(DataEvent::Instrument(inst)) {
453                        log::error!("Failed to emit auto-loaded instrument {instrument_id}: {e}");
454                    }
455                }
456
457                for instrument_id in ids {
458                    // Pop the pending entry under the lock; if `unsubscribe_*`
459                    // already cleared it, skip.
460                    let was_pending = {
461                        let mut guard = pending.lock().expect("pending_auto_loads mutex poisoned");
462                        guard.remove(&instrument_id)
463                    };
464
465                    if !was_pending {
466                        continue;
467                    }
468
469                    let Ok(token_id) = resolve_token_id_from(&instruments, instrument_id) else {
470                        log::error!("Auto-load did not return instrument {instrument_id}");
471                        continue;
472                    };
473
474                    // Reconcile WS state with whichever `active_*_subs` still
475                    // hold intent. A concurrent unsubscribe makes this a no-op.
476                    sync_ws_subscription_async(
477                        instrument_id,
478                        token_id,
479                        active_quote_subs.clone(),
480                        active_delta_subs.clone(),
481                        active_trade_subs.clone(),
482                        ws_open_tokens.clone(),
483                        ws_sub_mutex.clone(),
484                        ws_client.clone(),
485                    )
486                    .await;
487                }
488            }
489        });
490    }
491
492    async fn bootstrap_instruments(&mut self) -> anyhow::Result<()> {
493        self.provider.load_all(None).await?;
494
495        let all_instruments = self.provider.store().list_all();
496        let total = all_instruments.len();
497        for instrument in all_instruments {
498            cache_instrument(&self.instruments, &self.token_meta, instrument);
499            let instrument_id = instrument.id();
500
501            if let Err(e) = self
502                .data_sender
503                .send(DataEvent::Instrument(instrument.clone()))
504            {
505                log::warn!("Failed to publish instrument {instrument_id}: {e}");
506            }
507        }
508
509        log::info!("Published all {total} instruments to data engine");
510        Ok(())
511    }
512
513    fn spawn_message_handler(
514        &mut self,
515        mut rx: tokio::sync::mpsc::UnboundedReceiver<PolymarketWsMessage>,
516    ) {
517        let cancellation = self.cancellation_token.clone();
518
519        for (token_id, instrument) in self.provider.build_token_map() {
520            self.token_meta.insert(
521                token_id,
522                TokenMeta {
523                    instrument_id: instrument.id(),
524                    price_precision: instrument.price_precision(),
525                    size_precision: instrument.size_precision(),
526                },
527            );
528        }
529
530        let ctx = WsMessageContext {
531            clock: self.clock,
532            data_sender: self.data_sender.clone(),
533            token_meta: self.token_meta.clone(),
534            instruments: self.instruments.clone(),
535            gamma_client: self.provider.http_client().clone(),
536            filters: self.provider.filters(),
537            order_books: self.order_books.clone(),
538            last_quotes: self.last_quotes.clone(),
539            active_quote_subs: self.active_quote_subs.clone(),
540            active_delta_subs: self.active_delta_subs.clone(),
541            active_trade_subs: self.active_trade_subs.clone(),
542            subscribe_new_markets: self.config.subscribe_new_markets,
543            new_market_filter: self.config.new_market_filter.clone(),
544            cancellation_token: cancellation.clone(),
545        };
546
547        let handle = get_runtime().spawn(async move {
548            log::debug!("Polymarket message handler started");
549
550            loop {
551                tokio::select! {
552                    maybe_msg = rx.recv() => {
553                        match maybe_msg {
554                            Some(msg) => Self::handle_ws_message(msg, &ctx),
555                            None => {
556                                log::debug!("WebSocket message channel closed");
557                                break;
558                            }
559                        }
560                    }
561                    () = cancellation.cancelled() => {
562                        log::debug!("Polymarket message handler cancelled");
563                        break;
564                    }
565                }
566            }
567
568            log::debug!("Polymarket message handler ended");
569        });
570
571        self.tasks.push(handle);
572    }
573
574    fn handle_ws_message(message: PolymarketWsMessage, ctx: &WsMessageContext) {
575        match message {
576            PolymarketWsMessage::Market(market_msg) => {
577                Self::handle_market_message(market_msg, ctx);
578            }
579            PolymarketWsMessage::User(_) => {
580                log::debug!("Ignoring user message on data client");
581            }
582            PolymarketWsMessage::Reconnected => {
583                log::info!("Polymarket WS reconnected");
584            }
585        }
586    }
587
588    fn handle_market_message(message: MarketWsMessage, ctx: &WsMessageContext) {
589        match message {
590            MarketWsMessage::Book(snap) => {
591                let token_id = Ustr::from(snap.asset_id.as_str());
592                let meta = match ctx.token_meta.get(&token_id) {
593                    Some(m) => *m,
594                    None => {
595                        log::debug!("No instrument for token_id {token_id}");
596                        return;
597                    }
598                };
599                let instrument_id = meta.instrument_id;
600                let ts_init = ctx.clock.get_time_ns();
601
602                if ctx.active_delta_subs.contains(&instrument_id) {
603                    match parse_book_snapshot(
604                        &snap,
605                        instrument_id,
606                        meta.price_precision,
607                        meta.size_precision,
608                        ts_init,
609                    ) {
610                        Ok(deltas) => {
611                            let mut book = ctx
612                                .order_books
613                                .entry(instrument_id)
614                                .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
615
616                            if let Err(e) = book.apply_deltas(&deltas) {
617                                log::error!(
618                                    "Failed to apply book snapshot for {instrument_id}: {e}"
619                                );
620                            }
621
622                            let data: NautilusData = OrderBookDeltas_API::new(deltas).into();
623                            if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
624                                log::error!("Failed to emit book deltas: {e}");
625                            }
626                        }
627                        Err(e) => log::error!("Failed to parse book snapshot: {e}"),
628                    }
629                }
630
631                if ctx.active_quote_subs.contains(&instrument_id) {
632                    match parse_quote_from_snapshot(
633                        &snap,
634                        instrument_id,
635                        meta.price_precision,
636                        meta.size_precision,
637                        ts_init,
638                    ) {
639                        Ok(Some(quote)) => {
640                            Self::emit_quote_if_changed(ctx, instrument_id, quote);
641                        }
642                        Ok(None) => {}
643                        Err(e) => log::error!("Failed to parse quote from snapshot: {e}"),
644                    }
645                }
646            }
647
648            MarketWsMessage::PriceChange(quotes) => {
649                let ts_init = ctx.clock.get_time_ns();
650                let ts_event = match parse_timestamp_ms(&quotes.timestamp) {
651                    Ok(ts) => ts,
652                    Err(e) => {
653                        log::error!("Failed to parse price change timestamp: {e}");
654                        return;
655                    }
656                };
657
658                // Each change may belong to a different asset, so resolve per-change
659                for change in &quotes.price_changes {
660                    let token_id = Ustr::from(change.asset_id.as_str());
661                    let meta = match ctx.token_meta.get(&token_id) {
662                        Some(m) => *m,
663                        None => {
664                            log::debug!("No instrument for token_id {token_id}");
665                            continue;
666                        }
667                    };
668                    let instrument_id = meta.instrument_id;
669
670                    if ctx.active_delta_subs.contains(&instrument_id) {
671                        let per_asset = PolymarketQuotes {
672                            market: quotes.market,
673                            price_changes: vec![change.clone()],
674                            timestamp: quotes.timestamp.clone(),
675                        };
676
677                        match parse_book_deltas(
678                            &per_asset,
679                            instrument_id,
680                            meta.price_precision,
681                            meta.size_precision,
682                            ts_init,
683                        ) {
684                            Ok(deltas) => {
685                                if let Some(mut book) = ctx.order_books.get_mut(&instrument_id)
686                                    && let Err(e) = book.apply_deltas(&deltas)
687                                {
688                                    log::error!(
689                                        "Failed to apply book deltas for {instrument_id}: {e}"
690                                    );
691                                }
692
693                                let data: NautilusData = OrderBookDeltas_API::new(deltas).into();
694
695                                if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
696                                    log::error!("Failed to emit book deltas: {e}");
697                                }
698                            }
699                            Err(e) => log::error!("Failed to parse book deltas: {e}"),
700                        }
701                    }
702
703                    if ctx.active_quote_subs.contains(&instrument_id) {
704                        // Clone and drop guard before emit to avoid DashMap deadlock
705                        let last_quote = ctx.last_quotes.get(&instrument_id).map(|r| *r);
706
707                        match parse_quote_from_price_change(
708                            change,
709                            instrument_id,
710                            meta.price_precision,
711                            meta.size_precision,
712                            last_quote.as_ref(),
713                            ts_event,
714                            ts_init,
715                        ) {
716                            Ok(Some(quote)) => {
717                                Self::emit_quote_if_changed(ctx, instrument_id, quote);
718                            }
719                            Ok(None) => {} // Missing best_bid/best_ask
720                            Err(e) => {
721                                log::error!("Failed to parse quote from price change: {e}");
722                            }
723                        }
724                    }
725                }
726            }
727
728            MarketWsMessage::LastTradePrice(trade) => {
729                let token_id = Ustr::from(trade.asset_id.as_str());
730                let meta = match ctx.token_meta.get(&token_id) {
731                    Some(m) => *m,
732                    None => {
733                        log::debug!("No instrument for token_id {token_id}");
734                        return;
735                    }
736                };
737                let instrument_id = meta.instrument_id;
738
739                if ctx.active_trade_subs.contains(&instrument_id) {
740                    let ts_init = ctx.clock.get_time_ns();
741
742                    match parse_trade_tick(
743                        &trade,
744                        instrument_id,
745                        meta.price_precision,
746                        meta.size_precision,
747                        ts_init,
748                    ) {
749                        Ok(tick) => {
750                            if let Err(e) = ctx
751                                .data_sender
752                                .send(DataEvent::Data(NautilusData::Trade(tick)))
753                            {
754                                log::error!("Failed to emit trade tick: {e}");
755                            }
756                        }
757                        Err(e) => log::error!("Failed to parse trade tick: {e}"),
758                    }
759                }
760            }
761
762            MarketWsMessage::TickSizeChange(change) => {
763                log::info!(
764                    "Tick size changed for {}: {} -> {}",
765                    change.asset_id,
766                    change.old_tick_size,
767                    change.new_tick_size
768                );
769
770                let token_id = Ustr::from(change.asset_id.as_str());
771                let meta = match ctx.token_meta.get(&token_id) {
772                    Some(m) => *m,
773                    None => {
774                        log::error!("No instrument for token_id {token_id}");
775                        return;
776                    }
777                };
778
779                let tick_size: rust_decimal::Decimal = match change.new_tick_size.parse() {
780                    Ok(d) => d,
781                    Err(e) => {
782                        log::error!(
783                            "Failed to parse new tick size '{}': {e}",
784                            change.new_tick_size
785                        );
786                        return;
787                    }
788                };
789                let new_price_precision = tick_size.scale() as u8;
790
791                // Update hot-path precision
792                ctx.token_meta.insert(
793                    token_id,
794                    TokenMeta {
795                        price_precision: new_price_precision,
796                        ..meta
797                    },
798                );
799
800                // Rebuild and emit the full instrument to update cache.
801                let instruments = ctx.instruments.load();
802                if let Some(existing) = instruments.get(&meta.instrument_id) {
803                    let ts_init = ctx.clock.get_time_ns();
804
805                    match rebuild_instrument_with_tick_size(
806                        existing,
807                        &change.new_tick_size,
808                        ts_init,
809                        ts_init,
810                    ) {
811                        Ok(rebuilt) => {
812                            ctx.instruments.insert(rebuilt.id(), rebuilt.clone());
813                            if let Err(e) = ctx.data_sender.send(DataEvent::Instrument(rebuilt)) {
814                                log::error!("Failed to emit rebuilt instrument: {e}");
815                            }
816                        }
817                        Err(e) => {
818                            log::error!("Failed to rebuild instrument for tick size change: {e}");
819                        }
820                    }
821                }
822            }
823
824            MarketWsMessage::NewMarket(nm) => {
825                if !ctx.subscribe_new_markets {
826                    log::trace!("Ignoring new market event (subscribe_new_markets=false)");
827                    return;
828                }
829
830                if let Some(ref nf) = ctx.new_market_filter
831                    && !nf.accept_new_market(&nm)
832                {
833                    log::debug!("New market slug={} rejected by new_market_filter", nm.slug);
834                    return;
835                }
836
837                let gamma_client = ctx.gamma_client.clone();
838                let filters = ctx.filters.clone();
839                let token_meta = ctx.token_meta.clone();
840                let instruments = ctx.instruments.clone();
841                let data_sender = ctx.data_sender.clone();
842                let clock = ctx.clock;
843                let cancellation = ctx.cancellation_token.clone();
844                let slug = nm.slug;
845                let active = nm.active;
846
847                get_runtime().spawn(async move {
848                    let fetch = gamma_client
849                        .request_instruments_by_slugs_with_retry(vec![slug.clone()]);
850
851                    let result = tokio::select! {
852                        r = fetch => r,
853                        () = cancellation.cancelled() => {
854                            log::debug!("New market fetch for '{slug}' cancelled during shutdown");
855                            return;
856                        }
857                    };
858
859                    match result {
860                        Ok(new_instruments) => {
861                            for inst in new_instruments {
862                                if cancellation.is_cancelled() {
863                                    log::debug!("New market processing cancelled during shutdown");
864                                    return;
865                                }
866
867                                if !filters.iter().all(|f| f.accept(&inst)) {
868                                    log::debug!("New market instrument {} filtered out", inst.id());
869                                    continue;
870                                }
871
872                                cache_instrument(&instruments, &token_meta, &inst);
873
874                                let instrument_id = inst.id();
875                                if let Err(e) = data_sender.send(DataEvent::Instrument(inst)) {
876                                    log::error!(
877                                        "Failed to emit new market instrument {instrument_id}: {e}"
878                                    );
879                                }
880
881                                // Emit instrument status based on WS active flag
882                                let ts_now = clock.get_time_ns();
883                                let action = if active {
884                                    MarketStatusAction::Trading
885                                } else {
886                                    MarketStatusAction::PreOpen
887                                };
888                                let status = InstrumentStatus::new(
889                                    instrument_id,
890                                    action,
891                                    ts_now,
892                                    ts_now,
893                                    None,
894                                    None,
895                                    None,
896                                    None,
897                                    None,
898                                );
899
900                                if let Err(e) =
901                                    data_sender.send(DataEvent::InstrumentStatus(status))
902                                {
903                                    log::error!(
904                                        "Failed to emit instrument status for {instrument_id}: {e}"
905                                    );
906                                }
907                            }
908                        }
909                        Err(e) => log::warn!(
910                            "Failed to fetch instruments for new market slug '{slug}' after retries: {e}"
911                        ),
912                    }
913                });
914            }
915
916            MarketWsMessage::MarketResolved(resolved) => {
917                log::info!(
918                    "Market resolved: {} winner={} ({})",
919                    resolved.market,
920                    resolved.winning_asset_id,
921                    resolved.winning_outcome
922                );
923
924                let ts_init = ctx.clock.get_time_ns();
925                let reason = Ustr::from(&format!(
926                    "Winner: {} ({})",
927                    resolved.winning_asset_id, resolved.winning_outcome
928                ));
929
930                for asset_id in &resolved.assets_ids {
931                    let token_id = Ustr::from(asset_id.as_str());
932                    if let Some(meta) = ctx.token_meta.get(&token_id) {
933                        let status = InstrumentStatus::new(
934                            meta.instrument_id,
935                            MarketStatusAction::Close,
936                            ts_init,
937                            ts_init,
938                            Some(reason),
939                            None,
940                            Some(false),
941                            None,
942                            None,
943                        );
944
945                        if let Err(e) = ctx.data_sender.send(DataEvent::InstrumentStatus(status)) {
946                            log::error!(
947                                "Failed to emit instrument status for {}: {e}",
948                                meta.instrument_id
949                            );
950                        }
951                    }
952                }
953            }
954
955            MarketWsMessage::BestBidAsk(bba) => {
956                log::trace!(
957                    "best_bid_ask for {}: bid={} ask={}",
958                    bba.asset_id,
959                    bba.best_bid,
960                    bba.best_ask
961                );
962            }
963        }
964    }
965
966    fn emit_quote_if_changed(
967        ctx: &WsMessageContext,
968        instrument_id: InstrumentId,
969        quote: QuoteTick,
970    ) {
971        // Compare prices and sizes only; timestamps always differ between messages
972        let emit = !matches!(
973            ctx.last_quotes.get(&instrument_id),
974            Some(existing) if existing.bid_price == quote.bid_price
975                && existing.ask_price == quote.ask_price
976                && existing.bid_size == quote.bid_size
977                && existing.ask_size == quote.ask_size
978        );
979
980        if emit {
981            ctx.last_quotes.insert(instrument_id, quote);
982            if let Err(e) = ctx
983                .data_sender
984                .send(DataEvent::Data(NautilusData::Quote(quote)))
985            {
986                log::error!("Failed to emit quote tick: {e}");
987            }
988        }
989    }
990
991    async fn await_tasks_with_timeout(&mut self, timeout: tokio::time::Duration) {
992        for handle in self.tasks.drain(..) {
993            let _ = tokio::time::timeout(timeout, handle).await;
994        }
995    }
996}
997
998#[async_trait::async_trait(?Send)]
999impl DataClient for PolymarketDataClient {
1000    fn client_id(&self) -> ClientId {
1001        self.client_id
1002    }
1003
1004    fn venue(&self) -> Option<Venue> {
1005        Some(*POLYMARKET_VENUE)
1006    }
1007
1008    fn start(&mut self) -> anyhow::Result<()> {
1009        log::info!("Starting Polymarket data client: {}", self.client_id);
1010        Ok(())
1011    }
1012
1013    fn stop(&mut self) -> anyhow::Result<()> {
1014        log::info!("Stopping Polymarket data client: {}", self.client_id);
1015        self.cancellation_token.cancel();
1016        self.is_connected.store(false, Ordering::Relaxed);
1017        Ok(())
1018    }
1019
1020    fn reset(&mut self) -> anyhow::Result<()> {
1021        log::debug!("Resetting Polymarket data client: {}", self.client_id);
1022        self.is_connected.store(false, Ordering::Relaxed);
1023        self.cancellation_token = CancellationToken::new();
1024
1025        for handle in self.tasks.drain(..) {
1026            handle.abort();
1027        }
1028        Ok(())
1029    }
1030
1031    fn dispose(&mut self) -> anyhow::Result<()> {
1032        self.stop()
1033    }
1034
1035    async fn connect(&mut self) -> anyhow::Result<()> {
1036        if self.is_connected() {
1037            return Ok(());
1038        }
1039
1040        self.cancellation_token = CancellationToken::new();
1041
1042        log::info!("Connecting Polymarket data client");
1043
1044        log::info!("Bootstrapping instruments from Gamma API...");
1045        self.bootstrap_instruments().await?;
1046        log::info!(
1047            "Bootstrap complete, {} instruments loaded",
1048            self.instruments.load().len(),
1049        );
1050
1051        self.ws_client.connect().await?;
1052
1053        if self.config.subscribe_new_markets {
1054            log::info!("Subscribing to new markets...");
1055            self.ws_client.subscribe_market(vec![]).await?;
1056        }
1057
1058        let rx = self
1059            .ws_client
1060            .take_message_receiver()
1061            .ok_or_else(|| anyhow::anyhow!("WS message receiver not available after connect"))?;
1062
1063        self.spawn_message_handler(rx);
1064
1065        self.is_connected.store(true, Ordering::Relaxed);
1066        log::info!("Connected Polymarket data client");
1067
1068        Ok(())
1069    }
1070
1071    async fn disconnect(&mut self) -> anyhow::Result<()> {
1072        if !self.is_connected() {
1073            return Ok(());
1074        }
1075
1076        log::info!("Disconnecting Polymarket data client");
1077
1078        self.cancellation_token.cancel();
1079        self.await_tasks_with_timeout(tokio::time::Duration::from_secs(5))
1080            .await;
1081
1082        self.ws_client.disconnect().await?;
1083
1084        self.is_connected.store(false, Ordering::Relaxed);
1085        log::info!("Disconnected Polymarket data client");
1086
1087        Ok(())
1088    }
1089
1090    fn is_connected(&self) -> bool {
1091        self.is_connected.load(Ordering::Relaxed)
1092    }
1093
1094    fn is_disconnected(&self) -> bool {
1095        !self.is_connected()
1096    }
1097
1098    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1099        let http = self.provider.http_client().clone();
1100        let filters = self.provider.filters();
1101        let sender = self.data_sender.clone();
1102        let instruments_cache = self.instruments.clone();
1103        let token_meta = self.token_meta.clone();
1104        let request_id = request.request_id;
1105        let client_id = request.client_id.unwrap_or(self.client_id);
1106        let venue = *POLYMARKET_VENUE;
1107        let start_nanos = datetime_to_unix_nanos(request.start);
1108        let end_nanos = datetime_to_unix_nanos(request.end);
1109        let params = request.params;
1110        let clock = self.clock;
1111
1112        get_runtime().spawn(async move {
1113            match fetch_instruments(&http, &filters).await {
1114                Ok(instruments) => {
1115                    log::info!("Fetched {} instruments from Gamma API", instruments.len());
1116
1117                    for instrument in &instruments {
1118                        cache_instrument(&instruments_cache, &token_meta, instrument);
1119                    }
1120
1121                    let response = DataResponse::Instruments(InstrumentsResponse::new(
1122                        request_id,
1123                        client_id,
1124                        venue,
1125                        instruments,
1126                        start_nanos,
1127                        end_nanos,
1128                        clock.get_time_ns(),
1129                        params,
1130                    ));
1131
1132                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1133                        log::error!("Failed to send instruments response: {e}");
1134                    }
1135                }
1136                Err(e) => {
1137                    log::error!("Failed to fetch instruments from Gamma API: {e:?}");
1138                }
1139            }
1140        });
1141
1142        Ok(())
1143    }
1144
1145    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1146        let instrument_id = request.instrument_id;
1147        let http = self.provider.http_client().clone();
1148        let sender = self.data_sender.clone();
1149        let instruments_cache = self.instruments.clone();
1150        let token_meta = self.token_meta.clone();
1151        let client_id = request.client_id.unwrap_or(self.client_id);
1152        let request_id = request.request_id;
1153        let start = request.start;
1154        let end = request.end;
1155        let params = request.params;
1156        let clock = self.clock;
1157
1158        get_runtime().spawn(async move {
1159            let condition_id = match extract_condition_id(&instrument_id) {
1160                Ok(cid) => cid,
1161                Err(e) => {
1162                    log::error!("Failed to extract condition_id for {instrument_id}: {e}");
1163                    return;
1164                }
1165            };
1166
1167            let query_params = GetGammaMarketsParams {
1168                condition_ids: Some(condition_id),
1169                ..Default::default()
1170            };
1171
1172            let instrument = match http.request_instruments_by_params(query_params).await {
1173                Ok(instruments) => instruments.into_iter().find(|i| i.id() == instrument_id),
1174                Err(e) => {
1175                    log::error!("Failed to fetch instrument {instrument_id} from Gamma API: {e}");
1176                    return;
1177                }
1178            };
1179
1180            if let Some(inst) = instrument {
1181                cache_instrument(&instruments_cache, &token_meta, &inst);
1182
1183                // Publish onto the data bus so other clients (e.g. the exec
1184                // client's token map) can update from the same fetch.
1185                if let Err(e) = sender.send(DataEvent::Instrument(inst.clone())) {
1186                    log::warn!("Failed to publish instrument {instrument_id}: {e}");
1187                }
1188
1189                let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1190                    request_id,
1191                    client_id,
1192                    instrument_id,
1193                    inst,
1194                    datetime_to_unix_nanos(start),
1195                    datetime_to_unix_nanos(end),
1196                    clock.get_time_ns(),
1197                    params,
1198                )));
1199
1200                if let Err(e) = sender.send(DataEvent::Response(response)) {
1201                    log::error!("Failed to send instrument response: {e}");
1202                }
1203            } else {
1204                log::error!("Instrument {instrument_id} not found on Polymarket");
1205            }
1206        });
1207
1208        Ok(())
1209    }
1210
1211    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1212        let instrument_id = request.instrument_id;
1213        let instruments = self.instruments.load();
1214        let instrument = instruments
1215            .get(&instrument_id)
1216            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1217
1218        let token_id = instrument.raw_symbol().as_str().to_string();
1219        let price_precision = instrument.price_precision();
1220        let size_precision = instrument.size_precision();
1221
1222        let clob_client = self.clob_public_client.clone();
1223        let sender = self.data_sender.clone();
1224        let client_id = request.client_id.unwrap_or(self.client_id);
1225        let request_id = request.request_id;
1226        let params = request.params;
1227        let clock = self.clock;
1228
1229        get_runtime().spawn(async move {
1230            match clob_client
1231                .request_book_snapshot(instrument_id, &token_id, price_precision, size_precision)
1232                .await
1233                .context("failed to request book snapshot from Polymarket")
1234            {
1235                Ok(book) => {
1236                    let response = DataResponse::Book(BookResponse::new(
1237                        request_id,
1238                        client_id,
1239                        instrument_id,
1240                        book,
1241                        None,
1242                        None,
1243                        clock.get_time_ns(),
1244                        params,
1245                    ));
1246
1247                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1248                        log::error!("Failed to send book snapshot response: {e}");
1249                    }
1250                }
1251                Err(e) => log::error!("Book snapshot request failed: {e:?}"),
1252            }
1253        });
1254
1255        Ok(())
1256    }
1257
1258    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1259        let instrument_id = request.instrument_id;
1260        let instruments = self.instruments.load();
1261        let instrument = instruments
1262            .get(&instrument_id)
1263            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1264
1265        let condition_id = extract_condition_id(&instrument_id)?;
1266        let token_id = instrument.raw_symbol().as_str().to_string();
1267        let price_precision = instrument.price_precision();
1268        let size_precision = instrument.size_precision();
1269        let limit = request.limit.map(|n| n.get() as u32);
1270
1271        let data_api_client = self.data_api_client.clone();
1272        let sender = self.data_sender.clone();
1273        let client_id = request.client_id.unwrap_or(self.client_id);
1274        let request_id = request.request_id;
1275        let params = request.params;
1276        let clock = self.clock;
1277        let start_nanos = datetime_to_unix_nanos(request.start);
1278        let end_nanos = datetime_to_unix_nanos(request.end);
1279
1280        get_runtime().spawn(async move {
1281            match data_api_client
1282                .request_trade_ticks(
1283                    instrument_id,
1284                    &condition_id,
1285                    &token_id,
1286                    price_precision,
1287                    size_precision,
1288                    limit,
1289                )
1290                .await
1291                .context("failed to request trades from Polymarket Data API")
1292            {
1293                Ok(trades) => {
1294                    let response = DataResponse::Trades(TradesResponse::new(
1295                        request_id,
1296                        client_id,
1297                        instrument_id,
1298                        trades,
1299                        start_nanos,
1300                        end_nanos,
1301                        clock.get_time_ns(),
1302                        params,
1303                    ));
1304
1305                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1306                        log::error!("Failed to send trades response: {e}");
1307                    }
1308                }
1309                Err(e) => log::error!("Trade request failed for {instrument_id}: {e:?}"),
1310            }
1311        });
1312
1313        Ok(())
1314    }
1315
1316    fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
1317        log::debug!("subscribe_instruments: subscribed individually via data subscription methods");
1318        Ok(())
1319    }
1320
1321    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
1322        if cmd.book_type != BookType::L2_MBP {
1323            anyhow::bail!(
1324                "Polymarket only supports L2_MBP order book deltas, received {:?}",
1325                cmd.book_type
1326            );
1327        }
1328
1329        let instrument_id = cmd.instrument_id;
1330        let cached = self.instruments.load().contains_key(&instrument_id);
1331
1332        if !cached && !self.config.auto_load_missing_instruments {
1333            anyhow::bail!(
1334                "Instrument {instrument_id} not found, and `auto_load_missing_instruments` is disabled"
1335            );
1336        }
1337
1338        // Mark intent before routing so unsubscribe can race-safely clear it.
1339        self.active_delta_subs.insert(instrument_id);
1340        self.order_books
1341            .entry(instrument_id)
1342            .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
1343
1344        if !cached {
1345            self.queue_pending_load(instrument_id);
1346            return Ok(());
1347        }
1348
1349        self.sync_ws_subscription(instrument_id);
1350        log::debug!("Subscribed to book deltas for {instrument_id}");
1351        Ok(())
1352    }
1353
1354    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
1355        let instrument_id = cmd.instrument_id;
1356        let cached = self.instruments.load().contains_key(&instrument_id);
1357
1358        if !cached && !self.config.auto_load_missing_instruments {
1359            anyhow::bail!(
1360                "Instrument {instrument_id} not found, and `auto_load_missing_instruments` is disabled"
1361            );
1362        }
1363
1364        self.active_quote_subs.insert(instrument_id);
1365
1366        if !cached {
1367            self.queue_pending_load(instrument_id);
1368            return Ok(());
1369        }
1370
1371        self.sync_ws_subscription(instrument_id);
1372        log::debug!("Subscribed to quotes for {instrument_id}");
1373        Ok(())
1374    }
1375
1376    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
1377        let instrument_id = cmd.instrument_id;
1378        let cached = self.instruments.load().contains_key(&instrument_id);
1379
1380        if !cached && !self.config.auto_load_missing_instruments {
1381            anyhow::bail!(
1382                "Instrument {instrument_id} not found, and `auto_load_missing_instruments` is disabled"
1383            );
1384        }
1385
1386        self.active_trade_subs.insert(instrument_id);
1387
1388        if !cached {
1389            self.queue_pending_load(instrument_id);
1390            return Ok(());
1391        }
1392
1393        self.sync_ws_subscription(instrument_id);
1394        log::debug!("Subscribed to trades for {instrument_id}");
1395        Ok(())
1396    }
1397
1398    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1399        let instrument_id = cmd.instrument_id;
1400        self.active_delta_subs.remove(&instrument_id);
1401        self.drop_pending_if_unwanted(instrument_id);
1402        self.sync_ws_subscription(instrument_id);
1403        log::debug!("Unsubscribed from book deltas for {instrument_id}");
1404        Ok(())
1405    }
1406
1407    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1408        let instrument_id = cmd.instrument_id;
1409        self.active_quote_subs.remove(&instrument_id);
1410        self.drop_pending_if_unwanted(instrument_id);
1411        self.sync_ws_subscription(instrument_id);
1412        log::debug!("Unsubscribed from quotes for {instrument_id}");
1413        Ok(())
1414    }
1415
1416    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1417        let instrument_id = cmd.instrument_id;
1418        self.active_trade_subs.remove(&instrument_id);
1419        self.drop_pending_if_unwanted(instrument_id);
1420        self.sync_ws_subscription(instrument_id);
1421        log::debug!("Unsubscribed from trades for {instrument_id}");
1422        Ok(())
1423    }
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428    use nautilus_core::UnixNanos;
1429    use nautilus_model::{
1430        enums::AssetClass,
1431        identifiers::{InstrumentId, Symbol},
1432        instruments::BinaryOption,
1433        types::{Currency, Price, Quantity},
1434    };
1435    use rstest::rstest;
1436
1437    use super::*;
1438    use crate::websocket::{client::WsSubscriptionHandle, handler::HandlerCommand};
1439
1440    fn make_handle() -> (
1441        WsSubscriptionHandle,
1442        tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
1443    ) {
1444        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
1445        (WsSubscriptionHandle::from_sender(tx), rx)
1446    }
1447
1448    type ActiveSet = Arc<AtomicSet<InstrumentId>>;
1449    type OpenTokens = Arc<AtomicSet<Ustr>>;
1450    type WsMutex = Arc<tokio::sync::Mutex<()>>;
1451
1452    fn make_state() -> (ActiveSet, ActiveSet, ActiveSet, OpenTokens, WsMutex) {
1453        (
1454            Arc::new(AtomicSet::new()),
1455            Arc::new(AtomicSet::new()),
1456            Arc::new(AtomicSet::new()),
1457            Arc::new(AtomicSet::new()),
1458            Arc::new(tokio::sync::Mutex::new(())),
1459        )
1460    }
1461
1462    fn instrument_id() -> InstrumentId {
1463        InstrumentId::from("0xCOND-0xTOKEN.POLYMARKET")
1464    }
1465
1466    fn token_ustr() -> Ustr {
1467        Ustr::from("0xCOND-0xTOKEN")
1468    }
1469
1470    #[rstest]
1471    #[tokio::test]
1472    async fn sync_ws_subscribes_when_intent_present_and_ws_closed() {
1473        let (ws, mut rx) = make_handle();
1474        let (quotes, deltas, trades, open, mutex) = make_state();
1475
1476        // Intent: quotes subscribed.
1477        let inst = instrument_id();
1478        quotes.insert(inst);
1479
1480        sync_ws_subscription_async(
1481            inst,
1482            inst.symbol.as_str().to_string(),
1483            quotes.clone(),
1484            deltas,
1485            trades,
1486            open.clone(),
1487            mutex,
1488            ws,
1489        )
1490        .await;
1491
1492        assert!(open.contains(&token_ustr()));
1493
1494        match rx.try_recv().expect("expected SubscribeMarket command") {
1495            HandlerCommand::SubscribeMarket(ids) => {
1496                assert_eq!(ids, vec![inst.symbol.as_str().to_string()]);
1497            }
1498            other => panic!("unexpected command: {other:?}"),
1499        }
1500        assert!(rx.try_recv().is_err());
1501    }
1502
1503    #[rstest]
1504    #[tokio::test]
1505    async fn sync_ws_unsubscribes_when_intent_absent_and_ws_open() {
1506        let (ws, mut rx) = make_handle();
1507        let (quotes, deltas, trades, open, mutex) = make_state();
1508
1509        // WS currently open, but no caller wants it anymore.
1510        let inst = instrument_id();
1511        open.insert(token_ustr());
1512
1513        sync_ws_subscription_async(
1514            inst,
1515            inst.symbol.as_str().to_string(),
1516            quotes,
1517            deltas,
1518            trades,
1519            open.clone(),
1520            mutex,
1521            ws,
1522        )
1523        .await;
1524
1525        assert!(!open.contains(&token_ustr()));
1526
1527        match rx.try_recv().expect("expected UnsubscribeMarket command") {
1528            HandlerCommand::UnsubscribeMarket(ids) => {
1529                assert_eq!(ids, vec![inst.symbol.as_str().to_string()]);
1530            }
1531            other => panic!("unexpected command: {other:?}"),
1532        }
1533    }
1534
1535    #[rstest]
1536    #[case::intent_matches_open(true, true, false)]
1537    #[case::no_intent_not_open(false, false, false)]
1538    #[tokio::test]
1539    async fn sync_ws_no_op_when_state_already_matches(
1540        #[case] want: bool,
1541        #[case] is_open_initial: bool,
1542        #[case] expect_command: bool,
1543    ) {
1544        let (ws, mut rx) = make_handle();
1545        let (quotes, deltas, trades, open, mutex) = make_state();
1546
1547        let inst = instrument_id();
1548
1549        if want {
1550            quotes.insert(inst);
1551        }
1552
1553        if is_open_initial {
1554            open.insert(token_ustr());
1555        }
1556
1557        sync_ws_subscription_async(
1558            inst,
1559            inst.symbol.as_str().to_string(),
1560            quotes,
1561            deltas,
1562            trades,
1563            open.clone(),
1564            mutex,
1565            ws,
1566        )
1567        .await;
1568
1569        // State is preserved either way.
1570        assert_eq!(open.contains(&token_ustr()), is_open_initial);
1571        assert_eq!(rx.try_recv().is_ok(), expect_command);
1572    }
1573
1574    #[rstest]
1575    #[tokio::test]
1576    async fn sync_ws_rolls_back_open_tokens_on_send_failure() {
1577        // Drop the receiver so the channel send fails.
1578        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
1579        drop(rx);
1580        let ws = WsSubscriptionHandle::from_sender(tx);
1581
1582        let (quotes, deltas, trades, open, mutex) = make_state();
1583
1584        let inst = instrument_id();
1585        quotes.insert(inst);
1586
1587        sync_ws_subscription_async(
1588            inst,
1589            inst.symbol.as_str().to_string(),
1590            quotes,
1591            deltas,
1592            trades,
1593            open.clone(),
1594            mutex,
1595            ws,
1596        )
1597        .await;
1598
1599        // Send failed, so the tracked WS state must be rolled back.
1600        assert!(!open.contains(&token_ustr()));
1601    }
1602
1603    #[rstest]
1604    #[case::any_kind(true, false, false)]
1605    #[case::another_kind(false, true, false)]
1606    #[case::third_kind(false, false, true)]
1607    #[tokio::test]
1608    async fn sync_ws_opens_for_any_active_kind(#[case] q: bool, #[case] d: bool, #[case] t: bool) {
1609        let (ws, mut rx) = make_handle();
1610        let (quotes, deltas, trades, open, mutex) = make_state();
1611
1612        let inst = instrument_id();
1613
1614        if q {
1615            quotes.insert(inst);
1616        }
1617
1618        if d {
1619            deltas.insert(inst);
1620        }
1621
1622        if t {
1623            trades.insert(inst);
1624        }
1625
1626        sync_ws_subscription_async(
1627            inst,
1628            inst.symbol.as_str().to_string(),
1629            quotes,
1630            deltas,
1631            trades,
1632            open.clone(),
1633            mutex,
1634            ws,
1635        )
1636        .await;
1637
1638        assert!(open.contains(&token_ustr()));
1639        assert!(matches!(
1640            rx.try_recv(),
1641            Ok(HandlerCommand::SubscribeMarket(_))
1642        ));
1643    }
1644
1645    fn stub_instrument(
1646        raw_symbol: &str,
1647        price_increment: Price,
1648        size_increment: Quantity,
1649    ) -> InstrumentAny {
1650        let price_precision = price_increment.precision;
1651        let size_precision = size_increment.precision;
1652        InstrumentAny::BinaryOption(BinaryOption::new(
1653            InstrumentId::from(format!("{raw_symbol}.POLYMARKET").as_str()),
1654            Symbol::new(raw_symbol),
1655            AssetClass::Alternative,
1656            Currency::pUSD(),
1657            UnixNanos::default(),
1658            UnixNanos::from(u64::MAX),
1659            price_precision,
1660            size_precision,
1661            price_increment,
1662            size_increment,
1663            None,
1664            None,
1665            None,
1666            None,
1667            None,
1668            None,
1669            None,
1670            None,
1671            None,
1672            None,
1673            None,
1674            None,
1675            None,
1676            UnixNanos::default(),
1677            UnixNanos::default(),
1678        ))
1679    }
1680
1681    #[rstest]
1682    #[case::p3_s2("token-a", Price::from("0.001"), Quantity::from("0.01"))]
1683    #[case::p5_s4("token-b", Price::from("0.00001"), Quantity::from("0.0001"))]
1684    fn cache_instrument_writes_both_maps(
1685        #[case] raw_symbol: &str,
1686        #[case] price_increment: Price,
1687        #[case] size_increment: Quantity,
1688    ) {
1689        let instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>> = Arc::new(AtomicMap::new());
1690        let token_meta: Arc<DashMap<Ustr, TokenMeta>> = Arc::new(DashMap::new());
1691        let inst = stub_instrument(raw_symbol, price_increment, size_increment);
1692        let expected_id = inst.id();
1693        let expected_token = Ustr::from(raw_symbol);
1694        let expected_price_precision = price_increment.precision;
1695        let expected_size_precision = size_increment.precision;
1696
1697        cache_instrument(&instruments, &token_meta, &inst);
1698
1699        let loaded = instruments.load();
1700        let cached = loaded
1701            .get(&expected_id)
1702            .expect("instrument inserted into live cache");
1703        assert_eq!(cached.id(), expected_id);
1704        assert_eq!(cached.raw_symbol().as_str(), raw_symbol);
1705
1706        let meta = token_meta
1707            .get(&expected_token)
1708            .expect("token_meta inserted for raw_symbol");
1709        assert_eq!(meta.instrument_id, expected_id);
1710        assert_eq!(meta.price_precision, expected_price_precision);
1711        assert_eq!(meta.size_precision, expected_size_precision);
1712    }
1713
1714    #[rstest]
1715    fn cache_instrument_overwrites_precisions_on_second_call() {
1716        let instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>> = Arc::new(AtomicMap::new());
1717        let token_meta: Arc<DashMap<Ustr, TokenMeta>> = Arc::new(DashMap::new());
1718        let raw_symbol = "token-overwrite";
1719
1720        let first = stub_instrument(raw_symbol, Price::from("0.01"), Quantity::from("0.1"));
1721        cache_instrument(&instruments, &token_meta, &first);
1722
1723        let second = stub_instrument(raw_symbol, Price::from("0.0001"), Quantity::from("0.001"));
1724        cache_instrument(&instruments, &token_meta, &second);
1725
1726        let meta = token_meta
1727            .get(&Ustr::from(raw_symbol))
1728            .expect("token_meta present after overwrite");
1729        assert_eq!(meta.price_precision, 4);
1730        assert_eq!(meta.size_precision, 3);
1731        assert_eq!(token_meta.len(), 1);
1732        assert_eq!(instruments.load().len(), 1);
1733    }
1734
1735    #[rstest]
1736    fn cache_instrument_maintains_dual_cache_invariant() {
1737        let instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>> = Arc::new(AtomicMap::new());
1738        let token_meta: Arc<DashMap<Ustr, TokenMeta>> = Arc::new(DashMap::new());
1739
1740        let samples = [
1741            stub_instrument("token-1", Price::from("0.001"), Quantity::from("0.01")),
1742            stub_instrument("token-2", Price::from("0.0001"), Quantity::from("0.01")),
1743            stub_instrument("token-3", Price::from("0.00001"), Quantity::from("0.001")),
1744        ];
1745
1746        for inst in &samples {
1747            cache_instrument(&instruments, &token_meta, inst);
1748        }
1749
1750        let loaded = instruments.load();
1751        assert_eq!(loaded.len(), samples.len());
1752        for inst in loaded.values() {
1753            let token_id = Ustr::from(inst.raw_symbol().as_str());
1754            let meta = token_meta
1755                .get(&token_id)
1756                .unwrap_or_else(|| panic!("missing token_meta for {token_id}"));
1757            assert_eq!(meta.instrument_id, inst.id());
1758        }
1759    }
1760}