Skip to main content

nautilus_polymarket/
providers.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Instrument provider for the Polymarket adapter.
17
18use std::{collections::HashMap, fmt::Debug, sync::Arc};
19
20use ahash::{AHashMap, AHashSet};
21use async_trait::async_trait;
22use nautilus_common::providers::{InstrumentProvider, InstrumentStore};
23use nautilus_model::{
24    identifiers::InstrumentId,
25    instruments::{Instrument, InstrumentAny},
26};
27use ustr::Ustr;
28
29use crate::{
30    filters::InstrumentFilter,
31    http::{gamma::PolymarketGammaHttpClient, models::GammaTag, query::GetGammaMarketsParams},
32};
33
34/// Provides Polymarket instruments via the Gamma API.
35///
36/// Wraps [`PolymarketGammaHttpClient`] with an [`InstrumentStore`] and a
37/// token_id index for resolving WebSocket asset IDs to instruments.
38///
39/// Optional [`InstrumentFilter`]s control which instruments are loaded
40/// during `load_all()`. Without filters, all active markets are fetched.
41pub struct PolymarketInstrumentProvider {
42    store: InstrumentStore,
43    http_client: PolymarketGammaHttpClient,
44    token_index: AHashMap<Ustr, InstrumentId>,
45    filters: Vec<Arc<dyn InstrumentFilter>>,
46}
47
48impl Debug for PolymarketInstrumentProvider {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct(stringify!(PolymarketInstrumentProvider))
51            .field("store", &self.store)
52            .field("http_client", &self.http_client)
53            .field("token_index_len", &self.token_index.len())
54            .field("filters", &self.filters)
55            .finish()
56    }
57}
58
59impl PolymarketInstrumentProvider {
60    /// Creates a new [`PolymarketInstrumentProvider`] with an empty store and no filters.
61    #[must_use]
62    pub fn new(http_client: PolymarketGammaHttpClient) -> Self {
63        Self {
64            store: InstrumentStore::new(),
65            http_client,
66            token_index: AHashMap::new(),
67            filters: Vec::new(),
68        }
69    }
70
71    /// Creates a new [`PolymarketInstrumentProvider`] with multiple filters.
72    #[must_use]
73    pub fn with_filters(
74        http_client: PolymarketGammaHttpClient,
75        filters: Vec<Arc<dyn InstrumentFilter>>,
76    ) -> Self {
77        Self {
78            store: InstrumentStore::new(),
79            http_client,
80            token_index: AHashMap::new(),
81            filters,
82        }
83    }
84
85    /// Creates a new [`PolymarketInstrumentProvider`] with a single filter.
86    #[must_use]
87    pub fn with_filter(
88        http_client: PolymarketGammaHttpClient,
89        filter: Arc<dyn InstrumentFilter>,
90    ) -> Self {
91        Self {
92            store: InstrumentStore::new(),
93            http_client,
94            token_index: AHashMap::new(),
95            filters: vec![filter],
96        }
97    }
98
99    /// Adds an instrument filter for subsequent `load_all()` calls.
100    pub fn add_filter(&mut self, filter: Arc<dyn InstrumentFilter>) {
101        self.filters.push(filter);
102    }
103
104    /// Clears all instrument filters, reverting to bulk load behavior.
105    pub fn clear_filters(&mut self) {
106        self.filters.clear();
107    }
108
109    /// Returns the instrument for the given token ID, if found.
110    #[must_use]
111    pub fn get_by_token_id(&self, token_id: &Ustr) -> Option<&InstrumentAny> {
112        let instrument_id = self.token_index.get(token_id)?;
113        self.store.find(instrument_id)
114    }
115
116    /// Builds a frozen snapshot mapping token IDs to instruments.
117    ///
118    /// Used to provide the WS handler task with a read-only lookup
119    /// table after instruments have been loaded.
120    #[must_use]
121    pub fn build_token_map(&self) -> AHashMap<Ustr, InstrumentAny> {
122        self.token_index
123            .iter()
124            .filter_map(|(token_id, instrument_id)| {
125                self.store
126                    .find(instrument_id)
127                    .map(|inst| (*token_id, inst.clone()))
128            })
129            .collect()
130    }
131
132    /// Loads instruments for the given slugs additively into the store.
133    ///
134    /// Unlike [`Self::load_all`], this does **not** clear existing instruments or
135    /// mark the store as initialized, allowing incremental loading of
136    /// slug-based markets alongside bulk data.
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the HTTP request or parsing fails.
141    pub async fn load_by_slugs(&mut self, slugs: Vec<String>) -> anyhow::Result<()> {
142        let instruments = self.http_client.request_instruments_by_slugs(slugs).await?;
143
144        for instrument in &instruments {
145            self.token_index.insert(
146                Ustr::from(instrument.raw_symbol().as_str()),
147                instrument.id(),
148            );
149        }
150
151        self.store.add_bulk(instruments);
152
153        Ok(())
154    }
155
156    /// Returns a clone of the configured instrument filters.
157    #[must_use]
158    pub fn filters(&self) -> Vec<Arc<dyn InstrumentFilter>> {
159        self.filters.clone()
160    }
161
162    /// Returns a reference to the underlying HTTP client.
163    #[must_use]
164    pub fn http_client(&self) -> &PolymarketGammaHttpClient {
165        &self.http_client
166    }
167
168    /// Fetches available tags from the Gamma API.
169    pub async fn list_tags(&self) -> anyhow::Result<Vec<GammaTag>> {
170        self.http_client.request_tags().await
171    }
172
173    pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
174        for inst in &instruments {
175            self.token_index
176                .insert(Ustr::from(inst.raw_symbol().as_str()), inst.id());
177        }
178        self.store.add_bulk(instruments);
179    }
180
181    /// Loads instruments using all configured filters, combining results from
182    /// each filter's methods that return `Some`.
183    async fn load_filtered(&self) -> anyhow::Result<Vec<InstrumentAny>> {
184        fetch_instruments(&self.http_client, &self.filters).await
185    }
186}
187
188/// Fetches instruments from the Gamma API, respecting any configured filters.
189pub async fn fetch_instruments(
190    http_client: &PolymarketGammaHttpClient,
191    filters: &[Arc<dyn InstrumentFilter>],
192) -> anyhow::Result<Vec<InstrumentAny>> {
193    if filters.is_empty() {
194        return http_client.request_instruments().await;
195    }
196
197    let mut instruments = Vec::new();
198
199    for filter in filters {
200        if let Some(slugs) = filter.market_slugs()
201            && !slugs.is_empty()
202        {
203            let result = http_client.request_instruments_by_slugs(slugs).await?;
204            instruments.extend(result);
205        }
206
207        if let Some(event_slugs) = filter.event_slugs()
208            && !event_slugs.is_empty()
209        {
210            let result = http_client
211                .request_instruments_by_event_slugs(event_slugs)
212                .await?;
213            instruments.extend(result);
214        }
215
216        if let Some(params) = filter.query_params() {
217            let result = http_client.request_instruments_by_params(params).await?;
218            instruments.extend(result);
219        }
220
221        if let Some(event_queries) = filter.event_queries() {
222            for (event_slug, params) in event_queries {
223                let result = http_client
224                    .request_instruments_by_event_query(&event_slug, params)
225                    .await?;
226                instruments.extend(result);
227            }
228        }
229
230        if let Some(params) = filter.event_params() {
231            let result = http_client
232                .request_instruments_by_event_params(params)
233                .await?;
234            instruments.extend(result);
235        }
236
237        if let Some(params) = filter.search_params() {
238            let result = http_client.request_instruments_by_search(params).await?;
239            instruments.extend(result);
240        }
241    }
242
243    let mut seen = AHashSet::new();
244    instruments.retain(|inst| seen.insert(inst.id()));
245    instruments.retain(|inst| filters.iter().all(|f| f.accept(inst)));
246
247    Ok(instruments)
248}
249
250/// Extracts the condition ID from an instrument symbol.
251///
252/// Polymarket instrument symbols follow the pattern `{condition_id}-{token_id}`.
253/// The condition_id is a hex string (e.g. `0xabc123...`) and the token_id is a
254/// large decimal number. This extracts the condition_id by splitting at the last `-`.
255pub fn extract_condition_id(instrument_id: &InstrumentId) -> anyhow::Result<String> {
256    let symbol = instrument_id.symbol.as_str();
257    symbol
258        .rfind('-')
259        .map(|idx| symbol[..idx].to_string())
260        .ok_or_else(|| {
261            anyhow::anyhow!("Cannot extract condition_id from symbol '{symbol}': no '-' separator")
262        })
263}
264
265/// Builds `GetGammaMarketsParams` from a `HashMap<String, String>`.
266pub fn build_gamma_params_from_hashmap(map: &HashMap<String, String>) -> GetGammaMarketsParams {
267    let mut params = GetGammaMarketsParams::default();
268
269    if let Some(v) = map.get("active") {
270        params.active = v.parse().ok();
271    }
272
273    if let Some(v) = map.get("closed") {
274        params.closed = v.parse().ok();
275    }
276
277    if let Some(v) = map.get("archived") {
278        params.archived = v.parse().ok();
279    }
280
281    if let Some(v) = map.get("slug") {
282        params.slug = Some(v.clone());
283    }
284
285    if let Some(v) = map.get("tag_id") {
286        params.tag_id = Some(v.clone());
287    }
288
289    if let Some(v) = map.get("condition_ids") {
290        params.condition_ids = Some(v.clone());
291    }
292
293    if let Some(v) = map.get("clob_token_ids") {
294        params.clob_token_ids = Some(v.clone());
295    }
296
297    if let Some(v) = map.get("liquidity_num_min") {
298        params.liquidity_num_min = v.parse().ok();
299    }
300
301    if let Some(v) = map.get("liquidity_num_max") {
302        params.liquidity_num_max = v.parse().ok();
303    }
304
305    if let Some(v) = map.get("volume_num_min") {
306        params.volume_num_min = v.parse().ok();
307    }
308
309    if let Some(v) = map.get("volume_num_max") {
310        params.volume_num_max = v.parse().ok();
311    }
312
313    if let Some(v) = map.get("order") {
314        params.order = Some(v.clone());
315    }
316
317    if let Some(v) = map.get("ascending") {
318        params.ascending = v.parse().ok();
319    }
320
321    if let Some(v) = map.get("limit") {
322        params.limit = v.parse().ok();
323    }
324
325    if let Some(v) = map.get("max_markets") {
326        params.max_markets = v.parse().ok();
327    }
328
329    params
330}
331
332/// Resolves a tag slug to a tag ID by querying the Gamma tags endpoint.
333pub async fn resolve_tag_slug(
334    client: &PolymarketGammaHttpClient,
335    slug: &str,
336) -> anyhow::Result<String> {
337    let tags = client.request_tags().await?;
338    tags.iter()
339        .find(|t| t.slug.as_deref() == Some(slug))
340        .map(|t| t.id.clone())
341        .ok_or_else(|| anyhow::anyhow!("Tag slug '{slug}' not found"))
342}
343
344#[async_trait(?Send)]
345impl InstrumentProvider for PolymarketInstrumentProvider {
346    fn store(&self) -> &InstrumentStore {
347        &self.store
348    }
349
350    fn store_mut(&mut self) -> &mut InstrumentStore {
351        &mut self.store
352    }
353
354    async fn load_all(&mut self, filters: Option<&HashMap<String, String>>) -> anyhow::Result<()> {
355        let instruments = if self.filters.is_empty() {
356            // If HashMap filters are provided, convert to Gamma params
357            if let Some(map) = filters {
358                if map.is_empty() {
359                    self.http_client.request_instruments().await?
360                } else {
361                    let params = build_gamma_params_from_hashmap(map);
362                    self.http_client
363                        .request_instruments_by_params(params)
364                        .await?
365                }
366            } else {
367                self.http_client.request_instruments().await?
368            }
369        } else {
370            self.load_filtered().await?
371        };
372
373        self.store.clear();
374        self.token_index.clear();
375        self.add_instruments(instruments);
376        self.store.set_initialized();
377
378        Ok(())
379    }
380
381    async fn load_ids(
382        &mut self,
383        instrument_ids: &[InstrumentId],
384        filters: Option<&HashMap<String, String>>,
385    ) -> anyhow::Result<()> {
386        let missing: Vec<_> = instrument_ids
387            .iter()
388            .filter(|id| !self.store.contains(id))
389            .collect();
390
391        if missing.is_empty() {
392            return Ok(());
393        }
394
395        // Extract unique condition IDs from instrument symbols
396        // Symbol format: "{condition_id}-{token_id}"
397        let mut condition_ids: Vec<String> = missing
398            .iter()
399            .filter_map(|id| extract_condition_id(id).ok())
400            .collect();
401        condition_ids.sort();
402        condition_ids.dedup();
403
404        if !condition_ids.is_empty() && condition_ids.len() <= 100 {
405            let params = GetGammaMarketsParams {
406                condition_ids: Some(condition_ids.join(",")),
407                ..Default::default()
408            };
409            let instruments = self
410                .http_client
411                .request_instruments_by_params(params)
412                .await?;
413            self.add_instruments(instruments);
414        } else {
415            // Too many to batch, fall back to full load
416            self.load_all(filters).await?;
417        }
418
419        Ok(())
420    }
421
422    async fn load(
423        &mut self,
424        instrument_id: &InstrumentId,
425        filters: Option<&HashMap<String, String>>,
426    ) -> anyhow::Result<()> {
427        if self.store.contains(instrument_id) {
428            return Ok(());
429        }
430
431        // Try direct fetch via condition_id extracted from symbol
432        if let Ok(cid) = extract_condition_id(instrument_id) {
433            let params = GetGammaMarketsParams {
434                condition_ids: Some(cid),
435                ..Default::default()
436            };
437
438            if let Ok(instruments) = self.http_client.request_instruments_by_params(params).await {
439                self.add_instruments(instruments);
440
441                if self.store.contains(instrument_id) {
442                    return Ok(());
443                }
444            }
445        }
446
447        // Fallback: full load_all if not initialized
448        if !self.store.is_initialized() {
449            self.load_all(filters).await?;
450        }
451
452        if self.store.contains(instrument_id) {
453            Ok(())
454        } else {
455            anyhow::bail!("Instrument {instrument_id} not found on Polymarket")
456        }
457    }
458}