nautilus_dydx/execution/broadcaster.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Transaction broadcaster for dYdX v4 protocol.
17//!
18//! This module handles gRPC transmission of transactions with automatic retry
19//! on sequence mismatch errors. It works in conjunction with `TransactionManager`
20//! to provide reliable transaction delivery.
21//!
22//! # Retry Logic
23//!
24//! Uses the battle-tested [`RetryManager`] from `nautilus-network` with exponential
25//! backoff. When a transaction fails with "account sequence mismatch" (Cosmos SDK
26//! error code 32), the broadcaster:
27//!
28//! 1. Resyncs the sequence counter from chain
29//! 2. Rebuilds the transaction with the new sequence
30//! 3. Applies exponential backoff (500ms → 1s → 2s → 4s)
31//! 4. Retries up to 5 times
32//!
33//! This handles the case where multiple transactions are submitted in parallel
34//! and one succeeds before the other, invalidating the sequence.
35
36use std::sync::{
37 Arc,
38 atomic::{AtomicBool, Ordering},
39};
40
41use cosmrs::Any;
42use nautilus_network::{
43 ratelimiter::{RateLimiter, clock::MonotonicClock, quota::Quota},
44 retry::{RetryConfig, RetryManager},
45};
46
47use super::{tx_manager::TransactionManager, types::PreparedTransaction};
48use crate::{error::DydxError, grpc::DydxGrpcClient};
49
50/// Maximum retries for sequence mismatch errors.
51pub const MAX_SEQUENCE_RETRIES: u32 = 5;
52
53// Initial delay between retries in milliseconds.
54// Exponential backoff will increase this: 500 → 1000 → 2000 → 4000ms
55const INITIAL_RETRY_DELAY_MS: u64 = 500;
56
57// Maximum delay between retries in milliseconds
58const MAX_RETRY_DELAY_MS: u64 = 4_000;
59
60// Maximum total time for all retries in milliseconds (10 seconds).
61// Prevents indefinite retry loops during chain congestion.
62const MAX_ELAPSED_MS: u64 = 10_000;
63
64/// Creates a retry manager configured for blockchain transaction broadcasting.
65///
66/// Configuration optimized for Cosmos SDK sequence management:
67/// - 5 retries with exponential backoff (500ms → 4s max)
68/// - Small jitter (100ms) to avoid thundering herd
69/// - No operation timeout (chain responses can be slow)
70/// - 10 second total budget to prevent indefinite waits
71#[must_use]
72pub fn create_tx_retry_manager() -> RetryManager<DydxError> {
73 let config = RetryConfig {
74 max_retries: MAX_SEQUENCE_RETRIES,
75 initial_delay_ms: INITIAL_RETRY_DELAY_MS,
76 max_delay_ms: MAX_RETRY_DELAY_MS,
77 backoff_factor: 2.0,
78 jitter_ms: 100,
79 operation_timeout_ms: None, // Blockchain responses can be slow
80 immediate_first: false, // Always wait before retry (block needs time)
81 max_elapsed_ms: Some(MAX_ELAPSED_MS),
82 };
83 RetryManager::new(config)
84}
85
86// Rate limiter key for gRPC broadcast calls
87const GRPC_RATE_LIMIT_KEY: &str = "grpc";
88
89/// Transaction broadcaster responsible for gRPC transmission with retry logic.
90///
91/// Works with `TransactionManager` to handle sequence mismatch errors gracefully.
92/// Uses [`RetryManager`] with exponential backoff for reliable delivery.
93///
94/// # Broadcast Modes
95///
96/// ## Stateful Orders (long-term/conditional): `broadcast_with_retry`
97///
98/// Serialized through a semaphore to prevent sequence races. Cosmos SDK requires
99/// stateful transactions to have unique, incrementing sequence numbers. The semaphore
100/// ensures allocate → build → broadcast happens atomically for each operation.
101///
102/// On sequence mismatch (Cosmos SDK error code 32 or dYdX code 104):
103/// 1. The `should_retry` callback sets a flag indicating resync is needed
104/// 2. The `RetryManager` applies exponential backoff
105/// 3. On next attempt, the operation checks the flag and resyncs sequence from chain
106/// 4. A new transaction is built with the fresh sequence and broadcast
107///
108/// ## Short-term Orders: `broadcast_short_term`
109///
110/// dYdX short-term orders use Good-Til-Block (GTB) for replay protection instead of
111/// Cosmos SDK sequences. The chain's `ClobDecorator` ante handler skips sequence
112/// checking for short-term messages. This means:
113/// - No semaphore needed (fully concurrent)
114/// - Cached sequence used (no increment, no allocation)
115/// - No sequence-based retry logic needed
116#[derive(Debug)]
117pub struct TxBroadcaster {
118 /// gRPC client for broadcasting transactions.
119 grpc_client: DydxGrpcClient,
120 /// Retry manager for handling transient failures.
121 retry_manager: RetryManager<DydxError>,
122 /// Semaphore for serializing broadcasts (permits=1 acts as mutex).
123 /// Ensures sequence allocation → build → broadcast are atomic.
124 broadcast_semaphore: Arc<tokio::sync::Semaphore>,
125 /// Rate limiter for gRPC broadcast calls.
126 rate_limiter: Arc<RateLimiter<&'static str, MonotonicClock>>,
127}
128
129impl TxBroadcaster {
130 /// Creates a new transaction broadcaster.
131 #[must_use]
132 pub fn new(grpc_client: DydxGrpcClient, grpc_quota: Option<Quota>) -> Self {
133 let rate_limiter = Arc::new(RateLimiter::new_with_quota(grpc_quota, vec![]));
134 Self {
135 grpc_client,
136 retry_manager: create_tx_retry_manager(),
137 broadcast_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
138 rate_limiter,
139 }
140 }
141
142 async fn wait_for_rate_limit(&self) {
143 self.rate_limiter
144 .until_key_ready(&GRPC_RATE_LIMIT_KEY)
145 .await;
146 }
147
148 /// Broadcasts a prepared transaction with automatic retry on sequence mismatch.
149 ///
150 /// **Serialization**: Acquires a semaphore permit before allocating sequence,
151 /// building, and broadcasting. This ensures transactions are broadcast in
152 /// sequence order, preventing "sequence mismatch" errors from concurrent calls.
153 ///
154 /// On sequence mismatch (code=32), resyncs from chain, allocates new sequence,
155 /// rebuilds the transaction, and retries with exponential backoff.
156 ///
157 /// # Arguments
158 ///
159 /// * `tx_manager` - Transaction manager for sequence resync and rebuilding
160 /// * `msgs` - Original messages to rebuild on retry
161 /// * `operation` - Human-readable operation name for logging
162 ///
163 /// # Returns
164 ///
165 /// The transaction hash on success.
166 ///
167 /// # Errors
168 ///
169 /// Returns error if all retries are exhausted or a non-retryable error occurs.
170 pub async fn broadcast_with_retry(
171 &self,
172 tx_manager: &TransactionManager,
173 msgs: Vec<Any>,
174 operation_name: &str,
175 ) -> Result<String, DydxError> {
176 // Acquire semaphore to serialize broadcasts.
177 // This ensures sequence N is fully broadcast before sequence N+1 is allocated.
178 let _permit =
179 self.broadcast_semaphore.acquire().await.map_err(|e| {
180 DydxError::Nautilus(anyhow::anyhow!("Broadcast semaphore closed: {e}"))
181 })?;
182
183 log::debug!("Acquired broadcast permit for {operation_name}");
184
185 // Flag to track if we need to resync sequence before the next attempt.
186 // Set by should_retry when a sequence mismatch is detected.
187 let needs_resync = Arc::new(AtomicBool::new(false));
188 let needs_resync_for_retry = Arc::clone(&needs_resync);
189
190 // Clone values that need to be moved into closures
191 let grpc_client = self.grpc_client.clone();
192 let rate_limiter = Arc::clone(&self.rate_limiter);
193 let op_name = operation_name.to_string();
194
195 let operation = || {
196 // Clone captures for the async block
197 let needs_resync = Arc::clone(&needs_resync);
198 let grpc_client = grpc_client.clone();
199 let rate_limiter = Arc::clone(&rate_limiter);
200 let msgs = msgs.clone();
201 let op_name = op_name.clone();
202
203 async move {
204 // Resync sequence if previous attempt failed with sequence mismatch
205 if needs_resync.swap(false, Ordering::SeqCst) {
206 log::debug!("Resyncing sequence from chain before retry");
207 tx_manager.resync_sequence().await?;
208 }
209
210 // Prepare transaction (allocates new sequence)
211 let prepared = tx_manager.prepare_transaction(msgs, &op_name).await?;
212
213 // Wait for rate limiter before gRPC call
214 rate_limiter.until_key_ready(&GRPC_RATE_LIMIT_KEY).await;
215
216 // Broadcast
217 let mut grpc = grpc_client;
218 let tx_hash = grpc.broadcast_tx(prepared.tx_bytes).await.map_err(|e| {
219 log::error!("gRPC broadcast failed for {op_name}: {e}");
220 DydxError::Nautilus(e)
221 })?;
222
223 log::debug!("{op_name} successfully: tx_hash={tx_hash}");
224 Ok(tx_hash)
225 }
226 };
227
228 let should_retry = move |e: &DydxError| -> bool {
229 if e.is_sequence_mismatch() {
230 // Set flag so next attempt will resync
231 needs_resync_for_retry.store(true, Ordering::SeqCst);
232 log::warn!("Sequence mismatch detected, will resync and retry");
233 true
234 } else if e.is_transient() {
235 // Also resync on transient errors (timeout, unavailable).
236 // Without this, each retry allocates a NEW sequence, causing drift
237 // (e.g., timeout → alloc 314, timeout → alloc 315, then sequence mismatch).
238 needs_resync_for_retry.store(true, Ordering::SeqCst);
239 log::warn!("Transient error detected, will resync and retry: {e}");
240 true
241 } else {
242 false
243 }
244 };
245
246 let create_error = |msg: String| -> DydxError { DydxError::Nautilus(anyhow::anyhow!(msg)) };
247
248 // Permit is held throughout retry loop, released when _permit drops
249 self.retry_manager
250 .execute_with_retry(operation_name, operation, should_retry, create_error)
251 .await
252 }
253
254 /// Broadcasts a short-term order transaction without sequence management.
255 ///
256 /// Short-term orders use Good-Til-Block (GTB) for replay protection, so sequence
257 /// numbers are not incremented. All short-term broadcasts use a cached sequence.
258 ///
259 /// Benign cancel errors are treated as success (the cancel is already handled):
260 /// - code=19: Transaction already in mempool cache (duplicate tx)
261 /// - code=9: Cancel already exists in memclob with >= GoodTilBlock
262 /// - code=3006: Order to cancel does not exist (already filled/expired/cancelled)
263 ///
264 /// # Errors
265 ///
266 /// Returns error if building or broadcasting fails (excluding benign cancel errors).
267 pub async fn broadcast_short_term(
268 &self,
269 tx_manager: &TransactionManager,
270 msgs: Vec<Any>,
271 operation_name: &str,
272 ) -> Result<String, DydxError> {
273 let cached_sequence = tx_manager.get_cached_sequence().await?;
274 let prepared = tx_manager
275 .build_transaction(msgs, cached_sequence, operation_name)
276 .await?;
277
278 // Wait for rate limiter before gRPC call
279 self.wait_for_rate_limit().await;
280
281 let mut grpc = self.grpc_client.clone();
282 match grpc.broadcast_tx(prepared.tx_bytes).await {
283 Ok(tx_hash) => {
284 log::debug!("{operation_name} successfully: tx_hash={tx_hash}");
285 Ok(tx_hash)
286 }
287 Err(e) => {
288 let dydx_err = DydxError::Nautilus(e);
289 if dydx_err.is_benign_cancel_error() {
290 log::debug!(
291 "{operation_name}: benign cancel error, treating as success: {dydx_err}"
292 );
293 Ok(String::new())
294 } else {
295 log::error!("gRPC broadcast failed for {operation_name}: {dydx_err}");
296 Err(dydx_err)
297 }
298 }
299 }
300 }
301
302 /// Broadcasts a prepared transaction without retry.
303 ///
304 /// Use this for optimistic batching where you handle failures externally,
305 /// or when you've already prepared a transaction and want direct control.
306 ///
307 /// # Returns
308 ///
309 /// The transaction hash on success.
310 ///
311 /// # Errors
312 ///
313 /// Returns error if the gRPC broadcast fails.
314 pub async fn broadcast_once(
315 &self,
316 prepared: &PreparedTransaction,
317 ) -> Result<String, DydxError> {
318 // Wait for rate limiter before gRPC call
319 self.wait_for_rate_limit().await;
320
321 let mut grpc = self.grpc_client.clone();
322 let operation = &prepared.operation;
323
324 let tx_hash = grpc
325 .broadcast_tx(prepared.tx_bytes.clone())
326 .await
327 .map_err(|e| {
328 log::error!("gRPC broadcast failed for {operation}: {e}");
329 DydxError::Nautilus(e)
330 })?;
331
332 log::debug!("{operation} successfully: tx_hash={tx_hash}");
333 Ok(tx_hash)
334 }
335}