1use nautilus_core::{
19 UnixNanos,
20 correctness::{CorrectnessError, CorrectnessResult},
21 datetime::NANOSECONDS_IN_MILLISECOND,
22};
23use nautilus_model::{
24 data::{BookOrder, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
25 enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
26 identifiers::InstrumentId,
27 types::{Price, Quantity},
28};
29
30use super::messages::{PolymarketBookSnapshot, PolymarketQuote, PolymarketQuotes, PolymarketTrade};
31use crate::common::{enums::PolymarketOrderSide, parse::determine_trade_id};
32
33pub fn parse_timestamp_ms(ts: &str) -> anyhow::Result<UnixNanos> {
35 let ms: u64 = ts
36 .parse()
37 .map_err(|e| anyhow::anyhow!("Invalid timestamp '{ts}': {e}"))?;
38 let ns = ms
39 .checked_mul(NANOSECONDS_IN_MILLISECOND)
40 .ok_or_else(|| anyhow::anyhow!("Timestamp overflow for '{ts}'"))?;
41 Ok(UnixNanos::from(ns))
42}
43
44pub(crate) fn parse_price(s: &str, precision: u8) -> CorrectnessResult<Price> {
45 let value: f64 = s
46 .parse()
47 .map_err(|e| CorrectnessError::PredicateViolation {
48 message: format!("Invalid price '{s}': {e}"),
49 })?;
50 Price::new_checked(value, precision)
51}
52
53pub(crate) fn parse_quantity(s: &str, precision: u8) -> CorrectnessResult<Quantity> {
54 let value: f64 = s
55 .parse()
56 .map_err(|e| CorrectnessError::PredicateViolation {
57 message: format!("Invalid quantity '{s}': {e}"),
58 })?;
59 Quantity::new_checked(value, precision)
60}
61
62pub fn parse_book_snapshot(
64 snap: &PolymarketBookSnapshot,
65 instrument_id: InstrumentId,
66 price_precision: u8,
67 size_precision: u8,
68 ts_init: UnixNanos,
69) -> anyhow::Result<OrderBookDeltas> {
70 let ts_event = parse_timestamp_ms(&snap.timestamp)?;
71
72 let bids_len = snap.bids.len();
73 let asks_len = snap.asks.len();
74
75 if bids_len == 0 && asks_len == 0 {
76 anyhow::bail!("Empty book snapshot for {instrument_id}");
77 }
78
79 let total = bids_len + asks_len;
80 let mut deltas = Vec::with_capacity(total + 1);
81
82 let snapshot_flag = RecordFlag::F_SNAPSHOT as u8;
86 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_event, ts_init));
87
88 let mut count = 0;
89
90 for level in &snap.bids {
91 count += 1;
92 let price = parse_price(&level.price, price_precision)?;
93 let size = parse_quantity(&level.size, size_precision)?;
94 let order = BookOrder::new(OrderSide::Buy, price, size, 0);
95
96 let mut flags = snapshot_flag;
97 if count == total {
98 flags |= RecordFlag::F_LAST as u8;
99 }
100
101 deltas.push(OrderBookDelta::new_checked(
102 instrument_id,
103 BookAction::Add,
104 order,
105 flags,
106 0,
107 ts_event,
108 ts_init,
109 )?);
110 }
111
112 for level in &snap.asks {
113 count += 1;
114 let price = parse_price(&level.price, price_precision)?;
115 let size = parse_quantity(&level.size, size_precision)?;
116 let order = BookOrder::new(OrderSide::Sell, price, size, 0);
117
118 let mut flags = snapshot_flag;
119 if count == total {
120 flags |= RecordFlag::F_LAST as u8;
121 }
122
123 deltas.push(OrderBookDelta::new_checked(
124 instrument_id,
125 BookAction::Add,
126 order,
127 flags,
128 0,
129 ts_event,
130 ts_init,
131 )?);
132 }
133
134 Ok(OrderBookDeltas::new(instrument_id, deltas))
135}
136
137pub fn parse_book_deltas(
139 quotes: &PolymarketQuotes,
140 instrument_id: InstrumentId,
141 price_precision: u8,
142 size_precision: u8,
143 ts_init: UnixNanos,
144) -> anyhow::Result<OrderBookDeltas> {
145 let ts_event = parse_timestamp_ms("es.timestamp)?;
146
147 let total = quotes.price_changes.len();
148 let mut deltas = Vec::with_capacity(total);
149
150 for (idx, change) in quotes.price_changes.iter().enumerate() {
151 let price = parse_price(&change.price, price_precision)?;
152 let size = parse_quantity(&change.size, size_precision)?;
153 let side = match change.side {
154 PolymarketOrderSide::Buy => OrderSide::Buy,
155 PolymarketOrderSide::Sell => OrderSide::Sell,
156 };
157
158 let (action, order_size) = if size.is_zero() {
159 (BookAction::Delete, Quantity::new(0.0, size_precision))
160 } else {
161 (BookAction::Update, size)
162 };
163
164 let order = BookOrder::new(side, price, order_size, 0);
165 let flags = if idx == total - 1 {
166 RecordFlag::F_LAST as u8
167 } else {
168 0
169 };
170
171 deltas.push(OrderBookDelta::new_checked(
172 instrument_id,
173 action,
174 order,
175 flags,
176 0,
177 ts_event,
178 ts_init,
179 )?);
180 }
181
182 Ok(OrderBookDeltas::new(instrument_id, deltas))
183}
184
185pub fn parse_trade_tick(
187 trade: &PolymarketTrade,
188 instrument_id: InstrumentId,
189 price_precision: u8,
190 size_precision: u8,
191 ts_init: UnixNanos,
192) -> anyhow::Result<TradeTick> {
193 let price = parse_price(&trade.price, price_precision)?;
194 let size = parse_quantity(&trade.size, size_precision)?;
195 let aggressor_side = match trade.side {
196 PolymarketOrderSide::Buy => AggressorSide::Buyer,
197 PolymarketOrderSide::Sell => AggressorSide::Seller,
198 };
199 let ts_event = parse_timestamp_ms(&trade.timestamp)?;
200
201 let trade_id = determine_trade_id(
202 &trade.asset_id,
203 trade.side,
204 &trade.price,
205 &trade.size,
206 &trade.timestamp,
207 );
208
209 TradeTick::new_checked(
210 instrument_id,
211 price,
212 size,
213 aggressor_side,
214 trade_id,
215 ts_event,
216 ts_init,
217 )
218}
219
220pub fn parse_quote_from_snapshot(
229 snap: &PolymarketBookSnapshot,
230 instrument_id: InstrumentId,
231 price_precision: u8,
232 size_precision: u8,
233 ts_init: UnixNanos,
234) -> anyhow::Result<Option<QuoteTick>> {
235 if snap.bids.is_empty() || snap.asks.is_empty() {
236 return Ok(None);
237 }
238
239 let ts_event = parse_timestamp_ms(&snap.timestamp)?;
240
241 let best_bid = snap.bids.last().expect("bids not empty");
243 let best_ask = snap.asks.last().expect("asks not empty");
244
245 let bid_price = parse_price(&best_bid.price, price_precision)?;
246 let ask_price = parse_price(&best_ask.price, price_precision)?;
247 let bid_size = parse_quantity(&best_bid.size, size_precision)?;
248 let ask_size = parse_quantity(&best_ask.size, size_precision)?;
249
250 Ok(Some(QuoteTick::new_checked(
251 instrument_id,
252 bid_price,
253 ask_price,
254 bid_size,
255 ask_size,
256 ts_event,
257 ts_init,
258 )?))
259}
260
261pub fn parse_quote_from_price_change(
267 quote: &PolymarketQuote,
268 instrument_id: InstrumentId,
269 price_precision: u8,
270 size_precision: u8,
271 last_quote: Option<&QuoteTick>,
272 ts_event: UnixNanos,
273 ts_init: UnixNanos,
274) -> anyhow::Result<Option<QuoteTick>> {
275 let (Some(best_bid), Some(best_ask)) = ("e.best_bid, "e.best_ask) else {
276 return Ok(None);
277 };
278 let bid_price = parse_price(best_bid, price_precision)?;
279 let ask_price = parse_price(best_ask, price_precision)?;
280 let changed_price = parse_price("e.price, price_precision)?;
281
282 let size = parse_quantity("e.size, size_precision)?;
283 let zero = || Quantity::new(0.0, size_precision);
284
285 let (bid_size, ask_size) = match quote.side {
288 PolymarketOrderSide::Buy => {
289 let bid_size = if changed_price == bid_price {
290 size
291 } else {
292 last_quote.map_or_else(zero, |q| q.bid_size)
293 };
294 let ask_size = last_quote.map_or_else(zero, |q| q.ask_size);
295 (bid_size, ask_size)
296 }
297 PolymarketOrderSide::Sell => {
298 let ask_size = if changed_price == ask_price {
299 size
300 } else {
301 last_quote.map_or_else(zero, |q| q.ask_size)
302 };
303 let bid_size = last_quote.map_or_else(zero, |q| q.bid_size);
304 (bid_size, ask_size)
305 }
306 };
307
308 Ok(Some(QuoteTick::new_checked(
309 instrument_id,
310 bid_price,
311 ask_price,
312 bid_size,
313 ask_size,
314 ts_event,
315 ts_init,
316 )?))
317}
318
319#[cfg(test)]
320mod tests {
321 use nautilus_core::UnixNanos;
322 use nautilus_model::instruments::{Instrument, InstrumentAny};
323 use rstest::rstest;
324
325 use super::*;
326 use crate::http::parse::{create_instrument_from_def, parse_gamma_market};
327
328 fn load<T: serde::de::DeserializeOwned>(filename: &str) -> T {
329 let content =
330 std::fs::read_to_string(format!("test_data/{filename}")).expect("test data missing");
331 serde_json::from_str(&content).expect("parse failed")
332 }
333
334 fn test_instrument() -> InstrumentAny {
335 let market: crate::http::models::GammaMarket = load("gamma_market.json");
336 let defs = parse_gamma_market(&market).unwrap();
337 create_instrument_from_def(&defs[0], UnixNanos::from(1_000_000_000u64)).unwrap()
338 }
339
340 #[rstest]
341 fn test_parse_timestamp_ms() {
342 let ns = parse_timestamp_ms("1703875200000").unwrap();
343 assert_eq!(ns, UnixNanos::from(1_703_875_200_000_000_000u64));
344 }
345
346 #[rstest]
347 fn test_parse_timestamp_ms_invalid() {
348 assert!(parse_timestamp_ms("not_a_number").is_err());
349 }
350
351 #[rstest]
352 fn test_parse_book_snapshot() {
353 let snap: PolymarketBookSnapshot = load("ws_book_snapshot.json");
354 let instrument = test_instrument();
355 let ts_init = UnixNanos::from(1_000_000_000u64);
356
357 let deltas = parse_book_snapshot(
358 &snap,
359 instrument.id(),
360 instrument.price_precision(),
361 instrument.size_precision(),
362 ts_init,
363 )
364 .unwrap();
365
366 assert_eq!(deltas.deltas.len(), 7);
368 assert_eq!(deltas.deltas[0].action, BookAction::Clear);
369 assert_eq!(deltas.deltas[1].action, BookAction::Add);
370 assert_eq!(deltas.deltas[1].order.side, OrderSide::Buy);
371 assert_eq!(deltas.deltas[4].action, BookAction::Add);
372 assert_eq!(deltas.deltas[4].order.side, OrderSide::Sell);
373
374 for delta in &deltas.deltas {
376 assert_ne!(delta.flags & RecordFlag::F_SNAPSHOT as u8, 0);
377 }
378
379 let f_last_count = deltas
381 .deltas
382 .iter()
383 .filter(|d| d.flags & RecordFlag::F_LAST as u8 != 0)
384 .count();
385 assert_eq!(f_last_count, 1);
386 assert_ne!(
387 deltas.deltas.last().unwrap().flags & RecordFlag::F_LAST as u8,
388 0
389 );
390 }
391
392 #[rstest]
393 fn test_parse_book_deltas() {
394 let quotes: PolymarketQuotes = load("ws_quotes.json");
395 let instrument = test_instrument();
396 let ts_init = UnixNanos::from(1_000_000_000u64);
397
398 let deltas = parse_book_deltas(
399 "es,
400 instrument.id(),
401 instrument.price_precision(),
402 instrument.size_precision(),
403 ts_init,
404 )
405 .unwrap();
406
407 assert_eq!(deltas.deltas.len(), 2);
408
409 let f_last_count = deltas
411 .deltas
412 .iter()
413 .filter(|d| d.flags & RecordFlag::F_LAST as u8 != 0)
414 .count();
415 assert_eq!(f_last_count, 1);
416 assert_ne!(
417 deltas.deltas.last().unwrap().flags & RecordFlag::F_LAST as u8,
418 0
419 );
420 }
421
422 #[rstest]
423 fn test_parse_book_deltas_zero_size_is_delete() {
424 let mut quotes: PolymarketQuotes = load("ws_quotes.json");
425 quotes.price_changes[0].size = "0".to_string();
426 let instrument = test_instrument();
427 let ts_init = UnixNanos::from(1_000_000_000u64);
428
429 let deltas = parse_book_deltas(
430 "es,
431 instrument.id(),
432 instrument.price_precision(),
433 instrument.size_precision(),
434 ts_init,
435 )
436 .unwrap();
437
438 assert_eq!(deltas.deltas[0].action, BookAction::Delete);
439 }
440
441 #[rstest]
442 fn test_parse_trade_tick() {
443 let trade: PolymarketTrade = load("ws_last_trade.json");
444 let instrument = test_instrument();
445 let ts_init = UnixNanos::from(1_000_000_000u64);
446
447 let tick = parse_trade_tick(
448 &trade,
449 instrument.id(),
450 instrument.price_precision(),
451 instrument.size_precision(),
452 ts_init,
453 )
454 .unwrap();
455
456 assert_eq!(tick.instrument_id, instrument.id());
457 assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
458 assert_eq!(tick.ts_event, UnixNanos::from(1_703_875_202_000_000_000u64));
459 }
460
461 #[rstest]
462 fn test_parse_trade_tick_deterministic_id() {
463 let trade: PolymarketTrade = load("ws_last_trade.json");
464 let instrument = test_instrument();
465 let ts_init = UnixNanos::from(1_000_000_000u64);
466
467 let tick1 = parse_trade_tick(
468 &trade,
469 instrument.id(),
470 instrument.price_precision(),
471 instrument.size_precision(),
472 ts_init,
473 )
474 .unwrap();
475 let tick2 = parse_trade_tick(
476 &trade,
477 instrument.id(),
478 instrument.price_precision(),
479 instrument.size_precision(),
480 ts_init,
481 )
482 .unwrap();
483
484 assert_eq!(tick1.trade_id, tick2.trade_id);
485 }
486
487 #[rstest]
488 fn test_parse_quote_from_snapshot() {
489 let snap: PolymarketBookSnapshot = load("ws_book_snapshot.json");
490 let instrument = test_instrument();
491 let ts_init = UnixNanos::from(1_000_000_000u64);
492
493 let quote = parse_quote_from_snapshot(
494 &snap,
495 instrument.id(),
496 instrument.price_precision(),
497 instrument.size_precision(),
498 ts_init,
499 )
500 .unwrap()
501 .unwrap();
502
503 assert_eq!(quote.instrument_id, instrument.id());
504 assert_eq!(quote.bid_price, Price::from("0.50"));
505 assert_eq!(quote.ask_price, Price::from("0.51"));
506 assert_eq!(
507 quote.ts_event,
508 UnixNanos::from(1_703_875_200_000_000_000u64)
509 );
510 }
511
512 #[rstest]
513 fn test_parse_quote_from_snapshot_empty_side_returns_none() {
514 let mut snap: PolymarketBookSnapshot = load("ws_book_snapshot.json");
515 snap.bids.clear();
516 let instrument = test_instrument();
517 let ts_init = UnixNanos::from(1_000_000_000u64);
518
519 let result = parse_quote_from_snapshot(
520 &snap,
521 instrument.id(),
522 instrument.price_precision(),
523 instrument.size_precision(),
524 ts_init,
525 )
526 .unwrap();
527
528 assert!(result.is_none());
529 }
530
531 #[rstest]
532 fn test_parse_quote_from_price_change() {
533 let quotes: PolymarketQuotes = load("ws_quotes.json");
534 let instrument = test_instrument();
535 let ts_event = parse_timestamp_ms("es.timestamp).unwrap();
536 let ts_init = UnixNanos::from(1_000_000_000u64);
537
538 let quote = parse_quote_from_price_change(
539 "es.price_changes[0],
540 instrument.id(),
541 instrument.price_precision(),
542 instrument.size_precision(),
543 None,
544 ts_event,
545 ts_init,
546 )
547 .unwrap()
548 .expect("quote should be Some when best_bid/best_ask present");
549
550 assert_eq!(quote.instrument_id, instrument.id());
551 }
552}