nautilus_polymarket/
providers.rs1use std::{collections::HashMap, fmt::Debug, sync::Arc};
19
20use ahash::{AHashMap, AHashSet};
21use async_trait::async_trait;
22use nautilus_common::providers::{InstrumentProvider, InstrumentStore};
23use nautilus_model::{
24 identifiers::InstrumentId,
25 instruments::{Instrument, InstrumentAny},
26};
27use ustr::Ustr;
28
29use crate::{
30 filters::InstrumentFilter,
31 http::{gamma::PolymarketGammaHttpClient, models::GammaTag, query::GetGammaMarketsParams},
32};
33
34pub struct PolymarketInstrumentProvider {
42 store: InstrumentStore,
43 http_client: PolymarketGammaHttpClient,
44 token_index: AHashMap<Ustr, InstrumentId>,
45 filters: Vec<Arc<dyn InstrumentFilter>>,
46}
47
48impl Debug for PolymarketInstrumentProvider {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct(stringify!(PolymarketInstrumentProvider))
51 .field("store", &self.store)
52 .field("http_client", &self.http_client)
53 .field("token_index_len", &self.token_index.len())
54 .field("filters", &self.filters)
55 .finish()
56 }
57}
58
59impl PolymarketInstrumentProvider {
60 #[must_use]
62 pub fn new(http_client: PolymarketGammaHttpClient) -> Self {
63 Self {
64 store: InstrumentStore::new(),
65 http_client,
66 token_index: AHashMap::new(),
67 filters: Vec::new(),
68 }
69 }
70
71 #[must_use]
73 pub fn with_filters(
74 http_client: PolymarketGammaHttpClient,
75 filters: Vec<Arc<dyn InstrumentFilter>>,
76 ) -> Self {
77 Self {
78 store: InstrumentStore::new(),
79 http_client,
80 token_index: AHashMap::new(),
81 filters,
82 }
83 }
84
85 #[must_use]
87 pub fn with_filter(
88 http_client: PolymarketGammaHttpClient,
89 filter: Arc<dyn InstrumentFilter>,
90 ) -> Self {
91 Self {
92 store: InstrumentStore::new(),
93 http_client,
94 token_index: AHashMap::new(),
95 filters: vec![filter],
96 }
97 }
98
99 pub fn add_filter(&mut self, filter: Arc<dyn InstrumentFilter>) {
101 self.filters.push(filter);
102 }
103
104 pub fn clear_filters(&mut self) {
106 self.filters.clear();
107 }
108
109 #[must_use]
111 pub fn get_by_token_id(&self, token_id: &Ustr) -> Option<&InstrumentAny> {
112 let instrument_id = self.token_index.get(token_id)?;
113 self.store.find(instrument_id)
114 }
115
116 #[must_use]
121 pub fn build_token_map(&self) -> AHashMap<Ustr, InstrumentAny> {
122 self.token_index
123 .iter()
124 .filter_map(|(token_id, instrument_id)| {
125 self.store
126 .find(instrument_id)
127 .map(|inst| (*token_id, inst.clone()))
128 })
129 .collect()
130 }
131
132 pub async fn load_by_slugs(&mut self, slugs: Vec<String>) -> anyhow::Result<()> {
142 let instruments = self.http_client.request_instruments_by_slugs(slugs).await?;
143
144 for instrument in &instruments {
145 self.token_index.insert(
146 Ustr::from(instrument.raw_symbol().as_str()),
147 instrument.id(),
148 );
149 }
150
151 self.store.add_bulk(instruments);
152
153 Ok(())
154 }
155
156 #[must_use]
158 pub fn filters(&self) -> Vec<Arc<dyn InstrumentFilter>> {
159 self.filters.clone()
160 }
161
162 #[must_use]
164 pub fn http_client(&self) -> &PolymarketGammaHttpClient {
165 &self.http_client
166 }
167
168 pub async fn list_tags(&self) -> anyhow::Result<Vec<GammaTag>> {
170 self.http_client.request_tags().await
171 }
172
173 pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
174 for inst in &instruments {
175 self.token_index
176 .insert(Ustr::from(inst.raw_symbol().as_str()), inst.id());
177 }
178 self.store.add_bulk(instruments);
179 }
180
181 async fn load_filtered(&self) -> anyhow::Result<Vec<InstrumentAny>> {
184 fetch_instruments(&self.http_client, &self.filters).await
185 }
186}
187
188pub async fn fetch_instruments(
190 http_client: &PolymarketGammaHttpClient,
191 filters: &[Arc<dyn InstrumentFilter>],
192) -> anyhow::Result<Vec<InstrumentAny>> {
193 if filters.is_empty() {
194 return http_client.request_instruments().await;
195 }
196
197 let mut instruments = Vec::new();
198
199 for filter in filters {
200 if let Some(slugs) = filter.market_slugs()
201 && !slugs.is_empty()
202 {
203 let result = http_client.request_instruments_by_slugs(slugs).await?;
204 instruments.extend(result);
205 }
206
207 if let Some(event_slugs) = filter.event_slugs()
208 && !event_slugs.is_empty()
209 {
210 let result = http_client
211 .request_instruments_by_event_slugs(event_slugs)
212 .await?;
213 instruments.extend(result);
214 }
215
216 if let Some(params) = filter.query_params() {
217 let result = http_client.request_instruments_by_params(params).await?;
218 instruments.extend(result);
219 }
220
221 if let Some(event_queries) = filter.event_queries() {
222 for (event_slug, params) in event_queries {
223 let result = http_client
224 .request_instruments_by_event_query(&event_slug, params)
225 .await?;
226 instruments.extend(result);
227 }
228 }
229
230 if let Some(params) = filter.event_params() {
231 let result = http_client
232 .request_instruments_by_event_params(params)
233 .await?;
234 instruments.extend(result);
235 }
236
237 if let Some(params) = filter.search_params() {
238 let result = http_client.request_instruments_by_search(params).await?;
239 instruments.extend(result);
240 }
241 }
242
243 let mut seen = AHashSet::new();
244 instruments.retain(|inst| seen.insert(inst.id()));
245 instruments.retain(|inst| filters.iter().all(|f| f.accept(inst)));
246
247 Ok(instruments)
248}
249
250pub fn extract_condition_id(instrument_id: &InstrumentId) -> anyhow::Result<String> {
256 let symbol = instrument_id.symbol.as_str();
257 symbol
258 .rfind('-')
259 .map(|idx| symbol[..idx].to_string())
260 .ok_or_else(|| {
261 anyhow::anyhow!("Cannot extract condition_id from symbol '{symbol}': no '-' separator")
262 })
263}
264
265pub fn build_gamma_params_from_hashmap(map: &HashMap<String, String>) -> GetGammaMarketsParams {
267 let mut params = GetGammaMarketsParams::default();
268
269 if let Some(v) = map.get("active") {
270 params.active = v.parse().ok();
271 }
272
273 if let Some(v) = map.get("closed") {
274 params.closed = v.parse().ok();
275 }
276
277 if let Some(v) = map.get("archived") {
278 params.archived = v.parse().ok();
279 }
280
281 if let Some(v) = map.get("slug") {
282 params.slug = Some(v.clone());
283 }
284
285 if let Some(v) = map.get("tag_id") {
286 params.tag_id = Some(v.clone());
287 }
288
289 if let Some(v) = map.get("condition_ids") {
290 params.condition_ids = Some(v.clone());
291 }
292
293 if let Some(v) = map.get("clob_token_ids") {
294 params.clob_token_ids = Some(v.clone());
295 }
296
297 if let Some(v) = map.get("liquidity_num_min") {
298 params.liquidity_num_min = v.parse().ok();
299 }
300
301 if let Some(v) = map.get("liquidity_num_max") {
302 params.liquidity_num_max = v.parse().ok();
303 }
304
305 if let Some(v) = map.get("volume_num_min") {
306 params.volume_num_min = v.parse().ok();
307 }
308
309 if let Some(v) = map.get("volume_num_max") {
310 params.volume_num_max = v.parse().ok();
311 }
312
313 if let Some(v) = map.get("order") {
314 params.order = Some(v.clone());
315 }
316
317 if let Some(v) = map.get("ascending") {
318 params.ascending = v.parse().ok();
319 }
320
321 if let Some(v) = map.get("limit") {
322 params.limit = v.parse().ok();
323 }
324
325 if let Some(v) = map.get("max_markets") {
326 params.max_markets = v.parse().ok();
327 }
328
329 params
330}
331
332pub async fn resolve_tag_slug(
334 client: &PolymarketGammaHttpClient,
335 slug: &str,
336) -> anyhow::Result<String> {
337 let tags = client.request_tags().await?;
338 tags.iter()
339 .find(|t| t.slug.as_deref() == Some(slug))
340 .map(|t| t.id.clone())
341 .ok_or_else(|| anyhow::anyhow!("Tag slug '{slug}' not found"))
342}
343
344#[async_trait(?Send)]
345impl InstrumentProvider for PolymarketInstrumentProvider {
346 fn store(&self) -> &InstrumentStore {
347 &self.store
348 }
349
350 fn store_mut(&mut self) -> &mut InstrumentStore {
351 &mut self.store
352 }
353
354 async fn load_all(&mut self, filters: Option<&HashMap<String, String>>) -> anyhow::Result<()> {
355 let instruments = if self.filters.is_empty() {
356 if let Some(map) = filters {
358 if map.is_empty() {
359 self.http_client.request_instruments().await?
360 } else {
361 let params = build_gamma_params_from_hashmap(map);
362 self.http_client
363 .request_instruments_by_params(params)
364 .await?
365 }
366 } else {
367 self.http_client.request_instruments().await?
368 }
369 } else {
370 self.load_filtered().await?
371 };
372
373 self.store.clear();
374 self.token_index.clear();
375 self.add_instruments(instruments);
376 self.store.set_initialized();
377
378 Ok(())
379 }
380
381 async fn load_ids(
382 &mut self,
383 instrument_ids: &[InstrumentId],
384 filters: Option<&HashMap<String, String>>,
385 ) -> anyhow::Result<()> {
386 let missing: Vec<_> = instrument_ids
387 .iter()
388 .filter(|id| !self.store.contains(id))
389 .collect();
390
391 if missing.is_empty() {
392 return Ok(());
393 }
394
395 let mut condition_ids: Vec<String> = missing
398 .iter()
399 .filter_map(|id| extract_condition_id(id).ok())
400 .collect();
401 condition_ids.sort();
402 condition_ids.dedup();
403
404 if !condition_ids.is_empty() && condition_ids.len() <= 100 {
405 let params = GetGammaMarketsParams {
406 condition_ids: Some(condition_ids.join(",")),
407 ..Default::default()
408 };
409 let instruments = self
410 .http_client
411 .request_instruments_by_params(params)
412 .await?;
413 self.add_instruments(instruments);
414 } else {
415 self.load_all(filters).await?;
417 }
418
419 Ok(())
420 }
421
422 async fn load(
423 &mut self,
424 instrument_id: &InstrumentId,
425 filters: Option<&HashMap<String, String>>,
426 ) -> anyhow::Result<()> {
427 if self.store.contains(instrument_id) {
428 return Ok(());
429 }
430
431 if let Ok(cid) = extract_condition_id(instrument_id) {
433 let params = GetGammaMarketsParams {
434 condition_ids: Some(cid),
435 ..Default::default()
436 };
437
438 if let Ok(instruments) = self.http_client.request_instruments_by_params(params).await {
439 self.add_instruments(instruments);
440
441 if self.store.contains(instrument_id) {
442 return Ok(());
443 }
444 }
445 }
446
447 if !self.store.is_initialized() {
449 self.load_all(filters).await?;
450 }
451
452 if self.store.contains(instrument_id) {
453 Ok(())
454 } else {
455 anyhow::bail!("Instrument {instrument_id} not found on Polymarket")
456 }
457 }
458}