Skip to main content

nautilus_dydx/execution/
submitter.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Order submission facade for dYdX v4.
17//!
18//! This module provides [`OrderSubmitter`], a unified facade for submitting orders to dYdX.
19//! It internally uses the extracted components:
20//! - [`TransactionManager`]: Sequence tracking and transaction signing
21//! - [`TxBroadcaster`]: gRPC broadcast with retry logic
22//! - [`OrderMessageBuilder`]: Proto message construction
23//!
24//! The wallet is owned internally by `TransactionManager`, so method signatures
25//! don't require passing `&wallet` on each call.
26
27use std::sync::Arc;
28
29use nautilus_common::live::get_runtime;
30use nautilus_model::{
31    enums::{OrderSide, TimeInForce},
32    identifiers::InstrumentId,
33    types::{Price, Quantity},
34};
35use nautilus_network::ratelimiter::quota::Quota;
36
37use crate::{
38    error::DydxError,
39    execution::{
40        block_time::BlockTimeMonitor,
41        broadcaster::TxBroadcaster,
42        order_builder::OrderMessageBuilder,
43        tx_manager::TransactionManager,
44        types::{ConditionalOrderType, LimitOrderParams, OrderLifetime},
45    },
46    grpc::{DydxGrpcClient, types::ChainId},
47    http::client::DydxHttpClient,
48};
49
50/// Order submission facade for dYdX v4.
51///
52/// Provides a clean API for order submission, internally coordinating:
53/// - [`TransactionManager`]: Owns wallet, handles sequence + signing
54/// - [`TxBroadcaster`]: Handles gRPC broadcast with retry
55/// - [`OrderMessageBuilder`]: Constructs proto messages
56///
57/// # Wallet Ownership
58///
59/// The wallet is owned by `TransactionManager` (passed at construction via `private_key`).
60/// This eliminates the need to pass `&wallet` to every method.
61///
62/// # Block Time Monitor
63///
64/// `block_time_monitor` provides current block height and dynamic block time estimation.
65/// Updated externally by WebSocket, read by order methods.
66///
67/// # Thread Safety
68///
69/// All methods are safe to call from multiple tasks concurrently.
70#[derive(Debug)]
71pub struct OrderSubmitter {
72    /// Transaction manager - owns wallet, handles sequence and signing.
73    tx_manager: Arc<TransactionManager>,
74    /// Transaction broadcaster with retry logic.
75    broadcaster: Arc<TxBroadcaster>,
76    /// Order message builder for proto construction.
77    order_builder: Arc<OrderMessageBuilder>,
78    /// Block time monitor - provides current height and block time estimation.
79    block_time_monitor: Arc<BlockTimeMonitor>,
80}
81
82impl OrderSubmitter {
83    /// Creates a new order submitter with wallet owned internally.
84    ///
85    /// # Arguments
86    ///
87    /// * `grpc_client` - gRPC client for chain queries and broadcasting
88    /// * `http_client` - HTTP client (provides market params cache)
89    /// * `private_key` - Private key (hex-encoded) - wallet created internally
90    /// * `wallet_address` - Main account address (may differ from derived address for permissioned keys)
91    /// * `subaccount_number` - dYdX subaccount number (typically 0)
92    /// * `chain_id` - dYdX chain ID
93    /// * `block_time_monitor` - Block time monitor (provides current height and dynamic block time)
94    /// * `grpc_quota` - Optional rate limit quota for gRPC calls
95    ///
96    /// # Errors
97    ///
98    /// Returns error if wallet creation from private key fails.
99    #[expect(clippy::too_many_arguments)]
100    pub fn new(
101        grpc_client: DydxGrpcClient,
102        http_client: DydxHttpClient,
103        private_key: &str,
104        wallet_address: String,
105        subaccount_number: u32,
106        chain_id: ChainId,
107        block_time_monitor: Arc<BlockTimeMonitor>,
108        grpc_quota: Option<Quota>,
109    ) -> Result<Self, DydxError> {
110        // Create transaction manager (owns wallet and sequence management)
111        let tx_manager = Arc::new(TransactionManager::new(
112            grpc_client.clone(),
113            private_key,
114            wallet_address.clone(),
115            chain_id,
116        )?);
117
118        let broadcaster = Arc::new(TxBroadcaster::new(grpc_client, grpc_quota));
119
120        let order_builder = Arc::new(OrderMessageBuilder::new(
121            http_client,
122            wallet_address,
123            subaccount_number,
124            block_time_monitor.clone(),
125        ));
126
127        Ok(Self {
128            tx_manager,
129            broadcaster,
130            order_builder,
131            block_time_monitor,
132        })
133    }
134
135    /// Creates a new order submitter from pre-built components.
136    ///
137    /// Use this when you already have initialized components (e.g., from `DydxExecutionClient`).
138    pub fn from_components(
139        tx_manager: Arc<TransactionManager>,
140        broadcaster: Arc<TxBroadcaster>,
141        order_builder: Arc<OrderMessageBuilder>,
142        block_time_monitor: Arc<BlockTimeMonitor>,
143    ) -> Self {
144        Self {
145            tx_manager,
146            broadcaster,
147            order_builder,
148            block_time_monitor,
149        }
150    }
151
152    /// Returns the current block height.
153    #[must_use]
154    pub fn current_block_height(&self) -> u32 {
155        self.block_time_monitor.current_block_height() as u32
156    }
157
158    /// Returns a reference to the block time monitor.
159    #[must_use]
160    pub fn block_time_monitor(&self) -> &BlockTimeMonitor {
161        &self.block_time_monitor
162    }
163
164    /// Returns the wallet address.
165    #[must_use]
166    pub fn wallet_address(&self) -> &str {
167        self.tx_manager.wallet_address()
168    }
169
170    /// Returns a reference to the order builder.
171    #[must_use]
172    pub fn order_builder(&self) -> &OrderMessageBuilder {
173        &self.order_builder
174    }
175
176    /// Returns a reference to the transaction manager.
177    #[must_use]
178    pub fn tx_manager(&self) -> &TransactionManager {
179        &self.tx_manager
180    }
181
182    /// Submits a market order to dYdX via gRPC.
183    ///
184    /// Market orders execute immediately at the best available price.
185    /// Block height is read from the shared `block_height` state.
186    ///
187    /// # Returns
188    ///
189    /// The transaction hash on success.
190    ///
191    /// # Errors
192    ///
193    /// Returns `DydxError` if gRPC submission fails.
194    pub async fn submit_market_order(
195        &self,
196        instrument_id: InstrumentId,
197        client_order_id: u32,
198        client_metadata: u32,
199        side: OrderSide,
200        quantity: Quantity,
201    ) -> Result<String, DydxError> {
202        log::debug!(
203            "Submitting market order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, quantity={quantity}"
204        );
205
206        let block_height = self.current_block_height();
207
208        // Build proto message
209        let msg = self.order_builder.build_market_order(
210            instrument_id,
211            client_order_id,
212            client_metadata,
213            side,
214            quantity,
215            block_height,
216        )?;
217
218        // Market orders are always short-term — use cached sequence (no increment)
219        let operation = format!("Submit market order {client_order_id}");
220        let tx_hash = self
221            .broadcaster
222            .broadcast_short_term(&self.tx_manager, vec![msg], &operation)
223            .await?;
224
225        Ok(tx_hash)
226    }
227
228    /// Submits a limit order to dYdX via gRPC.
229    ///
230    /// Limit orders execute only at the specified price or better.
231    /// Block height is read from the shared `block_height` state.
232    ///
233    /// # Returns
234    ///
235    /// The transaction hash on success.
236    ///
237    /// # Errors
238    ///
239    /// Returns `DydxError` if gRPC submission fails.
240    #[expect(clippy::too_many_arguments)]
241    pub async fn submit_limit_order(
242        &self,
243        instrument_id: InstrumentId,
244        client_order_id: u32,
245        client_metadata: u32,
246        side: OrderSide,
247        price: Price,
248        quantity: Quantity,
249        time_in_force: TimeInForce,
250        post_only: bool,
251        reduce_only: bool,
252        expire_time: Option<i64>,
253    ) -> Result<String, DydxError> {
254        log::debug!(
255            "Submitting limit order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, price={price}, \
256             quantity={quantity}, tif={time_in_force:?}, post_only={post_only}, reduce_only={reduce_only}"
257        );
258
259        let block_height = self.current_block_height();
260
261        // Build proto message
262        let msg = self.order_builder.build_limit_order(
263            instrument_id,
264            client_order_id,
265            client_metadata,
266            side,
267            price,
268            quantity,
269            time_in_force,
270            post_only,
271            reduce_only,
272            block_height,
273            expire_time,
274        )?;
275
276        // Determine if short-term based on time_in_force and expire_time
277        let is_short_term = OrderLifetime::from_time_in_force(
278            time_in_force,
279            expire_time,
280            false,
281            self.order_builder.max_short_term_secs(),
282        )
283        .is_short_term();
284
285        // Short-term: cached sequence, no retry. Stateful: proper sequence management.
286        let operation = format!("Submit limit order {client_order_id}");
287        let tx_hash = if is_short_term {
288            self.broadcaster
289                .broadcast_short_term(&self.tx_manager, vec![msg], &operation)
290                .await?
291        } else {
292            self.broadcaster
293                .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
294                .await?
295        };
296
297        Ok(tx_hash)
298    }
299
300    /// Submits a batch of limit orders.
301    ///
302    /// # Protocol Constraints
303    ///
304    /// - **Short-term orders cannot be batched**: If any order is short-term (IOC, FOK, or
305    ///   expire_time within 60s), each order is submitted in a separate transaction.
306    /// - **Long-term orders can be batched**: All orders in a single transaction.
307    ///
308    /// # Returns
309    ///
310    /// A vector of transaction hashes (one per transaction).
311    ///
312    /// # Errors
313    ///
314    /// Returns `DydxError` if any submission fails.
315    pub async fn submit_limit_orders_batch(
316        &self,
317        orders: Vec<LimitOrderParams>,
318    ) -> Result<Vec<String>, DydxError> {
319        if orders.is_empty() {
320            return Ok(Vec::new());
321        }
322
323        let block_height = self.current_block_height();
324
325        // Check if any orders are short-term (cannot be batched)
326        let has_short_term = orders
327            .iter()
328            .any(|params| self.order_builder.is_short_term_order(params));
329
330        if has_short_term {
331            // Short-term orders must be submitted individually.
332            // They don't consume Cosmos SDK sequences (GTB replay protection),
333            // so we use broadcast_short_term for concurrent submission.
334            log::debug!(
335                "Submitting {} short-term limit orders concurrently (sequence not consumed)",
336                orders.len()
337            );
338
339            let mut tx_hashes = Vec::with_capacity(orders.len());
340            let mut handles = Vec::with_capacity(orders.len());
341
342            for params in orders {
343                let tx_manager = Arc::clone(&self.tx_manager);
344                let broadcaster = Arc::clone(&self.broadcaster);
345                let order_builder = Arc::clone(&self.order_builder);
346
347                let handle = get_runtime().spawn(async move {
348                    let msg = order_builder.build_limit_order_from_params(&params, block_height)?;
349                    let operation = format!("Submit short-term order {}", params.client_order_id);
350                    broadcaster
351                        .broadcast_short_term(&tx_manager, vec![msg], &operation)
352                        .await
353                });
354
355                handles.push(handle);
356            }
357
358            // Collect results
359            for handle in handles {
360                match handle.await {
361                    Ok(Ok(tx_hash)) => tx_hashes.push(tx_hash),
362                    Ok(Err(e)) => return Err(e),
363                    Err(e) => {
364                        return Err(DydxError::Nautilus(anyhow::anyhow!("Task join error: {e}")));
365                    }
366                }
367            }
368
369            Ok(tx_hashes)
370        } else {
371            // Long-term orders can be batched in a single transaction
372            log::info!(
373                "Batch submitting {} long-term limit orders in single transaction",
374                orders.len()
375            );
376
377            let msgs = self
378                .order_builder
379                .build_limit_orders_batch(&orders, block_height)?;
380
381            let operation = format!("Submit batch of {} limit orders", msgs.len());
382            let tx_hash = self
383                .broadcaster
384                .broadcast_with_retry(&self.tx_manager, msgs, &operation)
385                .await?;
386
387            Ok(vec![tx_hash])
388        }
389    }
390
391    /// Cancels an order on dYdX via gRPC.
392    ///
393    /// Block height is read from the shared `block_height` state.
394    ///
395    /// # Returns
396    ///
397    /// The transaction hash on success.
398    ///
399    /// # Errors
400    ///
401    /// Returns `DydxError` if gRPC cancellation fails or market params not found.
402    pub async fn cancel_order(
403        &self,
404        instrument_id: InstrumentId,
405        client_order_id: u32,
406        time_in_force: TimeInForce,
407        expire_time_ns: Option<nautilus_core::UnixNanos>,
408    ) -> Result<String, DydxError> {
409        log::debug!("Cancelling order: client_id={client_order_id}, instrument={instrument_id}");
410
411        let block_height = self.current_block_height();
412
413        // Build cancel message
414        let msg = self.order_builder.build_cancel_order(
415            instrument_id,
416            client_order_id,
417            time_in_force,
418            expire_time_ns,
419            block_height,
420        )?;
421
422        // Determine if this is a short-term cancel
423        let is_short_term = self
424            .order_builder
425            .is_short_term_cancel(time_in_force, expire_time_ns);
426
427        // Short-term: cached sequence, no retry. Stateful: proper sequence management.
428        let operation = format!("Cancel order {client_order_id}");
429        let tx_hash = if is_short_term {
430            self.broadcaster
431                .broadcast_short_term(&self.tx_manager, vec![msg], &operation)
432                .await?
433        } else {
434            self.broadcaster
435                .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
436                .await?
437        };
438
439        Ok(tx_hash)
440    }
441
442    /// Cancels multiple orders with optimal partitioned broadcasting.
443    ///
444    /// Partitions orders into short-term and long-term groups:
445    /// - Short-term orders: single `MsgBatchCancel` via `broadcast_short_term()`
446    /// - Long-term orders: batched `MsgCancelOrder` messages via `broadcast_with_retry()`
447    ///
448    /// # Arguments
449    ///
450    /// * `orders` - Slice of (instrument_id, client_order_id, time_in_force, expire_time_ns) tuples
451    ///
452    /// # Returns
453    ///
454    /// Comma-separated transaction hashes on success (one per partition).
455    ///
456    /// # Errors
457    ///
458    /// Returns `DydxError` if transaction broadcast fails or market params not found.
459    pub async fn cancel_orders_batch(
460        &self,
461        orders: &[(
462            InstrumentId,
463            u32,
464            TimeInForce,
465            Option<nautilus_core::UnixNanos>,
466        )],
467    ) -> Result<String, DydxError> {
468        if orders.is_empty() {
469            return Err(DydxError::Order("No orders to cancel".to_string()));
470        }
471
472        let block_height = self.current_block_height();
473
474        // Partition into short-term and long-term orders
475        let (short_term, long_term): (Vec<_>, Vec<_>) =
476            orders.iter().partition(|(_, _, tif, expire_ns)| {
477                self.order_builder.is_short_term_cancel(*tif, *expire_ns)
478            });
479
480        log::info!(
481            "Batch cancelling {} orders (short_term={}, long_term={})",
482            orders.len(),
483            short_term.len(),
484            long_term.len(),
485        );
486
487        let mut tx_hashes = Vec::new();
488
489        // Cancel short-term orders with MsgBatchCancel (single gRPC call)
490        if !short_term.is_empty() {
491            let st_pairs: Vec<_> = short_term
492                .iter()
493                .map(|(inst_id, client_id, _, _)| (*inst_id, *client_id))
494                .collect();
495
496            let msg = self
497                .order_builder
498                .build_batch_cancel_short_term(&st_pairs, block_height)?;
499
500            let operation = format!("BatchCancel {} short-term orders", st_pairs.len());
501            let tx_hash = self
502                .broadcaster
503                .broadcast_short_term(&self.tx_manager, vec![msg], &operation)
504                .await?;
505            tx_hashes.push(tx_hash);
506        }
507
508        // Cancel long-term orders with batched MsgCancelOrder (single gRPC call)
509        if !long_term.is_empty() {
510            let lt_orders: Vec<_> = long_term
511                .iter()
512                .map(|(inst_id, client_id, tif, expire_ns)| {
513                    (*inst_id, *client_id, *tif, *expire_ns)
514                })
515                .collect();
516
517            let msgs = self
518                .order_builder
519                .build_cancel_orders_batch(&lt_orders, block_height)?;
520
521            let operation = format!("BatchCancel {} long-term orders", lt_orders.len());
522            let tx_hash = self
523                .broadcaster
524                .broadcast_with_retry(&self.tx_manager, msgs, &operation)
525                .await?;
526            tx_hashes.push(tx_hash);
527        }
528
529        Ok(tx_hashes.join(","))
530    }
531
532    /// Submits a stop market order to dYdX via gRPC.
533    ///
534    /// Stop market orders are triggered when the price reaches `trigger_price`.
535    ///
536    /// # Returns
537    ///
538    /// The transaction hash on success.
539    ///
540    /// # Errors
541    ///
542    /// Returns `DydxError` if gRPC submission fails.
543    #[expect(clippy::too_many_arguments)]
544    pub async fn submit_stop_market_order(
545        &self,
546        instrument_id: InstrumentId,
547        client_order_id: u32,
548        client_metadata: u32,
549        side: OrderSide,
550        trigger_price: Price,
551        quantity: Quantity,
552        reduce_only: bool,
553        expire_time: Option<i64>,
554    ) -> Result<String, DydxError> {
555        log::debug!(
556            "Submitting stop market order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
557             trigger={trigger_price}, qty={quantity}"
558        );
559
560        // Build proto message
561        let msg = self.order_builder.build_stop_market_order(
562            instrument_id,
563            client_order_id,
564            client_metadata,
565            side,
566            trigger_price,
567            quantity,
568            reduce_only,
569            expire_time,
570        )?;
571
572        // Broadcast with retry
573        let operation = format!("Submit stop market order {client_order_id}");
574        let tx_hash = self
575            .broadcaster
576            .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
577            .await?;
578
579        Ok(tx_hash)
580    }
581
582    /// Submits a stop limit order to dYdX via gRPC.
583    ///
584    /// Stop limit orders are triggered when the price reaches `trigger_price`,
585    /// then placed as a limit order at `limit_price`.
586    ///
587    /// # Returns
588    ///
589    /// The transaction hash on success.
590    ///
591    /// # Errors
592    ///
593    /// Returns `DydxError` if gRPC submission fails.
594    #[expect(clippy::too_many_arguments)]
595    pub async fn submit_stop_limit_order(
596        &self,
597        instrument_id: InstrumentId,
598        client_order_id: u32,
599        client_metadata: u32,
600        side: OrderSide,
601        trigger_price: Price,
602        limit_price: Price,
603        quantity: Quantity,
604        time_in_force: TimeInForce,
605        post_only: bool,
606        reduce_only: bool,
607        expire_time: Option<i64>,
608    ) -> Result<String, DydxError> {
609        log::debug!(
610            "Submitting stop limit order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
611             trigger={trigger_price}, limit={limit_price}, qty={quantity}"
612        );
613
614        // Build proto message
615        let msg = self.order_builder.build_stop_limit_order(
616            instrument_id,
617            client_order_id,
618            client_metadata,
619            side,
620            trigger_price,
621            limit_price,
622            quantity,
623            time_in_force,
624            post_only,
625            reduce_only,
626            expire_time,
627        )?;
628
629        // Broadcast with retry
630        let operation = format!("Submit stop limit order {client_order_id}");
631        let tx_hash = self
632            .broadcaster
633            .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
634            .await?;
635
636        Ok(tx_hash)
637    }
638
639    /// Submits a take profit market order to dYdX via gRPC.
640    ///
641    /// Take profit market orders are triggered when the price reaches `trigger_price`,
642    /// then executed as a market order.
643    ///
644    /// # Returns
645    ///
646    /// The transaction hash on success.
647    ///
648    /// # Errors
649    ///
650    /// Returns `DydxError` if gRPC submission fails.
651    #[expect(clippy::too_many_arguments)]
652    pub async fn submit_take_profit_market_order(
653        &self,
654        instrument_id: InstrumentId,
655        client_order_id: u32,
656        client_metadata: u32,
657        side: OrderSide,
658        trigger_price: Price,
659        quantity: Quantity,
660        reduce_only: bool,
661        expire_time: Option<i64>,
662    ) -> Result<String, DydxError> {
663        log::debug!(
664            "Submitting take profit market order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
665             trigger={trigger_price}, qty={quantity}"
666        );
667
668        // Build proto message
669        let msg = self.order_builder.build_take_profit_market_order(
670            instrument_id,
671            client_order_id,
672            client_metadata,
673            side,
674            trigger_price,
675            quantity,
676            reduce_only,
677            expire_time,
678        )?;
679
680        // Broadcast with retry
681        let operation = format!("Submit take profit market order {client_order_id}");
682        let tx_hash = self
683            .broadcaster
684            .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
685            .await?;
686
687        Ok(tx_hash)
688    }
689
690    /// Submits a take profit limit order to dYdX via gRPC.
691    ///
692    /// Take profit limit orders are triggered when the price reaches `trigger_price`,
693    /// then placed as a limit order at `limit_price`.
694    ///
695    /// # Returns
696    ///
697    /// The transaction hash on success.
698    ///
699    /// # Errors
700    ///
701    /// Returns `DydxError` if gRPC submission fails.
702    #[expect(clippy::too_many_arguments)]
703    pub async fn submit_take_profit_limit_order(
704        &self,
705        instrument_id: InstrumentId,
706        client_order_id: u32,
707        client_metadata: u32,
708        side: OrderSide,
709        trigger_price: Price,
710        limit_price: Price,
711        quantity: Quantity,
712        time_in_force: TimeInForce,
713        post_only: bool,
714        reduce_only: bool,
715        expire_time: Option<i64>,
716    ) -> Result<String, DydxError> {
717        log::debug!(
718            "Submitting take profit limit order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
719             trigger={trigger_price}, limit={limit_price}, qty={quantity}"
720        );
721
722        // Build proto message
723        let msg = self.order_builder.build_take_profit_limit_order(
724            instrument_id,
725            client_order_id,
726            client_metadata,
727            side,
728            trigger_price,
729            limit_price,
730            quantity,
731            time_in_force,
732            post_only,
733            reduce_only,
734            expire_time,
735        )?;
736
737        // Broadcast with retry
738        let operation = format!("Submit take profit limit order {client_order_id}");
739        let tx_hash = self
740            .broadcaster
741            .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
742            .await?;
743
744        Ok(tx_hash)
745    }
746
747    /// Submits a conditional order (generic interface).
748    ///
749    /// This method handles all conditional order types: StopMarket, StopLimit,
750    /// TakeProfitMarket, and TakeProfitLimit.
751    ///
752    /// # Returns
753    ///
754    /// The transaction hash on success.
755    ///
756    /// # Errors
757    ///
758    /// Returns `DydxError` if gRPC submission fails or `limit_price` is missing for limit orders.
759    #[expect(clippy::too_many_arguments)]
760    pub async fn submit_conditional_order(
761        &self,
762        instrument_id: InstrumentId,
763        client_order_id: u32,
764        client_metadata: u32,
765        order_type: ConditionalOrderType,
766        side: OrderSide,
767        trigger_price: Price,
768        limit_price: Option<Price>,
769        quantity: Quantity,
770        time_in_force: Option<TimeInForce>,
771        post_only: bool,
772        reduce_only: bool,
773        expire_time: Option<i64>,
774    ) -> Result<String, DydxError> {
775        // Build proto message
776        let msg = self.order_builder.build_conditional_order(
777            instrument_id,
778            client_order_id,
779            client_metadata,
780            order_type,
781            side,
782            trigger_price,
783            limit_price,
784            quantity,
785            time_in_force,
786            post_only,
787            reduce_only,
788            expire_time,
789        )?;
790
791        // Broadcast with retry
792        let operation = format!("Submit {order_type:?} order {client_order_id}");
793        let tx_hash = self
794            .broadcaster
795            .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
796            .await?;
797
798        Ok(tx_hash)
799    }
800}