Skip to main content

nautilus_dydx/execution/
tx_manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Transaction manager for dYdX v4 protocol.
17//!
18//! This module provides centralized transaction management including:
19//! - Atomic sequence number tracking for stateful (long-term/conditional) orders
20//! - Transaction building and signing
21//! - Chain synchronization for sequence recovery
22//!
23//! # Sequence Management
24//!
25//! dYdX has two transaction types with different sequence behavior:
26//!
27//! - **Stateful orders** (long-term, conditional): Use Cosmos SDK sequences for replay
28//!   protection. Each transaction requires a unique, incrementing sequence number.
29//! - **Short-term orders**: Use Good-Til-Block (GTB) for replay protection. The chain's
30//!   `ClobDecorator` ante handler skips sequence checking, so sequences are not consumed.
31//!   Use [`TransactionManager::get_cached_sequence`] for these — it returns the current value
32//!   without incrementing.
33//!
34//! For stateful orders, this module provides:
35//! 1. `AtomicU64` for lock-free sequence allocation via [`TransactionManager::allocate_sequence`]
36//! 2. Lazy initialization from chain on first use
37//! 3. [`TransactionManager::resync_sequence`] for recovery after mismatch errors
38//! 4. Batch allocation via [`TransactionManager::allocate_sequences`] for parallel stateful
39//!    broadcasts
40
41use std::sync::{
42    Arc, RwLock,
43    atomic::{AtomicU64, Ordering},
44};
45
46use cosmrs::Any;
47
48use super::{types::PreparedTransaction, wallet::Wallet};
49use crate::{
50    error::DydxError,
51    grpc::{DydxGrpcClient, TxBuilder, types::ChainId},
52    proto::AccountAuthenticator,
53};
54
55/// Sentinel value indicating sequence is uninitialized.
56pub const SEQUENCE_UNINITIALIZED: u64 = u64::MAX;
57
58/// Default fee denomination for dYdX transactions.
59const FEE_DENOM: &str = "adydx";
60
61/// Transaction manager responsible for wallet, sequence tracking, and transaction building.
62///
63/// This is the single source of truth for:
64/// - Wallet and signing operations
65/// - Sequence numbers (ensuring concurrent order operations don't race)
66/// - Authenticator resolution for permissioned key trading
67///
68/// # Thread Safety
69///
70/// All methods are safe to call from multiple tasks concurrently. Sequence
71/// allocation uses atomic compare-exchange operations for lock-free performance.
72#[derive(Debug)]
73pub struct TransactionManager {
74    /// gRPC client for chain queries.
75    grpc_client: DydxGrpcClient,
76    /// Wallet for transaction signing (created from private key).
77    wallet: Wallet,
78    /// Main account address (for account lookups).
79    /// May differ from wallet's signing address when using permissioned keys.
80    wallet_address: String,
81    /// Chain ID for transaction building.
82    chain_id: ChainId,
83    /// Authenticator IDs for permissioned key trading.
84    authenticator_ids: RwLock<Vec<u64>>,
85    /// Atomic sequence counter. Value `SEQUENCE_UNINITIALIZED` means uninitialized.
86    sequence_number: Arc<AtomicU64>,
87    /// Cached account number (never changes for a given address).
88    /// Value 0 means uninitialized.
89    account_number: AtomicU64,
90}
91
92impl TransactionManager {
93    /// Creates a new transaction manager.
94    ///
95    /// Creates wallet from private key internally. The sequence number is initialized
96    /// to `SEQUENCE_UNINITIALIZED` and will be fetched from chain on first use, or
97    /// can be proactively initialized by calling [`Self::initialize_sequence`].
98    ///
99    /// # Errors
100    ///
101    /// Returns error if wallet creation from private key fails.
102    pub fn new(
103        grpc_client: DydxGrpcClient,
104        private_key: &str,
105        wallet_address: String,
106        chain_id: ChainId,
107    ) -> Result<Self, DydxError> {
108        let wallet = Wallet::from_private_key(private_key)
109            .map_err(|e| DydxError::Wallet(format!("Failed to create wallet: {e}")))?;
110
111        Ok(Self {
112            grpc_client,
113            wallet,
114            wallet_address,
115            chain_id,
116            authenticator_ids: RwLock::new(Vec::new()),
117            sequence_number: Arc::new(AtomicU64::new(SEQUENCE_UNINITIALIZED)),
118            account_number: AtomicU64::new(0),
119        })
120    }
121
122    /// Proactively initializes the sequence number from chain.
123    ///
124    /// Call this during connect() to ensure orders can be submitted immediately
125    /// without first-transaction latency penalty. Also catches auth errors early.
126    ///
127    /// Returns the initialized sequence number.
128    ///
129    /// # Errors
130    ///
131    /// Returns error if chain query fails.
132    pub async fn initialize_sequence(&self) -> Result<u64, DydxError> {
133        let mut grpc = self.grpc_client.clone();
134        let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
135            DydxError::Grpc(Box::new(tonic::Status::internal(format!(
136                "Failed to fetch account for sequence init: {e}"
137            ))))
138        })?;
139
140        let chain_seq = base_account.sequence;
141        self.sequence_number.store(chain_seq, Ordering::SeqCst);
142        log::debug!("Initialized sequence from chain: {chain_seq}");
143        Ok(chain_seq)
144    }
145
146    /// Resolves authenticator IDs if using permissioned keys (API wallet).
147    ///
148    /// Compares the wallet's signing address with the main account address.
149    /// If they differ, fetches authenticators from chain and finds the one
150    /// matching this wallet's public key.
151    ///
152    /// Call this during connect() after creating the TransactionManager.
153    ///
154    /// # Errors
155    ///
156    /// Returns error if:
157    /// - Using permissioned key but no authenticators found for main account
158    /// - No authenticator matches the wallet's public key
159    /// - gRPC query fails
160    ///
161    /// # Panics
162    ///
163    /// Panics if the internal `RwLock` is poisoned.
164    pub async fn resolve_authenticators(&self) -> Result<(), DydxError> {
165        // Check if we already have authenticator IDs configured
166        {
167            let ids = self.authenticator_ids.read().expect("RwLock poisoned");
168            if !ids.is_empty() {
169                log::debug!("Using pre-configured authenticator IDs: {:?}", *ids);
170                return Ok(());
171            }
172        }
173
174        // Get the wallet's address (derived from private key)
175        let account = self
176            .wallet
177            .account_offline()
178            .map_err(|e| DydxError::Wallet(format!("Failed to derive account: {e}")))?;
179        let signing_address = account.address.clone();
180        let signing_pubkey = account.public_key();
181
182        // Check if we're using an API wallet (signing address != main account)
183        if signing_address == self.wallet_address {
184            log::debug!(
185                "Signing wallet matches main account {}, no authenticator needed",
186                self.wallet_address
187            );
188            return Ok(());
189        }
190
191        log::info!(
192            "Detected permissioned key setup: signing with {} for main account {}",
193            signing_address,
194            self.wallet_address
195        );
196
197        // Fetch authenticators for the main account
198        let mut grpc = self.grpc_client.clone();
199        let authenticators = grpc
200            .get_authenticators(&self.wallet_address)
201            .await
202            .map_err(|e| {
203                DydxError::Grpc(Box::new(tonic::Status::internal(format!(
204                    "Failed to fetch authenticators from chain: {e}"
205                ))))
206            })?;
207
208        if authenticators.is_empty() {
209            return Err(DydxError::Config(format!(
210                "No authenticators found for {}. \
211                 Please create an API Trading Key in the dYdX UI first.",
212                self.wallet_address
213            )));
214        }
215
216        log::debug!(
217            "Found {} authenticator(s) for {}",
218            authenticators.len(),
219            self.wallet_address
220        );
221
222        // Find authenticators matching the API wallet's public key
223        let signing_pubkey_bytes = signing_pubkey.to_bytes();
224        let signing_pubkey_b64 = base64::Engine::encode(
225            &base64::engine::general_purpose::STANDARD,
226            &signing_pubkey_bytes,
227        );
228
229        let mut matching_ids = Vec::new();
230
231        for auth in &authenticators {
232            if Self::authenticator_matches_pubkey(auth, &signing_pubkey_b64) {
233                matching_ids.push(auth.id);
234                log::info!("Found matching authenticator: id={}", auth.id);
235            }
236        }
237
238        if matching_ids.is_empty() {
239            return Err(DydxError::Config(format!(
240                "No authenticator matches the API wallet's public key. \
241                 Ensure the API Trading Key was created for wallet {}. \
242                 Available authenticators: {:?}",
243                signing_address,
244                authenticators.iter().map(|a| a.id).collect::<Vec<_>>()
245            )));
246        }
247
248        // Store the resolved authenticator IDs
249        {
250            let mut ids = self.authenticator_ids.write().expect("RwLock poisoned");
251            *ids = matching_ids.clone();
252        }
253        log::info!("Resolved authenticator IDs: {matching_ids:?}");
254
255        Ok(())
256    }
257
258    /// Checks if an authenticator contains a SignatureVerification matching the public key.
259    ///
260    /// Expected authenticator config format (JSON array of sub-authenticators):
261    /// ```json
262    /// [{"type": "SignatureVerification", "config": "<base64-pubkey>"}, ...]
263    /// ```
264    fn authenticator_matches_pubkey(auth: &AccountAuthenticator, pubkey_b64: &str) -> bool {
265        #[derive(serde::Deserialize)]
266        struct SubAuth {
267            #[serde(rename = "type")]
268            auth_type: String,
269            config: String,
270        }
271
272        // auth.config is raw bytes (Vec<u8>) containing JSON
273        let config_str = match String::from_utf8(auth.config.clone()) {
274            Ok(s) => s,
275            Err(e) => {
276                log::warn!(
277                    "Authenticator id={} has invalid UTF-8 config (len={}): {}",
278                    auth.id,
279                    auth.config.len(),
280                    e
281                );
282                return false;
283            }
284        };
285
286        log::debug!(
287            "Checking authenticator id={}, type={}, config={}",
288            auth.id,
289            auth.r#type,
290            config_str
291        );
292
293        match serde_json::from_str::<Vec<SubAuth>>(&config_str) {
294            Ok(sub_auths) => {
295                for sub in sub_auths {
296                    log::debug!(
297                        "  Sub-authenticator: type={}, config={}",
298                        sub.auth_type,
299                        sub.config
300                    );
301
302                    if sub.auth_type == "SignatureVerification" && sub.config == pubkey_b64 {
303                        log::debug!("  -> MATCH! pubkey_b64={pubkey_b64}");
304                        return true;
305                    }
306                }
307            }
308            Err(e) => {
309                log::warn!(
310                    "Authenticator id={} config is not in expected JSON array format: {} (config={})",
311                    auth.id,
312                    e,
313                    config_str
314                );
315            }
316        }
317
318        false
319    }
320
321    /// Allocates the next sequence number atomically.
322    ///
323    /// If the sequence is uninitialized (0), fetches from chain first.
324    /// Uses compare-exchange for lock-free concurrent access.
325    ///
326    /// # Errors
327    ///
328    /// Returns error if chain query fails during initialization.
329    pub async fn allocate_sequence(&self) -> Result<u64, DydxError> {
330        loop {
331            let current = self.sequence_number.load(Ordering::SeqCst);
332            if current == SEQUENCE_UNINITIALIZED {
333                // Initialize from chain
334                self.initialize_sequence_from_chain().await?;
335                continue;
336            }
337            // Atomic get-and-increment
338            if self
339                .sequence_number
340                .compare_exchange(current, current + 1, Ordering::SeqCst, Ordering::SeqCst)
341                .is_ok()
342            {
343                return Ok(current);
344            }
345            // Another thread modified it, retry
346        }
347    }
348
349    /// Allocates N sequence numbers for optimistic parallel broadcast.
350    ///
351    /// Returns a vector of consecutive sequences that can be used concurrently.
352    /// The caller is responsible for handling partial failures by resyncing.
353    ///
354    /// # Arguments
355    ///
356    /// * `count` - Number of sequences to allocate
357    ///
358    /// # Errors
359    ///
360    /// Returns error if chain query fails during initialization.
361    ///
362    /// # Example
363    ///
364    /// ```ignore
365    /// let sequences = tx_manager.allocate_sequences(3).await?;
366    /// // sequences = [10, 11, 12] - three consecutive sequence numbers
367    /// ```
368    pub async fn allocate_sequences(&self, count: usize) -> Result<Vec<u64>, DydxError> {
369        if count == 0 {
370            return Ok(Vec::new());
371        }
372
373        loop {
374            let current = self.sequence_number.load(Ordering::SeqCst);
375            if current == SEQUENCE_UNINITIALIZED {
376                self.initialize_sequence_from_chain().await?;
377                continue;
378            }
379            let new_value = current + count as u64;
380
381            if self
382                .sequence_number
383                .compare_exchange(current, new_value, Ordering::SeqCst, Ordering::SeqCst)
384                .is_ok()
385            {
386                return Ok((current..new_value).collect());
387            }
388            // Another thread modified it, retry
389        }
390    }
391
392    /// Initializes the sequence counter from chain state.
393    ///
394    /// Only sets the value if it's still 0 (another thread might have set it).
395    async fn initialize_sequence_from_chain(&self) -> Result<(), DydxError> {
396        let mut grpc = self.grpc_client.clone();
397        let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
398            DydxError::Grpc(Box::new(tonic::Status::internal(format!(
399                "Failed to fetch account for sequence init: {e}"
400            ))))
401        })?;
402
403        let chain_seq = base_account.sequence;
404        // Only set if still uninitialized (another thread might have set it)
405        if self
406            .sequence_number
407            .compare_exchange(
408                SEQUENCE_UNINITIALIZED,
409                chain_seq,
410                Ordering::SeqCst,
411                Ordering::SeqCst,
412            )
413            .is_ok()
414        {
415            log::debug!("Initialized sequence from chain: {chain_seq}");
416        }
417        Ok(())
418    }
419
420    /// Resyncs the sequence counter from chain after a mismatch error.
421    ///
422    /// Called by the broadcaster's retry logic when a sequence mismatch is detected.
423    /// Unconditionally stores the chain's current sequence.
424    ///
425    /// # Errors
426    ///
427    /// Returns error if chain query fails.
428    pub async fn resync_sequence(&self) -> Result<(), DydxError> {
429        let mut grpc = self.grpc_client.clone();
430        let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
431            DydxError::Grpc(Box::new(tonic::Status::internal(format!(
432                "Failed to fetch account for resync: {e}"
433            ))))
434        })?;
435
436        let chain_seq = base_account.sequence;
437        self.sequence_number.store(chain_seq, Ordering::SeqCst);
438        log::info!("Resynced sequence from chain: {chain_seq}");
439        Ok(())
440    }
441
442    /// Returns the current sequence value without allocation.
443    ///
444    /// Useful for logging and debugging. Returns `SEQUENCE_UNINITIALIZED` if not yet initialized.
445    #[must_use]
446    pub fn current_sequence(&self) -> u64 {
447        self.sequence_number.load(Ordering::SeqCst)
448    }
449
450    /// Returns the cached sequence for short-term orders without incrementing.
451    ///
452    /// # Errors
453    ///
454    /// Returns error if chain query fails during initialization.
455    pub async fn get_cached_sequence(&self) -> Result<u64, DydxError> {
456        let current = self.sequence_number.load(Ordering::SeqCst);
457        if current == SEQUENCE_UNINITIALIZED {
458            self.initialize_sequence_from_chain().await?;
459            return Ok(self.sequence_number.load(Ordering::SeqCst));
460        }
461        Ok(current)
462    }
463
464    /// Builds and signs a transaction with the given messages and sequence.
465    ///
466    /// Uses cached account_number (fetched once from chain) to avoid repeated queries.
467    ///
468    /// # Arguments
469    ///
470    /// * `msgs` - Proto messages to include in transaction
471    /// * `sequence` - Pre-allocated sequence number
472    /// * `operation` - Human-readable name for logging
473    ///
474    /// # Errors
475    ///
476    /// Returns error if account lookup fails or transaction building fails.
477    ///
478    /// # Panics
479    ///
480    /// Panics if the internal `RwLock` is poisoned.
481    pub async fn build_transaction(
482        &self,
483        msgs: Vec<Any>,
484        sequence: u64,
485        operation: &str,
486    ) -> Result<PreparedTransaction, DydxError> {
487        // Derive account for signing (address/account_id are cached in wallet)
488        let mut account = self
489            .wallet
490            .account_offline()
491            .map_err(|e| DydxError::Wallet(format!("Failed to derive account: {e}")))?;
492
493        // Read authenticator IDs (resolved during connect if using permissioned keys)
494        let auth_ids_snapshot: Vec<u64> = {
495            let ids = self.authenticator_ids.read().expect("RwLock poisoned");
496            ids.clone()
497        };
498
499        if !auth_ids_snapshot.is_empty() {
500            log::debug!(
501                "Using permissioned key mode: signing with {} for main account {}",
502                account.address,
503                self.wallet_address
504            );
505        }
506
507        // Get or cache account number (it never changes for a given address)
508        let account_num = self.get_or_fetch_account_number().await?;
509
510        // Set account info for signing
511        account.set_account_info(account_num, sequence);
512
513        // Build transaction
514        let tx_builder =
515            TxBuilder::new(self.chain_id.clone(), FEE_DENOM.to_string()).map_err(|e| {
516                DydxError::Grpc(Box::new(tonic::Status::internal(format!(
517                    "TxBuilder init failed: {e}"
518                ))))
519            })?;
520
521        // For permissioned key trading, each message needs an authenticator ID.
522        // Repeat the configured authenticator ID(s) for each message in the batch.
523        let expanded_auth_ids: Vec<u64> = if auth_ids_snapshot.is_empty() {
524            Vec::new()
525        } else {
526            // For each message, use the first authenticator ID
527            // (typically there's only one configured for the trading key)
528            std::iter::repeat_n(auth_ids_snapshot[0], msgs.len()).collect()
529        };
530
531        let auth_ids = if expanded_auth_ids.is_empty() {
532            None
533        } else {
534            Some(expanded_auth_ids.as_slice())
535        };
536
537        let tx_raw = tx_builder
538            .build_transaction(&account, msgs, None, auth_ids)
539            .map_err(|e| {
540                DydxError::Grpc(Box::new(tonic::Status::internal(format!(
541                    "Failed to build tx: {e}"
542                ))))
543            })?;
544
545        let tx_bytes = tx_raw.to_bytes().map_err(|e| {
546            DydxError::Grpc(Box::new(tonic::Status::internal(format!(
547                "Failed to serialize tx: {e}"
548            ))))
549        })?;
550
551        log::debug!(
552            "Built {} with {} bytes, sequence={}",
553            operation,
554            tx_bytes.len(),
555            sequence
556        );
557
558        Ok(PreparedTransaction {
559            tx_bytes,
560            sequence,
561            operation: operation.to_string(),
562        })
563    }
564
565    /// Gets the cached account number, or fetches it from chain if not yet cached.
566    ///
567    /// Account numbers are immutable on-chain, so we only need to fetch once.
568    async fn get_or_fetch_account_number(&self) -> Result<u64, DydxError> {
569        let cached = self.account_number.load(Ordering::SeqCst);
570        if cached != 0 {
571            return Ok(cached);
572        }
573
574        // Fetch from chain
575        let mut grpc = self.grpc_client.clone();
576        let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
577            DydxError::Grpc(Box::new(tonic::Status::internal(format!(
578                "Failed to fetch account: {e}"
579            ))))
580        })?;
581
582        let account_num = base_account.account_number;
583
584        // Cache it (CAS to handle concurrent fetches)
585        let _ = self.account_number.compare_exchange(
586            0,
587            account_num,
588            Ordering::SeqCst,
589            Ordering::SeqCst,
590        );
591
592        log::debug!("Cached account_number from chain: {account_num}");
593        Ok(account_num)
594    }
595
596    /// Convenience method: allocate sequence, build, and return prepared transaction.
597    ///
598    /// This is the typical flow for single transaction submission.
599    ///
600    /// # Arguments
601    ///
602    /// * `msgs` - Proto messages to include in transaction
603    /// * `operation` - Human-readable name for logging
604    ///
605    /// # Errors
606    ///
607    /// Returns error if sequence allocation or transaction building fails.
608    pub async fn prepare_transaction(
609        &self,
610        msgs: Vec<Any>,
611        operation: &str,
612    ) -> Result<PreparedTransaction, DydxError> {
613        let sequence = self.allocate_sequence().await?;
614        self.build_transaction(msgs, sequence, operation).await
615    }
616
617    /// Returns the wallet address.
618    #[must_use]
619    pub fn wallet_address(&self) -> &str {
620        &self.wallet_address
621    }
622
623    /// Returns the chain ID.
624    #[must_use]
625    pub fn chain_id(&self) -> &ChainId {
626        &self.chain_id
627    }
628}