1use std::sync::Arc;
28
29use nautilus_common::live::get_runtime;
30use nautilus_model::{
31 enums::{OrderSide, TimeInForce},
32 identifiers::InstrumentId,
33 types::{Price, Quantity},
34};
35use nautilus_network::ratelimiter::quota::Quota;
36
37use crate::{
38 error::DydxError,
39 execution::{
40 block_time::BlockTimeMonitor,
41 broadcaster::TxBroadcaster,
42 order_builder::OrderMessageBuilder,
43 tx_manager::TransactionManager,
44 types::{ConditionalOrderType, LimitOrderParams, OrderLifetime},
45 },
46 grpc::{DydxGrpcClient, types::ChainId},
47 http::client::DydxHttpClient,
48};
49
50#[derive(Debug)]
71pub struct OrderSubmitter {
72 tx_manager: Arc<TransactionManager>,
74 broadcaster: Arc<TxBroadcaster>,
76 order_builder: Arc<OrderMessageBuilder>,
78 block_time_monitor: Arc<BlockTimeMonitor>,
80}
81
82impl OrderSubmitter {
83 #[expect(clippy::too_many_arguments)]
100 pub fn new(
101 grpc_client: DydxGrpcClient,
102 http_client: DydxHttpClient,
103 private_key: &str,
104 wallet_address: String,
105 subaccount_number: u32,
106 chain_id: ChainId,
107 block_time_monitor: Arc<BlockTimeMonitor>,
108 grpc_quota: Option<Quota>,
109 ) -> Result<Self, DydxError> {
110 let tx_manager = Arc::new(TransactionManager::new(
112 grpc_client.clone(),
113 private_key,
114 wallet_address.clone(),
115 chain_id,
116 )?);
117
118 let broadcaster = Arc::new(TxBroadcaster::new(grpc_client, grpc_quota));
119
120 let order_builder = Arc::new(OrderMessageBuilder::new(
121 http_client,
122 wallet_address,
123 subaccount_number,
124 block_time_monitor.clone(),
125 ));
126
127 Ok(Self {
128 tx_manager,
129 broadcaster,
130 order_builder,
131 block_time_monitor,
132 })
133 }
134
135 pub fn from_components(
139 tx_manager: Arc<TransactionManager>,
140 broadcaster: Arc<TxBroadcaster>,
141 order_builder: Arc<OrderMessageBuilder>,
142 block_time_monitor: Arc<BlockTimeMonitor>,
143 ) -> Self {
144 Self {
145 tx_manager,
146 broadcaster,
147 order_builder,
148 block_time_monitor,
149 }
150 }
151
152 #[must_use]
154 pub fn current_block_height(&self) -> u32 {
155 self.block_time_monitor.current_block_height() as u32
156 }
157
158 #[must_use]
160 pub fn block_time_monitor(&self) -> &BlockTimeMonitor {
161 &self.block_time_monitor
162 }
163
164 #[must_use]
166 pub fn wallet_address(&self) -> &str {
167 self.tx_manager.wallet_address()
168 }
169
170 #[must_use]
172 pub fn order_builder(&self) -> &OrderMessageBuilder {
173 &self.order_builder
174 }
175
176 #[must_use]
178 pub fn tx_manager(&self) -> &TransactionManager {
179 &self.tx_manager
180 }
181
182 pub async fn submit_market_order(
195 &self,
196 instrument_id: InstrumentId,
197 client_order_id: u32,
198 client_metadata: u32,
199 side: OrderSide,
200 quantity: Quantity,
201 ) -> Result<String, DydxError> {
202 log::debug!(
203 "Submitting market order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, quantity={quantity}"
204 );
205
206 let block_height = self.current_block_height();
207
208 let msg = self.order_builder.build_market_order(
210 instrument_id,
211 client_order_id,
212 client_metadata,
213 side,
214 quantity,
215 block_height,
216 )?;
217
218 let operation = format!("Submit market order {client_order_id}");
220 let tx_hash = self
221 .broadcaster
222 .broadcast_short_term(&self.tx_manager, vec![msg], &operation)
223 .await?;
224
225 Ok(tx_hash)
226 }
227
228 #[expect(clippy::too_many_arguments)]
241 pub async fn submit_limit_order(
242 &self,
243 instrument_id: InstrumentId,
244 client_order_id: u32,
245 client_metadata: u32,
246 side: OrderSide,
247 price: Price,
248 quantity: Quantity,
249 time_in_force: TimeInForce,
250 post_only: bool,
251 reduce_only: bool,
252 expire_time: Option<i64>,
253 ) -> Result<String, DydxError> {
254 log::debug!(
255 "Submitting limit order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, price={price}, \
256 quantity={quantity}, tif={time_in_force:?}, post_only={post_only}, reduce_only={reduce_only}"
257 );
258
259 let block_height = self.current_block_height();
260
261 let msg = self.order_builder.build_limit_order(
263 instrument_id,
264 client_order_id,
265 client_metadata,
266 side,
267 price,
268 quantity,
269 time_in_force,
270 post_only,
271 reduce_only,
272 block_height,
273 expire_time,
274 )?;
275
276 let is_short_term = OrderLifetime::from_time_in_force(
278 time_in_force,
279 expire_time,
280 false,
281 self.order_builder.max_short_term_secs(),
282 )
283 .is_short_term();
284
285 let operation = format!("Submit limit order {client_order_id}");
287 let tx_hash = if is_short_term {
288 self.broadcaster
289 .broadcast_short_term(&self.tx_manager, vec![msg], &operation)
290 .await?
291 } else {
292 self.broadcaster
293 .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
294 .await?
295 };
296
297 Ok(tx_hash)
298 }
299
300 pub async fn submit_limit_orders_batch(
316 &self,
317 orders: Vec<LimitOrderParams>,
318 ) -> Result<Vec<String>, DydxError> {
319 if orders.is_empty() {
320 return Ok(Vec::new());
321 }
322
323 let block_height = self.current_block_height();
324
325 let has_short_term = orders
327 .iter()
328 .any(|params| self.order_builder.is_short_term_order(params));
329
330 if has_short_term {
331 log::debug!(
335 "Submitting {} short-term limit orders concurrently (sequence not consumed)",
336 orders.len()
337 );
338
339 let mut tx_hashes = Vec::with_capacity(orders.len());
340 let mut handles = Vec::with_capacity(orders.len());
341
342 for params in orders {
343 let tx_manager = Arc::clone(&self.tx_manager);
344 let broadcaster = Arc::clone(&self.broadcaster);
345 let order_builder = Arc::clone(&self.order_builder);
346
347 let handle = get_runtime().spawn(async move {
348 let msg = order_builder.build_limit_order_from_params(¶ms, block_height)?;
349 let operation = format!("Submit short-term order {}", params.client_order_id);
350 broadcaster
351 .broadcast_short_term(&tx_manager, vec![msg], &operation)
352 .await
353 });
354
355 handles.push(handle);
356 }
357
358 for handle in handles {
360 match handle.await {
361 Ok(Ok(tx_hash)) => tx_hashes.push(tx_hash),
362 Ok(Err(e)) => return Err(e),
363 Err(e) => {
364 return Err(DydxError::Nautilus(anyhow::anyhow!("Task join error: {e}")));
365 }
366 }
367 }
368
369 Ok(tx_hashes)
370 } else {
371 log::info!(
373 "Batch submitting {} long-term limit orders in single transaction",
374 orders.len()
375 );
376
377 let msgs = self
378 .order_builder
379 .build_limit_orders_batch(&orders, block_height)?;
380
381 let operation = format!("Submit batch of {} limit orders", msgs.len());
382 let tx_hash = self
383 .broadcaster
384 .broadcast_with_retry(&self.tx_manager, msgs, &operation)
385 .await?;
386
387 Ok(vec![tx_hash])
388 }
389 }
390
391 pub async fn cancel_order(
403 &self,
404 instrument_id: InstrumentId,
405 client_order_id: u32,
406 time_in_force: TimeInForce,
407 expire_time_ns: Option<nautilus_core::UnixNanos>,
408 ) -> Result<String, DydxError> {
409 log::debug!("Cancelling order: client_id={client_order_id}, instrument={instrument_id}");
410
411 let block_height = self.current_block_height();
412
413 let msg = self.order_builder.build_cancel_order(
415 instrument_id,
416 client_order_id,
417 time_in_force,
418 expire_time_ns,
419 block_height,
420 )?;
421
422 let is_short_term = self
424 .order_builder
425 .is_short_term_cancel(time_in_force, expire_time_ns);
426
427 let operation = format!("Cancel order {client_order_id}");
429 let tx_hash = if is_short_term {
430 self.broadcaster
431 .broadcast_short_term(&self.tx_manager, vec![msg], &operation)
432 .await?
433 } else {
434 self.broadcaster
435 .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
436 .await?
437 };
438
439 Ok(tx_hash)
440 }
441
442 pub async fn cancel_orders_batch(
460 &self,
461 orders: &[(
462 InstrumentId,
463 u32,
464 TimeInForce,
465 Option<nautilus_core::UnixNanos>,
466 )],
467 ) -> Result<String, DydxError> {
468 if orders.is_empty() {
469 return Err(DydxError::Order("No orders to cancel".to_string()));
470 }
471
472 let block_height = self.current_block_height();
473
474 let (short_term, long_term): (Vec<_>, Vec<_>) =
476 orders.iter().partition(|(_, _, tif, expire_ns)| {
477 self.order_builder.is_short_term_cancel(*tif, *expire_ns)
478 });
479
480 log::info!(
481 "Batch cancelling {} orders (short_term={}, long_term={})",
482 orders.len(),
483 short_term.len(),
484 long_term.len(),
485 );
486
487 let mut tx_hashes = Vec::new();
488
489 if !short_term.is_empty() {
491 let st_pairs: Vec<_> = short_term
492 .iter()
493 .map(|(inst_id, client_id, _, _)| (*inst_id, *client_id))
494 .collect();
495
496 let msg = self
497 .order_builder
498 .build_batch_cancel_short_term(&st_pairs, block_height)?;
499
500 let operation = format!("BatchCancel {} short-term orders", st_pairs.len());
501 let tx_hash = self
502 .broadcaster
503 .broadcast_short_term(&self.tx_manager, vec![msg], &operation)
504 .await?;
505 tx_hashes.push(tx_hash);
506 }
507
508 if !long_term.is_empty() {
510 let lt_orders: Vec<_> = long_term
511 .iter()
512 .map(|(inst_id, client_id, tif, expire_ns)| {
513 (*inst_id, *client_id, *tif, *expire_ns)
514 })
515 .collect();
516
517 let msgs = self
518 .order_builder
519 .build_cancel_orders_batch(<_orders, block_height)?;
520
521 let operation = format!("BatchCancel {} long-term orders", lt_orders.len());
522 let tx_hash = self
523 .broadcaster
524 .broadcast_with_retry(&self.tx_manager, msgs, &operation)
525 .await?;
526 tx_hashes.push(tx_hash);
527 }
528
529 Ok(tx_hashes.join(","))
530 }
531
532 #[expect(clippy::too_many_arguments)]
544 pub async fn submit_stop_market_order(
545 &self,
546 instrument_id: InstrumentId,
547 client_order_id: u32,
548 client_metadata: u32,
549 side: OrderSide,
550 trigger_price: Price,
551 quantity: Quantity,
552 reduce_only: bool,
553 expire_time: Option<i64>,
554 ) -> Result<String, DydxError> {
555 log::debug!(
556 "Submitting stop market order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
557 trigger={trigger_price}, qty={quantity}"
558 );
559
560 let msg = self.order_builder.build_stop_market_order(
562 instrument_id,
563 client_order_id,
564 client_metadata,
565 side,
566 trigger_price,
567 quantity,
568 reduce_only,
569 expire_time,
570 )?;
571
572 let operation = format!("Submit stop market order {client_order_id}");
574 let tx_hash = self
575 .broadcaster
576 .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
577 .await?;
578
579 Ok(tx_hash)
580 }
581
582 #[expect(clippy::too_many_arguments)]
595 pub async fn submit_stop_limit_order(
596 &self,
597 instrument_id: InstrumentId,
598 client_order_id: u32,
599 client_metadata: u32,
600 side: OrderSide,
601 trigger_price: Price,
602 limit_price: Price,
603 quantity: Quantity,
604 time_in_force: TimeInForce,
605 post_only: bool,
606 reduce_only: bool,
607 expire_time: Option<i64>,
608 ) -> Result<String, DydxError> {
609 log::debug!(
610 "Submitting stop limit order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
611 trigger={trigger_price}, limit={limit_price}, qty={quantity}"
612 );
613
614 let msg = self.order_builder.build_stop_limit_order(
616 instrument_id,
617 client_order_id,
618 client_metadata,
619 side,
620 trigger_price,
621 limit_price,
622 quantity,
623 time_in_force,
624 post_only,
625 reduce_only,
626 expire_time,
627 )?;
628
629 let operation = format!("Submit stop limit order {client_order_id}");
631 let tx_hash = self
632 .broadcaster
633 .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
634 .await?;
635
636 Ok(tx_hash)
637 }
638
639 #[expect(clippy::too_many_arguments)]
652 pub async fn submit_take_profit_market_order(
653 &self,
654 instrument_id: InstrumentId,
655 client_order_id: u32,
656 client_metadata: u32,
657 side: OrderSide,
658 trigger_price: Price,
659 quantity: Quantity,
660 reduce_only: bool,
661 expire_time: Option<i64>,
662 ) -> Result<String, DydxError> {
663 log::debug!(
664 "Submitting take profit market order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
665 trigger={trigger_price}, qty={quantity}"
666 );
667
668 let msg = self.order_builder.build_take_profit_market_order(
670 instrument_id,
671 client_order_id,
672 client_metadata,
673 side,
674 trigger_price,
675 quantity,
676 reduce_only,
677 expire_time,
678 )?;
679
680 let operation = format!("Submit take profit market order {client_order_id}");
682 let tx_hash = self
683 .broadcaster
684 .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
685 .await?;
686
687 Ok(tx_hash)
688 }
689
690 #[expect(clippy::too_many_arguments)]
703 pub async fn submit_take_profit_limit_order(
704 &self,
705 instrument_id: InstrumentId,
706 client_order_id: u32,
707 client_metadata: u32,
708 side: OrderSide,
709 trigger_price: Price,
710 limit_price: Price,
711 quantity: Quantity,
712 time_in_force: TimeInForce,
713 post_only: bool,
714 reduce_only: bool,
715 expire_time: Option<i64>,
716 ) -> Result<String, DydxError> {
717 log::debug!(
718 "Submitting take profit limit order: client_id={client_order_id}, meta={client_metadata:#x}, side={side:?}, \
719 trigger={trigger_price}, limit={limit_price}, qty={quantity}"
720 );
721
722 let msg = self.order_builder.build_take_profit_limit_order(
724 instrument_id,
725 client_order_id,
726 client_metadata,
727 side,
728 trigger_price,
729 limit_price,
730 quantity,
731 time_in_force,
732 post_only,
733 reduce_only,
734 expire_time,
735 )?;
736
737 let operation = format!("Submit take profit limit order {client_order_id}");
739 let tx_hash = self
740 .broadcaster
741 .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
742 .await?;
743
744 Ok(tx_hash)
745 }
746
747 #[expect(clippy::too_many_arguments)]
760 pub async fn submit_conditional_order(
761 &self,
762 instrument_id: InstrumentId,
763 client_order_id: u32,
764 client_metadata: u32,
765 order_type: ConditionalOrderType,
766 side: OrderSide,
767 trigger_price: Price,
768 limit_price: Option<Price>,
769 quantity: Quantity,
770 time_in_force: Option<TimeInForce>,
771 post_only: bool,
772 reduce_only: bool,
773 expire_time: Option<i64>,
774 ) -> Result<String, DydxError> {
775 let msg = self.order_builder.build_conditional_order(
777 instrument_id,
778 client_order_id,
779 client_metadata,
780 order_type,
781 side,
782 trigger_price,
783 limit_price,
784 quantity,
785 time_in_force,
786 post_only,
787 reduce_only,
788 expire_time,
789 )?;
790
791 let operation = format!("Submit {order_type:?} order {client_order_id}");
793 let tx_hash = self
794 .broadcaster
795 .broadcast_with_retry(&self.tx_manager, vec![msg], &operation)
796 .await?;
797
798 Ok(tx_hash)
799 }
800}