Skip to main content

nautilus_polymarket/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for the Polymarket CLOB API.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use nautilus_network::{
24    RECONNECTED,
25    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
26};
27use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; // tokio-import-ok
28use tokio_tungstenite::tungstenite::Message;
29
30use super::{
31    client::WsChannel,
32    messages::{
33        MarketInitialSubscribeRequest, MarketSubscribeRequest, MarketUnsubscribeRequest,
34        MarketWsMessage, PolymarketWsAuth, PolymarketWsMessage, UserSubscribeRequest,
35        UserWsMessage,
36    },
37};
38use crate::common::credential::Credential;
39
40/// Commands sent from the outer client to the inner message handler.
41#[derive(Debug)]
42pub enum HandlerCommand {
43    /// Set the WebSocketClient for the handler to use.
44    SetClient(WebSocketClient),
45    /// Disconnect the WebSocket connection.
46    Disconnect,
47    /// Add asset IDs to the market-channel subscription set and send a subscribe message.
48    SubscribeMarket(Vec<String>),
49    /// Remove asset IDs from the subscription set (no wire message needed).
50    UnsubscribeMarket(Vec<String>),
51    /// Send the authenticated subscribe message on the user channel.
52    SubscribeUser,
53}
54
55pub(super) struct FeedHandler {
56    signal: Arc<AtomicBool>,
57    channel: WsChannel,
58    client: Option<WebSocketClient>,
59    cmd_rx: UnboundedReceiver<HandlerCommand>,
60    raw_rx: UnboundedReceiver<Message>,
61    out_tx: UnboundedSender<PolymarketWsMessage>,
62    credential: Option<Credential>,
63    subscriptions: SubscriptionState,
64    auth_tracker: AuthTracker,
65    // True once SubscribeUser has been explicitly requested by the caller
66    user_subscribed: bool,
67    // True once the current market-channel session has sent its initial subscribe payload.
68    market_subscription_initialized: bool,
69    // Overflow buffer for batched frames, drained before reading the next raw message
70    message_buffer: Vec<PolymarketWsMessage>,
71    // Whether to include `custom_feature_enabled: true` in the initial subscribe
72    subscribe_new_markets: bool,
73}
74
75impl FeedHandler {
76    #[expect(clippy::too_many_arguments)]
77    pub(super) fn new(
78        signal: Arc<AtomicBool>,
79        channel: WsChannel,
80        cmd_rx: UnboundedReceiver<HandlerCommand>,
81        raw_rx: UnboundedReceiver<Message>,
82        out_tx: UnboundedSender<PolymarketWsMessage>,
83        credential: Option<Credential>,
84        subscriptions: SubscriptionState,
85        auth_tracker: AuthTracker,
86        user_subscribed: bool,
87        subscribe_new_markets: bool,
88    ) -> Self {
89        Self {
90            signal,
91            channel,
92            client: None,
93            cmd_rx,
94            raw_rx,
95            out_tx,
96            credential,
97            subscriptions,
98            auth_tracker,
99            user_subscribed,
100            market_subscription_initialized: false,
101            message_buffer: Vec::new(),
102            subscribe_new_markets,
103        }
104    }
105
106    pub(super) fn send(&self, msg: PolymarketWsMessage) -> Result<(), String> {
107        self.out_tx
108            .send(msg)
109            .map_err(|e| format!("Failed to send message: {e}"))
110    }
111
112    pub(super) fn is_stopped(&self) -> bool {
113        self.signal.load(Ordering::Relaxed)
114    }
115
116    async fn send_subscribe_market(&mut self, asset_ids: &[String]) {
117        let Some(ref client) = self.client else {
118            log::warn!("No client available for market subscribe");
119            return;
120        };
121
122        for id in asset_ids {
123            self.subscriptions.mark_subscribe(id);
124        }
125
126        let payload = if self.market_subscription_initialized {
127            serde_json::to_string(&MarketSubscribeRequest {
128                assets_ids: asset_ids.to_vec(),
129                operation: "subscribe",
130                custom_feature_enabled: self.subscribe_new_markets,
131            })
132        } else {
133            serde_json::to_string(&MarketInitialSubscribeRequest {
134                assets_ids: asset_ids.to_vec(),
135                msg_type: "market",
136                custom_feature_enabled: self.subscribe_new_markets,
137            })
138        };
139
140        match payload {
141            Ok(payload) => {
142                if let Err(e) = client.send_text(payload, None).await {
143                    for id in asset_ids {
144                        self.subscriptions.mark_failure(id);
145                    }
146                    log::error!("Failed to send market subscribe: {e}");
147                } else {
148                    self.market_subscription_initialized = true;
149                    // Polymarket has no server ACK, treat successful send as confirmation
150                    for id in asset_ids {
151                        self.subscriptions.confirm_subscribe(id);
152                    }
153                }
154            }
155            Err(e) => {
156                for id in asset_ids {
157                    self.subscriptions.mark_failure(id);
158                }
159                log::error!("Failed to serialize market subscribe request: {e}");
160            }
161        }
162    }
163
164    async fn send_unsubscribe_market(&self, asset_ids: &[String]) {
165        let Some(ref client) = self.client else {
166            log::warn!("No client available for market unsubscribe");
167            return;
168        };
169
170        let req = MarketUnsubscribeRequest {
171            assets_ids: asset_ids.to_vec(),
172            operation: "unsubscribe",
173        };
174
175        match serde_json::to_string(&req) {
176            Ok(payload) => {
177                if let Err(e) = client.send_text(payload, None).await {
178                    log::error!("Failed to send market unsubscribe: {e}");
179                }
180            }
181            Err(e) => log::error!("Failed to serialize market unsubscribe request: {e}"),
182        }
183    }
184
185    async fn send_subscribe_user(&self) {
186        let Some(ref client) = self.client else {
187            log::warn!("No client available for user subscribe");
188            return;
189        };
190        let Some(cred) = &self.credential else {
191            log::error!("User channel subscribe requires credential");
192            return;
193        };
194
195        let req = UserSubscribeRequest {
196            auth: PolymarketWsAuth {
197                api_key: cred.api_key().to_string(),
198                secret: cred.api_secret(),
199                passphrase: cred.passphrase().to_string(),
200            },
201            markets: vec![],
202            assets_ids: vec![],
203            msg_type: "user",
204        };
205
206        // Begin auth tracking; discard receiver, state is queried via is_authenticated()
207        drop(self.auth_tracker.begin());
208
209        match serde_json::to_string(&req) {
210            Ok(payload) => {
211                // auth_tracker.succeed() is NOT called here; sending the request only
212                // confirms delivery to the server, not that the credentials were accepted.
213                // succeed() is called in next() when the server actually sends user-channel
214                // data, which is the real confirmation that authentication worked.
215                if let Err(e) = client.send_text(payload, None).await {
216                    self.auth_tracker.fail(e.to_string());
217                    log::error!("Failed to send user subscribe: {e}");
218                }
219            }
220            Err(e) => {
221                self.auth_tracker.fail(format!("Serialize error: {e}"));
222                log::error!("Failed to serialize user subscribe request: {e}");
223            }
224        }
225    }
226
227    async fn resubscribe_all(&mut self) {
228        match self.channel {
229            WsChannel::Market => {
230                let ids = self.subscriptions.all_topics();
231                if ids.is_empty() {
232                    return;
233                }
234                log::info!(
235                    "Resubscribing to {} market assets after reconnect",
236                    ids.len()
237                );
238                self.send_subscribe_market(&ids).await;
239            }
240            WsChannel::User => {
241                if self.user_subscribed {
242                    log::info!("Re-authenticating user channel after reconnect");
243                    self.send_subscribe_user().await;
244                }
245            }
246        }
247    }
248
249    fn parse_messages(&self, text: &str) -> Vec<PolymarketWsMessage> {
250        // When `subscribe_new_markets` is enabled, Polymarket's WSS periodically
251        // sends the plain-text string "NO NEW ASSETS" as a heartbeat/ack.
252        if text == "NO NEW ASSETS" {
253            return vec![];
254        }
255
256        match self.channel {
257            WsChannel::Market => {
258                if let Ok(msgs) = serde_json::from_str::<Vec<MarketWsMessage>>(text) {
259                    msgs.into_iter().map(PolymarketWsMessage::Market).collect()
260                } else if let Ok(msg) = serde_json::from_str::<MarketWsMessage>(text) {
261                    vec![PolymarketWsMessage::Market(msg)]
262                } else {
263                    log::warn!("Failed to parse market WS message: {text}");
264                    vec![]
265                }
266            }
267            WsChannel::User => {
268                if let Ok(msgs) = serde_json::from_str::<Vec<UserWsMessage>>(text) {
269                    msgs.into_iter().map(PolymarketWsMessage::User).collect()
270                } else if let Ok(msg) = serde_json::from_str::<UserWsMessage>(text) {
271                    vec![PolymarketWsMessage::User(msg)]
272                } else {
273                    log::warn!("Failed to parse user WS message: {text}");
274                    vec![]
275                }
276            }
277        }
278    }
279
280    pub(super) async fn next(&mut self) -> Option<PolymarketWsMessage> {
281        if !self.message_buffer.is_empty() {
282            return Some(self.message_buffer.remove(0));
283        }
284
285        loop {
286            tokio::select! {
287                Some(cmd) = self.cmd_rx.recv() => {
288                    match cmd {
289                        HandlerCommand::SetClient(client) => {
290                            log::debug!("Setting WebSocket client in handler");
291                            self.client = Some(client);
292                        }
293                        HandlerCommand::Disconnect => {
294                            log::debug!("Handler received disconnect command");
295
296                            if let Some(ref client) = self.client {
297                                client.disconnect().await;
298                            }
299                            self.signal.store(true, Ordering::SeqCst);
300                            return None;
301                        }
302                        HandlerCommand::SubscribeMarket(ids) => {
303                            self.send_subscribe_market(&ids).await;
304                        }
305                        HandlerCommand::UnsubscribeMarket(ids) => {
306                            for id in &ids {
307                                self.subscriptions.mark_unsubscribe(id);
308                            }
309                            self.send_unsubscribe_market(&ids).await;
310                            for id in &ids {
311                                self.subscriptions.confirm_unsubscribe(id);
312                            }
313                        }
314                        HandlerCommand::SubscribeUser => {
315                            self.user_subscribed = true;
316                            self.send_subscribe_user().await;
317                        }
318                    }
319                }
320                Some(raw) = self.raw_rx.recv() => {
321                    match raw {
322                        Message::Text(text) => {
323                            if text == RECONNECTED {
324                                self.market_subscription_initialized = false;
325                                self.resubscribe_all().await;
326                                return Some(PolymarketWsMessage::Reconnected);
327                            }
328                            let msgs = self.parse_messages(&text);
329                            if msgs.is_empty() {
330                                continue;
331                            }
332                            // Receiving any user-channel data confirms the server accepted the
333                            // credentials; mark auth as successful on the first delivery.
334                            if self.channel == WsChannel::User {
335                                self.auth_tracker.succeed();
336                            }
337                            // Buffer msgs[1..] so they are returned in order on subsequent
338                            // next() calls; returning first directly preserves 0,1,2,...,n order
339                            let mut iter = msgs.into_iter();
340                            let first = iter.next().unwrap();
341                            self.message_buffer.extend(iter);
342                            return Some(first);
343                        }
344                        Message::Ping(data) => {
345                            if let Some(ref client) = self.client
346                                && let Err(e) = client.send_pong(data.to_vec()).await
347                            {
348                                log::warn!("Failed to send pong: {e}");
349                            }
350                        }
351                        Message::Close(_) => {
352                            log::info!("WebSocket close frame received");
353                            return None;
354                        }
355                        _ => {}
356                    }
357                }
358                else => return None,
359            }
360        }
361    }
362}