nautilus_dydx/execution/tx_manager.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Transaction manager for dYdX v4 protocol.
17//!
18//! This module provides centralized transaction management including:
19//! - Atomic sequence number tracking for stateful (long-term/conditional) orders
20//! - Transaction building and signing
21//! - Chain synchronization for sequence recovery
22//!
23//! # Sequence Management
24//!
25//! dYdX has two transaction types with different sequence behavior:
26//!
27//! - **Stateful orders** (long-term, conditional): Use Cosmos SDK sequences for replay
28//! protection. Each transaction requires a unique, incrementing sequence number.
29//! - **Short-term orders**: Use Good-Til-Block (GTB) for replay protection. The chain's
30//! `ClobDecorator` ante handler skips sequence checking, so sequences are not consumed.
31//! Use [`TransactionManager::get_cached_sequence`] for these — it returns the current value
32//! without incrementing.
33//!
34//! For stateful orders, this module provides:
35//! 1. `AtomicU64` for lock-free sequence allocation via [`TransactionManager::allocate_sequence`]
36//! 2. Lazy initialization from chain on first use
37//! 3. [`TransactionManager::resync_sequence`] for recovery after mismatch errors
38//! 4. Batch allocation via [`TransactionManager::allocate_sequences`] for parallel stateful
39//! broadcasts
40
41use std::sync::{
42 Arc, RwLock,
43 atomic::{AtomicU64, Ordering},
44};
45
46use cosmrs::Any;
47
48use super::{types::PreparedTransaction, wallet::Wallet};
49use crate::{
50 error::DydxError,
51 grpc::{DydxGrpcClient, TxBuilder, types::ChainId},
52 proto::AccountAuthenticator,
53};
54
55/// Sentinel value indicating sequence is uninitialized.
56pub const SEQUENCE_UNINITIALIZED: u64 = u64::MAX;
57
58/// Default fee denomination for dYdX transactions.
59const FEE_DENOM: &str = "adydx";
60
61/// Transaction manager responsible for wallet, sequence tracking, and transaction building.
62///
63/// This is the single source of truth for:
64/// - Wallet and signing operations
65/// - Sequence numbers (ensuring concurrent order operations don't race)
66/// - Authenticator resolution for permissioned key trading
67///
68/// # Thread Safety
69///
70/// All methods are safe to call from multiple tasks concurrently. Sequence
71/// allocation uses atomic compare-exchange operations for lock-free performance.
72#[derive(Debug)]
73pub struct TransactionManager {
74 /// gRPC client for chain queries.
75 grpc_client: DydxGrpcClient,
76 /// Wallet for transaction signing (created from private key).
77 wallet: Wallet,
78 /// Main account address (for account lookups).
79 /// May differ from wallet's signing address when using permissioned keys.
80 wallet_address: String,
81 /// Chain ID for transaction building.
82 chain_id: ChainId,
83 /// Authenticator IDs for permissioned key trading.
84 authenticator_ids: RwLock<Vec<u64>>,
85 /// Atomic sequence counter. Value `SEQUENCE_UNINITIALIZED` means uninitialized.
86 sequence_number: Arc<AtomicU64>,
87 /// Cached account number (never changes for a given address).
88 /// Value 0 means uninitialized.
89 account_number: AtomicU64,
90}
91
92impl TransactionManager {
93 /// Creates a new transaction manager.
94 ///
95 /// Creates wallet from private key internally. The sequence number is initialized
96 /// to `SEQUENCE_UNINITIALIZED` and will be fetched from chain on first use, or
97 /// can be proactively initialized by calling [`Self::initialize_sequence`].
98 ///
99 /// # Errors
100 ///
101 /// Returns error if wallet creation from private key fails.
102 pub fn new(
103 grpc_client: DydxGrpcClient,
104 private_key: &str,
105 wallet_address: String,
106 chain_id: ChainId,
107 ) -> Result<Self, DydxError> {
108 let wallet = Wallet::from_private_key(private_key)
109 .map_err(|e| DydxError::Wallet(format!("Failed to create wallet: {e}")))?;
110
111 Ok(Self {
112 grpc_client,
113 wallet,
114 wallet_address,
115 chain_id,
116 authenticator_ids: RwLock::new(Vec::new()),
117 sequence_number: Arc::new(AtomicU64::new(SEQUENCE_UNINITIALIZED)),
118 account_number: AtomicU64::new(0),
119 })
120 }
121
122 /// Proactively initializes the sequence number from chain.
123 ///
124 /// Call this during connect() to ensure orders can be submitted immediately
125 /// without first-transaction latency penalty. Also catches auth errors early.
126 ///
127 /// Returns the initialized sequence number.
128 ///
129 /// # Errors
130 ///
131 /// Returns error if chain query fails.
132 pub async fn initialize_sequence(&self) -> Result<u64, DydxError> {
133 let mut grpc = self.grpc_client.clone();
134 let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
135 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
136 "Failed to fetch account for sequence init: {e}"
137 ))))
138 })?;
139
140 let chain_seq = base_account.sequence;
141 self.sequence_number.store(chain_seq, Ordering::SeqCst);
142 log::debug!("Initialized sequence from chain: {chain_seq}");
143 Ok(chain_seq)
144 }
145
146 /// Resolves authenticator IDs if using permissioned keys (API wallet).
147 ///
148 /// Compares the wallet's signing address with the main account address.
149 /// If they differ, fetches authenticators from chain and finds the one
150 /// matching this wallet's public key.
151 ///
152 /// Call this during connect() after creating the TransactionManager.
153 ///
154 /// # Errors
155 ///
156 /// Returns error if:
157 /// - Using permissioned key but no authenticators found for main account
158 /// - No authenticator matches the wallet's public key
159 /// - gRPC query fails
160 ///
161 /// # Panics
162 ///
163 /// Panics if the internal `RwLock` is poisoned.
164 pub async fn resolve_authenticators(&self) -> Result<(), DydxError> {
165 // Check if we already have authenticator IDs configured
166 {
167 let ids = self.authenticator_ids.read().expect("RwLock poisoned");
168 if !ids.is_empty() {
169 log::debug!("Using pre-configured authenticator IDs: {:?}", *ids);
170 return Ok(());
171 }
172 }
173
174 // Get the wallet's address (derived from private key)
175 let account = self
176 .wallet
177 .account_offline()
178 .map_err(|e| DydxError::Wallet(format!("Failed to derive account: {e}")))?;
179 let signing_address = account.address.clone();
180 let signing_pubkey = account.public_key();
181
182 // Check if we're using an API wallet (signing address != main account)
183 if signing_address == self.wallet_address {
184 log::debug!(
185 "Signing wallet matches main account {}, no authenticator needed",
186 self.wallet_address
187 );
188 return Ok(());
189 }
190
191 log::info!(
192 "Detected permissioned key setup: signing with {} for main account {}",
193 signing_address,
194 self.wallet_address
195 );
196
197 // Fetch authenticators for the main account
198 let mut grpc = self.grpc_client.clone();
199 let authenticators = grpc
200 .get_authenticators(&self.wallet_address)
201 .await
202 .map_err(|e| {
203 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
204 "Failed to fetch authenticators from chain: {e}"
205 ))))
206 })?;
207
208 if authenticators.is_empty() {
209 return Err(DydxError::Config(format!(
210 "No authenticators found for {}. \
211 Please create an API Trading Key in the dYdX UI first.",
212 self.wallet_address
213 )));
214 }
215
216 log::debug!(
217 "Found {} authenticator(s) for {}",
218 authenticators.len(),
219 self.wallet_address
220 );
221
222 // Find authenticators matching the API wallet's public key
223 let signing_pubkey_bytes = signing_pubkey.to_bytes();
224 let signing_pubkey_b64 = base64::Engine::encode(
225 &base64::engine::general_purpose::STANDARD,
226 &signing_pubkey_bytes,
227 );
228
229 let mut matching_ids = Vec::new();
230
231 for auth in &authenticators {
232 if Self::authenticator_matches_pubkey(auth, &signing_pubkey_b64) {
233 matching_ids.push(auth.id);
234 log::info!("Found matching authenticator: id={}", auth.id);
235 }
236 }
237
238 if matching_ids.is_empty() {
239 return Err(DydxError::Config(format!(
240 "No authenticator matches the API wallet's public key. \
241 Ensure the API Trading Key was created for wallet {}. \
242 Available authenticators: {:?}",
243 signing_address,
244 authenticators.iter().map(|a| a.id).collect::<Vec<_>>()
245 )));
246 }
247
248 // Store the resolved authenticator IDs
249 {
250 let mut ids = self.authenticator_ids.write().expect("RwLock poisoned");
251 *ids = matching_ids.clone();
252 }
253 log::info!("Resolved authenticator IDs: {matching_ids:?}");
254
255 Ok(())
256 }
257
258 /// Checks if an authenticator contains a SignatureVerification matching the public key.
259 ///
260 /// Expected authenticator config format (JSON array of sub-authenticators):
261 /// ```json
262 /// [{"type": "SignatureVerification", "config": "<base64-pubkey>"}, ...]
263 /// ```
264 fn authenticator_matches_pubkey(auth: &AccountAuthenticator, pubkey_b64: &str) -> bool {
265 #[derive(serde::Deserialize)]
266 struct SubAuth {
267 #[serde(rename = "type")]
268 auth_type: String,
269 config: String,
270 }
271
272 // auth.config is raw bytes (Vec<u8>) containing JSON
273 let config_str = match String::from_utf8(auth.config.clone()) {
274 Ok(s) => s,
275 Err(e) => {
276 log::warn!(
277 "Authenticator id={} has invalid UTF-8 config (len={}): {}",
278 auth.id,
279 auth.config.len(),
280 e
281 );
282 return false;
283 }
284 };
285
286 log::debug!(
287 "Checking authenticator id={}, type={}, config={}",
288 auth.id,
289 auth.r#type,
290 config_str
291 );
292
293 match serde_json::from_str::<Vec<SubAuth>>(&config_str) {
294 Ok(sub_auths) => {
295 for sub in sub_auths {
296 log::debug!(
297 " Sub-authenticator: type={}, config={}",
298 sub.auth_type,
299 sub.config
300 );
301
302 if sub.auth_type == "SignatureVerification" && sub.config == pubkey_b64 {
303 log::debug!(" -> MATCH! pubkey_b64={pubkey_b64}");
304 return true;
305 }
306 }
307 }
308 Err(e) => {
309 log::warn!(
310 "Authenticator id={} config is not in expected JSON array format: {} (config={})",
311 auth.id,
312 e,
313 config_str
314 );
315 }
316 }
317
318 false
319 }
320
321 /// Allocates the next sequence number atomically.
322 ///
323 /// If the sequence is uninitialized (0), fetches from chain first.
324 /// Uses compare-exchange for lock-free concurrent access.
325 ///
326 /// # Errors
327 ///
328 /// Returns error if chain query fails during initialization.
329 pub async fn allocate_sequence(&self) -> Result<u64, DydxError> {
330 loop {
331 let current = self.sequence_number.load(Ordering::SeqCst);
332 if current == SEQUENCE_UNINITIALIZED {
333 // Initialize from chain
334 self.initialize_sequence_from_chain().await?;
335 continue;
336 }
337 // Atomic get-and-increment
338 if self
339 .sequence_number
340 .compare_exchange(current, current + 1, Ordering::SeqCst, Ordering::SeqCst)
341 .is_ok()
342 {
343 return Ok(current);
344 }
345 // Another thread modified it, retry
346 }
347 }
348
349 /// Allocates N sequence numbers for optimistic parallel broadcast.
350 ///
351 /// Returns a vector of consecutive sequences that can be used concurrently.
352 /// The caller is responsible for handling partial failures by resyncing.
353 ///
354 /// # Arguments
355 ///
356 /// * `count` - Number of sequences to allocate
357 ///
358 /// # Errors
359 ///
360 /// Returns error if chain query fails during initialization.
361 ///
362 /// # Example
363 ///
364 /// ```ignore
365 /// let sequences = tx_manager.allocate_sequences(3).await?;
366 /// // sequences = [10, 11, 12] - three consecutive sequence numbers
367 /// ```
368 pub async fn allocate_sequences(&self, count: usize) -> Result<Vec<u64>, DydxError> {
369 if count == 0 {
370 return Ok(Vec::new());
371 }
372
373 loop {
374 let current = self.sequence_number.load(Ordering::SeqCst);
375 if current == SEQUENCE_UNINITIALIZED {
376 self.initialize_sequence_from_chain().await?;
377 continue;
378 }
379 let new_value = current + count as u64;
380
381 if self
382 .sequence_number
383 .compare_exchange(current, new_value, Ordering::SeqCst, Ordering::SeqCst)
384 .is_ok()
385 {
386 return Ok((current..new_value).collect());
387 }
388 // Another thread modified it, retry
389 }
390 }
391
392 /// Initializes the sequence counter from chain state.
393 ///
394 /// Only sets the value if it's still 0 (another thread might have set it).
395 async fn initialize_sequence_from_chain(&self) -> Result<(), DydxError> {
396 let mut grpc = self.grpc_client.clone();
397 let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
398 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
399 "Failed to fetch account for sequence init: {e}"
400 ))))
401 })?;
402
403 let chain_seq = base_account.sequence;
404 // Only set if still uninitialized (another thread might have set it)
405 if self
406 .sequence_number
407 .compare_exchange(
408 SEQUENCE_UNINITIALIZED,
409 chain_seq,
410 Ordering::SeqCst,
411 Ordering::SeqCst,
412 )
413 .is_ok()
414 {
415 log::debug!("Initialized sequence from chain: {chain_seq}");
416 }
417 Ok(())
418 }
419
420 /// Resyncs the sequence counter from chain after a mismatch error.
421 ///
422 /// Called by the broadcaster's retry logic when a sequence mismatch is detected.
423 /// Unconditionally stores the chain's current sequence.
424 ///
425 /// # Errors
426 ///
427 /// Returns error if chain query fails.
428 pub async fn resync_sequence(&self) -> Result<(), DydxError> {
429 let mut grpc = self.grpc_client.clone();
430 let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
431 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
432 "Failed to fetch account for resync: {e}"
433 ))))
434 })?;
435
436 let chain_seq = base_account.sequence;
437 self.sequence_number.store(chain_seq, Ordering::SeqCst);
438 log::info!("Resynced sequence from chain: {chain_seq}");
439 Ok(())
440 }
441
442 /// Returns the current sequence value without allocation.
443 ///
444 /// Useful for logging and debugging. Returns `SEQUENCE_UNINITIALIZED` if not yet initialized.
445 #[must_use]
446 pub fn current_sequence(&self) -> u64 {
447 self.sequence_number.load(Ordering::SeqCst)
448 }
449
450 /// Returns the cached sequence for short-term orders without incrementing.
451 ///
452 /// # Errors
453 ///
454 /// Returns error if chain query fails during initialization.
455 pub async fn get_cached_sequence(&self) -> Result<u64, DydxError> {
456 let current = self.sequence_number.load(Ordering::SeqCst);
457 if current == SEQUENCE_UNINITIALIZED {
458 self.initialize_sequence_from_chain().await?;
459 return Ok(self.sequence_number.load(Ordering::SeqCst));
460 }
461 Ok(current)
462 }
463
464 /// Builds and signs a transaction with the given messages and sequence.
465 ///
466 /// Uses cached account_number (fetched once from chain) to avoid repeated queries.
467 ///
468 /// # Arguments
469 ///
470 /// * `msgs` - Proto messages to include in transaction
471 /// * `sequence` - Pre-allocated sequence number
472 /// * `operation` - Human-readable name for logging
473 ///
474 /// # Errors
475 ///
476 /// Returns error if account lookup fails or transaction building fails.
477 ///
478 /// # Panics
479 ///
480 /// Panics if the internal `RwLock` is poisoned.
481 pub async fn build_transaction(
482 &self,
483 msgs: Vec<Any>,
484 sequence: u64,
485 operation: &str,
486 ) -> Result<PreparedTransaction, DydxError> {
487 // Derive account for signing (address/account_id are cached in wallet)
488 let mut account = self
489 .wallet
490 .account_offline()
491 .map_err(|e| DydxError::Wallet(format!("Failed to derive account: {e}")))?;
492
493 // Read authenticator IDs (resolved during connect if using permissioned keys)
494 let auth_ids_snapshot: Vec<u64> = {
495 let ids = self.authenticator_ids.read().expect("RwLock poisoned");
496 ids.clone()
497 };
498
499 if !auth_ids_snapshot.is_empty() {
500 log::debug!(
501 "Using permissioned key mode: signing with {} for main account {}",
502 account.address,
503 self.wallet_address
504 );
505 }
506
507 // Get or cache account number (it never changes for a given address)
508 let account_num = self.get_or_fetch_account_number().await?;
509
510 // Set account info for signing
511 account.set_account_info(account_num, sequence);
512
513 // Build transaction
514 let tx_builder =
515 TxBuilder::new(self.chain_id.clone(), FEE_DENOM.to_string()).map_err(|e| {
516 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
517 "TxBuilder init failed: {e}"
518 ))))
519 })?;
520
521 // For permissioned key trading, each message needs an authenticator ID.
522 // Repeat the configured authenticator ID(s) for each message in the batch.
523 let expanded_auth_ids: Vec<u64> = if auth_ids_snapshot.is_empty() {
524 Vec::new()
525 } else {
526 // For each message, use the first authenticator ID
527 // (typically there's only one configured for the trading key)
528 std::iter::repeat_n(auth_ids_snapshot[0], msgs.len()).collect()
529 };
530
531 let auth_ids = if expanded_auth_ids.is_empty() {
532 None
533 } else {
534 Some(expanded_auth_ids.as_slice())
535 };
536
537 let tx_raw = tx_builder
538 .build_transaction(&account, msgs, None, auth_ids)
539 .map_err(|e| {
540 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
541 "Failed to build tx: {e}"
542 ))))
543 })?;
544
545 let tx_bytes = tx_raw.to_bytes().map_err(|e| {
546 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
547 "Failed to serialize tx: {e}"
548 ))))
549 })?;
550
551 log::debug!(
552 "Built {} with {} bytes, sequence={}",
553 operation,
554 tx_bytes.len(),
555 sequence
556 );
557
558 Ok(PreparedTransaction {
559 tx_bytes,
560 sequence,
561 operation: operation.to_string(),
562 })
563 }
564
565 /// Gets the cached account number, or fetches it from chain if not yet cached.
566 ///
567 /// Account numbers are immutable on-chain, so we only need to fetch once.
568 async fn get_or_fetch_account_number(&self) -> Result<u64, DydxError> {
569 let cached = self.account_number.load(Ordering::SeqCst);
570 if cached != 0 {
571 return Ok(cached);
572 }
573
574 // Fetch from chain
575 let mut grpc = self.grpc_client.clone();
576 let base_account = grpc.get_account(&self.wallet_address).await.map_err(|e| {
577 DydxError::Grpc(Box::new(tonic::Status::internal(format!(
578 "Failed to fetch account: {e}"
579 ))))
580 })?;
581
582 let account_num = base_account.account_number;
583
584 // Cache it (CAS to handle concurrent fetches)
585 let _ = self.account_number.compare_exchange(
586 0,
587 account_num,
588 Ordering::SeqCst,
589 Ordering::SeqCst,
590 );
591
592 log::debug!("Cached account_number from chain: {account_num}");
593 Ok(account_num)
594 }
595
596 /// Convenience method: allocate sequence, build, and return prepared transaction.
597 ///
598 /// This is the typical flow for single transaction submission.
599 ///
600 /// # Arguments
601 ///
602 /// * `msgs` - Proto messages to include in transaction
603 /// * `operation` - Human-readable name for logging
604 ///
605 /// # Errors
606 ///
607 /// Returns error if sequence allocation or transaction building fails.
608 pub async fn prepare_transaction(
609 &self,
610 msgs: Vec<Any>,
611 operation: &str,
612 ) -> Result<PreparedTransaction, DydxError> {
613 let sequence = self.allocate_sequence().await?;
614 self.build_transaction(msgs, sequence, operation).await
615 }
616
617 /// Returns the wallet address.
618 #[must_use]
619 pub fn wallet_address(&self) -> &str {
620 &self.wallet_address
621 }
622
623 /// Returns the chain ID.
624 #[must_use]
625 pub fn chain_id(&self) -> &ChainId {
626 &self.chain_id
627 }
628}