Skip to main content

nautilus_dydx/execution/
broadcaster.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Transaction broadcaster for dYdX v4 protocol.
17//!
18//! This module handles gRPC transmission of transactions with automatic retry
19//! on sequence mismatch errors. It works in conjunction with `TransactionManager`
20//! to provide reliable transaction delivery.
21//!
22//! # Retry Logic
23//!
24//! Uses the battle-tested [`RetryManager`] from `nautilus-network` with exponential
25//! backoff. When a transaction fails with "account sequence mismatch" (Cosmos SDK
26//! error code 32), the broadcaster:
27//!
28//! 1. Resyncs the sequence counter from chain
29//! 2. Rebuilds the transaction with the new sequence
30//! 3. Applies exponential backoff (500ms → 1s → 2s → 4s)
31//! 4. Retries up to 5 times
32//!
33//! This handles the case where multiple transactions are submitted in parallel
34//! and one succeeds before the other, invalidating the sequence.
35
36use std::sync::{
37    Arc,
38    atomic::{AtomicBool, Ordering},
39};
40
41use cosmrs::Any;
42use nautilus_network::{
43    ratelimiter::{RateLimiter, clock::MonotonicClock, quota::Quota},
44    retry::{RetryConfig, RetryManager},
45};
46
47use super::{tx_manager::TransactionManager, types::PreparedTransaction};
48use crate::{error::DydxError, grpc::DydxGrpcClient};
49
50/// Maximum retries for sequence mismatch errors.
51pub const MAX_SEQUENCE_RETRIES: u32 = 5;
52
53// Initial delay between retries in milliseconds.
54// Exponential backoff will increase this: 500 → 1000 → 2000 → 4000ms
55const INITIAL_RETRY_DELAY_MS: u64 = 500;
56
57// Maximum delay between retries in milliseconds
58const MAX_RETRY_DELAY_MS: u64 = 4_000;
59
60// Maximum total time for all retries in milliseconds (10 seconds).
61// Prevents indefinite retry loops during chain congestion.
62const MAX_ELAPSED_MS: u64 = 10_000;
63
64/// Creates a retry manager configured for blockchain transaction broadcasting.
65///
66/// Configuration optimized for Cosmos SDK sequence management:
67/// - 5 retries with exponential backoff (500ms → 4s max)
68/// - Small jitter (100ms) to avoid thundering herd
69/// - No operation timeout (chain responses can be slow)
70/// - 10 second total budget to prevent indefinite waits
71#[must_use]
72pub fn create_tx_retry_manager() -> RetryManager<DydxError> {
73    let config = RetryConfig {
74        max_retries: MAX_SEQUENCE_RETRIES,
75        initial_delay_ms: INITIAL_RETRY_DELAY_MS,
76        max_delay_ms: MAX_RETRY_DELAY_MS,
77        backoff_factor: 2.0,
78        jitter_ms: 100,
79        operation_timeout_ms: None, // Blockchain responses can be slow
80        immediate_first: false,     // Always wait before retry (block needs time)
81        max_elapsed_ms: Some(MAX_ELAPSED_MS),
82    };
83    RetryManager::new(config)
84}
85
86// Rate limiter key for gRPC broadcast calls
87const GRPC_RATE_LIMIT_KEY: &str = "grpc";
88
89/// Transaction broadcaster responsible for gRPC transmission with retry logic.
90///
91/// Works with `TransactionManager` to handle sequence mismatch errors gracefully.
92/// Uses [`RetryManager`] with exponential backoff for reliable delivery.
93///
94/// # Broadcast Modes
95///
96/// ## Stateful Orders (long-term/conditional): `broadcast_with_retry`
97///
98/// Serialized through a semaphore to prevent sequence races. Cosmos SDK requires
99/// stateful transactions to have unique, incrementing sequence numbers. The semaphore
100/// ensures allocate → build → broadcast happens atomically for each operation.
101///
102/// On sequence mismatch (Cosmos SDK error code 32 or dYdX code 104):
103/// 1. The `should_retry` callback sets a flag indicating resync is needed
104/// 2. The `RetryManager` applies exponential backoff
105/// 3. On next attempt, the operation checks the flag and resyncs sequence from chain
106/// 4. A new transaction is built with the fresh sequence and broadcast
107///
108/// ## Short-term Orders: `broadcast_short_term`
109///
110/// dYdX short-term orders use Good-Til-Block (GTB) for replay protection instead of
111/// Cosmos SDK sequences. The chain's `ClobDecorator` ante handler skips sequence
112/// checking for short-term messages. This means:
113/// - No semaphore needed (fully concurrent)
114/// - Cached sequence used (no increment, no allocation)
115/// - No sequence-based retry logic needed
116#[derive(Debug)]
117pub struct TxBroadcaster {
118    /// gRPC client for broadcasting transactions.
119    grpc_client: DydxGrpcClient,
120    /// Retry manager for handling transient failures.
121    retry_manager: RetryManager<DydxError>,
122    /// Semaphore for serializing broadcasts (permits=1 acts as mutex).
123    /// Ensures sequence allocation → build → broadcast are atomic.
124    broadcast_semaphore: Arc<tokio::sync::Semaphore>,
125    /// Rate limiter for gRPC broadcast calls.
126    rate_limiter: Arc<RateLimiter<&'static str, MonotonicClock>>,
127}
128
129impl TxBroadcaster {
130    /// Creates a new transaction broadcaster.
131    #[must_use]
132    pub fn new(grpc_client: DydxGrpcClient, grpc_quota: Option<Quota>) -> Self {
133        let rate_limiter = Arc::new(RateLimiter::new_with_quota(grpc_quota, vec![]));
134        Self {
135            grpc_client,
136            retry_manager: create_tx_retry_manager(),
137            broadcast_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
138            rate_limiter,
139        }
140    }
141
142    async fn wait_for_rate_limit(&self) {
143        self.rate_limiter
144            .until_key_ready(&GRPC_RATE_LIMIT_KEY)
145            .await;
146    }
147
148    /// Broadcasts a prepared transaction with automatic retry on sequence mismatch.
149    ///
150    /// **Serialization**: Acquires a semaphore permit before allocating sequence,
151    /// building, and broadcasting. This ensures transactions are broadcast in
152    /// sequence order, preventing "sequence mismatch" errors from concurrent calls.
153    ///
154    /// On sequence mismatch (code=32), resyncs from chain, allocates new sequence,
155    /// rebuilds the transaction, and retries with exponential backoff.
156    ///
157    /// # Arguments
158    ///
159    /// * `tx_manager` - Transaction manager for sequence resync and rebuilding
160    /// * `msgs` - Original messages to rebuild on retry
161    /// * `operation` - Human-readable operation name for logging
162    ///
163    /// # Returns
164    ///
165    /// The transaction hash on success.
166    ///
167    /// # Errors
168    ///
169    /// Returns error if all retries are exhausted or a non-retryable error occurs.
170    pub async fn broadcast_with_retry(
171        &self,
172        tx_manager: &TransactionManager,
173        msgs: Vec<Any>,
174        operation_name: &str,
175    ) -> Result<String, DydxError> {
176        // Acquire semaphore to serialize broadcasts.
177        // This ensures sequence N is fully broadcast before sequence N+1 is allocated.
178        let _permit =
179            self.broadcast_semaphore.acquire().await.map_err(|e| {
180                DydxError::Nautilus(anyhow::anyhow!("Broadcast semaphore closed: {e}"))
181            })?;
182
183        log::debug!("Acquired broadcast permit for {operation_name}");
184
185        // Flag to track if we need to resync sequence before the next attempt.
186        // Set by should_retry when a sequence mismatch is detected.
187        let needs_resync = Arc::new(AtomicBool::new(false));
188        let needs_resync_for_retry = Arc::clone(&needs_resync);
189
190        // Clone values that need to be moved into closures
191        let grpc_client = self.grpc_client.clone();
192        let rate_limiter = Arc::clone(&self.rate_limiter);
193        let op_name = operation_name.to_string();
194
195        let operation = || {
196            // Clone captures for the async block
197            let needs_resync = Arc::clone(&needs_resync);
198            let grpc_client = grpc_client.clone();
199            let rate_limiter = Arc::clone(&rate_limiter);
200            let msgs = msgs.clone();
201            let op_name = op_name.clone();
202
203            async move {
204                // Resync sequence if previous attempt failed with sequence mismatch
205                if needs_resync.swap(false, Ordering::SeqCst) {
206                    log::debug!("Resyncing sequence from chain before retry");
207                    tx_manager.resync_sequence().await?;
208                }
209
210                // Prepare transaction (allocates new sequence)
211                let prepared = tx_manager.prepare_transaction(msgs, &op_name).await?;
212
213                // Wait for rate limiter before gRPC call
214                rate_limiter.until_key_ready(&GRPC_RATE_LIMIT_KEY).await;
215
216                // Broadcast
217                let mut grpc = grpc_client;
218                let tx_hash = grpc.broadcast_tx(prepared.tx_bytes).await.map_err(|e| {
219                    log::error!("gRPC broadcast failed for {op_name}: {e}");
220                    DydxError::Nautilus(e)
221                })?;
222
223                log::debug!("{op_name} successfully: tx_hash={tx_hash}");
224                Ok(tx_hash)
225            }
226        };
227
228        let should_retry = move |e: &DydxError| -> bool {
229            if e.is_sequence_mismatch() {
230                // Set flag so next attempt will resync
231                needs_resync_for_retry.store(true, Ordering::SeqCst);
232                log::warn!("Sequence mismatch detected, will resync and retry");
233                true
234            } else if e.is_transient() {
235                // Also resync on transient errors (timeout, unavailable).
236                // Without this, each retry allocates a NEW sequence, causing drift
237                // (e.g., timeout → alloc 314, timeout → alloc 315, then sequence mismatch).
238                needs_resync_for_retry.store(true, Ordering::SeqCst);
239                log::warn!("Transient error detected, will resync and retry: {e}");
240                true
241            } else {
242                false
243            }
244        };
245
246        let create_error = |msg: String| -> DydxError { DydxError::Nautilus(anyhow::anyhow!(msg)) };
247
248        // Permit is held throughout retry loop, released when _permit drops
249        self.retry_manager
250            .execute_with_retry(operation_name, operation, should_retry, create_error)
251            .await
252    }
253
254    /// Broadcasts a short-term order transaction without sequence management.
255    ///
256    /// Short-term orders use Good-Til-Block (GTB) for replay protection, so sequence
257    /// numbers are not incremented. All short-term broadcasts use a cached sequence.
258    ///
259    /// Benign cancel errors are treated as success (the cancel is already handled):
260    /// - code=19: Transaction already in mempool cache (duplicate tx)
261    /// - code=9: Cancel already exists in memclob with >= GoodTilBlock
262    /// - code=3006: Order to cancel does not exist (already filled/expired/cancelled)
263    ///
264    /// # Errors
265    ///
266    /// Returns error if building or broadcasting fails (excluding benign cancel errors).
267    pub async fn broadcast_short_term(
268        &self,
269        tx_manager: &TransactionManager,
270        msgs: Vec<Any>,
271        operation_name: &str,
272    ) -> Result<String, DydxError> {
273        let cached_sequence = tx_manager.get_cached_sequence().await?;
274        let prepared = tx_manager
275            .build_transaction(msgs, cached_sequence, operation_name)
276            .await?;
277
278        // Wait for rate limiter before gRPC call
279        self.wait_for_rate_limit().await;
280
281        let mut grpc = self.grpc_client.clone();
282        match grpc.broadcast_tx(prepared.tx_bytes).await {
283            Ok(tx_hash) => {
284                log::debug!("{operation_name} successfully: tx_hash={tx_hash}");
285                Ok(tx_hash)
286            }
287            Err(e) => {
288                let dydx_err = DydxError::Nautilus(e);
289                if dydx_err.is_benign_cancel_error() {
290                    log::debug!(
291                        "{operation_name}: benign cancel error, treating as success: {dydx_err}"
292                    );
293                    Ok(String::new())
294                } else {
295                    log::error!("gRPC broadcast failed for {operation_name}: {dydx_err}");
296                    Err(dydx_err)
297                }
298            }
299        }
300    }
301
302    /// Broadcasts a prepared transaction without retry.
303    ///
304    /// Use this for optimistic batching where you handle failures externally,
305    /// or when you've already prepared a transaction and want direct control.
306    ///
307    /// # Returns
308    ///
309    /// The transaction hash on success.
310    ///
311    /// # Errors
312    ///
313    /// Returns error if the gRPC broadcast fails.
314    pub async fn broadcast_once(
315        &self,
316        prepared: &PreparedTransaction,
317    ) -> Result<String, DydxError> {
318        // Wait for rate limiter before gRPC call
319        self.wait_for_rate_limit().await;
320
321        let mut grpc = self.grpc_client.clone();
322        let operation = &prepared.operation;
323
324        let tx_hash = grpc
325            .broadcast_tx(prepared.tx_bytes.clone())
326            .await
327            .map_err(|e| {
328                log::error!("gRPC broadcast failed for {operation}: {e}");
329                DydxError::Nautilus(e)
330            })?;
331
332        log::debug!("{operation} successfully: tx_hash={tx_hash}");
333        Ok(tx_hash)
334    }
335}