Skip to main content

nautilus_blockchain/rpc/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, fmt::Debug, sync::Arc};
17
18use nautilus_core::consts::NAUTILUS_USER_AGENT;
19use nautilus_model::defi::{Block, Chain, rpc::RpcNodeWssResponse};
20use nautilus_network::{
21    RECONNECTED,
22    http::USER_AGENT,
23    websocket::{TransportBackend, WebSocketClient, WebSocketConfig, channel_message_handler},
24};
25use tokio_tungstenite::tungstenite::Message;
26
27use crate::rpc::{
28    error::BlockchainRpcClientError,
29    types::{BlockchainMessage, RpcEventType},
30    utils::{
31        extract_rpc_subscription_id, is_subscription_confirmation_response, is_subscription_event,
32    },
33};
34
35/// Core implementation of a blockchain RPC client that serves as the base for all chain-specific clients.
36///
37/// It provides a shared implementation of common blockchain RPC functionality, handling:
38/// - WebSocket connection management with blockchain RPC node.
39/// - Subscription lifecycle (creation, tracking, and termination).
40/// - Message serialization and deserialization of RPC messages.
41/// - Event type mapping and dispatching.
42/// - Automatic subscription re-establishment on reconnection.
43pub struct CoreBlockchainRpcClient {
44    /// The blockchain network type this client connects to.
45    chain: Chain,
46    /// WebSocket secure URL for the blockchain node's RPC endpoint.
47    wss_rpc_url: String,
48    /// Auto-incrementing counter for generating unique RPC request IDs.
49    request_id: u64,
50    /// Tracks in-flight subscription requests by mapping request IDs to their event types.
51    pending_subscription_request: HashMap<u64, RpcEventType>,
52    /// Maps active subscription IDs to their corresponding event types for message
53    /// deserialization.
54    subscription_event_types: HashMap<String, RpcEventType>,
55    /// The active WebSocket client connection.
56    wss_client: Option<Arc<WebSocketClient>>,
57    /// Channel receiver for consuming WebSocket messages.
58    wss_consumer_rx: Option<tokio::sync::mpsc::UnboundedReceiver<Message>>,
59    /// Tracks confirmed subscriptions that need to be re-established on reconnection.
60    subscriptions: Arc<tokio::sync::RwLock<HashMap<RpcEventType, String>>>,
61    /// WebSocket transport backend (defaults to `Tungstenite`).
62    transport_backend: TransportBackend,
63    /// Optional proxy URL for the WebSocket connection.
64    proxy_url: Option<String>,
65}
66
67impl Debug for CoreBlockchainRpcClient {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct(stringify!(CoreBlockchainRpcClient))
70            .field("chain", &self.chain)
71            .field("wss_rpc_url", &self.wss_rpc_url)
72            .field("request_id", &self.request_id)
73            .field(
74                "pending_subscription_request",
75                &self.pending_subscription_request,
76            )
77            .field("subscription_event_types", &self.subscription_event_types)
78            .field(
79                "wss_client",
80                &self.wss_client.as_ref().map(|_| "<WebSocketClient>"),
81            )
82            .field(
83                "wss_consumer_rx",
84                &self.wss_consumer_rx.as_ref().map(|_| "<Receiver>"),
85            )
86            .field("confirmed_subscriptions", &"<RwLock<HashMap>>")
87            .finish()
88    }
89}
90
91impl CoreBlockchainRpcClient {
92    #[must_use]
93    pub fn new(chain: Chain, wss_rpc_url: String, proxy_url: Option<String>) -> Self {
94        Self {
95            chain,
96            wss_rpc_url,
97            request_id: 1,
98            wss_client: None,
99            pending_subscription_request: HashMap::new(),
100            subscription_event_types: HashMap::new(),
101            wss_consumer_rx: None,
102            subscriptions: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
103            transport_backend: TransportBackend::default(),
104            proxy_url,
105        }
106    }
107
108    /// Sets the transport backend for the next [`Self::connect`].
109    #[must_use]
110    pub fn with_transport_backend(mut self, backend: TransportBackend) -> Self {
111        self.transport_backend = backend;
112        self
113    }
114
115    /// Updates the transport backend in place.
116    pub fn set_transport_backend(&mut self, backend: TransportBackend) {
117        self.transport_backend = backend;
118    }
119
120    /// Establishes a WebSocket connection to the blockchain node and sets up the message channel.
121    ///
122    /// Configures automatic reconnection with exponential backoff and subscription re-establishment.
123    /// Reconnection is handled via the `RECONNECTED` message in the message stream.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the WebSocket connection fails.
128    pub async fn connect(&mut self) -> anyhow::Result<()> {
129        let (handler, rx) = channel_message_handler();
130
131        // Most blockchain RPC nodes require a heartbeat to keep the connection alive
132        let heartbeat_interval = 30;
133
134        let config = WebSocketConfig {
135            url: self.wss_rpc_url.clone(),
136            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
137            heartbeat: Some(heartbeat_interval),
138            heartbeat_msg: None,
139            reconnect_timeout_ms: Some(10_000),
140            reconnect_delay_initial_ms: Some(1_000),
141            reconnect_delay_max_ms: Some(30_000),
142            reconnect_backoff_factor: Some(2.0),
143            reconnect_jitter_ms: Some(1_000),
144            reconnect_max_attempts: None,
145            idle_timeout_ms: None,
146            backend: self.transport_backend,
147            proxy_url: self.proxy_url.clone(),
148        };
149
150        let client =
151            WebSocketClient::connect(config, Some(handler), None, None, vec![], None).await?;
152
153        self.wss_client = Some(Arc::new(client));
154        self.wss_consumer_rx = Some(rx);
155
156        Ok(())
157    }
158
159    /// Registers a subscription for the specified event type and records it internally with the given ID.
160    async fn subscribe_events(
161        &mut self,
162        event_type: RpcEventType,
163        subscription_id: String,
164    ) -> Result<(), BlockchainRpcClientError> {
165        if let Some(client) = &self.wss_client {
166            log::info!(
167                "Subscribing to '{}' on chain '{}'",
168                subscription_id,
169                self.chain.name
170            );
171            let msg = serde_json::json!({
172                "method": "eth_subscribe",
173                "id": self.request_id,
174                "jsonrpc": "2.0",
175                "params": [subscription_id]
176            });
177            self.pending_subscription_request
178                .insert(self.request_id, event_type.clone());
179            self.request_id += 1;
180
181            if let Err(e) = client.send_text(msg.to_string(), None).await {
182                log::error!("Error sending subscribe message: {e:?}");
183            }
184
185            // Track subscription for re-establishment on reconnect
186            let mut confirmed = self.subscriptions.write().await;
187            confirmed.insert(event_type, subscription_id);
188
189            Ok(())
190        } else {
191            Err(BlockchainRpcClientError::ClientError(String::from(
192                "Client not connected",
193            )))
194        }
195    }
196
197    /// Re-establishes all confirmed subscriptions after reconnection.
198    async fn resubscribe_all(&mut self) -> Result<(), BlockchainRpcClientError> {
199        let subscriptions = self.subscriptions.read().await;
200
201        if subscriptions.is_empty() {
202            log::debug!(
203                "No subscriptions to re-establish for chain '{}'",
204                self.chain.name
205            );
206            return Ok(());
207        }
208
209        log::info!(
210            "Re-establishing {} subscription(s) for chain '{}'",
211            subscriptions.len(),
212            self.chain.name
213        );
214
215        let subs_to_restore: Vec<(RpcEventType, String)> = subscriptions
216            .iter()
217            .map(|(event_type, sub_id)| (event_type.clone(), sub_id.clone()))
218            .collect();
219
220        drop(subscriptions);
221
222        for (event_type, subscription_id) in subs_to_restore {
223            if let Some(client) = &self.wss_client {
224                log::debug!(
225                    "Re-subscribing to '{}' on chain '{}'",
226                    subscription_id,
227                    self.chain.name
228                );
229
230                let msg = serde_json::json!({
231                    "method": "eth_subscribe",
232                    "id": self.request_id,
233                    "jsonrpc": "2.0",
234                    "params": [subscription_id]
235                });
236
237                self.pending_subscription_request
238                    .insert(self.request_id, event_type);
239                self.request_id += 1;
240
241                if let Err(e) = client.send_text(msg.to_string(), None).await {
242                    log::error!("Error re-subscribing after reconnection: {e:?}");
243                }
244            }
245        }
246
247        Ok(())
248    }
249
250    /// Terminates a subscription with the blockchain node using the provided subscription ID.
251    async fn unsubscribe_events(
252        &self,
253        subscription_id: String,
254    ) -> Result<(), BlockchainRpcClientError> {
255        if let Some(client) = &self.wss_client {
256            log::info!("Unsubscribing to new blocks on chain {}", self.chain.name);
257            let msg = serde_json::json!({
258                "method": "eth_unsubscribe",
259                "id": 1,
260                "jsonrpc": "2.0",
261                "params": [subscription_id]
262            });
263
264            if let Err(e) = client.send_text(msg.to_string(), None).await {
265                log::error!("Error sending unsubscribe message: {e:?}");
266            }
267            Ok(())
268        } else {
269            Err(BlockchainRpcClientError::ClientError(String::from(
270                "Client not connected",
271            )))
272        }
273    }
274
275    /// Waits for and returns the next available message from the WebSocket channel.
276    pub async fn wait_on_rpc_channel(&mut self) -> Option<Message> {
277        match &mut self.wss_consumer_rx {
278            Some(rx) => rx.recv().await,
279            None => None,
280        }
281    }
282
283    /// Retrieves, parses, and returns the next blockchain RPC message as a structured `BlockchainRpcMessage` type.
284    ///
285    /// Handles subscription confirmations, events, and reconnection signals automatically.
286    ///
287    /// # Panics
288    ///
289    /// Panics if expected fields (`id`, `result`) are missing or cannot be converted when handling subscription confirmations or events.
290    ///
291    /// # Errors
292    ///
293    /// Returns an error if the RPC channel encounters an error or if deserialization of the message fails.
294    pub async fn next_rpc_message(
295        &mut self,
296    ) -> Result<BlockchainMessage, BlockchainRpcClientError> {
297        while let Some(msg) = self.wait_on_rpc_channel().await {
298            match msg {
299                Message::Text(text) => {
300                    if text == RECONNECTED {
301                        log::info!("Detected reconnection for chain '{}'", self.chain.name);
302
303                        if let Err(e) = self.resubscribe_all().await {
304                            log::error!("Failed to re-establish subscriptions: {e:?}");
305                        }
306                        continue;
307                    }
308
309                    match serde_json::from_str::<serde_json::Value>(&text) {
310                        Ok(json) => {
311                            if is_subscription_confirmation_response(&json) {
312                                let subscription_request_id =
313                                    json.get("id").unwrap().as_u64().unwrap();
314                                let result = json.get("result").unwrap().as_str().unwrap();
315                                let event_type = self
316                                    .pending_subscription_request
317                                    .get(&subscription_request_id)
318                                    .unwrap();
319                                self.subscription_event_types
320                                    .insert(result.to_string(), event_type.clone());
321                                self.pending_subscription_request
322                                    .remove(&subscription_request_id);
323                                continue;
324                            } else if is_subscription_event(&json) {
325                                let subscription_id = match extract_rpc_subscription_id(&json) {
326                                    Some(id) => id,
327                                    None => {
328                                        return Err(BlockchainRpcClientError::InternalRpcClientError(
329                                        "Error parsing subscription id from valid rpc response"
330                                            .to_string(),
331                                    ));
332                                    }
333                                };
334
335                                if let Some(event_type) =
336                                    self.subscription_event_types.get(subscription_id)
337                                {
338                                    match event_type {
339                                        RpcEventType::NewBlock => {
340                                            return match serde_json::from_value::<
341                                                RpcNodeWssResponse<Block>,
342                                            >(
343                                                json
344                                            ) {
345                                                Ok(block_response) => {
346                                                    let block = block_response.params.result;
347                                                    Ok(BlockchainMessage::Block(block))
348                                                }
349                                                Err(e) => Err(
350                                                    BlockchainRpcClientError::MessageParsingError(
351                                                        format!(
352                                                            "Error parsing rpc response to block with error {e}"
353                                                        ),
354                                                    ),
355                                                ),
356                                            };
357                                        }
358                                    }
359                                }
360                                return Err(BlockchainRpcClientError::InternalRpcClientError(
361                                    format!(
362                                        "Event type not found for defined subscription id {subscription_id}"
363                                    ),
364                                ));
365                            }
366                            return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
367                                json.to_string(),
368                            ));
369                        }
370                        Err(e) => {
371                            return Err(BlockchainRpcClientError::MessageParsingError(
372                                e.to_string(),
373                            ));
374                        }
375                    }
376                }
377                Message::Pong(_) => {}
378                _ => {
379                    return Err(BlockchainRpcClientError::UnsupportedRpcResponseType(
380                        msg.to_string(),
381                    ));
382                }
383            }
384        }
385
386        Err(BlockchainRpcClientError::NoMessageReceived)
387    }
388
389    /// Subscribes to real-time block updates from the blockchain node.
390    ///
391    /// # Errors
392    ///
393    /// Returns an error if the subscription request fails or if the client is not connected.
394    pub async fn subscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
395        self.subscribe_events(RpcEventType::NewBlock, String::from("newHeads"))
396            .await
397    }
398
399    /// Cancels the subscription to real-time block updates.
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if the unsubscription request fails or if the client is not connected.
404    pub async fn unsubscribe_blocks(&mut self) -> Result<(), BlockchainRpcClientError> {
405        self.unsubscribe_events(String::from("newHeads")).await?;
406
407        let subscription_ids_to_remove: Vec<String> = self
408            .subscription_event_types
409            .iter()
410            .filter(|(_, event_type)| **event_type == RpcEventType::NewBlock)
411            .map(|(id, _)| id.clone())
412            .collect();
413
414        for id in subscription_ids_to_remove {
415            self.subscription_event_types.remove(&id);
416        }
417
418        let mut confirmed = self.subscriptions.write().await;
419        confirmed.remove(&RpcEventType::NewBlock);
420
421        Ok(())
422    }
423}