1use cosmrs::Tx;
22use prost::Message as ProstMessage;
23use tonic::transport::Channel;
24
25use crate::{
26 error::DydxError,
27 proto::{
28 AccountAuthenticator, AccountPlusClient, GetAuthenticatorsRequest,
29 cosmos_sdk_proto::cosmos::{
30 auth::v1beta1::{
31 BaseAccount, QueryAccountRequest, query_client::QueryClient as AuthClient,
32 },
33 bank::v1beta1::{QueryAllBalancesRequest, query_client::QueryClient as BankClient},
34 base::{
35 tendermint::v1beta1::{
36 Block, GetLatestBlockRequest, GetNodeInfoRequest, GetNodeInfoResponse,
37 service_client::ServiceClient as BaseClient,
38 },
39 v1beta1::Coin,
40 },
41 tx::v1beta1::{
42 BroadcastMode, BroadcastTxRequest, GetTxRequest, SimulateRequest,
43 service_client::ServiceClient as TxClient,
44 },
45 },
46 dydxprotocol::{
47 clob::{ClobPair, QueryAllClobPairRequest, query_client::QueryClient as ClobClient},
48 perpetuals::{
49 Perpetual, QueryAllPerpetualsRequest, query_client::QueryClient as PerpetualsClient,
50 },
51 subaccounts::{
52 QueryGetSubaccountRequest, Subaccount as SubaccountInfo,
53 query_client::QueryClient as SubaccountsClient,
54 },
55 },
56 },
57};
58
59pub type TxHash = String;
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
64pub struct Height(pub u32);
65
66#[derive(Debug, Clone)]
74pub struct DydxGrpcClient {
75 channel: Channel,
76 auth: AuthClient<Channel>,
77 bank: BankClient<Channel>,
78 base: BaseClient<Channel>,
79 tx: TxClient<Channel>,
80 clob: ClobClient<Channel>,
81 perpetuals: PerpetualsClient<Channel>,
82 subaccounts: SubaccountsClient<Channel>,
83 accountplus: AccountPlusClient<Channel>,
84 current_url: String,
85}
86
87impl DydxGrpcClient {
88 pub async fn new(grpc_url: String) -> Result<Self, DydxError> {
94 let mut endpoint = Channel::from_shared(grpc_url.clone())
95 .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))?
96 .connect_timeout(std::time::Duration::from_secs(10))
97 .timeout(std::time::Duration::from_secs(30));
98
99 if grpc_url.starts_with("https://") {
101 let tls = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
102 endpoint = endpoint
103 .tls_config(tls)
104 .map_err(|e| DydxError::Config(format!("TLS config failed: {e}")))?;
105 }
106
107 let channel = endpoint.connect().await.map_err(|e| {
108 DydxError::Grpc(Box::new(tonic::Status::unavailable(format!(
109 "Connection failed: {e}"
110 ))))
111 })?;
112
113 Ok(Self {
114 auth: AuthClient::new(channel.clone()),
115 bank: BankClient::new(channel.clone()),
116 base: BaseClient::new(channel.clone()),
117 tx: TxClient::new(channel.clone()),
118 clob: ClobClient::new(channel.clone()),
119 perpetuals: PerpetualsClient::new(channel.clone()),
120 subaccounts: SubaccountsClient::new(channel.clone()),
121 accountplus: AccountPlusClient::new(channel.clone()),
122 channel,
123 current_url: grpc_url,
124 })
125 }
126
127 pub async fn new_with_fallback(grpc_urls: &[impl AsRef<str>]) -> Result<Self, DydxError> {
137 if grpc_urls.is_empty() {
138 return Err(DydxError::Config("No gRPC URLs provided".to_string()));
139 }
140
141 let mut last_error = None;
142
143 for (idx, url) in grpc_urls.iter().enumerate() {
144 let url_str = url.as_ref();
145 log::debug!(
146 "Attempting to connect to gRPC node: {url_str} (attempt {}/{})",
147 idx + 1,
148 grpc_urls.len()
149 );
150
151 match Self::new(url_str.to_string()).await {
152 Ok(client) => {
153 log::info!("Successfully connected to gRPC node: {url_str}");
154 return Ok(client);
155 }
156 Err(e) => {
157 log::warn!("Failed to connect to gRPC node {url_str}: {e}");
158 last_error = Some(e);
159 }
160 }
161 }
162
163 Err(last_error.unwrap_or_else(|| {
164 DydxError::Grpc(Box::new(tonic::Status::unavailable(
165 "All gRPC connection attempts failed".to_string(),
166 )))
167 }))
168 }
169
170 pub async fn reconnect_with_fallback(
180 &mut self,
181 grpc_urls: &[impl AsRef<str>],
182 ) -> Result<(), DydxError> {
183 if grpc_urls.is_empty() {
184 return Err(DydxError::Config("No gRPC URLs provided".to_string()));
185 }
186
187 let mut last_error = None;
188
189 for (idx, url) in grpc_urls.iter().enumerate() {
190 let url_str = url.as_ref();
191
192 if url_str == self.current_url {
194 log::debug!("Skipping current URL: {url_str}");
195 continue;
196 }
197
198 log::debug!(
199 "Attempting to reconnect to gRPC node: {url_str} (attempt {}/{})",
200 idx + 1,
201 grpc_urls.len()
202 );
203
204 let mut endpoint = match Channel::from_shared(url_str.to_string())
205 .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))
206 {
207 Ok(ep) => ep
208 .connect_timeout(std::time::Duration::from_secs(10))
209 .timeout(std::time::Duration::from_secs(30)),
210 Err(e) => {
211 last_error = Some(e);
212 continue;
213 }
214 };
215
216 if url_str.starts_with("https://") {
218 let tls = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
219 endpoint = match endpoint.tls_config(tls) {
220 Ok(ep) => ep,
221 Err(e) => {
222 last_error = Some(DydxError::Config(format!("TLS config failed: {e}")));
223 continue;
224 }
225 };
226 }
227
228 match endpoint.connect().await {
229 Ok(connected_channel) => {
230 log::info!("Successfully reconnected to gRPC node: {url_str}");
231
232 self.channel = connected_channel.clone();
234 self.auth = AuthClient::new(connected_channel.clone());
235 self.bank = BankClient::new(connected_channel.clone());
236 self.base = BaseClient::new(connected_channel.clone());
237 self.tx = TxClient::new(connected_channel.clone());
238 self.clob = ClobClient::new(connected_channel.clone());
239 self.perpetuals = PerpetualsClient::new(connected_channel.clone());
240 self.subaccounts = SubaccountsClient::new(connected_channel);
241 self.current_url = url_str.to_string();
242
243 return Ok(());
244 }
245 Err(e) => {
246 log::warn!("Failed to reconnect to gRPC node {url_str}: {e}");
247 last_error = Some(DydxError::Grpc(Box::new(tonic::Status::unavailable(
248 format!("Connection failed: {e}"),
249 ))));
250 }
251 }
252 }
253
254 Err(last_error.unwrap_or_else(|| {
255 DydxError::Grpc(Box::new(tonic::Status::unavailable(
256 "All gRPC reconnection attempts failed".to_string(),
257 )))
258 }))
259 }
260
261 #[must_use]
263 pub fn current_url(&self) -> &str {
264 &self.current_url
265 }
266
267 #[must_use]
271 pub fn channel(&self) -> &Channel {
272 &self.channel
273 }
274
275 pub async fn query_address(&mut self, address: &str) -> Result<(u64, u64), anyhow::Error> {
283 let req = QueryAccountRequest {
284 address: address.to_string(),
285 };
286 let resp = self
287 .auth
288 .account(req)
289 .await?
290 .into_inner()
291 .account
292 .ok_or_else(|| {
293 anyhow::anyhow!("Query account request failure, account should exist")
294 })?;
295
296 let account = BaseAccount::decode(&*resp.value)?;
297 Ok((account.account_number, account.sequence))
298 }
299
300 pub async fn get_account(&mut self, address: &str) -> Result<BaseAccount, anyhow::Error> {
307 let req = QueryAccountRequest {
308 address: address.to_string(),
309 };
310 let resp = self
311 .auth
312 .account(req)
313 .await?
314 .into_inner()
315 .account
316 .ok_or_else(|| {
317 anyhow::anyhow!("Query account request failure, account should exist")
318 })?;
319
320 Ok(BaseAccount::decode(&*resp.value)?)
321 }
322
323 pub async fn get_account_balances(
330 &mut self,
331 address: &str,
332 ) -> Result<Vec<Coin>, anyhow::Error> {
333 let req = QueryAllBalancesRequest {
334 address: address.to_string(),
335 resolve_denom: false,
336 pagination: None,
337 };
338 let balances = self.bank.all_balances(req).await?.into_inner().balances;
339 Ok(balances)
340 }
341
342 pub async fn get_authenticators(
351 &mut self,
352 address: &str,
353 ) -> Result<Vec<AccountAuthenticator>, anyhow::Error> {
354 let req = GetAuthenticatorsRequest {
355 account: address.to_string(),
356 };
357 let resp = self.accountplus.get_authenticators(req).await?.into_inner();
358 Ok(resp.account_authenticators)
359 }
360
361 pub async fn get_node_info(&mut self) -> Result<GetNodeInfoResponse, anyhow::Error> {
367 let req = GetNodeInfoRequest {};
368 let info = self.base.get_node_info(req).await?.into_inner();
369 Ok(info)
370 }
371
372 pub async fn latest_block(&mut self) -> Result<Block, anyhow::Error> {
378 let req = GetLatestBlockRequest::default();
379 let latest_block = self
380 .base
381 .get_latest_block(req)
382 .await?
383 .into_inner()
384 .sdk_block
385 .ok_or_else(|| anyhow::anyhow!("The latest block is empty"))?;
386 Ok(latest_block)
387 }
388
389 pub async fn latest_block_height(&mut self) -> Result<Height, anyhow::Error> {
395 let latest_block = self.latest_block().await?;
396 let header = latest_block
397 .header
398 .ok_or_else(|| anyhow::anyhow!("The block doesn't contain a header"))?;
399 let height = Height(header.height.try_into()?);
400 Ok(height)
401 }
402
403 pub async fn get_perpetuals(&mut self) -> Result<Vec<Perpetual>, anyhow::Error> {
409 let req = QueryAllPerpetualsRequest { pagination: None };
410 let response = self.perpetuals.all_perpetuals(req).await?.into_inner();
411 Ok(response.perpetual)
412 }
413
414 pub async fn get_clob_pairs(&mut self) -> Result<Vec<ClobPair>, anyhow::Error> {
420 let req = QueryAllClobPairRequest { pagination: None };
421 let pairs = self.clob.clob_pair_all(req).await?.into_inner().clob_pair;
422 Ok(pairs)
423 }
424
425 pub async fn get_subaccount(
431 &mut self,
432 address: &str,
433 number: u32,
434 ) -> Result<SubaccountInfo, anyhow::Error> {
435 let req = QueryGetSubaccountRequest {
436 owner: address.to_string(),
437 number,
438 };
439 let subaccount = self
440 .subaccounts
441 .subaccount(req)
442 .await?
443 .into_inner()
444 .subaccount
445 .ok_or_else(|| {
446 anyhow::anyhow!("Subaccount query response does not contain subaccount")
447 })?;
448 Ok(subaccount)
449 }
450
451 #[allow(deprecated)]
457 pub async fn simulate_tx(&mut self, tx_bytes: Vec<u8>) -> Result<u64, anyhow::Error> {
458 let req = SimulateRequest { tx_bytes, tx: None };
459 let gas_used = self
460 .tx
461 .simulate(req)
462 .await?
463 .into_inner()
464 .gas_info
465 .ok_or_else(|| anyhow::anyhow!("Simulation response does not contain gas info"))?
466 .gas_used;
467 Ok(gas_used)
468 }
469
470 pub async fn broadcast_tx(&mut self, tx_bytes: Vec<u8>) -> Result<TxHash, anyhow::Error> {
476 let req = BroadcastTxRequest {
477 tx_bytes,
478 mode: BroadcastMode::Sync as i32,
479 };
480 let response = self.tx.broadcast_tx(req).await?.into_inner();
481
482 if let Some(tx_response) = response.tx_response {
483 if tx_response.code != 0 {
484 anyhow::bail!(
485 "Transaction broadcast failed: code={}, log={}",
486 tx_response.code,
487 tx_response.raw_log
488 );
489 }
490 Ok(tx_response.txhash)
491 } else {
492 Err(anyhow::anyhow!(
493 "Broadcast response does not contain tx_response"
494 ))
495 }
496 }
497
498 pub async fn get_tx(&mut self, hash: &str) -> Result<Tx, anyhow::Error> {
504 let req = GetTxRequest {
505 hash: hash.to_string(),
506 };
507 let response = self.tx.get_tx(req).await?.into_inner();
508
509 if let Some(tx) = response.tx {
510 let tx_bytes = tx.encode_to_vec();
512 Tx::try_from(tx_bytes.as_slice()).map_err(|e| anyhow::anyhow!("{e}"))
513 } else {
514 anyhow::bail!("Transaction not found")
515 }
516 }
517}
518
519#[cfg(test)]
520mod tests {
521 use rstest::rstest;
522
523 use super::*;
524
525 #[rstest]
526 fn test_height_ordering() {
527 let h1 = Height(100);
528 let h2 = Height(200);
529 assert!(h1 < h2);
530 assert_eq!(h1, Height(100));
531 }
532
533 #[tokio::test]
534 async fn test_new_with_fallback_empty_urls() {
535 let result = DydxGrpcClient::new_with_fallback(&[] as &[&str]).await;
536 assert!(result.is_err());
537 if let Err(DydxError::Config(msg)) = result {
538 assert_eq!(msg, "No gRPC URLs provided");
539 } else {
540 panic!("Expected Config error");
541 }
542 }
543
544 #[tokio::test]
545 async fn test_new_with_fallback_invalid_urls() {
546 let invalid_urls = vec!["invalid://bad-url", "http://0.0.0.0:1"];
548 let result = DydxGrpcClient::new_with_fallback(&invalid_urls).await;
549
550 assert!(result.is_err());
552 }
553}