Skip to main content

nautilus_polymarket/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parse functions for converting Polymarket WebSocket messages to Nautilus data types.
17
18use nautilus_core::{
19    UnixNanos,
20    correctness::{CorrectnessError, CorrectnessResult},
21    datetime::NANOSECONDS_IN_MILLISECOND,
22};
23use nautilus_model::{
24    data::{BookOrder, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
25    enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
26    identifiers::InstrumentId,
27    types::{Price, Quantity},
28};
29
30use super::messages::{PolymarketBookSnapshot, PolymarketQuote, PolymarketQuotes, PolymarketTrade};
31use crate::common::{enums::PolymarketOrderSide, parse::determine_trade_id};
32
33/// Parses a millisecond epoch timestamp string into [`UnixNanos`].
34pub fn parse_timestamp_ms(ts: &str) -> anyhow::Result<UnixNanos> {
35    let ms: u64 = ts
36        .parse()
37        .map_err(|e| anyhow::anyhow!("Invalid timestamp '{ts}': {e}"))?;
38    let ns = ms
39        .checked_mul(NANOSECONDS_IN_MILLISECOND)
40        .ok_or_else(|| anyhow::anyhow!("Timestamp overflow for '{ts}'"))?;
41    Ok(UnixNanos::from(ns))
42}
43
44pub(crate) fn parse_price(s: &str, precision: u8) -> CorrectnessResult<Price> {
45    let value: f64 = s
46        .parse()
47        .map_err(|e| CorrectnessError::PredicateViolation {
48            message: format!("Invalid price '{s}': {e}"),
49        })?;
50    Price::new_checked(value, precision)
51}
52
53pub(crate) fn parse_quantity(s: &str, precision: u8) -> CorrectnessResult<Quantity> {
54    let value: f64 = s
55        .parse()
56        .map_err(|e| CorrectnessError::PredicateViolation {
57            message: format!("Invalid quantity '{s}': {e}"),
58        })?;
59    Quantity::new_checked(value, precision)
60}
61
62/// Parses a book snapshot into [`OrderBookDeltas`] (CLEAR + ADD).
63pub fn parse_book_snapshot(
64    snap: &PolymarketBookSnapshot,
65    instrument_id: InstrumentId,
66    price_precision: u8,
67    size_precision: u8,
68    ts_init: UnixNanos,
69) -> anyhow::Result<OrderBookDeltas> {
70    let ts_event = parse_timestamp_ms(&snap.timestamp)?;
71
72    let bids_len = snap.bids.len();
73    let asks_len = snap.asks.len();
74
75    if bids_len == 0 && asks_len == 0 {
76        anyhow::bail!("Empty book snapshot for {instrument_id}");
77    }
78
79    let total = bids_len + asks_len;
80    let mut deltas = Vec::with_capacity(total + 1);
81
82    // Every snapshot delta (including the opening CLEAR) carries F_SNAPSHOT so
83    // downstream consumers can recognise the rebuild; F_LAST closes the batch
84    // on the final delta. `OrderBookDelta::clear` already sets F_SNAPSHOT.
85    let snapshot_flag = RecordFlag::F_SNAPSHOT as u8;
86    deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_event, ts_init));
87
88    let mut count = 0;
89
90    for level in &snap.bids {
91        count += 1;
92        let price = parse_price(&level.price, price_precision)?;
93        let size = parse_quantity(&level.size, size_precision)?;
94        let order = BookOrder::new(OrderSide::Buy, price, size, 0);
95
96        let mut flags = snapshot_flag;
97        if count == total {
98            flags |= RecordFlag::F_LAST as u8;
99        }
100
101        deltas.push(OrderBookDelta::new_checked(
102            instrument_id,
103            BookAction::Add,
104            order,
105            flags,
106            0,
107            ts_event,
108            ts_init,
109        )?);
110    }
111
112    for level in &snap.asks {
113        count += 1;
114        let price = parse_price(&level.price, price_precision)?;
115        let size = parse_quantity(&level.size, size_precision)?;
116        let order = BookOrder::new(OrderSide::Sell, price, size, 0);
117
118        let mut flags = snapshot_flag;
119        if count == total {
120            flags |= RecordFlag::F_LAST as u8;
121        }
122
123        deltas.push(OrderBookDelta::new_checked(
124            instrument_id,
125            BookAction::Add,
126            order,
127            flags,
128            0,
129            ts_event,
130            ts_init,
131        )?);
132    }
133
134    Ok(OrderBookDeltas::new(instrument_id, deltas))
135}
136
137/// Parses price change quotes into incremental [`OrderBookDeltas`].
138pub fn parse_book_deltas(
139    quotes: &PolymarketQuotes,
140    instrument_id: InstrumentId,
141    price_precision: u8,
142    size_precision: u8,
143    ts_init: UnixNanos,
144) -> anyhow::Result<OrderBookDeltas> {
145    let ts_event = parse_timestamp_ms(&quotes.timestamp)?;
146
147    let total = quotes.price_changes.len();
148    let mut deltas = Vec::with_capacity(total);
149
150    for (idx, change) in quotes.price_changes.iter().enumerate() {
151        let price = parse_price(&change.price, price_precision)?;
152        let size = parse_quantity(&change.size, size_precision)?;
153        let side = match change.side {
154            PolymarketOrderSide::Buy => OrderSide::Buy,
155            PolymarketOrderSide::Sell => OrderSide::Sell,
156        };
157
158        let (action, order_size) = if size.is_zero() {
159            (BookAction::Delete, Quantity::new(0.0, size_precision))
160        } else {
161            (BookAction::Update, size)
162        };
163
164        let order = BookOrder::new(side, price, order_size, 0);
165        let flags = if idx == total - 1 {
166            RecordFlag::F_LAST as u8
167        } else {
168            0
169        };
170
171        deltas.push(OrderBookDelta::new_checked(
172            instrument_id,
173            action,
174            order,
175            flags,
176            0,
177            ts_event,
178            ts_init,
179        )?);
180    }
181
182    Ok(OrderBookDeltas::new(instrument_id, deltas))
183}
184
185/// Parses a trade message into a [`TradeTick`].
186pub fn parse_trade_tick(
187    trade: &PolymarketTrade,
188    instrument_id: InstrumentId,
189    price_precision: u8,
190    size_precision: u8,
191    ts_init: UnixNanos,
192) -> anyhow::Result<TradeTick> {
193    let price = parse_price(&trade.price, price_precision)?;
194    let size = parse_quantity(&trade.size, size_precision)?;
195    let aggressor_side = match trade.side {
196        PolymarketOrderSide::Buy => AggressorSide::Buyer,
197        PolymarketOrderSide::Sell => AggressorSide::Seller,
198    };
199    let ts_event = parse_timestamp_ms(&trade.timestamp)?;
200
201    let trade_id = determine_trade_id(
202        &trade.asset_id,
203        trade.side,
204        &trade.price,
205        &trade.size,
206        &trade.timestamp,
207    );
208
209    TradeTick::new_checked(
210        instrument_id,
211        price,
212        size,
213        aggressor_side,
214        trade_id,
215        ts_event,
216        ts_init,
217    )
218}
219
220/// Extracts a top-of-book [`QuoteTick`] from a book snapshot.
221///
222/// Returns `None` if either side is empty.
223///
224/// # Panics
225///
226/// Cannot panic: `.expect()` calls are guarded by the empty-side
227/// early return above.
228pub fn parse_quote_from_snapshot(
229    snap: &PolymarketBookSnapshot,
230    instrument_id: InstrumentId,
231    price_precision: u8,
232    size_precision: u8,
233    ts_init: UnixNanos,
234) -> anyhow::Result<Option<QuoteTick>> {
235    if snap.bids.is_empty() || snap.asks.is_empty() {
236        return Ok(None);
237    }
238
239    let ts_event = parse_timestamp_ms(&snap.timestamp)?;
240
241    // Polymarket sends bids ascending and asks descending, so best-of-book is last
242    let best_bid = snap.bids.last().expect("bids not empty");
243    let best_ask = snap.asks.last().expect("asks not empty");
244
245    let bid_price = parse_price(&best_bid.price, price_precision)?;
246    let ask_price = parse_price(&best_ask.price, price_precision)?;
247    let bid_size = parse_quantity(&best_bid.size, size_precision)?;
248    let ask_size = parse_quantity(&best_ask.size, size_precision)?;
249
250    Ok(Some(QuoteTick::new_checked(
251        instrument_id,
252        bid_price,
253        ask_price,
254        bid_size,
255        ask_size,
256        ts_event,
257        ts_init,
258    )?))
259}
260
261/// Parses a quote tick from a price change message using its best_bid/best_ask fields.
262///
263/// Returns `None` when either best_bid or best_ask is absent (empty book side).
264/// When `last_quote` is provided the opposite side's size is carried forward
265/// instead of being set to zero, matching the Python adapter's behavior.
266pub fn parse_quote_from_price_change(
267    quote: &PolymarketQuote,
268    instrument_id: InstrumentId,
269    price_precision: u8,
270    size_precision: u8,
271    last_quote: Option<&QuoteTick>,
272    ts_event: UnixNanos,
273    ts_init: UnixNanos,
274) -> anyhow::Result<Option<QuoteTick>> {
275    let (Some(best_bid), Some(best_ask)) = (&quote.best_bid, &quote.best_ask) else {
276        return Ok(None);
277    };
278    let bid_price = parse_price(best_bid, price_precision)?;
279    let ask_price = parse_price(best_ask, price_precision)?;
280    let changed_price = parse_price(&quote.price, price_precision)?;
281
282    let size = parse_quantity(&quote.size, size_precision)?;
283    let zero = || Quantity::new(0.0, size_precision);
284
285    // Only use the changed level's size when it matches the best price,
286    // otherwise preserve the previous quote's size for that side
287    let (bid_size, ask_size) = match quote.side {
288        PolymarketOrderSide::Buy => {
289            let bid_size = if changed_price == bid_price {
290                size
291            } else {
292                last_quote.map_or_else(zero, |q| q.bid_size)
293            };
294            let ask_size = last_quote.map_or_else(zero, |q| q.ask_size);
295            (bid_size, ask_size)
296        }
297        PolymarketOrderSide::Sell => {
298            let ask_size = if changed_price == ask_price {
299                size
300            } else {
301                last_quote.map_or_else(zero, |q| q.ask_size)
302            };
303            let bid_size = last_quote.map_or_else(zero, |q| q.bid_size);
304            (bid_size, ask_size)
305        }
306    };
307
308    Ok(Some(QuoteTick::new_checked(
309        instrument_id,
310        bid_price,
311        ask_price,
312        bid_size,
313        ask_size,
314        ts_event,
315        ts_init,
316    )?))
317}
318
319#[cfg(test)]
320mod tests {
321    use nautilus_core::UnixNanos;
322    use nautilus_model::instruments::{Instrument, InstrumentAny};
323    use rstest::rstest;
324
325    use super::*;
326    use crate::http::parse::{create_instrument_from_def, parse_gamma_market};
327
328    fn load<T: serde::de::DeserializeOwned>(filename: &str) -> T {
329        let content =
330            std::fs::read_to_string(format!("test_data/{filename}")).expect("test data missing");
331        serde_json::from_str(&content).expect("parse failed")
332    }
333
334    fn test_instrument() -> InstrumentAny {
335        let market: crate::http::models::GammaMarket = load("gamma_market.json");
336        let defs = parse_gamma_market(&market).unwrap();
337        create_instrument_from_def(&defs[0], UnixNanos::from(1_000_000_000u64)).unwrap()
338    }
339
340    #[rstest]
341    fn test_parse_timestamp_ms() {
342        let ns = parse_timestamp_ms("1703875200000").unwrap();
343        assert_eq!(ns, UnixNanos::from(1_703_875_200_000_000_000u64));
344    }
345
346    #[rstest]
347    fn test_parse_timestamp_ms_invalid() {
348        assert!(parse_timestamp_ms("not_a_number").is_err());
349    }
350
351    #[rstest]
352    fn test_parse_book_snapshot() {
353        let snap: PolymarketBookSnapshot = load("ws_book_snapshot.json");
354        let instrument = test_instrument();
355        let ts_init = UnixNanos::from(1_000_000_000u64);
356
357        let deltas = parse_book_snapshot(
358            &snap,
359            instrument.id(),
360            instrument.price_precision(),
361            instrument.size_precision(),
362            ts_init,
363        )
364        .unwrap();
365
366        // CLEAR + 3 bids + 3 asks = 7 deltas
367        assert_eq!(deltas.deltas.len(), 7);
368        assert_eq!(deltas.deltas[0].action, BookAction::Clear);
369        assert_eq!(deltas.deltas[1].action, BookAction::Add);
370        assert_eq!(deltas.deltas[1].order.side, OrderSide::Buy);
371        assert_eq!(deltas.deltas[4].action, BookAction::Add);
372        assert_eq!(deltas.deltas[4].order.side, OrderSide::Sell);
373
374        // Every snapshot delta carries F_SNAPSHOT
375        for delta in &deltas.deltas {
376            assert_ne!(delta.flags & RecordFlag::F_SNAPSHOT as u8, 0);
377        }
378
379        // Exactly one delta carries F_LAST, and it must be the last one
380        let f_last_count = deltas
381            .deltas
382            .iter()
383            .filter(|d| d.flags & RecordFlag::F_LAST as u8 != 0)
384            .count();
385        assert_eq!(f_last_count, 1);
386        assert_ne!(
387            deltas.deltas.last().unwrap().flags & RecordFlag::F_LAST as u8,
388            0
389        );
390    }
391
392    #[rstest]
393    fn test_parse_book_deltas() {
394        let quotes: PolymarketQuotes = load("ws_quotes.json");
395        let instrument = test_instrument();
396        let ts_init = UnixNanos::from(1_000_000_000u64);
397
398        let deltas = parse_book_deltas(
399            &quotes,
400            instrument.id(),
401            instrument.price_precision(),
402            instrument.size_precision(),
403            ts_init,
404        )
405        .unwrap();
406
407        assert_eq!(deltas.deltas.len(), 2);
408
409        // Exactly one delta carries F_LAST, and it must be the last one
410        let f_last_count = deltas
411            .deltas
412            .iter()
413            .filter(|d| d.flags & RecordFlag::F_LAST as u8 != 0)
414            .count();
415        assert_eq!(f_last_count, 1);
416        assert_ne!(
417            deltas.deltas.last().unwrap().flags & RecordFlag::F_LAST as u8,
418            0
419        );
420    }
421
422    #[rstest]
423    fn test_parse_book_deltas_zero_size_is_delete() {
424        let mut quotes: PolymarketQuotes = load("ws_quotes.json");
425        quotes.price_changes[0].size = "0".to_string();
426        let instrument = test_instrument();
427        let ts_init = UnixNanos::from(1_000_000_000u64);
428
429        let deltas = parse_book_deltas(
430            &quotes,
431            instrument.id(),
432            instrument.price_precision(),
433            instrument.size_precision(),
434            ts_init,
435        )
436        .unwrap();
437
438        assert_eq!(deltas.deltas[0].action, BookAction::Delete);
439    }
440
441    #[rstest]
442    fn test_parse_trade_tick() {
443        let trade: PolymarketTrade = load("ws_last_trade.json");
444        let instrument = test_instrument();
445        let ts_init = UnixNanos::from(1_000_000_000u64);
446
447        let tick = parse_trade_tick(
448            &trade,
449            instrument.id(),
450            instrument.price_precision(),
451            instrument.size_precision(),
452            ts_init,
453        )
454        .unwrap();
455
456        assert_eq!(tick.instrument_id, instrument.id());
457        assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
458        assert_eq!(tick.ts_event, UnixNanos::from(1_703_875_202_000_000_000u64));
459    }
460
461    #[rstest]
462    fn test_parse_trade_tick_deterministic_id() {
463        let trade: PolymarketTrade = load("ws_last_trade.json");
464        let instrument = test_instrument();
465        let ts_init = UnixNanos::from(1_000_000_000u64);
466
467        let tick1 = parse_trade_tick(
468            &trade,
469            instrument.id(),
470            instrument.price_precision(),
471            instrument.size_precision(),
472            ts_init,
473        )
474        .unwrap();
475        let tick2 = parse_trade_tick(
476            &trade,
477            instrument.id(),
478            instrument.price_precision(),
479            instrument.size_precision(),
480            ts_init,
481        )
482        .unwrap();
483
484        assert_eq!(tick1.trade_id, tick2.trade_id);
485    }
486
487    #[rstest]
488    fn test_parse_quote_from_snapshot() {
489        let snap: PolymarketBookSnapshot = load("ws_book_snapshot.json");
490        let instrument = test_instrument();
491        let ts_init = UnixNanos::from(1_000_000_000u64);
492
493        let quote = parse_quote_from_snapshot(
494            &snap,
495            instrument.id(),
496            instrument.price_precision(),
497            instrument.size_precision(),
498            ts_init,
499        )
500        .unwrap()
501        .unwrap();
502
503        assert_eq!(quote.instrument_id, instrument.id());
504        assert_eq!(quote.bid_price, Price::from("0.50"));
505        assert_eq!(quote.ask_price, Price::from("0.51"));
506        assert_eq!(
507            quote.ts_event,
508            UnixNanos::from(1_703_875_200_000_000_000u64)
509        );
510    }
511
512    #[rstest]
513    fn test_parse_quote_from_snapshot_empty_side_returns_none() {
514        let mut snap: PolymarketBookSnapshot = load("ws_book_snapshot.json");
515        snap.bids.clear();
516        let instrument = test_instrument();
517        let ts_init = UnixNanos::from(1_000_000_000u64);
518
519        let result = parse_quote_from_snapshot(
520            &snap,
521            instrument.id(),
522            instrument.price_precision(),
523            instrument.size_precision(),
524            ts_init,
525        )
526        .unwrap();
527
528        assert!(result.is_none());
529    }
530
531    #[rstest]
532    fn test_parse_quote_from_price_change() {
533        let quotes: PolymarketQuotes = load("ws_quotes.json");
534        let instrument = test_instrument();
535        let ts_event = parse_timestamp_ms(&quotes.timestamp).unwrap();
536        let ts_init = UnixNanos::from(1_000_000_000u64);
537
538        let quote = parse_quote_from_price_change(
539            &quotes.price_changes[0],
540            instrument.id(),
541            instrument.price_precision(),
542            instrument.size_precision(),
543            None,
544            ts_event,
545            ts_init,
546        )
547        .unwrap()
548        .expect("quote should be Some when best_bid/best_ask present");
549
550        assert_eq!(quote.instrument_id, instrument.id());
551    }
552}