nautilus_polymarket/websocket/
handler.rs1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use nautilus_network::{
24 RECONNECTED,
25 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
26};
27use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio_tungstenite::tungstenite::Message;
29
30use super::{
31 client::WsChannel,
32 messages::{
33 MarketInitialSubscribeRequest, MarketSubscribeRequest, MarketUnsubscribeRequest,
34 MarketWsMessage, PolymarketWsAuth, PolymarketWsMessage, UserSubscribeRequest,
35 UserWsMessage,
36 },
37};
38use crate::common::credential::Credential;
39
40#[derive(Debug)]
42pub enum HandlerCommand {
43 SetClient(WebSocketClient),
45 Disconnect,
47 SubscribeMarket(Vec<String>),
49 UnsubscribeMarket(Vec<String>),
51 SubscribeUser,
53}
54
55pub(super) struct FeedHandler {
56 signal: Arc<AtomicBool>,
57 channel: WsChannel,
58 client: Option<WebSocketClient>,
59 cmd_rx: UnboundedReceiver<HandlerCommand>,
60 raw_rx: UnboundedReceiver<Message>,
61 out_tx: UnboundedSender<PolymarketWsMessage>,
62 credential: Option<Credential>,
63 subscriptions: SubscriptionState,
64 auth_tracker: AuthTracker,
65 user_subscribed: bool,
67 market_subscription_initialized: bool,
69 message_buffer: Vec<PolymarketWsMessage>,
71 subscribe_new_markets: bool,
73}
74
75impl FeedHandler {
76 #[expect(clippy::too_many_arguments)]
77 pub(super) fn new(
78 signal: Arc<AtomicBool>,
79 channel: WsChannel,
80 cmd_rx: UnboundedReceiver<HandlerCommand>,
81 raw_rx: UnboundedReceiver<Message>,
82 out_tx: UnboundedSender<PolymarketWsMessage>,
83 credential: Option<Credential>,
84 subscriptions: SubscriptionState,
85 auth_tracker: AuthTracker,
86 user_subscribed: bool,
87 subscribe_new_markets: bool,
88 ) -> Self {
89 Self {
90 signal,
91 channel,
92 client: None,
93 cmd_rx,
94 raw_rx,
95 out_tx,
96 credential,
97 subscriptions,
98 auth_tracker,
99 user_subscribed,
100 market_subscription_initialized: false,
101 message_buffer: Vec::new(),
102 subscribe_new_markets,
103 }
104 }
105
106 pub(super) fn send(&self, msg: PolymarketWsMessage) -> Result<(), String> {
107 self.out_tx
108 .send(msg)
109 .map_err(|e| format!("Failed to send message: {e}"))
110 }
111
112 pub(super) fn is_stopped(&self) -> bool {
113 self.signal.load(Ordering::Relaxed)
114 }
115
116 async fn send_subscribe_market(&mut self, asset_ids: &[String]) {
117 let Some(ref client) = self.client else {
118 log::warn!("No client available for market subscribe");
119 return;
120 };
121
122 for id in asset_ids {
123 self.subscriptions.mark_subscribe(id);
124 }
125
126 let payload = if self.market_subscription_initialized {
127 serde_json::to_string(&MarketSubscribeRequest {
128 assets_ids: asset_ids.to_vec(),
129 operation: "subscribe",
130 custom_feature_enabled: self.subscribe_new_markets,
131 })
132 } else {
133 serde_json::to_string(&MarketInitialSubscribeRequest {
134 assets_ids: asset_ids.to_vec(),
135 msg_type: "market",
136 custom_feature_enabled: self.subscribe_new_markets,
137 })
138 };
139
140 match payload {
141 Ok(payload) => {
142 if let Err(e) = client.send_text(payload, None).await {
143 for id in asset_ids {
144 self.subscriptions.mark_failure(id);
145 }
146 log::error!("Failed to send market subscribe: {e}");
147 } else {
148 self.market_subscription_initialized = true;
149 for id in asset_ids {
151 self.subscriptions.confirm_subscribe(id);
152 }
153 }
154 }
155 Err(e) => {
156 for id in asset_ids {
157 self.subscriptions.mark_failure(id);
158 }
159 log::error!("Failed to serialize market subscribe request: {e}");
160 }
161 }
162 }
163
164 async fn send_unsubscribe_market(&self, asset_ids: &[String]) {
165 let Some(ref client) = self.client else {
166 log::warn!("No client available for market unsubscribe");
167 return;
168 };
169
170 let req = MarketUnsubscribeRequest {
171 assets_ids: asset_ids.to_vec(),
172 operation: "unsubscribe",
173 };
174
175 match serde_json::to_string(&req) {
176 Ok(payload) => {
177 if let Err(e) = client.send_text(payload, None).await {
178 log::error!("Failed to send market unsubscribe: {e}");
179 }
180 }
181 Err(e) => log::error!("Failed to serialize market unsubscribe request: {e}"),
182 }
183 }
184
185 async fn send_subscribe_user(&self) {
186 let Some(ref client) = self.client else {
187 log::warn!("No client available for user subscribe");
188 return;
189 };
190 let Some(cred) = &self.credential else {
191 log::error!("User channel subscribe requires credential");
192 return;
193 };
194
195 let req = UserSubscribeRequest {
196 auth: PolymarketWsAuth {
197 api_key: cred.api_key().to_string(),
198 secret: cred.api_secret(),
199 passphrase: cred.passphrase().to_string(),
200 },
201 markets: vec![],
202 assets_ids: vec![],
203 msg_type: "user",
204 };
205
206 drop(self.auth_tracker.begin());
208
209 match serde_json::to_string(&req) {
210 Ok(payload) => {
211 if let Err(e) = client.send_text(payload, None).await {
216 self.auth_tracker.fail(e.to_string());
217 log::error!("Failed to send user subscribe: {e}");
218 }
219 }
220 Err(e) => {
221 self.auth_tracker.fail(format!("Serialize error: {e}"));
222 log::error!("Failed to serialize user subscribe request: {e}");
223 }
224 }
225 }
226
227 async fn resubscribe_all(&mut self) {
228 match self.channel {
229 WsChannel::Market => {
230 let ids = self.subscriptions.all_topics();
231 if ids.is_empty() {
232 return;
233 }
234 log::info!(
235 "Resubscribing to {} market assets after reconnect",
236 ids.len()
237 );
238 self.send_subscribe_market(&ids).await;
239 }
240 WsChannel::User => {
241 if self.user_subscribed {
242 log::info!("Re-authenticating user channel after reconnect");
243 self.send_subscribe_user().await;
244 }
245 }
246 }
247 }
248
249 fn parse_messages(&self, text: &str) -> Vec<PolymarketWsMessage> {
250 if text == "NO NEW ASSETS" {
253 return vec![];
254 }
255
256 match self.channel {
257 WsChannel::Market => {
258 if let Ok(msgs) = serde_json::from_str::<Vec<MarketWsMessage>>(text) {
259 msgs.into_iter().map(PolymarketWsMessage::Market).collect()
260 } else if let Ok(msg) = serde_json::from_str::<MarketWsMessage>(text) {
261 vec![PolymarketWsMessage::Market(msg)]
262 } else {
263 log::warn!("Failed to parse market WS message: {text}");
264 vec![]
265 }
266 }
267 WsChannel::User => {
268 if let Ok(msgs) = serde_json::from_str::<Vec<UserWsMessage>>(text) {
269 msgs.into_iter().map(PolymarketWsMessage::User).collect()
270 } else if let Ok(msg) = serde_json::from_str::<UserWsMessage>(text) {
271 vec![PolymarketWsMessage::User(msg)]
272 } else {
273 log::warn!("Failed to parse user WS message: {text}");
274 vec![]
275 }
276 }
277 }
278 }
279
280 pub(super) async fn next(&mut self) -> Option<PolymarketWsMessage> {
281 if !self.message_buffer.is_empty() {
282 return Some(self.message_buffer.remove(0));
283 }
284
285 loop {
286 tokio::select! {
287 Some(cmd) = self.cmd_rx.recv() => {
288 match cmd {
289 HandlerCommand::SetClient(client) => {
290 log::debug!("Setting WebSocket client in handler");
291 self.client = Some(client);
292 }
293 HandlerCommand::Disconnect => {
294 log::debug!("Handler received disconnect command");
295
296 if let Some(ref client) = self.client {
297 client.disconnect().await;
298 }
299 self.signal.store(true, Ordering::SeqCst);
300 return None;
301 }
302 HandlerCommand::SubscribeMarket(ids) => {
303 self.send_subscribe_market(&ids).await;
304 }
305 HandlerCommand::UnsubscribeMarket(ids) => {
306 for id in &ids {
307 self.subscriptions.mark_unsubscribe(id);
308 }
309 self.send_unsubscribe_market(&ids).await;
310 for id in &ids {
311 self.subscriptions.confirm_unsubscribe(id);
312 }
313 }
314 HandlerCommand::SubscribeUser => {
315 self.user_subscribed = true;
316 self.send_subscribe_user().await;
317 }
318 }
319 }
320 Some(raw) = self.raw_rx.recv() => {
321 match raw {
322 Message::Text(text) => {
323 if text == RECONNECTED {
324 self.market_subscription_initialized = false;
325 self.resubscribe_all().await;
326 return Some(PolymarketWsMessage::Reconnected);
327 }
328 let msgs = self.parse_messages(&text);
329 if msgs.is_empty() {
330 continue;
331 }
332 if self.channel == WsChannel::User {
335 self.auth_tracker.succeed();
336 }
337 let mut iter = msgs.into_iter();
340 let first = iter.next().unwrap();
341 self.message_buffer.extend(iter);
342 return Some(first);
343 }
344 Message::Ping(data) => {
345 if let Some(ref client) = self.client
346 && let Err(e) = client.send_pong(data.to_vec()).await
347 {
348 log::warn!("Failed to send pong: {e}");
349 }
350 }
351 Message::Close(_) => {
352 log::info!("WebSocket close frame received");
353 return None;
354 }
355 _ => {}
356 }
357 }
358 else => return None,
359 }
360 }
361 }
362}