1use std::{collections::HashMap, fmt::Debug, sync::Arc};
17
18use nautilus_core::consts::NAUTILUS_USER_AGENT;
19use nautilus_model::defi::{Block, Chain, rpc::RpcNodeWssResponse};
20use nautilus_network::{
21 RECONNECTED,
22 http::USER_AGENT,
23 websocket::{TransportBackend, WebSocketClient, WebSocketConfig, channel_message_handler},
24};
25use tokio_tungstenite::tungstenite::Message;
26
27use crate::rpc::{
28 error::BlockchainRpcClientError,
29 types::{BlockchainMessage, RpcEventType},
30 utils::{
31 extract_rpc_subscription_id, is_subscription_confirmation_response, is_subscription_event,
32 },
33};
34
35pub struct CoreBlockchainRpcClient {
44 chain: Chain,
46 wss_rpc_url: String,
48 request_id: u64,
50 pending_subscription_request: HashMap<u64, RpcEventType>,
52 subscription_event_types: HashMap<String, RpcEventType>,
55 wss_client: Option<Arc<WebSocketClient>>,
57 wss_consumer_rx: Option<tokio::sync::mpsc::UnboundedReceiver<Message>>,
59 subscriptions: Arc<tokio::sync::RwLock<HashMap<RpcEventType, String>>>,
61 transport_backend: TransportBackend,
63 proxy_url: Option<String>,
65}
66
67impl Debug for CoreBlockchainRpcClient {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct(stringify!(CoreBlockchainRpcClient))
70 .field("chain", &self.chain)
71 .field("wss_rpc_url", &self.wss_rpc_url)
72 .field("request_id", &self.request_id)
73 .field(
74 "pending_subscription_request",
75 &self.pending_subscription_request,
76 )
77 .field("subscription_event_types", &self.subscription_event_types)
78 .field(
79 "wss_client",
80 &self.wss_client.as_ref().map(|_| "<WebSocketClient>"),
81 )
82 .field(
83 "wss_consumer_rx",
84 &self.wss_consumer_rx.as_ref().map(|_| "<Receiver>"),
85 )
86 .field("confirmed_subscriptions", &"<RwLock<HashMap>>")
87 .finish()
88 }
89}
90
91impl CoreBlockchainRpcClient {
92 #[must_use]
93 pub fn new(chain: Chain, wss_rpc_url: String, proxy_url: Option<String>) -> Self {
94 Self {
95 chain,
96 wss_rpc_url,
97 request_id: 1,
98 wss_client: None,
99 pending_subscription_request: HashMap::new(),
100 subscription_event_types: HashMap::new(),
101 wss_consumer_rx: None,
102 subscriptions: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
103 transport_backend: TransportBackend::default(),
104 proxy_url,
105 }
106 }
107
108 #[must_use]
110 pub fn with_transport_backend(mut self, backend: TransportBackend) -> Self {
111 self.transport_backend = backend;
112 self
113 }
114
115 pub fn set_transport_backend(&mut self, backend: TransportBackend) {
117 self.transport_backend = backend;
118 }
119
120 pub async fn connect(&mut self) -> anyhow::Result<()> {
129 let (handler, rx) = channel_message_handler();
130
131 let heartbeat_interval = 30;
133
134 let config = WebSocketConfig {
135 url: self.wss_rpc_url.clone(),
136 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
137 heartbeat: Some(heartbeat_interval),
138 heartbeat_msg: None,
139 reconnect_timeout_ms: Some(10_000),
140 reconnect_delay_initial_ms: Some(1_000),
141 reconnect_delay_max_ms: Some(30_000),
142 reconnect_backoff_factor: Some(2.0),
143 reconnect_jitter_ms: Some(1_000),
144 reconnect_max_attempts: None,
145 idle_timeout_ms: None,
146 backend: self.transport_backend,
147 proxy_url: self.proxy_url.clone(),
148 };
149
150 let client =
151 WebSocketClient::connect(config, Some(handler), None, None, vec![], None).await?;
152
153 self.wss_client = Some(Arc::new(client));
154 self.wss_consumer_rx = Some(rx);
155
156 Ok(())
157 }
158
159 async fn subscribe_events(
161 &mut self,
162 event_type: RpcEventType,
163 subscription_id: String,
164 ) -> Result<(), BlockchainRpcClientError> {
165 if let Some(client) = &self.wss_client {
166 log::info!(
167 "Subscribing to '{}' on chain '{}'",
168 subscription_id,
169 self.chain.name
170 );
171 let msg = serde_json::json!({
172 "method": "eth_subscribe",
173 "id": self.request_id,
174 "jsonrpc": "2.0",
175 "params": [subscription_id]
176 });
177 self.pending_subscription_request
178 .insert(self.request_id, event_type.clone());
179 self.request_id += 1;
180
181 if let Err(e) = client.send_text(msg.to_string(), None).await {
182 log::error!("Error sending subscribe message: {e:?}");
183 }
184
185 let mut confirmed = self.subscriptions.write().await;
187 confirmed.insert(event_type, subscription_id);
188
189 Ok(())
190 } else {
191 Err(BlockchainRpcClientError::ClientError(String::from(
192 "Client not connected",
193 )))
194 }
195 }
196
197 async fn resubscribe_all(&mut self) -> Result<(), BlockchainRpcClientError> {
199 let subscriptions = self.subscriptions.read().await;
200
201 if subscriptions.is_empty() {
202 log::debug!(
203 "No subscriptions to re-establish for chain '{}'",
204 self.chain.name
205 );
206 return Ok(());
207 }
208
209 log::info!(
210 "Re-establishing {} subscription(s) for chain '{}'",
211 subscriptions.len(),
212 self.chain.name
213 );
214
215 let subs_to_restore: Vec<(RpcEventType, String)> = subscriptions
216 .iter()
217 .map(|(event_type, sub_id)| (event_type.clone(), sub_id.clone()))
218 .collect();
219
220 drop(subscriptions);
221
222 for (event_type, subscription_id) in subs_to_restore {
223 if let Some(client) = &self.wss_client {
224 log::debug!(
225 "Re-subscribing to '{}' on chain '{}'",
226 subscription_id,
227 self.chain.name
228 );
229
230 let msg = serde_json::json!({
231 "method": "eth_subscribe",
232 "id": self.request_id,
233 "jsonrpc": "2.0",
234 "params": [subscription_id]
235 });
236
237 self.pending_subscription_request
238 .insert(self.request_id, event_type);
239 self.request_id += 1;
240
241 if let Err(e) = client.send_text(msg.to_string(), None).await {
242 log::error!("Error re-subscribing after reconnection: {e:?}");
243 }
244 }
245 }
246
247 Ok(())
248 }
249
250 async fn unsubscribe_events(
252 &self,
253 subscription_id: String,
254 ) -> Result<(), BlockchainRpcClientError> {
255 if let Some(client) = &self.wss_client {
256 log::info!("Unsubscribing to new blocks on chain {}", self.chain.name);
257 let msg = serde_json::json!({
258 "method": "eth_unsubscribe",
259 "id": 1,
260 "jsonrpc": "2.0",
261 "params": [subscription_id]
262 });
263
264 if let Err(e) = client.send_text(msg.to_string(), None).await {
265 log::error!("Error sending unsubscribe message: {e:?}");
266 }
267 Ok(())
268 } else {
269 Err(BlockchainRpcClientError::ClientError(String::from(
270 "Client not connected",
271 )))
272 }
273 }
274
275 pub async fn wait_on_rpc_channel(&mut self) -> Option<Message> {
277 match &mut self.wss_consumer_rx {
278 Some(rx) => rx.recv().await,
279 None => None,
280 }
281 }
282
283 pub async fn next_rpc_message(
295 &mut self,
296 ) -> Result<BlockchainMessage, BlockchainRpcClientError> {
297 while let Some(msg) = self.wait_on_rpc_channel().await {
298 match msg {
299 Message::Text(text) => {
300 if text == RECONNECTED {
301 log::info!("Detected reconnection for chain '{}'", self.chain.name);
302
303 if let Err(e) = self.resubscribe_all().await {
304 log::error!("Failed to re-establish subscriptions: {e:?}");
305 }
306 continue;
307 }
308
309 match serde_json::from_str::<serde_json::Value>(&text) {
310 Ok(json) => {
311 if is_subscription_confirmation_response(&json) {
312 let subscription_request_id =
313 json.get("id").unwrap().as_u64().unwrap();
314 let result = json.get("result").unwrap().as_str().unwrap();
315 let event_type = self
316 .pending_subscription_request
317 .get(&subscription_request_id)
318 .unwrap();
319 self.subscription_event_types
320 .insert(result.to_string(), event_type.clone());
321 self.pending_subscription_request
322 .remove(&subscription_request_id);
323 continue;
324 } else if is_subscription_event(&json) {
325 let subscription_id = match extract_rpc_subscription_id(&json) {
326 Some(id) => id,
327 None => {
328 return Err(BlockchainRpcClientError::InternalRpcClientError(
329 "Error parsing subscription id from valid rpc response"
330 .to_string(),
331 ));
332 }
333 };
334
335 if let Some(event_type) =
336 self.subscription_event_types.get(subscription_id)
337 {
338 match event_type {
339 RpcEventType::NewBlock => {
340 return match serde_json::from_value::<
341 RpcNodeWssResponse<Block>,
342 >(
343 json
344 ) {
345 Ok(block_response) => {
346 let block = block_response.params.result;
347 Ok(BlockchainMessage::Block(block))
348 }
349 Err(e) => Err(
350 BlockchainRpcClientError::MessageParsingError(
351 format!(
352 "Error parsing rpc response to block with error {e}"
353 ),
354 ),
355 ),
356 };
357 }
358 }
359 }
360 return Err(BlockchainRpcClientError::InternalRpcClientError(
361 format!(
362 "Event type not found for defined subscription id {subscription_id}"
363 ),
364 ));
365 }
366 return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
367 json.to_string(),
368 ));
369 }
370 Err(e) => {
371 return Err(BlockchainRpcClientError::MessageParsingError(
372 e.to_string(),
373 ));
374 }
375 }
376 }
377 Message::Pong(_) => {}
378 _ => {
379 return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
380 msg.to_string(),
381 ));
382 }
383 }
384 }
385
386 Err(BlockchainRpcClientError::NoMessageReceived)
387 }
388
389 pub async fn subscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
395 self.subscribe_events(RpcEventType::NewBlock, String::from("newHeads"))
396 .await
397 }
398
399 pub async fn unsubscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
405 self.unsubscribe_events(String::from("newHeads")).await?;
406
407 let subscription_ids_to_remove: Vec<String> = self
408 .subscription_event_types
409 .iter()
410 .filter(|(_, event_type)| **event_type == RpcEventType::NewBlock)
411 .map(|(id, _)| id.clone())
412 .collect();
413
414 for id in subscription_ids_to_remove {
415 self.subscription_event_types.remove(&id);
416 }
417
418 let mut confirmed = self.subscriptions.write().await;
419 confirmed.remove(&RpcEventType::NewBlock);
420
421 Ok(())
422 }
423}