nautilus_blockchain/rpc/
http.rs1use std::{collections::HashMap, num::NonZeroU32, str::FromStr};
17
18use alloy::primitives::{Address, U256};
19use bytes::Bytes;
20use nautilus_core::hex;
21use nautilus_model::defi::rpc::{RpcLog, RpcNodeHttpResponse};
22use nautilus_network::{
23 http::{HttpClient, Method},
24 ratelimiter::quota::Quota,
25};
26use serde::de::DeserializeOwned;
27
28use crate::rpc::error::BlockchainRpcClientError;
29
30#[derive(Debug)]
35pub struct BlockchainHttpRpcClient {
36 http_rpc_url: String,
38 http_client: HttpClient,
40}
41
42impl BlockchainHttpRpcClient {
43 #[must_use]
51 pub fn new(
52 http_rpc_url: String,
53 rpc_request_per_second: Option<u32>,
54 proxy_url: Option<String>,
55 ) -> Self {
56 let default_quota =
57 rpc_request_per_second.and_then(|rps| Quota::per_second(NonZeroU32::new(rps)?));
58 let http_client = HttpClient::new(
59 HashMap::new(),
60 vec![],
61 Vec::new(),
62 default_quota,
63 None, proxy_url,
65 )
66 .expect("Failed to create HTTP client");
67 Self {
68 http_rpc_url,
69 http_client,
70 }
71 }
72
73 async fn send_rpc_request(
75 &self,
76 rpc_request: serde_json::Value,
77 ) -> Result<Bytes, BlockchainRpcClientError> {
78 let body_bytes = serde_json::to_vec(&rpc_request).map_err(|e| {
79 BlockchainRpcClientError::ClientError(format!("Failed to serialize request: {e}"))
80 })?;
81
82 let mut headers = HashMap::new();
83 headers.insert("Content-Type".to_string(), "application/json".to_string());
84
85 match self
86 .http_client
87 .request(
88 Method::POST,
89 self.http_rpc_url.clone(),
90 None,
91 Some(headers),
92 Some(body_bytes),
93 None,
94 None,
95 )
96 .await
97 {
98 Ok(response) => Ok(response.body),
99 Err(e) => Err(BlockchainRpcClientError::ClientError(e.to_string())),
100 }
101 }
102
103 pub async fn execute_rpc_call<T: DeserializeOwned>(
109 &self,
110 rpc_request: serde_json::Value,
111 ) -> anyhow::Result<T> {
112 match self.send_rpc_request(rpc_request).await {
113 Ok(bytes) => match serde_json::from_slice::<RpcNodeHttpResponse<T>>(bytes.as_ref()) {
114 Ok(parsed) => {
115 if parsed.jsonrpc.is_none()
118 && let (Some(code), Some(message)) = (parsed.code, parsed.message)
119 {
120 anyhow::bail!("RPC provider error {code}: {message}");
121 }
122
123 if let Some(error) = parsed.error {
124 Err(anyhow::anyhow!(
125 "RPC error {}: {}",
126 error.code,
127 error.message
128 ))
129 } else if let Some(result) = parsed.result {
130 Ok(result)
131 } else {
132 Err(anyhow::anyhow!(
133 "Response missing both result and error fields"
134 ))
135 }
136 }
137 Err(e) => {
138 let raw_response = String::from_utf8_lossy(bytes.as_ref());
140 let preview = if raw_response.len() > 500 {
141 format!(
142 "{}... (truncated, {} bytes total)",
143 &raw_response[..500],
144 raw_response.len()
145 )
146 } else {
147 raw_response.to_string()
148 };
149
150 Err(anyhow::anyhow!(
151 "Failed to parse eth call response: {e}\nRaw response: {preview}"
152 ))
153 }
154 },
155 Err(e) => Err(anyhow::anyhow!(
156 "Failed to execute eth call RPC request: {e}"
157 )),
158 }
159 }
160
161 #[must_use]
163 pub fn construct_eth_call(
164 &self,
165 to: &str,
166 call_data: &[u8],
167 block: Option<u64>,
168 ) -> serde_json::Value {
169 let encoded_data = hex::encode_prefixed(call_data);
170 let call = serde_json::json!({
171 "to": to,
172 "data": encoded_data
173 });
174
175 let block_param = if let Some(block_number) = block {
176 serde_json::json!(format!("0x{:x}", block_number))
177 } else {
178 serde_json::json!("latest")
179 };
180
181 serde_json::json!({
182 "jsonrpc": "2.0",
183 "id": 1,
184 "method": "eth_call",
185 "params": [call, block_param]
186 })
187 }
188
189 pub async fn get_balance(&self, address: &Address, block: Option<u64>) -> anyhow::Result<U256> {
195 let block_param = if let Some(block_number) = block {
196 serde_json::json!(format!("0x{:x}", block_number))
197 } else {
198 serde_json::json!("latest")
199 };
200
201 let request = serde_json::json!({
202 "jsonrpc": "2.0",
203 "id": 1,
204 "method": "eth_getBalance",
205 "params": [address, block_param]
206 });
207 let hex_string: String = self.execute_rpc_call(request).await?;
208
209 U256::from_str(&hex_string)
210 .map_err(|e| anyhow::anyhow!("Failed to parse balance hex string '{hex_string}': {e}"))
211 }
212
213 pub async fn get_logs(
222 &self,
223 address: Option<&Address>,
224 topics: Option<Vec<Option<String>>>,
225 from_block: u64,
226 to_block: u64,
227 ) -> anyhow::Result<Vec<RpcLog>> {
228 let mut filter = serde_json::Map::new();
229
230 filter.insert(
231 "fromBlock".to_string(),
232 serde_json::json!(format!("0x{:x}", from_block)),
233 );
234 filter.insert(
235 "toBlock".to_string(),
236 serde_json::json!(format!("0x{:x}", to_block)),
237 );
238
239 if let Some(addr) = address {
240 filter.insert(
241 "address".to_string(),
242 serde_json::json!(format!("{:?}", addr)),
243 );
244 }
245
246 if let Some(topics) = topics {
247 filter.insert("topics".to_string(), serde_json::json!(topics));
248 }
249
250 let request = serde_json::json!({
251 "jsonrpc": "2.0",
252 "id": 1,
253 "method": "eth_getLogs",
254 "params": [filter]
255 });
256
257 self.execute_rpc_call(request).await
258 }
259}