Skip to main content

nautilus_polymarket/http/
gamma.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client for the Polymarket Gamma API.
17
18use std::{collections::HashMap, result::Result as StdResult, sync::Arc};
19
20use nautilus_core::{
21    UnixNanos,
22    consts::NAUTILUS_USER_AGENT,
23    time::{AtomicTime, get_atomic_clock_realtime},
24};
25use nautilus_model::instruments::InstrumentAny;
26use nautilus_network::{
27    http::{HttpClient, HttpClientError, Method, USER_AGENT},
28    retry::{RetryConfig, RetryManager},
29};
30use serde::Serialize;
31use serde_json::Value;
32
33use crate::{
34    common::urls::gamma_api_url,
35    http::{
36        error::{Error, Result},
37        models::{GammaEvent, GammaMarket, GammaTag, SearchResponse},
38        parse::{create_instrument_from_def, parse_gamma_market},
39        query::{GetGammaEventsParams, GetGammaMarketsParams, GetSearchParams},
40        rate_limits::POLYMARKET_GAMMA_REST_QUOTA,
41    },
42};
43
44/// Provides a raw HTTP client for the Polymarket Gamma API.
45///
46/// Handles HTTP transport for fetching market data from the public Gamma API.
47/// No authentication is required.
48#[derive(Debug, Clone)]
49pub struct PolymarketGammaRawHttpClient {
50    client: HttpClient,
51    base_url: String,
52}
53
54impl PolymarketGammaRawHttpClient {
55    /// Creates a new [`PolymarketGammaRawHttpClient`].
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if the HTTP client cannot be created.
60    pub fn new(base_url: Option<String>, timeout_secs: u64) -> StdResult<Self, HttpClientError> {
61        Ok(Self {
62            client: HttpClient::new(
63                Self::default_headers(),
64                vec![],
65                vec![],
66                Some(*POLYMARKET_GAMMA_REST_QUOTA),
67                Some(timeout_secs),
68                None,
69            )?,
70            base_url: base_url
71                .unwrap_or_else(|| gamma_api_url().to_string())
72                .trim_end_matches('/')
73                .to_string(),
74        })
75    }
76
77    fn default_headers() -> HashMap<String, String> {
78        HashMap::from([
79            (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string()),
80            ("Content-Type".to_string(), "application/json".to_string()),
81        ])
82    }
83
84    fn url(&self, path: &str) -> String {
85        format!("{}{path}", self.base_url)
86    }
87
88    async fn send_get<P: Serialize, T: serde::de::DeserializeOwned>(
89        &self,
90        path: &str,
91        params: Option<&P>,
92    ) -> Result<T> {
93        let url = self.url(path);
94        let response = self
95            .client
96            .request_with_params(Method::GET, url, params, None, None, None, None)
97            .await
98            .map_err(Error::from_http_client)?;
99
100        if response.status.is_success() {
101            serde_json::from_slice(&response.body).map_err(Error::Serde)
102        } else {
103            Err(Error::from_status_code(
104                response.status.as_u16(),
105                &response.body,
106            ))
107        }
108    }
109
110    /// Fetches markets from the Gamma API.
111    ///
112    /// Handles both bare array and `{"data": [...]}` response schemas.
113    pub async fn get_gamma_markets(
114        &self,
115        params: GetGammaMarketsParams,
116    ) -> Result<Vec<GammaMarket>> {
117        let value: Value = self.send_get("/markets", Some(&params)).await?;
118
119        let array = match value {
120            Value::Array(_) => value,
121            Value::Object(ref map) if map.contains_key("data") => {
122                map.get("data").cloned().unwrap_or(Value::Array(vec![]))
123            }
124            _ => {
125                return Err(Error::decode(
126                    "Unrecognized Gamma markets response schema".to_string(),
127                ));
128            }
129        };
130
131        serde_json::from_value(array).map_err(Error::Serde)
132    }
133
134    /// Fetches a single market by ID from the Gamma API.
135    pub async fn get_gamma_market(&self, market_id: &str) -> Result<GammaMarket> {
136        let path = format!("/markets/{market_id}");
137        self.send_get::<(), _>(&path, None::<&()>).await
138    }
139
140    /// Fetches events from the Gamma API `GET /events?slug=`.
141    pub async fn get_gamma_events_by_slug(&self, slug: &str) -> Result<Vec<GammaEvent>> {
142        #[derive(Serialize)]
143        struct EventSlugParams<'a> {
144            slug: &'a str,
145        }
146        let params = EventSlugParams { slug };
147        self.send_get("/events", Some(&params)).await
148    }
149
150    /// Fetches events from the Gamma API `GET /events` with full query params.
151    pub async fn get_gamma_events(&self, params: GetGammaEventsParams) -> Result<Vec<GammaEvent>> {
152        self.send_get("/events", Some(&params)).await
153    }
154
155    /// Fetches available tags from the Gamma API `GET /tags`.
156    pub async fn get_gamma_tags(&self) -> Result<Vec<GammaTag>> {
157        self.send_get::<(), _>("/tags", None::<&()>).await
158    }
159
160    /// Searches the Gamma API via `GET /public-search`.
161    pub async fn get_public_search(&self, params: GetSearchParams) -> Result<SearchResponse> {
162        self.send_get("/public-search", Some(&params)).await
163    }
164}
165
166fn parse_markets_to_instruments(markets: &[GammaMarket], ts_init: UnixNanos) -> Vec<InstrumentAny> {
167    let mut instruments = Vec::new();
168    let mut skipped_empty = 0u32;
169
170    for market in markets {
171        // Markets without CLOB token IDs are not tradeable (resolved, pending, etc.)
172        if market.clob_token_ids.is_empty() {
173            skipped_empty += 1;
174            continue;
175        }
176
177        match parse_gamma_market(market) {
178            Ok(defs) => {
179                for def in defs {
180                    match create_instrument_from_def(&def, ts_init) {
181                        Ok(instrument) => instruments.push(instrument),
182                        Err(e) => log::warn!("Failed to create instrument: {e}"),
183                    }
184                }
185            }
186            Err(e) => log::warn!("Failed to parse gamma market: {e}"),
187        }
188    }
189
190    if skipped_empty > 0 {
191        log::debug!(
192            "Skipped {skipped_empty} markets with empty clob_token_ids (currently not tradeable)"
193        );
194    }
195    instruments
196}
197
198fn flatten_event_markets(events: Vec<GammaEvent>) -> Vec<GammaMarket> {
199    events
200        .into_iter()
201        .flat_map(|event| {
202            let event_game_id = event.game_id;
203            event.markets.into_iter().map(move |mut market| {
204                if market.game_id.is_none() {
205                    market.game_id = event_game_id;
206                }
207                market
208            })
209        })
210        .collect()
211}
212
213/// Provides a domain HTTP client for Polymarket instrument fetching.
214///
215/// Wraps [`PolymarketGammaRawHttpClient`] with instrument parsing: fetch from
216/// the Gamma API and parse into Nautilus types. Stateless with respect to
217/// instrument storage; caching is handled by the instrument provider.
218#[derive(Debug, Clone)]
219pub struct PolymarketGammaHttpClient {
220    inner: Arc<PolymarketGammaRawHttpClient>,
221    clock: &'static AtomicTime,
222    retry_manager: Arc<RetryManager<Error>>,
223}
224
225impl PolymarketGammaHttpClient {
226    /// Creates a new [`PolymarketGammaHttpClient`].
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if the underlying HTTP client cannot be created.
231    pub fn new(
232        gamma_base_url: Option<String>,
233        timeout_secs: u64,
234        retry_config: RetryConfig,
235    ) -> StdResult<Self, HttpClientError> {
236        Ok(Self {
237            inner: Arc::new(PolymarketGammaRawHttpClient::new(
238                gamma_base_url,
239                timeout_secs,
240            )?),
241            clock: get_atomic_clock_realtime(),
242            retry_manager: Arc::new(RetryManager::new(retry_config)),
243        })
244    }
245
246    /// Fetches markets from the Gamma API with the given base params, paginating automatically.
247    async fn fetch_gamma_markets_paginated(
248        &self,
249        base_params: GetGammaMarketsParams,
250    ) -> anyhow::Result<Vec<GammaMarket>> {
251        const PAGE_LIMIT: u32 = 500;
252        let page_size = base_params.limit.unwrap_or(PAGE_LIMIT);
253        let max_markets = base_params.max_markets;
254        let mut all_markets = Vec::new();
255        let mut offset: u32 = base_params.offset.unwrap_or(0);
256        let mut page_num = 0u32;
257
258        loop {
259            let params = GetGammaMarketsParams {
260                limit: Some(page_size),
261                offset: Some(offset),
262                ..base_params.clone()
263            };
264
265            let page = self.inner.get_gamma_markets(params).await?;
266            let page_len = page.len() as u32;
267            page_num += 1;
268            all_markets.extend(page);
269
270            log::info!(
271                "Fetched markets page {page_num}: {page_len} markets (total: {})",
272                all_markets.len(),
273            );
274
275            if let Some(cap) = max_markets
276                && all_markets.len() as u32 >= cap
277            {
278                all_markets.truncate(cap as usize);
279                break;
280            }
281
282            if page_len < page_size {
283                break;
284            }
285
286            offset += page_size;
287        }
288
289        Ok(all_markets)
290    }
291
292    /// Fetches all active markets from the Gamma API, paginating automatically.
293    async fn fetch_all_gamma_markets(&self) -> anyhow::Result<Vec<GammaMarket>> {
294        self.fetch_gamma_markets_paginated(GetGammaMarketsParams {
295            active: Some(true),
296            closed: Some(false),
297            ..Default::default()
298        })
299        .await
300    }
301
302    /// Fetches instruments from the Gamma API and returns Nautilus domain types.
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if the HTTP request or parsing fails.
307    pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
308        let markets = self.fetch_all_gamma_markets().await?;
309        let ts_init = self.clock.get_time_ns();
310        let instruments = parse_markets_to_instruments(&markets, ts_init);
311        log::info!("Parsed {} instruments from Gamma API", instruments.len());
312        Ok(instruments)
313    }
314
315    /// Fetches instruments for the given slugs concurrently.
316    ///
317    /// Each slug is queried individually via the Gamma API. Missing or
318    /// unparsable slugs are logged and skipped.
319    ///
320    /// # Errors
321    ///
322    /// Returns an error if all slug requests fail. Individual slug failures
323    /// are warned and skipped when at least one slug succeeds.
324    pub async fn request_instruments_by_slugs(
325        &self,
326        slugs: Vec<String>,
327    ) -> anyhow::Result<Vec<InstrumentAny>> {
328        let ts_init = self.clock.get_time_ns();
329
330        let futures = slugs.into_iter().map(|slug| {
331            let inner = Arc::clone(&self.inner);
332            async move {
333                let params = GetGammaMarketsParams {
334                    slug: Some(slug.clone()),
335                    ..Default::default()
336                };
337
338                match inner.get_gamma_markets(params).await {
339                    Ok(markets) => Some((slug, markets)),
340                    Err(e) => {
341                        log::warn!("Failed to fetch slug '{slug}': {e}");
342                        None
343                    }
344                }
345            }
346        });
347
348        let results = futures_util::future::join_all(futures).await;
349
350        let total_slugs = results.len();
351        let succeeded = results.iter().filter(|r| r.is_some()).count();
352        let mut instruments = Vec::new();
353
354        for result in results.into_iter().flatten() {
355            let (slug, markets) = result;
356            if markets.is_empty() {
357                log::debug!("No markets found for slug '{slug}'");
358                continue;
359            }
360            instruments.extend(parse_markets_to_instruments(&markets, ts_init));
361        }
362
363        if succeeded == 0 && total_slugs > 0 {
364            anyhow::bail!("All {total_slugs} slug requests failed");
365        }
366
367        log::info!("Parsed {} instruments from slug queries", instruments.len());
368        Ok(instruments)
369    }
370
371    /// Fetches instruments for the given slugs with retry on empty results.
372    ///
373    /// Uses the client's [`RetryManager`] with exponential backoff. Gamma API
374    /// may not have indexed a newly created market yet, so empty results are
375    /// treated as retryable (indexing lag). HTTP errors are also retried per
376    /// the standard `is_retryable()` classification.
377    pub async fn request_instruments_by_slugs_with_retry(
378        &self,
379        slugs: Vec<String>,
380    ) -> anyhow::Result<Vec<InstrumentAny>> {
381        let inner = Arc::clone(&self.inner);
382        let ts_init = self.clock.get_time_ns();
383
384        self.retry_manager
385            .execute_with_retry(
386                "gamma_fetch_by_slugs",
387                || {
388                    let inner = Arc::clone(&inner);
389                    let slugs = slugs.clone();
390                    async move {
391                        let futures = slugs.into_iter().map(|slug| {
392                            let inner = Arc::clone(&inner);
393                            async move {
394                                let params = GetGammaMarketsParams {
395                                    slug: Some(slug.clone()),
396                                    ..Default::default()
397                                };
398                                inner
399                                    .get_gamma_markets(params)
400                                    .await
401                                    .map(|markets| (slug, markets))
402                            }
403                        });
404
405                        let results: Vec<_> = futures_util::future::join_all(futures)
406                            .await
407                            .into_iter()
408                            .collect::<StdResult<Vec<_>, _>>()?;
409
410                        let instruments: Vec<InstrumentAny> = results
411                            .into_iter()
412                            .flat_map(|(_, markets)| {
413                                parse_markets_to_instruments(&markets, ts_init)
414                            })
415                            .collect();
416
417                        if instruments.is_empty() {
418                            return Err(Error::transport(
419                                "Gamma returned no instruments (indexing lag)",
420                            ));
421                        }
422
423                        Ok(instruments)
424                    }
425                },
426                |e| e.is_retryable(),
427                Error::transport,
428            )
429            .await
430            .map_err(|e| anyhow::anyhow!("{e}"))
431    }
432
433    /// Fetches instruments from event slugs concurrently.
434    ///
435    /// Each slug queries `GET /events?slug=`, extracts the markets array from
436    /// the first matching event, and parses each market into instruments.
437    pub async fn request_instruments_by_event_slugs(
438        &self,
439        event_slugs: Vec<String>,
440    ) -> anyhow::Result<Vec<InstrumentAny>> {
441        let ts_init = self.clock.get_time_ns();
442
443        let futures = event_slugs.into_iter().map(|slug| {
444            let inner = Arc::clone(&self.inner);
445            async move {
446                match inner.get_gamma_events_by_slug(&slug).await {
447                    Ok(events) => Some((slug, events)),
448                    Err(e) => {
449                        log::warn!("Failed to fetch event slug '{slug}': {e}");
450                        None
451                    }
452                }
453            }
454        });
455
456        let results = futures_util::future::join_all(futures).await;
457
458        let total = results.len();
459        let succeeded = results.iter().filter(|r| r.is_some()).count();
460        let mut instruments = Vec::new();
461
462        for result in results.into_iter().flatten() {
463            let (slug, events) = result;
464            let markets = flatten_event_markets(events);
465            if markets.is_empty() {
466                log::warn!("No markets found in event slug '{slug}'");
467                continue;
468            }
469            instruments.extend(parse_markets_to_instruments(&markets, ts_init));
470        }
471
472        if succeeded == 0 && total > 0 {
473            anyhow::bail!("All {total} event slug requests failed");
474        }
475
476        log::info!(
477            "Parsed {} instruments from event slug queries",
478            instruments.len()
479        );
480        Ok(instruments)
481    }
482
483    /// Fetches instruments using arbitrary Gamma API query params with auto-pagination.
484    pub async fn request_instruments_by_params(
485        &self,
486        base_params: GetGammaMarketsParams,
487    ) -> anyhow::Result<Vec<InstrumentAny>> {
488        let markets = self.fetch_gamma_markets_paginated(base_params).await?;
489        let ts_init = self.clock.get_time_ns();
490        let instruments = parse_markets_to_instruments(&markets, ts_init);
491        log::debug!("Parsed {} instruments from params query", instruments.len());
492        Ok(instruments)
493    }
494
495    /// Fetches instruments from an event slug with client-side sorting and limiting.
496    ///
497    /// The `/events?slug=` response already includes the full markets array,
498    /// so no second API call is needed. Sorting and truncation are applied
499    /// client-side using fields from `GetGammaMarketsParams`:
500    /// - `order`: sort field (`"liquidity"`, `"volume"`, `"volume24hr"`)
501    /// - `ascending`: sort direction (default: descending)
502    /// - `max_markets`: truncate after sorting
503    pub async fn request_instruments_by_event_query(
504        &self,
505        event_slug: &str,
506        params: GetGammaMarketsParams,
507    ) -> anyhow::Result<Vec<InstrumentAny>> {
508        let events = self.inner.get_gamma_events_by_slug(event_slug).await?;
509        let mut markets = flatten_event_markets(events);
510
511        if markets.is_empty() {
512            log::warn!("No markets found in event slug '{event_slug}'");
513            return Ok(Vec::new());
514        }
515
516        log::debug!("Event '{event_slug}' returned {} markets", markets.len());
517
518        // Client-side sort
519        if let Some(ref order_field) = params.order {
520            let ascending = params.ascending.unwrap_or(false);
521            markets.sort_by(|a, b| {
522                let cmp = match order_field.as_str() {
523                    "liquidity" => a
524                        .liquidity_num
525                        .unwrap_or(0.0)
526                        .partial_cmp(&b.liquidity_num.unwrap_or(0.0)),
527                    "volume" => a
528                        .volume_num
529                        .unwrap_or(0.0)
530                        .partial_cmp(&b.volume_num.unwrap_or(0.0)),
531                    "volume24hr" => a
532                        .volume_24hr
533                        .unwrap_or(0.0)
534                        .partial_cmp(&b.volume_24hr.unwrap_or(0.0)),
535                    "competitive" => a
536                        .competitive
537                        .unwrap_or(0.0)
538                        .partial_cmp(&b.competitive.unwrap_or(0.0)),
539                    "spread" => a
540                        .spread
541                        .unwrap_or(f64::MAX)
542                        .partial_cmp(&b.spread.unwrap_or(f64::MAX)),
543                    "best_bid" => a
544                        .best_bid
545                        .unwrap_or(0.0)
546                        .partial_cmp(&b.best_bid.unwrap_or(0.0)),
547                    "one_day_price_change" => a
548                        .one_day_price_change
549                        .unwrap_or(0.0)
550                        .partial_cmp(&b.one_day_price_change.unwrap_or(0.0)),
551                    "volume_1wk" => a
552                        .volume_1wk
553                        .unwrap_or(0.0)
554                        .partial_cmp(&b.volume_1wk.unwrap_or(0.0)),
555                    _ => None,
556                };
557                let cmp = cmp.unwrap_or(std::cmp::Ordering::Equal);
558                if ascending { cmp } else { cmp.reverse() }
559            });
560        }
561
562        // Client-side truncation
563        if let Some(cap) = params.max_markets {
564            markets.truncate(cap as usize);
565        }
566
567        let ts_init = self.clock.get_time_ns();
568        let instruments = parse_markets_to_instruments(&markets, ts_init);
569        log::debug!(
570            "Parsed {} instruments from event query '{event_slug}'",
571            instruments.len()
572        );
573        Ok(instruments)
574    }
575
576    /// Fetches events from the Gamma API with the given base params, paginating automatically.
577    async fn fetch_gamma_events_paginated(
578        &self,
579        base_params: GetGammaEventsParams,
580    ) -> anyhow::Result<Vec<GammaEvent>> {
581        const PAGE_LIMIT: u32 = 100;
582        let page_size = base_params.limit.unwrap_or(PAGE_LIMIT);
583        let max_events = base_params.max_events;
584        let mut all_events = Vec::new();
585        let mut offset: u32 = base_params.offset.unwrap_or(0);
586        let mut page_num = 0u32;
587
588        loop {
589            let params = GetGammaEventsParams {
590                limit: Some(page_size),
591                offset: Some(offset),
592                ..base_params.clone()
593            };
594
595            let page = self.inner.get_gamma_events(params).await?;
596            let page_len = page.len() as u32;
597            page_num += 1;
598            let market_count: usize = page.iter().map(|e| e.markets.len()).sum();
599            all_events.extend(page);
600
601            log::info!(
602                "Fetched events page {page_num}: {page_len} events, {market_count} markets (total events: {})",
603                all_events.len(),
604            );
605
606            if let Some(cap) = max_events
607                && all_events.len() as u32 >= cap
608            {
609                all_events.truncate(cap as usize);
610                break;
611            }
612
613            if page_len < page_size {
614                break;
615            }
616
617            offset += page_size;
618        }
619
620        Ok(all_events)
621    }
622
623    /// Fetches instruments from events matching full query params (paginated).
624    pub async fn request_instruments_by_event_params(
625        &self,
626        params: GetGammaEventsParams,
627    ) -> anyhow::Result<Vec<InstrumentAny>> {
628        let events = self.fetch_gamma_events_paginated(params).await?;
629        let ts_init = self.clock.get_time_ns();
630        let total_events = events.len();
631        let markets = flatten_event_markets(events);
632        let total_markets = markets.len();
633        let instruments = parse_markets_to_instruments(&markets, ts_init);
634        log::info!(
635            "Parsed {} instruments from {total_events} events ({total_markets} markets)",
636            instruments.len(),
637        );
638        Ok(instruments)
639    }
640
641    /// Searches for instruments via the Gamma public search endpoint.
642    pub async fn request_instruments_by_search(
643        &self,
644        params: GetSearchParams,
645    ) -> anyhow::Result<Vec<InstrumentAny>> {
646        let response = self.inner.get_public_search(params).await?;
647        let ts_init = self.clock.get_time_ns();
648
649        let mut instruments = Vec::new();
650
651        if let Some(markets) = &response.markets {
652            instruments.extend(parse_markets_to_instruments(markets, ts_init));
653        }
654
655        if let Some(events) = &response.events {
656            let event_markets = flatten_event_markets(events.clone());
657            instruments.extend(parse_markets_to_instruments(&event_markets, ts_init));
658        }
659
660        log::debug!("Parsed {} instruments from search query", instruments.len());
661        Ok(instruments)
662    }
663
664    /// Fetches available tags from the Gamma API.
665    pub async fn request_tags(&self) -> anyhow::Result<Vec<GammaTag>> {
666        Ok(self.inner.get_gamma_tags().await?)
667    }
668
669    /// Returns a reference to the underlying raw HTTP client.
670    #[must_use]
671    pub fn inner(&self) -> &Arc<PolymarketGammaRawHttpClient> {
672        &self.inner
673    }
674}