1use std::{collections::HashMap, result::Result as StdResult, sync::Arc};
19
20use nautilus_core::{
21 UnixNanos,
22 consts::NAUTILUS_USER_AGENT,
23 time::{AtomicTime, get_atomic_clock_realtime},
24};
25use nautilus_model::instruments::InstrumentAny;
26use nautilus_network::{
27 http::{HttpClient, HttpClientError, Method, USER_AGENT},
28 retry::{RetryConfig, RetryManager},
29};
30use serde::Serialize;
31use serde_json::Value;
32
33use crate::{
34 common::urls::gamma_api_url,
35 http::{
36 error::{Error, Result},
37 models::{GammaEvent, GammaMarket, GammaTag, SearchResponse},
38 parse::{create_instrument_from_def, parse_gamma_market},
39 query::{GetGammaEventsParams, GetGammaMarketsParams, GetSearchParams},
40 rate_limits::POLYMARKET_GAMMA_REST_QUOTA,
41 },
42};
43
44#[derive(Debug, Clone)]
49pub struct PolymarketGammaRawHttpClient {
50 client: HttpClient,
51 base_url: String,
52}
53
54impl PolymarketGammaRawHttpClient {
55 pub fn new(base_url: Option<String>, timeout_secs: u64) -> StdResult<Self, HttpClientError> {
61 Ok(Self {
62 client: HttpClient::new(
63 Self::default_headers(),
64 vec![],
65 vec![],
66 Some(*POLYMARKET_GAMMA_REST_QUOTA),
67 Some(timeout_secs),
68 None,
69 )?,
70 base_url: base_url
71 .unwrap_or_else(|| gamma_api_url().to_string())
72 .trim_end_matches('/')
73 .to_string(),
74 })
75 }
76
77 fn default_headers() -> HashMap<String, String> {
78 HashMap::from([
79 (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string()),
80 ("Content-Type".to_string(), "application/json".to_string()),
81 ])
82 }
83
84 fn url(&self, path: &str) -> String {
85 format!("{}{path}", self.base_url)
86 }
87
88 async fn send_get<P: Serialize, T: serde::de::DeserializeOwned>(
89 &self,
90 path: &str,
91 params: Option<&P>,
92 ) -> Result<T> {
93 let url = self.url(path);
94 let response = self
95 .client
96 .request_with_params(Method::GET, url, params, None, None, None, None)
97 .await
98 .map_err(Error::from_http_client)?;
99
100 if response.status.is_success() {
101 serde_json::from_slice(&response.body).map_err(Error::Serde)
102 } else {
103 Err(Error::from_status_code(
104 response.status.as_u16(),
105 &response.body,
106 ))
107 }
108 }
109
110 pub async fn get_gamma_markets(
114 &self,
115 params: GetGammaMarketsParams,
116 ) -> Result<Vec<GammaMarket>> {
117 let value: Value = self.send_get("/markets", Some(¶ms)).await?;
118
119 let array = match value {
120 Value::Array(_) => value,
121 Value::Object(ref map) if map.contains_key("data") => {
122 map.get("data").cloned().unwrap_or(Value::Array(vec![]))
123 }
124 _ => {
125 return Err(Error::decode(
126 "Unrecognized Gamma markets response schema".to_string(),
127 ));
128 }
129 };
130
131 serde_json::from_value(array).map_err(Error::Serde)
132 }
133
134 pub async fn get_gamma_market(&self, market_id: &str) -> Result<GammaMarket> {
136 let path = format!("/markets/{market_id}");
137 self.send_get::<(), _>(&path, None::<&()>).await
138 }
139
140 pub async fn get_gamma_events_by_slug(&self, slug: &str) -> Result<Vec<GammaEvent>> {
142 #[derive(Serialize)]
143 struct EventSlugParams<'a> {
144 slug: &'a str,
145 }
146 let params = EventSlugParams { slug };
147 self.send_get("/events", Some(¶ms)).await
148 }
149
150 pub async fn get_gamma_events(&self, params: GetGammaEventsParams) -> Result<Vec<GammaEvent>> {
152 self.send_get("/events", Some(¶ms)).await
153 }
154
155 pub async fn get_gamma_tags(&self) -> Result<Vec<GammaTag>> {
157 self.send_get::<(), _>("/tags", None::<&()>).await
158 }
159
160 pub async fn get_public_search(&self, params: GetSearchParams) -> Result<SearchResponse> {
162 self.send_get("/public-search", Some(¶ms)).await
163 }
164}
165
166fn parse_markets_to_instruments(markets: &[GammaMarket], ts_init: UnixNanos) -> Vec<InstrumentAny> {
167 let mut instruments = Vec::new();
168 let mut skipped_empty = 0u32;
169
170 for market in markets {
171 if market.clob_token_ids.is_empty() {
173 skipped_empty += 1;
174 continue;
175 }
176
177 match parse_gamma_market(market) {
178 Ok(defs) => {
179 for def in defs {
180 match create_instrument_from_def(&def, ts_init) {
181 Ok(instrument) => instruments.push(instrument),
182 Err(e) => log::warn!("Failed to create instrument: {e}"),
183 }
184 }
185 }
186 Err(e) => log::warn!("Failed to parse gamma market: {e}"),
187 }
188 }
189
190 if skipped_empty > 0 {
191 log::debug!(
192 "Skipped {skipped_empty} markets with empty clob_token_ids (currently not tradeable)"
193 );
194 }
195 instruments
196}
197
198fn flatten_event_markets(events: Vec<GammaEvent>) -> Vec<GammaMarket> {
199 events
200 .into_iter()
201 .flat_map(|event| {
202 let event_game_id = event.game_id;
203 event.markets.into_iter().map(move |mut market| {
204 if market.game_id.is_none() {
205 market.game_id = event_game_id;
206 }
207 market
208 })
209 })
210 .collect()
211}
212
213#[derive(Debug, Clone)]
219pub struct PolymarketGammaHttpClient {
220 inner: Arc<PolymarketGammaRawHttpClient>,
221 clock: &'static AtomicTime,
222 retry_manager: Arc<RetryManager<Error>>,
223}
224
225impl PolymarketGammaHttpClient {
226 pub fn new(
232 gamma_base_url: Option<String>,
233 timeout_secs: u64,
234 retry_config: RetryConfig,
235 ) -> StdResult<Self, HttpClientError> {
236 Ok(Self {
237 inner: Arc::new(PolymarketGammaRawHttpClient::new(
238 gamma_base_url,
239 timeout_secs,
240 )?),
241 clock: get_atomic_clock_realtime(),
242 retry_manager: Arc::new(RetryManager::new(retry_config)),
243 })
244 }
245
246 async fn fetch_gamma_markets_paginated(
248 &self,
249 base_params: GetGammaMarketsParams,
250 ) -> anyhow::Result<Vec<GammaMarket>> {
251 const PAGE_LIMIT: u32 = 500;
252 let page_size = base_params.limit.unwrap_or(PAGE_LIMIT);
253 let max_markets = base_params.max_markets;
254 let mut all_markets = Vec::new();
255 let mut offset: u32 = base_params.offset.unwrap_or(0);
256 let mut page_num = 0u32;
257
258 loop {
259 let params = GetGammaMarketsParams {
260 limit: Some(page_size),
261 offset: Some(offset),
262 ..base_params.clone()
263 };
264
265 let page = self.inner.get_gamma_markets(params).await?;
266 let page_len = page.len() as u32;
267 page_num += 1;
268 all_markets.extend(page);
269
270 log::info!(
271 "Fetched markets page {page_num}: {page_len} markets (total: {})",
272 all_markets.len(),
273 );
274
275 if let Some(cap) = max_markets
276 && all_markets.len() as u32 >= cap
277 {
278 all_markets.truncate(cap as usize);
279 break;
280 }
281
282 if page_len < page_size {
283 break;
284 }
285
286 offset += page_size;
287 }
288
289 Ok(all_markets)
290 }
291
292 async fn fetch_all_gamma_markets(&self) -> anyhow::Result<Vec<GammaMarket>> {
294 self.fetch_gamma_markets_paginated(GetGammaMarketsParams {
295 active: Some(true),
296 closed: Some(false),
297 ..Default::default()
298 })
299 .await
300 }
301
302 pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
308 let markets = self.fetch_all_gamma_markets().await?;
309 let ts_init = self.clock.get_time_ns();
310 let instruments = parse_markets_to_instruments(&markets, ts_init);
311 log::info!("Parsed {} instruments from Gamma API", instruments.len());
312 Ok(instruments)
313 }
314
315 pub async fn request_instruments_by_slugs(
325 &self,
326 slugs: Vec<String>,
327 ) -> anyhow::Result<Vec<InstrumentAny>> {
328 let ts_init = self.clock.get_time_ns();
329
330 let futures = slugs.into_iter().map(|slug| {
331 let inner = Arc::clone(&self.inner);
332 async move {
333 let params = GetGammaMarketsParams {
334 slug: Some(slug.clone()),
335 ..Default::default()
336 };
337
338 match inner.get_gamma_markets(params).await {
339 Ok(markets) => Some((slug, markets)),
340 Err(e) => {
341 log::warn!("Failed to fetch slug '{slug}': {e}");
342 None
343 }
344 }
345 }
346 });
347
348 let results = futures_util::future::join_all(futures).await;
349
350 let total_slugs = results.len();
351 let succeeded = results.iter().filter(|r| r.is_some()).count();
352 let mut instruments = Vec::new();
353
354 for result in results.into_iter().flatten() {
355 let (slug, markets) = result;
356 if markets.is_empty() {
357 log::debug!("No markets found for slug '{slug}'");
358 continue;
359 }
360 instruments.extend(parse_markets_to_instruments(&markets, ts_init));
361 }
362
363 if succeeded == 0 && total_slugs > 0 {
364 anyhow::bail!("All {total_slugs} slug requests failed");
365 }
366
367 log::info!("Parsed {} instruments from slug queries", instruments.len());
368 Ok(instruments)
369 }
370
371 pub async fn request_instruments_by_slugs_with_retry(
378 &self,
379 slugs: Vec<String>,
380 ) -> anyhow::Result<Vec<InstrumentAny>> {
381 let inner = Arc::clone(&self.inner);
382 let ts_init = self.clock.get_time_ns();
383
384 self.retry_manager
385 .execute_with_retry(
386 "gamma_fetch_by_slugs",
387 || {
388 let inner = Arc::clone(&inner);
389 let slugs = slugs.clone();
390 async move {
391 let futures = slugs.into_iter().map(|slug| {
392 let inner = Arc::clone(&inner);
393 async move {
394 let params = GetGammaMarketsParams {
395 slug: Some(slug.clone()),
396 ..Default::default()
397 };
398 inner
399 .get_gamma_markets(params)
400 .await
401 .map(|markets| (slug, markets))
402 }
403 });
404
405 let results: Vec<_> = futures_util::future::join_all(futures)
406 .await
407 .into_iter()
408 .collect::<StdResult<Vec<_>, _>>()?;
409
410 let instruments: Vec<InstrumentAny> = results
411 .into_iter()
412 .flat_map(|(_, markets)| {
413 parse_markets_to_instruments(&markets, ts_init)
414 })
415 .collect();
416
417 if instruments.is_empty() {
418 return Err(Error::transport(
419 "Gamma returned no instruments (indexing lag)",
420 ));
421 }
422
423 Ok(instruments)
424 }
425 },
426 |e| e.is_retryable(),
427 Error::transport,
428 )
429 .await
430 .map_err(|e| anyhow::anyhow!("{e}"))
431 }
432
433 pub async fn request_instruments_by_event_slugs(
438 &self,
439 event_slugs: Vec<String>,
440 ) -> anyhow::Result<Vec<InstrumentAny>> {
441 let ts_init = self.clock.get_time_ns();
442
443 let futures = event_slugs.into_iter().map(|slug| {
444 let inner = Arc::clone(&self.inner);
445 async move {
446 match inner.get_gamma_events_by_slug(&slug).await {
447 Ok(events) => Some((slug, events)),
448 Err(e) => {
449 log::warn!("Failed to fetch event slug '{slug}': {e}");
450 None
451 }
452 }
453 }
454 });
455
456 let results = futures_util::future::join_all(futures).await;
457
458 let total = results.len();
459 let succeeded = results.iter().filter(|r| r.is_some()).count();
460 let mut instruments = Vec::new();
461
462 for result in results.into_iter().flatten() {
463 let (slug, events) = result;
464 let markets = flatten_event_markets(events);
465 if markets.is_empty() {
466 log::warn!("No markets found in event slug '{slug}'");
467 continue;
468 }
469 instruments.extend(parse_markets_to_instruments(&markets, ts_init));
470 }
471
472 if succeeded == 0 && total > 0 {
473 anyhow::bail!("All {total} event slug requests failed");
474 }
475
476 log::info!(
477 "Parsed {} instruments from event slug queries",
478 instruments.len()
479 );
480 Ok(instruments)
481 }
482
483 pub async fn request_instruments_by_params(
485 &self,
486 base_params: GetGammaMarketsParams,
487 ) -> anyhow::Result<Vec<InstrumentAny>> {
488 let markets = self.fetch_gamma_markets_paginated(base_params).await?;
489 let ts_init = self.clock.get_time_ns();
490 let instruments = parse_markets_to_instruments(&markets, ts_init);
491 log::debug!("Parsed {} instruments from params query", instruments.len());
492 Ok(instruments)
493 }
494
495 pub async fn request_instruments_by_event_query(
504 &self,
505 event_slug: &str,
506 params: GetGammaMarketsParams,
507 ) -> anyhow::Result<Vec<InstrumentAny>> {
508 let events = self.inner.get_gamma_events_by_slug(event_slug).await?;
509 let mut markets = flatten_event_markets(events);
510
511 if markets.is_empty() {
512 log::warn!("No markets found in event slug '{event_slug}'");
513 return Ok(Vec::new());
514 }
515
516 log::debug!("Event '{event_slug}' returned {} markets", markets.len());
517
518 if let Some(ref order_field) = params.order {
520 let ascending = params.ascending.unwrap_or(false);
521 markets.sort_by(|a, b| {
522 let cmp = match order_field.as_str() {
523 "liquidity" => a
524 .liquidity_num
525 .unwrap_or(0.0)
526 .partial_cmp(&b.liquidity_num.unwrap_or(0.0)),
527 "volume" => a
528 .volume_num
529 .unwrap_or(0.0)
530 .partial_cmp(&b.volume_num.unwrap_or(0.0)),
531 "volume24hr" => a
532 .volume_24hr
533 .unwrap_or(0.0)
534 .partial_cmp(&b.volume_24hr.unwrap_or(0.0)),
535 "competitive" => a
536 .competitive
537 .unwrap_or(0.0)
538 .partial_cmp(&b.competitive.unwrap_or(0.0)),
539 "spread" => a
540 .spread
541 .unwrap_or(f64::MAX)
542 .partial_cmp(&b.spread.unwrap_or(f64::MAX)),
543 "best_bid" => a
544 .best_bid
545 .unwrap_or(0.0)
546 .partial_cmp(&b.best_bid.unwrap_or(0.0)),
547 "one_day_price_change" => a
548 .one_day_price_change
549 .unwrap_or(0.0)
550 .partial_cmp(&b.one_day_price_change.unwrap_or(0.0)),
551 "volume_1wk" => a
552 .volume_1wk
553 .unwrap_or(0.0)
554 .partial_cmp(&b.volume_1wk.unwrap_or(0.0)),
555 _ => None,
556 };
557 let cmp = cmp.unwrap_or(std::cmp::Ordering::Equal);
558 if ascending { cmp } else { cmp.reverse() }
559 });
560 }
561
562 if let Some(cap) = params.max_markets {
564 markets.truncate(cap as usize);
565 }
566
567 let ts_init = self.clock.get_time_ns();
568 let instruments = parse_markets_to_instruments(&markets, ts_init);
569 log::debug!(
570 "Parsed {} instruments from event query '{event_slug}'",
571 instruments.len()
572 );
573 Ok(instruments)
574 }
575
576 async fn fetch_gamma_events_paginated(
578 &self,
579 base_params: GetGammaEventsParams,
580 ) -> anyhow::Result<Vec<GammaEvent>> {
581 const PAGE_LIMIT: u32 = 100;
582 let page_size = base_params.limit.unwrap_or(PAGE_LIMIT);
583 let max_events = base_params.max_events;
584 let mut all_events = Vec::new();
585 let mut offset: u32 = base_params.offset.unwrap_or(0);
586 let mut page_num = 0u32;
587
588 loop {
589 let params = GetGammaEventsParams {
590 limit: Some(page_size),
591 offset: Some(offset),
592 ..base_params.clone()
593 };
594
595 let page = self.inner.get_gamma_events(params).await?;
596 let page_len = page.len() as u32;
597 page_num += 1;
598 let market_count: usize = page.iter().map(|e| e.markets.len()).sum();
599 all_events.extend(page);
600
601 log::info!(
602 "Fetched events page {page_num}: {page_len} events, {market_count} markets (total events: {})",
603 all_events.len(),
604 );
605
606 if let Some(cap) = max_events
607 && all_events.len() as u32 >= cap
608 {
609 all_events.truncate(cap as usize);
610 break;
611 }
612
613 if page_len < page_size {
614 break;
615 }
616
617 offset += page_size;
618 }
619
620 Ok(all_events)
621 }
622
623 pub async fn request_instruments_by_event_params(
625 &self,
626 params: GetGammaEventsParams,
627 ) -> anyhow::Result<Vec<InstrumentAny>> {
628 let events = self.fetch_gamma_events_paginated(params).await?;
629 let ts_init = self.clock.get_time_ns();
630 let total_events = events.len();
631 let markets = flatten_event_markets(events);
632 let total_markets = markets.len();
633 let instruments = parse_markets_to_instruments(&markets, ts_init);
634 log::info!(
635 "Parsed {} instruments from {total_events} events ({total_markets} markets)",
636 instruments.len(),
637 );
638 Ok(instruments)
639 }
640
641 pub async fn request_instruments_by_search(
643 &self,
644 params: GetSearchParams,
645 ) -> anyhow::Result<Vec<InstrumentAny>> {
646 let response = self.inner.get_public_search(params).await?;
647 let ts_init = self.clock.get_time_ns();
648
649 let mut instruments = Vec::new();
650
651 if let Some(markets) = &response.markets {
652 instruments.extend(parse_markets_to_instruments(markets, ts_init));
653 }
654
655 if let Some(events) = &response.events {
656 let event_markets = flatten_event_markets(events.clone());
657 instruments.extend(parse_markets_to_instruments(&event_markets, ts_init));
658 }
659
660 log::debug!("Parsed {} instruments from search query", instruments.len());
661 Ok(instruments)
662 }
663
664 pub async fn request_tags(&self) -> anyhow::Result<Vec<GammaTag>> {
666 Ok(self.inner.get_gamma_tags().await?)
667 }
668
669 #[must_use]
671 pub fn inner(&self) -> &Arc<PolymarketGammaRawHttpClient> {
672 &self.inner
673 }
674}