Skip to main content

nautilus_dydx/grpc/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! gRPC client implementation for dYdX v4 protocol.
17//!
18//! This module provides the main gRPC client for interacting with dYdX v4 validator nodes.
19//! It handles transaction signing, broadcasting, and querying account state.
20
21use cosmrs::Tx;
22use prost::Message as ProstMessage;
23use tonic::transport::Channel;
24
25use crate::{
26    error::DydxError,
27    proto::{
28        AccountAuthenticator, AccountPlusClient, GetAuthenticatorsRequest,
29        cosmos_sdk_proto::cosmos::{
30            auth::v1beta1::{
31                BaseAccount, QueryAccountRequest, query_client::QueryClient as AuthClient,
32            },
33            bank::v1beta1::{QueryAllBalancesRequest, query_client::QueryClient as BankClient},
34            base::{
35                tendermint::v1beta1::{
36                    Block, GetLatestBlockRequest, GetNodeInfoRequest, GetNodeInfoResponse,
37                    service_client::ServiceClient as BaseClient,
38                },
39                v1beta1::Coin,
40            },
41            tx::v1beta1::{
42                BroadcastMode, BroadcastTxRequest, GetTxRequest, SimulateRequest,
43                service_client::ServiceClient as TxClient,
44            },
45        },
46        dydxprotocol::{
47            clob::{ClobPair, QueryAllClobPairRequest, query_client::QueryClient as ClobClient},
48            perpetuals::{
49                Perpetual, QueryAllPerpetualsRequest, query_client::QueryClient as PerpetualsClient,
50            },
51            subaccounts::{
52                QueryGetSubaccountRequest, Subaccount as SubaccountInfo,
53                query_client::QueryClient as SubaccountsClient,
54            },
55        },
56    },
57};
58
59/// Transaction hash type (internally uses tendermint::Hash).
60pub type TxHash = String;
61
62/// Block height.
63#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
64pub struct Height(pub u32);
65
66/// gRPC client for dYdX v4 protocol operations.
67///
68/// This client handles:
69/// - Transaction signing and broadcasting.
70/// - Account query operations.
71/// - Order placement and management via Cosmos SDK messages.
72/// - Connection management and automatic failover to fallback nodes.
73#[derive(Debug, Clone)]
74pub struct DydxGrpcClient {
75    channel: Channel,
76    auth: AuthClient<Channel>,
77    bank: BankClient<Channel>,
78    base: BaseClient<Channel>,
79    tx: TxClient<Channel>,
80    clob: ClobClient<Channel>,
81    perpetuals: PerpetualsClient<Channel>,
82    subaccounts: SubaccountsClient<Channel>,
83    accountplus: AccountPlusClient<Channel>,
84    current_url: String,
85}
86
87impl DydxGrpcClient {
88    /// Create a new gRPC client with a single URL.
89    ///
90    /// # Errors
91    ///
92    /// Returns an error if the gRPC connection cannot be established.
93    pub async fn new(grpc_url: String) -> Result<Self, DydxError> {
94        let mut endpoint = Channel::from_shared(grpc_url.clone())
95            .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))?
96            .connect_timeout(std::time::Duration::from_secs(10))
97            .timeout(std::time::Duration::from_secs(30));
98
99        // Enable TLS for HTTPS URLs (required for public gRPC nodes)
100        if grpc_url.starts_with("https://") {
101            let tls = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
102            endpoint = endpoint
103                .tls_config(tls)
104                .map_err(|e| DydxError::Config(format!("TLS config failed: {e}")))?;
105        }
106
107        let channel = endpoint.connect().await.map_err(|e| {
108            DydxError::Grpc(Box::new(tonic::Status::unavailable(format!(
109                "Connection failed: {e}"
110            ))))
111        })?;
112
113        Ok(Self {
114            auth: AuthClient::new(channel.clone()),
115            bank: BankClient::new(channel.clone()),
116            base: BaseClient::new(channel.clone()),
117            tx: TxClient::new(channel.clone()),
118            clob: ClobClient::new(channel.clone()),
119            perpetuals: PerpetualsClient::new(channel.clone()),
120            subaccounts: SubaccountsClient::new(channel.clone()),
121            accountplus: AccountPlusClient::new(channel.clone()),
122            channel,
123            current_url: grpc_url,
124        })
125    }
126
127    /// Create a new gRPC client with fallback support.
128    ///
129    /// Attempts to connect to each URL in the provided list until a successful
130    /// connection is established. This is useful for DEX environments where nodes
131    /// can fail and fallback options are needed.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if none of the provided URLs can establish a connection.
136    pub async fn new_with_fallback(grpc_urls: &[impl AsRef<str>]) -> Result<Self, DydxError> {
137        if grpc_urls.is_empty() {
138            return Err(DydxError::Config("No gRPC URLs provided".to_string()));
139        }
140
141        let mut last_error = None;
142
143        for (idx, url) in grpc_urls.iter().enumerate() {
144            let url_str = url.as_ref();
145            log::debug!(
146                "Attempting to connect to gRPC node: {url_str} (attempt {}/{})",
147                idx + 1,
148                grpc_urls.len()
149            );
150
151            match Self::new(url_str.to_string()).await {
152                Ok(client) => {
153                    log::info!("Successfully connected to gRPC node: {url_str}");
154                    return Ok(client);
155                }
156                Err(e) => {
157                    log::warn!("Failed to connect to gRPC node {url_str}: {e}");
158                    last_error = Some(e);
159                }
160            }
161        }
162
163        Err(last_error.unwrap_or_else(|| {
164            DydxError::Grpc(Box::new(tonic::Status::unavailable(
165                "All gRPC connection attempts failed".to_string(),
166            )))
167        }))
168    }
169
170    /// Reconnect to a different gRPC node from the fallback list.
171    ///
172    /// Attempts to establish a new connection to each URL in the provided list
173    /// until successful. This is useful when the current node fails and you need
174    /// to failover to a different validator node.
175    ///
176    /// # Errors
177    ///
178    /// Returns an error if none of the provided URLs can establish a connection.
179    pub async fn reconnect_with_fallback(
180        &mut self,
181        grpc_urls: &[impl AsRef<str>],
182    ) -> Result<(), DydxError> {
183        if grpc_urls.is_empty() {
184            return Err(DydxError::Config("No gRPC URLs provided".to_string()));
185        }
186
187        let mut last_error = None;
188
189        for (idx, url) in grpc_urls.iter().enumerate() {
190            let url_str = url.as_ref();
191
192            // Skip if it's the same URL we're currently connected to
193            if url_str == self.current_url {
194                log::debug!("Skipping current URL: {url_str}");
195                continue;
196            }
197
198            log::debug!(
199                "Attempting to reconnect to gRPC node: {url_str} (attempt {}/{})",
200                idx + 1,
201                grpc_urls.len()
202            );
203
204            let mut endpoint = match Channel::from_shared(url_str.to_string())
205                .map_err(|e| DydxError::Config(format!("Invalid gRPC URL: {e}")))
206            {
207                Ok(ep) => ep
208                    .connect_timeout(std::time::Duration::from_secs(10))
209                    .timeout(std::time::Duration::from_secs(30)),
210                Err(e) => {
211                    last_error = Some(e);
212                    continue;
213                }
214            };
215
216            // Enable TLS for HTTPS URLs (required for public gRPC nodes)
217            if url_str.starts_with("https://") {
218                let tls = tonic::transport::ClientTlsConfig::new().with_enabled_roots();
219                endpoint = match endpoint.tls_config(tls) {
220                    Ok(ep) => ep,
221                    Err(e) => {
222                        last_error = Some(DydxError::Config(format!("TLS config failed: {e}")));
223                        continue;
224                    }
225                };
226            }
227
228            match endpoint.connect().await {
229                Ok(connected_channel) => {
230                    log::info!("Successfully reconnected to gRPC node: {url_str}");
231
232                    // Update all service clients with the new channel
233                    self.channel = connected_channel.clone();
234                    self.auth = AuthClient::new(connected_channel.clone());
235                    self.bank = BankClient::new(connected_channel.clone());
236                    self.base = BaseClient::new(connected_channel.clone());
237                    self.tx = TxClient::new(connected_channel.clone());
238                    self.clob = ClobClient::new(connected_channel.clone());
239                    self.perpetuals = PerpetualsClient::new(connected_channel.clone());
240                    self.subaccounts = SubaccountsClient::new(connected_channel);
241                    self.current_url = url_str.to_string();
242
243                    return Ok(());
244                }
245                Err(e) => {
246                    log::warn!("Failed to reconnect to gRPC node {url_str}: {e}");
247                    last_error = Some(DydxError::Grpc(Box::new(tonic::Status::unavailable(
248                        format!("Connection failed: {e}"),
249                    ))));
250                }
251            }
252        }
253
254        Err(last_error.unwrap_or_else(|| {
255            DydxError::Grpc(Box::new(tonic::Status::unavailable(
256                "All gRPC reconnection attempts failed".to_string(),
257            )))
258        }))
259    }
260
261    /// Get the currently connected gRPC node URL.
262    #[must_use]
263    pub fn current_url(&self) -> &str {
264        &self.current_url
265    }
266
267    /// Get the underlying gRPC channel.
268    ///
269    /// This can be used to create custom gRPC service clients.
270    #[must_use]
271    pub fn channel(&self) -> &Channel {
272        &self.channel
273    }
274
275    /// Query account information for a given address.
276    ///
277    /// Returns the account number and sequence number needed for transaction signing.
278    ///
279    /// # Errors
280    ///
281    /// Returns an error if the query fails or the account does not exist.
282    pub async fn query_address(&mut self, address: &str) -> Result<(u64, u64), anyhow::Error> {
283        let req = QueryAccountRequest {
284            address: address.to_string(),
285        };
286        let resp = self
287            .auth
288            .account(req)
289            .await?
290            .into_inner()
291            .account
292            .ok_or_else(|| {
293                anyhow::anyhow!("Query account request failure, account should exist")
294            })?;
295
296        let account = BaseAccount::decode(&*resp.value)?;
297        Ok((account.account_number, account.sequence))
298    }
299
300    /// Query for [an account](https://github.com/cosmos/cosmos-sdk/tree/main/x/auth#account-1)
301    /// by its address.
302    ///
303    /// # Errors
304    ///
305    /// Returns an error if the query fails or the account does not exist.
306    pub async fn get_account(&mut self, address: &str) -> Result<BaseAccount, anyhow::Error> {
307        let req = QueryAccountRequest {
308            address: address.to_string(),
309        };
310        let resp = self
311            .auth
312            .account(req)
313            .await?
314            .into_inner()
315            .account
316            .ok_or_else(|| {
317                anyhow::anyhow!("Query account request failure, account should exist")
318            })?;
319
320        Ok(BaseAccount::decode(&*resp.value)?)
321    }
322
323    /// Query for [account balances](https://github.com/cosmos/cosmos-sdk/tree/main/x/bank#allbalances)
324    /// by address for all denominations.
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if the query fails.
329    pub async fn get_account_balances(
330        &mut self,
331        address: &str,
332    ) -> Result<Vec<Coin>, anyhow::Error> {
333        let req = QueryAllBalancesRequest {
334            address: address.to_string(),
335            resolve_denom: false,
336            pagination: None,
337        };
338        let balances = self.bank.all_balances(req).await?.into_inner().balances;
339        Ok(balances)
340    }
341
342    /// Query for authenticators registered for an account.
343    ///
344    /// Authenticators enable permissioned key trading, allowing API wallets
345    /// to sign transactions on behalf of a main account.
346    ///
347    /// # Errors
348    ///
349    /// Returns an error if the query fails.
350    pub async fn get_authenticators(
351        &mut self,
352        address: &str,
353    ) -> Result<Vec<AccountAuthenticator>, anyhow::Error> {
354        let req = GetAuthenticatorsRequest {
355            account: address.to_string(),
356        };
357        let resp = self.accountplus.get_authenticators(req).await?.into_inner();
358        Ok(resp.account_authenticators)
359    }
360
361    /// Query for node info.
362    ///
363    /// # Errors
364    ///
365    /// Returns an error if the query fails.
366    pub async fn get_node_info(&mut self) -> Result<GetNodeInfoResponse, anyhow::Error> {
367        let req = GetNodeInfoRequest {};
368        let info = self.base.get_node_info(req).await?.into_inner();
369        Ok(info)
370    }
371
372    /// Query for the latest block.
373    ///
374    /// # Errors
375    ///
376    /// Returns an error if the query fails.
377    pub async fn latest_block(&mut self) -> Result<Block, anyhow::Error> {
378        let req = GetLatestBlockRequest::default();
379        let latest_block = self
380            .base
381            .get_latest_block(req)
382            .await?
383            .into_inner()
384            .sdk_block
385            .ok_or_else(|| anyhow::anyhow!("The latest block is empty"))?;
386        Ok(latest_block)
387    }
388
389    /// Query for the latest block height.
390    ///
391    /// # Errors
392    ///
393    /// Returns an error if the query fails.
394    pub async fn latest_block_height(&mut self) -> Result<Height, anyhow::Error> {
395        let latest_block = self.latest_block().await?;
396        let header = latest_block
397            .header
398            .ok_or_else(|| anyhow::anyhow!("The block doesn't contain a header"))?;
399        let height = Height(header.height.try_into()?);
400        Ok(height)
401    }
402
403    /// Query for all perpetual markets.
404    ///
405    /// # Errors
406    ///
407    /// Returns an error if the query fails.
408    pub async fn get_perpetuals(&mut self) -> Result<Vec<Perpetual>, anyhow::Error> {
409        let req = QueryAllPerpetualsRequest { pagination: None };
410        let response = self.perpetuals.all_perpetuals(req).await?.into_inner();
411        Ok(response.perpetual)
412    }
413
414    /// Query for all CLOB pairs.
415    ///
416    /// # Errors
417    ///
418    /// Returns an error if the query fails.
419    pub async fn get_clob_pairs(&mut self) -> Result<Vec<ClobPair>, anyhow::Error> {
420        let req = QueryAllClobPairRequest { pagination: None };
421        let pairs = self.clob.clob_pair_all(req).await?.into_inner().clob_pair;
422        Ok(pairs)
423    }
424
425    /// Query for subaccount information.
426    ///
427    /// # Errors
428    ///
429    /// Returns an error if the query fails.
430    pub async fn get_subaccount(
431        &mut self,
432        address: &str,
433        number: u32,
434    ) -> Result<SubaccountInfo, anyhow::Error> {
435        let req = QueryGetSubaccountRequest {
436            owner: address.to_string(),
437            number,
438        };
439        let subaccount = self
440            .subaccounts
441            .subaccount(req)
442            .await?
443            .into_inner()
444            .subaccount
445            .ok_or_else(|| {
446                anyhow::anyhow!("Subaccount query response does not contain subaccount")
447            })?;
448        Ok(subaccount)
449    }
450
451    /// Simulate a transaction to estimate gas usage.
452    ///
453    /// # Errors
454    ///
455    /// Returns an error if simulation fails.
456    #[allow(deprecated)]
457    pub async fn simulate_tx(&mut self, tx_bytes: Vec<u8>) -> Result<u64, anyhow::Error> {
458        let req = SimulateRequest { tx_bytes, tx: None };
459        let gas_used = self
460            .tx
461            .simulate(req)
462            .await?
463            .into_inner()
464            .gas_info
465            .ok_or_else(|| anyhow::anyhow!("Simulation response does not contain gas info"))?
466            .gas_used;
467        Ok(gas_used)
468    }
469
470    /// Broadcast a signed transaction.
471    ///
472    /// # Errors
473    ///
474    /// Returns an error if broadcasting fails.
475    pub async fn broadcast_tx(&mut self, tx_bytes: Vec<u8>) -> Result<TxHash, anyhow::Error> {
476        let req = BroadcastTxRequest {
477            tx_bytes,
478            mode: BroadcastMode::Sync as i32,
479        };
480        let response = self.tx.broadcast_tx(req).await?.into_inner();
481
482        if let Some(tx_response) = response.tx_response {
483            if tx_response.code != 0 {
484                anyhow::bail!(
485                    "Transaction broadcast failed: code={}, log={}",
486                    tx_response.code,
487                    tx_response.raw_log
488                );
489            }
490            Ok(tx_response.txhash)
491        } else {
492            Err(anyhow::anyhow!(
493                "Broadcast response does not contain tx_response"
494            ))
495        }
496    }
497
498    /// Query transaction by hash.
499    ///
500    /// # Errors
501    ///
502    /// Returns an error if the query fails.
503    pub async fn get_tx(&mut self, hash: &str) -> Result<Tx, anyhow::Error> {
504        let req = GetTxRequest {
505            hash: hash.to_string(),
506        };
507        let response = self.tx.get_tx(req).await?.into_inner();
508
509        if let Some(tx) = response.tx {
510            // Convert through bytes since the types are incompatible
511            let tx_bytes = tx.encode_to_vec();
512            Tx::try_from(tx_bytes.as_slice()).map_err(|e| anyhow::anyhow!("{e}"))
513        } else {
514            anyhow::bail!("Transaction not found")
515        }
516    }
517}
518
519#[cfg(test)]
520mod tests {
521    use rstest::rstest;
522
523    use super::*;
524
525    #[rstest]
526    fn test_height_ordering() {
527        let h1 = Height(100);
528        let h2 = Height(200);
529        assert!(h1 < h2);
530        assert_eq!(h1, Height(100));
531    }
532
533    #[tokio::test]
534    async fn test_new_with_fallback_empty_urls() {
535        let result = DydxGrpcClient::new_with_fallback(&[] as &[&str]).await;
536        assert!(result.is_err());
537        if let Err(DydxError::Config(msg)) = result {
538            assert_eq!(msg, "No gRPC URLs provided");
539        } else {
540            panic!("Expected Config error");
541        }
542    }
543
544    #[tokio::test]
545    async fn test_new_with_fallback_invalid_urls() {
546        // Test with invalid URLs that will fail to connect
547        let invalid_urls = vec!["invalid://bad-url", "http://0.0.0.0:1"];
548        let result = DydxGrpcClient::new_with_fallback(&invalid_urls).await;
549
550        // Should fail with either Config or Grpc error
551        assert!(result.is_err());
552    }
553}