1use std::{
19 collections::VecDeque,
20 future::Future,
21 str::FromStr,
22 sync::{Arc, Mutex},
23 time::{Duration, Instant},
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use async_trait::async_trait;
29use nautilus_common::{
30 clients::ExecutionClient,
31 live::{get_runtime, runner::get_exec_event_sender},
32 messages::execution::{
33 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
34 GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
35 GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
36 GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
37 },
38};
39use nautilus_core::{
40 MUTEX_POISONED, UnixNanos,
41 time::{AtomicTime, get_atomic_clock_realtime},
42};
43use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
44use nautilus_model::{
45 accounts::AccountAny,
46 enums::{AccountType, LiquiditySide, OmsType, OrderSide, OrderStatus, OrderType, TriggerType},
47 identifiers::{
48 AccountId, ClientId, ClientOrderId, InstrumentId, Symbol, TradeId, Venue, VenueOrderId,
49 },
50 instruments::{Instrument, InstrumentAny},
51 orders::Order,
52 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
53 types::{AccountBalance, MarginBalance, Money, Price, Quantity},
54};
55use nautilus_network::retry::RetryConfig;
56use rust_decimal::Decimal;
57use tokio::task::JoinHandle;
58use ustr::Ustr;
59
60use crate::{
61 common::{
62 consts::COINBASE_VENUE,
63 credential::CoinbaseCredential,
64 enums::{CoinbaseProductType, CoinbaseWsChannel},
65 },
66 config::CoinbaseExecClientConfig,
67 http::{
68 client::CoinbaseHttpClient,
69 parse::{parse_quantity, parse_ws_cfm_account_state},
70 },
71 websocket::{
72 client::CoinbaseWebSocketClient,
73 handler::{NautilusWsMessage, UserOrderUpdate},
74 messages::WsOrderUpdate,
75 parse::parse_ws_user_event_to_fill_report,
76 },
77};
78
79const BATCH_CANCEL_CHUNK: usize = 100;
82
83const FILL_DEDUP_CAPACITY: usize = 10_000;
87
88const CUMULATIVE_STATE_CAPACITY: usize = 10_000;
92
93const ACCOUNT_REGISTERED_TIMEOUT_SECS: f64 = 30.0;
96
97#[derive(Debug)]
98struct FillDedup {
99 seen: AHashMap<(String, String), ()>,
100 order: VecDeque<(String, String)>,
101 capacity: usize,
102}
103
104impl FillDedup {
105 fn new(capacity: usize) -> Self {
106 Self {
107 seen: AHashMap::with_capacity(capacity),
108 order: VecDeque::with_capacity(capacity),
109 capacity,
110 }
111 }
112
113 fn insert(&mut self, key: (String, String)) -> bool {
115 if self.seen.contains_key(&key) {
116 return false;
117 }
118
119 if self.order.len() >= self.capacity
120 && let Some(oldest) = self.order.pop_front()
121 {
122 self.seen.remove(&oldest);
123 }
124 self.order.push_back(key.clone());
125 self.seen.insert(key, ());
126 true
127 }
128}
129
130#[derive(Debug, Default, Clone)]
143struct OrderCumulativeState {
144 filled_qty: Option<Quantity>,
145 total_fees: Decimal,
146 avg_price: Decimal,
147 quantity: Option<Quantity>,
148}
149
150#[derive(Debug, Default, Clone)]
157struct OrderContext {
158 price: Option<Price>,
159 trigger_price: Option<Price>,
160 trigger_type: Option<TriggerType>,
161 post_only: bool,
166 submitted_product_id: Option<Ustr>,
174}
175
176#[derive(Debug)]
182struct CumulativeStateMap {
183 map: AHashMap<String, OrderCumulativeState>,
184 order: VecDeque<String>,
185 capacity: usize,
186}
187
188impl CumulativeStateMap {
189 fn with_capacity(capacity: usize) -> Self {
190 Self {
191 map: AHashMap::with_capacity(capacity),
192 order: VecDeque::with_capacity(capacity),
193 capacity,
194 }
195 }
196
197 fn entry_or_default(&mut self, key: &str) -> &mut OrderCumulativeState {
198 if self.map.contains_key(key) {
199 if let Some(pos) = self.order.iter().position(|k| k == key) {
204 self.order.remove(pos);
205 }
206 self.order.push_back(key.to_string());
207 } else {
208 self.evict_until_capacity_or_empty();
209 self.order.push_back(key.to_string());
210 self.map
211 .insert(key.to_string(), OrderCumulativeState::default());
212 }
213 self.map
214 .get_mut(key)
215 .expect("key was just inserted or confirmed present")
216 }
217
218 fn remove(&mut self, key: &str) {
219 if self.map.remove(key).is_some() {
220 self.order.retain(|k| k != key);
225 }
226 }
227
228 fn evict_until_capacity_or_empty(&mut self) {
229 while self.map.len() >= self.capacity {
233 match self.order.pop_front() {
234 Some(oldest) => {
235 self.map.remove(&oldest);
236 }
237 None => break,
238 }
239 }
240
241 if self.order.len() > 2 * self.capacity {
247 self.order.retain(|key| self.map.contains_key(key));
248 }
249 }
250
251 #[cfg(test)]
252 fn len(&self) -> usize {
253 self.map.len()
254 }
255
256 #[cfg(test)]
257 fn get(&self, key: &str) -> Option<&OrderCumulativeState> {
258 self.map.get(key)
259 }
260
261 #[cfg(test)]
262 fn clear(&mut self) {
263 self.map.clear();
264 self.order.clear();
265 }
266}
267
268#[derive(Debug)]
270pub struct CoinbaseExecutionClient {
271 core: ExecutionClientCore,
272 clock: &'static AtomicTime,
273 config: CoinbaseExecClientConfig,
274 emitter: ExecutionEventEmitter,
275 http_client: CoinbaseHttpClient,
276 ws_user: CoinbaseWebSocketClient,
277 ws_stream_handle: Option<JoinHandle<()>>,
278 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
279 instruments_cache: Arc<AHashMap<String, InstrumentAny>>,
280 fill_dedup: Arc<Mutex<FillDedup>>,
281 cumulative_state: Arc<Mutex<CumulativeStateMap>>,
282 order_contexts: Arc<Mutex<AHashMap<String, OrderContext>>>,
283 external_order_contexts: Arc<Mutex<AHashMap<String, OrderContext>>>,
290}
291
292impl CoinbaseExecutionClient {
293 pub fn new(
300 core: ExecutionClientCore,
301 config: CoinbaseExecClientConfig,
302 ) -> anyhow::Result<Self> {
303 let credential =
304 CoinbaseCredential::resolve(config.api_key.as_deref(), config.api_secret.as_deref())
305 .ok_or_else(|| {
306 anyhow::anyhow!(
307 "Coinbase credentials not available; set COINBASE_API_KEY and COINBASE_API_SECRET or pass them in the config"
308 )
309 })?;
310
311 let retry_config = RetryConfig {
312 max_retries: config.max_retries,
313 initial_delay_ms: config.retry_delay_initial_ms,
314 max_delay_ms: config.retry_delay_max_ms,
315 backoff_factor: 2.0,
316 jitter_ms: 250,
317 operation_timeout_ms: Some(60_000),
318 immediate_first: false,
319 max_elapsed_ms: Some(180_000),
320 };
321
322 let http_client = CoinbaseHttpClient::with_credentials(
323 credential.clone(),
324 config.environment,
325 config.http_timeout_secs,
326 config.proxy_url.clone(),
327 Some(retry_config),
328 )
329 .map_err(|e| anyhow::anyhow!("Failed to create Coinbase HTTP client: {e}"))?;
330
331 if let Some(ref url) = config.base_url_rest {
332 http_client.set_base_url(url.clone());
333 }
334
335 let ws_url = config.ws_url();
336 let ws_user = CoinbaseWebSocketClient::with_credential(
337 &ws_url,
338 credential,
339 config.transport_backend,
340 config.proxy_url.clone(),
341 );
342
343 let clock = get_atomic_clock_realtime();
344 let emitter = ExecutionEventEmitter::new(
345 clock,
346 core.trader_id,
347 core.account_id,
348 core.account_type,
349 None,
350 );
351
352 Ok(Self {
353 core,
354 clock,
355 config,
356 emitter,
357 http_client,
358 ws_user,
359 ws_stream_handle: None,
360 pending_tasks: Mutex::new(Vec::new()),
361 instruments_cache: Arc::new(AHashMap::new()),
362 fill_dedup: Arc::new(Mutex::new(FillDedup::new(FILL_DEDUP_CAPACITY))),
363 cumulative_state: Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
364 CUMULATIVE_STATE_CAPACITY,
365 ))),
366 order_contexts: Arc::new(Mutex::new(AHashMap::new())),
367 external_order_contexts: Arc::new(Mutex::new(AHashMap::new())),
368 })
369 }
370
371 fn spawn_task<F>(&self, description: &'static str, fut: F)
372 where
373 F: Future<Output = anyhow::Result<()>> + Send + 'static,
374 {
375 let runtime = get_runtime();
376 let handle = runtime.spawn(async move {
377 if let Err(e) = fut.await {
378 log::warn!("{description} failed: {e:?}");
379 }
380 });
381
382 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
383 tasks.retain(|h| !h.is_finished());
384 tasks.push(handle);
385 }
386
387 fn abort_pending_tasks(&self) {
388 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
389 for handle in tasks.drain(..) {
390 handle.abort();
391 }
392 }
393
394 fn is_margin(&self) -> bool {
397 self.core.account_type == AccountType::Margin
398 }
399
400 fn is_instrument_cached(&self, instrument_id: &InstrumentId) -> bool {
404 self.instruments_cache
405 .contains_key(instrument_id.symbol.as_str())
406 }
407
408 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
410 let account_id = self.core.account_id;
411
412 if self.core.cache().account(&account_id).is_some() {
413 log::info!("Account {account_id} registered");
414 return Ok(());
415 }
416
417 let start = Instant::now();
418 let timeout = Duration::from_secs_f64(timeout_secs);
419 let interval = Duration::from_millis(10);
420
421 loop {
422 tokio::time::sleep(interval).await;
423
424 if self.core.cache().account(&account_id).is_some() {
425 log::info!("Account {account_id} registered");
426 return Ok(());
427 }
428
429 if start.elapsed() >= timeout {
430 anyhow::bail!(
431 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
432 );
433 }
434 }
435 }
436}
437
438fn unix_nanos_to_utc(ts: UnixNanos) -> anyhow::Result<chrono::DateTime<chrono::Utc>> {
441 let secs = (ts.as_u64() / 1_000_000_000) as i64;
442 let nanos = (ts.as_u64() % 1_000_000_000) as u32;
443 chrono::DateTime::<chrono::Utc>::from_timestamp(secs, nanos)
444 .ok_or_else(|| anyhow::anyhow!("UnixNanos {ts} is out of range for chrono::DateTime"))
445}
446
447#[async_trait(?Send)]
448impl ExecutionClient for CoinbaseExecutionClient {
449 fn is_connected(&self) -> bool {
450 self.core.is_connected()
451 }
452
453 fn client_id(&self) -> ClientId {
454 self.core.client_id
455 }
456
457 fn account_id(&self) -> AccountId {
458 self.core.account_id
459 }
460
461 fn venue(&self) -> Venue {
462 *COINBASE_VENUE
463 }
464
465 fn oms_type(&self) -> OmsType {
466 self.core.oms_type
467 }
468
469 fn get_account(&self) -> Option<AccountAny> {
470 self.core.cache().account(&self.core.account_id).cloned()
471 }
472
473 async fn connect(&mut self) -> anyhow::Result<()> {
474 if self.core.is_connected() {
475 return Ok(());
476 }
477
478 if self.ws_user.is_active() || self.ws_user.is_reconnecting() {
485 log::info!("Tearing down stale user WS before reconnect");
486 self.ws_user.disconnect().await;
487 if let Some(handle) = self.ws_stream_handle.take() {
490 handle.abort();
491 }
492 let credential = CoinbaseCredential::resolve(
493 self.config.api_key.as_deref(),
494 self.config.api_secret.as_deref(),
495 )
496 .ok_or_else(|| anyhow::anyhow!("Coinbase credentials unavailable for WS reset"))?;
497 self.ws_user = CoinbaseWebSocketClient::with_credential(
498 &self.config.ws_url(),
499 credential,
500 self.config.transport_backend,
501 self.config.proxy_url.clone(),
502 );
503 }
504
505 if self.core.instruments_initialized() {
506 let cached: Vec<InstrumentAny> = self.instruments_cache.values().cloned().collect();
509 if !cached.is_empty() {
510 self.ws_user.initialize_instruments(cached).await;
511 }
512 } else {
513 let instruments = if self.is_margin() {
519 self.http_client
520 .request_instruments(Some(CoinbaseProductType::Future))
521 .await
522 .context("failed to load Coinbase futures instruments")?
523 } else {
524 self.http_client
525 .request_instruments(Some(CoinbaseProductType::Spot))
526 .await
527 .context("failed to load Coinbase instruments")?
528 };
529
530 let product_kind = if self.is_margin() { "futures" } else { "spot" };
531
532 if instruments.is_empty() {
533 log::warn!("Coinbase instrument bootstrap returned no {product_kind} instruments");
534 } else {
535 log::info!(
536 "Coinbase exec client loaded {} {product_kind} instruments",
537 instruments.len()
538 );
539 }
540
541 let mut map: AHashMap<String, InstrumentAny> =
542 AHashMap::with_capacity(instruments.len());
543 for inst in &instruments {
544 map.insert(inst.id().symbol.as_str().to_string(), inst.clone());
545 }
546 self.instruments_cache = Arc::new(map);
547
548 self.ws_user.initialize_instruments(instruments).await;
551
552 self.core.set_instruments_initialized();
553 }
554
555 self.ws_user.set_account_id(self.core.account_id).await;
556 self.ws_user.connect().await?;
557
558 self.ws_user
561 .subscribe(CoinbaseWsChannel::User, &[])
562 .await
563 .context("failed to subscribe to Coinbase user channel")?;
564
565 if self.is_margin() {
566 self.ws_user
567 .subscribe(CoinbaseWsChannel::FuturesBalanceSummary, &[])
568 .await
569 .context("failed to subscribe to Coinbase futures_balance_summary channel")?;
570 }
571
572 if let Some(mut rx) = self.ws_user.take_out_rx() {
573 let fill_dedup = Arc::clone(&self.fill_dedup);
574 let cumulative_state = Arc::clone(&self.cumulative_state);
575 let order_contexts = Arc::clone(&self.order_contexts);
576 let external_order_contexts = Arc::clone(&self.external_order_contexts);
577 let emitter = self.emitter.clone();
578 let http_client = self.http_client.clone();
579 let account_id = self.core.account_id;
580 let clock = self.clock;
581 let is_margin = self.is_margin();
582
583 let handle = get_runtime().spawn(async move {
584 while let Some(message) = rx.recv().await {
585 match message {
586 NautilusWsMessage::UserOrder(carrier) => {
587 handle_user_order_update(
588 *carrier,
589 &emitter,
590 &fill_dedup,
591 &cumulative_state,
592 &order_contexts,
593 &external_order_contexts,
594 &http_client,
595 account_id,
596 )
597 .await;
598 }
599 NautilusWsMessage::FuturesBalanceSummary(summary) => {
600 let ts = clock.get_time_ns();
601 match parse_ws_cfm_account_state(&summary, account_id, ts, ts) {
602 Ok(state) => emitter.send_account_state(state),
603 Err(e) => log::warn!(
604 "Failed to parse futures_balance_summary into AccountState: {e}"
605 ),
606 }
607 }
608 NautilusWsMessage::Reconnected => {
609 log::info!("Coinbase user WebSocket reconnected");
610 let refresh = if is_margin {
616 http_client.request_cfm_account_state(account_id).await
617 } else {
618 http_client.request_account_state(account_id).await
619 };
620
621 match refresh {
622 Ok(state) => emitter.send_account_state(state),
623 Err(e) => {
624 log::warn!("Failed to refresh account state on reconnect: {e}");
625 }
626 }
627 }
628 NautilusWsMessage::Error(err) => {
629 log::warn!("Coinbase user WebSocket error: {err}");
630 }
631 _ => {}
632 }
633 }
634 });
635 self.ws_stream_handle = Some(handle);
636 }
637
638 let account_state = if self.is_margin() {
639 self.http_client
640 .request_cfm_account_state(self.core.account_id)
641 .await
642 .context("failed to request Coinbase CFM account state")?
643 } else {
644 self.http_client
645 .request_account_state(self.core.account_id)
646 .await
647 .context("failed to request Coinbase account state")?
648 };
649
650 if !account_state.balances.is_empty() {
651 log::info!(
652 "Received account state with {} balance(s)",
653 account_state.balances.len()
654 );
655 }
656 self.emitter.send_account_state(account_state);
657
658 self.await_account_registered(ACCOUNT_REGISTERED_TIMEOUT_SECS)
659 .await?;
660
661 self.core.set_connected();
662 log::info!("Connected: client_id={}", self.core.client_id);
663 Ok(())
664 }
665
666 async fn disconnect(&mut self) -> anyhow::Result<()> {
667 if self.core.is_disconnected() {
668 return Ok(());
669 }
670
671 self.abort_pending_tasks();
672 self.ws_user.disconnect().await;
673
674 if let Some(handle) = self.ws_stream_handle.take() {
675 handle.abort();
676 }
677
678 self.core.set_disconnected();
679 log::info!("Disconnected: client_id={}", self.core.client_id);
680 Ok(())
681 }
682
683 fn start(&mut self) -> anyhow::Result<()> {
684 if self.core.is_started() {
685 return Ok(());
686 }
687
688 let sender = get_exec_event_sender();
689 self.emitter.set_sender(sender);
690 self.core.set_started();
691
692 log::info!(
693 "Started: client_id={}, account_id={}, account_type={:?}, environment={:?}",
694 self.core.client_id,
695 self.core.account_id,
696 self.core.account_type,
697 self.config.environment,
698 );
699 Ok(())
700 }
701
702 fn stop(&mut self) -> anyhow::Result<()> {
703 if self.core.is_stopped() {
704 return Ok(());
705 }
706
707 self.core.set_stopped();
708 self.core.set_disconnected();
709
710 if let Some(handle) = self.ws_stream_handle.take() {
711 handle.abort();
712 }
713 self.abort_pending_tasks();
714 log::info!("Stopped: client_id={}", self.core.client_id);
715 Ok(())
716 }
717
718 fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
719 let http_client = self.http_client.clone();
720 let account_id = self.core.account_id;
721 let emitter = self.emitter.clone();
722 let is_margin = self.is_margin();
723
724 self.spawn_task("query_account", async move {
725 let account_state = if is_margin {
726 http_client
727 .request_cfm_account_state(account_id)
728 .await
729 .context("failed to request Coinbase CFM account state")?
730 } else {
731 http_client
732 .request_account_state(account_id)
733 .await
734 .context("failed to request Coinbase account state")?
735 };
736 emitter.send_account_state(account_state);
737 Ok(())
738 });
739 Ok(())
740 }
741
742 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
743 let http_client = self.http_client.clone();
744 let account_id = self.core.account_id;
745 let emitter = self.emitter.clone();
746 let client_order_id = Some(cmd.client_order_id);
747 let venue_order_id = cmd.venue_order_id;
748
749 self.spawn_task("query_order", async move {
750 match http_client
751 .request_order_status_report(account_id, client_order_id, venue_order_id)
752 .await
753 {
754 Ok(report) => emitter.send_order_status_report(report),
755 Err(e) => log::warn!("Failed to query order: {e}"),
756 }
757 Ok(())
758 });
759
760 Ok(())
761 }
762
763 fn generate_account_state(
764 &self,
765 balances: Vec<AccountBalance>,
766 margins: Vec<MarginBalance>,
767 reported: bool,
768 ts_event: UnixNanos,
769 ) -> anyhow::Result<()> {
770 self.emitter
771 .emit_account_state(balances, margins, reported, ts_event);
772 Ok(())
773 }
774
775 async fn generate_order_status_report(
776 &self,
777 cmd: &GenerateOrderStatusReport,
778 ) -> anyhow::Result<Option<OrderStatusReport>> {
779 let report = self
780 .http_client
781 .request_order_status_report(
782 self.core.account_id,
783 cmd.client_order_id,
784 cmd.venue_order_id,
785 )
786 .await
787 .ok();
788
789 Ok(report.filter(|r| self.is_instrument_cached(&r.instrument_id)))
794 }
795
796 async fn generate_order_status_reports(
797 &self,
798 cmd: &GenerateOrderStatusReports,
799 ) -> anyhow::Result<Vec<OrderStatusReport>> {
800 let start = cmd.start.map(unix_nanos_to_utc).transpose()?;
801 let end = cmd.end.map(unix_nanos_to_utc).transpose()?;
802
803 let mut reports = self
804 .http_client
805 .request_order_status_reports(
806 self.core.account_id,
807 cmd.instrument_id,
808 cmd.open_only,
809 start,
810 end,
811 None,
812 )
813 .await?;
814
815 let before = reports.len();
816 reports.retain(|r| self.is_instrument_cached(&r.instrument_id));
817 if reports.len() != before {
818 let scope = if self.is_margin() {
819 "non-futures"
820 } else {
821 "non-spot"
822 };
823 log::debug!("Filtered {} {scope} order reports", before - reports.len());
824 }
825 Ok(reports)
826 }
827
828 async fn generate_fill_reports(
829 &self,
830 cmd: GenerateFillReports,
831 ) -> anyhow::Result<Vec<FillReport>> {
832 let start = cmd.start.map(unix_nanos_to_utc).transpose()?;
833 let end = cmd.end.map(unix_nanos_to_utc).transpose()?;
834
835 let mut reports = self
836 .http_client
837 .request_fill_reports(
838 self.core.account_id,
839 cmd.instrument_id,
840 cmd.venue_order_id,
841 start,
842 end,
843 None,
844 )
845 .await?;
846
847 let before = reports.len();
848 reports.retain(|r| self.is_instrument_cached(&r.instrument_id));
849 if reports.len() != before {
850 let scope = if self.is_margin() {
851 "non-futures"
852 } else {
853 "non-spot"
854 };
855 log::debug!("Filtered {} {scope} fill reports", before - reports.len());
856 }
857 Ok(reports)
858 }
859
860 async fn generate_position_status_reports(
861 &self,
862 cmd: &GeneratePositionStatusReports,
863 ) -> anyhow::Result<Vec<PositionStatusReport>> {
864 if !self.is_margin() {
866 return Ok(Vec::new());
867 }
868
869 if let Some(instrument_id) = cmd.instrument_id {
874 let report = self
875 .http_client
876 .request_position_status_report(self.core.account_id, instrument_id)
877 .await
878 .with_context(|| format!("failed to request CFM position for {instrument_id}"))?;
879 Ok(report.map(|r| vec![r]).unwrap_or_default())
880 } else {
881 self.http_client
882 .request_position_status_reports(self.core.account_id)
883 .await
884 .context("failed to request CFM positions")
885 }
886 }
887
888 async fn generate_mass_status(
889 &self,
890 lookback_mins: Option<u64>,
891 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
892 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
893
894 let ts_now = self.clock.get_time_ns();
895 let start = lookback_mins.map(|mins| {
896 let lookback_ns = mins * 60 * 1_000_000_000;
897 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
898 });
899
900 let order_cmd = GenerateOrderStatusReportsBuilder::default()
901 .ts_init(ts_now)
902 .open_only(false)
903 .start(start)
904 .build()
905 .map_err(|e| anyhow::anyhow!("{e}"))?;
906 let fill_cmd = GenerateFillReportsBuilder::default()
907 .ts_init(ts_now)
908 .start(start)
909 .build()
910 .map_err(|e| anyhow::anyhow!("{e}"))?;
911 let position_cmd = GeneratePositionStatusReportsBuilder::default()
912 .ts_init(ts_now)
913 .build()
914 .map_err(|e| anyhow::anyhow!("{e}"))?;
915
916 let (order_reports, fill_reports, position_reports) = tokio::try_join!(
917 self.generate_order_status_reports(&order_cmd),
918 self.generate_fill_reports(fill_cmd),
919 self.generate_position_status_reports(&position_cmd),
920 )?;
921
922 log::info!("Received {} OrderStatusReports", order_reports.len());
923 log::info!("Received {} FillReports", fill_reports.len());
924 log::info!("Received {} PositionReports", position_reports.len());
925
926 let mut mass_status = ExecutionMassStatus::new(
927 self.core.client_id,
928 self.core.account_id,
929 *COINBASE_VENUE,
930 ts_now,
931 None,
932 );
933
934 mass_status.add_order_reports(order_reports);
935 mass_status.add_fill_reports(fill_reports);
936 mass_status.add_position_reports(position_reports);
937
938 Ok(Some(mass_status))
939 }
940
941 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
942 let order = {
943 let cache = self.core.cache();
944 let order = cache
945 .order(&cmd.client_order_id)
946 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
947
948 if order.is_closed() {
949 log::warn!("Cannot submit closed order {}", order.client_order_id());
950 return Ok(());
951 }
952
953 order.clone()
954 };
955
956 let instrument_id = order.instrument_id();
962 let symbol_key = instrument_id.symbol.as_str();
963 if !self.instruments_cache.contains_key(symbol_key) {
964 let scope = if self.is_margin() {
965 "a Coinbase futures / perpetual product"
966 } else {
967 "a Coinbase spot product"
968 };
969 self.emitter.emit_order_denied(
970 &order,
971 &format!(
972 "Instrument {} is not {scope} in this client's bootstrap cache",
973 order.instrument_id()
974 ),
975 );
976 return Ok(());
977 }
978
979 log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
987 self.emitter.emit_order_submitted(&order);
988
989 let http_client = self.http_client.clone();
990 let emitter = self.emitter.clone();
991 let order_contexts = Arc::clone(&self.order_contexts);
992 let clock = self.clock;
993 let strategy_id = order.strategy_id();
994 let client_order_id = order.client_order_id();
995 let side = order.order_side();
996 let order_type = order.order_type();
997 let quantity = order.quantity();
998 let time_in_force = order.time_in_force();
999 let price = order.price();
1000 let trigger_price = order.trigger_price();
1001 let trigger_type = order.trigger_type();
1002 let expire_time = order.expire_time();
1003 let post_only = order.is_post_only();
1004 let is_quote_quantity = order.is_quote_quantity();
1005 let reduce_only = order.is_reduce_only();
1006
1007 {
1014 let mut map = self.order_contexts.lock().expect(MUTEX_POISONED);
1015 map.insert(
1016 client_order_id.to_string(),
1017 OrderContext {
1018 price,
1019 trigger_price,
1020 trigger_type,
1021 post_only,
1022 submitted_product_id: Some(instrument_id.symbol.inner()),
1023 },
1024 );
1025 }
1026 let (leverage, margin_type) = if self.core.account_type == AccountType::Margin {
1027 (
1028 self.config.default_leverage,
1029 self.config.default_margin_type,
1030 )
1031 } else {
1032 (None, None)
1033 };
1034 let retail_portfolio_id = self.config.retail_portfolio_id.clone();
1035
1036 self.spawn_task("submit_order", async move {
1037 let result = http_client
1038 .submit_order(
1039 client_order_id,
1040 instrument_id,
1041 side,
1042 order_type,
1043 quantity,
1044 time_in_force,
1045 price,
1046 trigger_price,
1047 expire_time,
1048 post_only,
1049 is_quote_quantity,
1050 leverage,
1051 margin_type,
1052 reduce_only,
1053 retail_portfolio_id,
1054 )
1055 .await;
1056
1057 match result {
1058 Ok(response) => {
1059 if response.success {
1060 let venue_id = response
1061 .success_response
1062 .as_ref()
1063 .map(|s| s.order_id.clone())
1064 .unwrap_or(response.order_id);
1065
1066 if venue_id.is_empty() {
1067 log::warn!(
1068 "Submit succeeded but no order_id returned for {client_order_id}"
1069 );
1070 } else {
1071 let venue_order_id = VenueOrderId::new(&venue_id);
1072 let ts_event = clock.get_time_ns();
1073 emitter.emit_order_accepted(&order, venue_order_id, ts_event);
1074 }
1075 } else {
1076 let reason = response.error_response.as_ref().map_or_else(
1077 || response.failure_reason.clone(),
1078 |e| format!("{}: {}", e.error, e.message),
1079 );
1080 let due_post_only = reason.contains("INVALID_LIMIT_PRICE_POST_ONLY")
1086 || response.error_response.as_ref().is_some_and(|e| {
1087 e.preview_failure_reason == "PREVIEW_INVALID_LIMIT_PRICE_POSTONLY"
1088 || e.new_order_failure_reason == "INVALID_LIMIT_PRICE_POST_ONLY"
1089 });
1090 order_contexts
1094 .lock()
1095 .expect(MUTEX_POISONED)
1096 .remove(client_order_id.as_str());
1097 let ts_event = clock.get_time_ns();
1098 emitter.emit_order_rejected_event(
1099 strategy_id,
1100 instrument_id,
1101 client_order_id,
1102 &format!("submit-order-rejected: {reason}"),
1103 ts_event,
1104 due_post_only,
1105 );
1106 }
1107 }
1108 Err(e) => {
1109 order_contexts
1110 .lock()
1111 .expect(MUTEX_POISONED)
1112 .remove(client_order_id.as_str());
1113 let ts_event = clock.get_time_ns();
1114 emitter.emit_order_rejected_event(
1115 strategy_id,
1116 instrument_id,
1117 client_order_id,
1118 &format!("submit-order-error: {e}"),
1119 ts_event,
1120 false,
1121 );
1122 anyhow::bail!("submit order failed: {e}");
1123 }
1124 }
1125 Ok(())
1126 });
1127
1128 Ok(())
1129 }
1130
1131 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1132 let ts_event = self.clock.get_time_ns();
1133
1134 let Some(venue_order_id) = cmd.venue_order_id else {
1135 self.emitter.emit_order_modify_rejected_event(
1136 cmd.strategy_id,
1137 cmd.instrument_id,
1138 cmd.client_order_id,
1139 None,
1140 "modify-order requires venue_order_id",
1141 ts_event,
1142 );
1143 return Ok(());
1144 };
1145
1146 if cmd.price.is_none() && cmd.quantity.is_none() && cmd.trigger_price.is_none() {
1147 self.emitter.emit_order_modify_rejected_event(
1148 cmd.strategy_id,
1149 cmd.instrument_id,
1150 cmd.client_order_id,
1151 Some(venue_order_id),
1152 "modify-order requires price, quantity, or trigger_price",
1153 ts_event,
1154 );
1155 return Ok(());
1156 }
1157
1158 let (auto_price, auto_quantity) = {
1165 let cache = self.core.cache();
1166 let order = cache.order(&cmd.client_order_id);
1167 (
1168 cmd.price.or_else(|| order.and_then(|o| o.price())),
1169 cmd.quantity.or_else(|| order.map(|o| o.quantity())),
1170 )
1171 };
1172
1173 let http_client = self.http_client.clone();
1174 let emitter = self.emitter.clone();
1175 let order_contexts = Arc::clone(&self.order_contexts);
1176 let clock = self.clock;
1177 let strategy_id = cmd.strategy_id;
1178 let instrument_id = cmd.instrument_id;
1179 let client_order_id = cmd.client_order_id;
1180 let price = auto_price;
1181 let quantity = auto_quantity;
1182 let trigger_price = cmd.trigger_price;
1183
1184 self.spawn_task("modify_order", async move {
1185 let result = http_client
1186 .modify_order(venue_order_id, price, quantity, trigger_price)
1187 .await;
1188
1189 match result {
1190 Ok(resp) => {
1191 if resp.success {
1192 let mut map = order_contexts.lock().expect(MUTEX_POISONED);
1199 if let Some(meta) = map.get_mut(client_order_id.as_str()) {
1200 if price.is_some() {
1201 meta.price = price;
1202 }
1203
1204 if trigger_price.is_some() {
1205 meta.trigger_price = trigger_price;
1206 }
1207 }
1208 } else {
1209 let reason = resp
1210 .errors
1211 .iter()
1212 .map(|e| {
1213 if e.edit_failure_reason.is_empty() {
1214 e.preview_failure_reason.clone()
1215 } else {
1216 e.edit_failure_reason.clone()
1217 }
1218 })
1219 .collect::<Vec<_>>()
1220 .join(",");
1221 let ts_event = clock.get_time_ns();
1222 emitter.emit_order_modify_rejected_event(
1223 strategy_id,
1224 instrument_id,
1225 client_order_id,
1226 Some(venue_order_id),
1227 &format!("modify-order-rejected: {reason}"),
1228 ts_event,
1229 );
1230 }
1231 }
1232 Err(e) => {
1233 let ts_event = clock.get_time_ns();
1234 emitter.emit_order_modify_rejected_event(
1235 strategy_id,
1236 instrument_id,
1237 client_order_id,
1238 Some(venue_order_id),
1239 &format!("modify-order-error: {e}"),
1240 ts_event,
1241 );
1242 anyhow::bail!("modify order failed: {e}");
1243 }
1244 }
1245
1246 Ok(())
1247 });
1248
1249 Ok(())
1250 }
1251
1252 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1253 let ts_event = self.clock.get_time_ns();
1254
1255 let Some(venue_order_id) = cmd.venue_order_id else {
1256 self.emitter.emit_order_cancel_rejected_event(
1257 cmd.strategy_id,
1258 cmd.instrument_id,
1259 cmd.client_order_id,
1260 None,
1261 "cancel-order requires venue_order_id",
1262 ts_event,
1263 );
1264 return Ok(());
1265 };
1266
1267 let http_client = self.http_client.clone();
1268 let emitter = self.emitter.clone();
1269 let clock = self.clock;
1270 let strategy_id = cmd.strategy_id;
1271 let instrument_id = cmd.instrument_id;
1272 let client_order_id = cmd.client_order_id;
1273
1274 self.spawn_task("cancel_order", async move {
1275 match http_client.cancel_orders(&[venue_order_id]).await {
1276 Ok(resp) => {
1277 if let Some(result) = resp.results.first()
1278 && !result.success
1279 {
1280 let ts_event = clock.get_time_ns();
1281 emitter.emit_order_cancel_rejected_event(
1282 strategy_id,
1283 instrument_id,
1284 client_order_id,
1285 Some(venue_order_id),
1286 &format!("cancel-order-rejected: {}", result.failure_reason),
1287 ts_event,
1288 );
1289 }
1290 }
1291 Err(e) => {
1292 let ts_event = clock.get_time_ns();
1293 emitter.emit_order_cancel_rejected_event(
1294 strategy_id,
1295 instrument_id,
1296 client_order_id,
1297 Some(venue_order_id),
1298 &format!("cancel-order-error: {e}"),
1299 ts_event,
1300 );
1301 anyhow::bail!("cancel order failed: {e}");
1302 }
1303 }
1304 Ok(())
1305 });
1306
1307 Ok(())
1308 }
1309
1310 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1311 let http_client = self.http_client.clone();
1312 let account_id = self.core.account_id;
1313 let instrument_id = cmd.instrument_id;
1314 let side_filter = cmd.order_side;
1315 let emitter = self.emitter.clone();
1316 let clock = self.clock;
1317 let strategy_id = cmd.strategy_id;
1318
1319 self.spawn_task("cancel_all_orders", async move {
1320 let reports = http_client
1325 .request_order_status_reports(
1326 account_id,
1327 Some(instrument_id),
1328 false,
1329 None,
1330 None,
1331 None,
1332 )
1333 .await
1334 .context("failed to list orders for cancel_all")?;
1335
1336 let filtered: Vec<(Option<ClientOrderId>, VenueOrderId)> = reports
1345 .into_iter()
1346 .filter(|r| {
1347 matches!(
1348 r.order_status,
1349 OrderStatus::Accepted
1350 | OrderStatus::Triggered
1351 | OrderStatus::PendingUpdate
1352 | OrderStatus::PartiallyFilled
1353 )
1354 })
1355 .filter(|r| side_filter == OrderSide::NoOrderSide || r.order_side == side_filter)
1356 .map(|r| (r.client_order_id, r.venue_order_id))
1357 .collect();
1358
1359 if filtered.is_empty() {
1360 return Ok(());
1361 }
1362
1363 for chunk in filtered.chunks(BATCH_CANCEL_CHUNK) {
1364 let venue_ids: Vec<VenueOrderId> = chunk.iter().map(|(_, v)| *v).collect();
1365 match http_client.cancel_orders(&venue_ids).await {
1366 Ok(resp) => {
1367 for result in &resp.results {
1368 if result.success {
1369 continue;
1370 }
1371 let matching = chunk
1372 .iter()
1373 .find(|(_, vid)| vid.as_str() == result.order_id);
1374 if let Some((cid_opt, vid)) = matching
1375 && let Some(cid) = cid_opt
1376 {
1377 let ts_event = clock.get_time_ns();
1378 emitter.emit_order_cancel_rejected_event(
1379 strategy_id,
1380 instrument_id,
1381 *cid,
1382 Some(*vid),
1383 &format!("cancel-all-rejected: {}", result.failure_reason),
1384 ts_event,
1385 );
1386 }
1387 }
1388 }
1389 Err(e) => {
1390 log::error!("Failed to cancel chunk for {instrument_id}: {e}");
1391 let ts_event = clock.get_time_ns();
1392
1393 for (cid_opt, vid) in chunk {
1394 if let Some(cid) = cid_opt {
1395 emitter.emit_order_cancel_rejected_event(
1396 strategy_id,
1397 instrument_id,
1398 *cid,
1399 Some(*vid),
1400 &format!("cancel-all-error: {e}"),
1401 ts_event,
1402 );
1403 }
1404 }
1405 }
1406 }
1407 }
1408 Ok(())
1409 });
1410
1411 Ok(())
1412 }
1413
1414 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1415 if cmd.cancels.is_empty() {
1416 return Ok(());
1417 }
1418
1419 let http_client = self.http_client.clone();
1420 let emitter = self.emitter.clone();
1421 let clock = self.clock;
1422 let strategy_id = cmd.strategy_id;
1423 let instrument_id = cmd.instrument_id;
1424
1425 let entries: Vec<(ClientOrderId, Option<VenueOrderId>)> = cmd
1427 .cancels
1428 .iter()
1429 .map(|c| (c.client_order_id, c.venue_order_id))
1430 .collect();
1431
1432 self.spawn_task("batch_cancel_orders", async move {
1433 let venue_order_ids: Vec<VenueOrderId> =
1434 entries.iter().filter_map(|(_, v)| *v).collect();
1435
1436 for (cid, vid_opt) in &entries {
1437 if vid_opt.is_none() {
1438 let ts_event = clock.get_time_ns();
1439 emitter.emit_order_cancel_rejected_event(
1440 strategy_id,
1441 instrument_id,
1442 *cid,
1443 None,
1444 "batch-cancel requires venue_order_id",
1445 ts_event,
1446 );
1447 }
1448 }
1449
1450 for chunk in venue_order_ids.chunks(BATCH_CANCEL_CHUNK) {
1451 match http_client.cancel_orders(chunk).await {
1452 Ok(resp) => {
1453 for result in &resp.results {
1454 if !result.success {
1455 let vid = VenueOrderId::new(&result.order_id);
1456 let matching = entries
1457 .iter()
1458 .find(|(_, v)| {
1459 v.is_some_and(|id| id.as_str() == result.order_id)
1460 })
1461 .map(|(cid, _)| *cid);
1462 if let Some(cid) = matching {
1463 let ts_event = clock.get_time_ns();
1464 emitter.emit_order_cancel_rejected_event(
1465 strategy_id,
1466 instrument_id,
1467 cid,
1468 Some(vid),
1469 &format!(
1470 "batch-cancel-rejected: {}",
1471 result.failure_reason
1472 ),
1473 ts_event,
1474 );
1475 }
1476 }
1477 }
1478 }
1479 Err(e) => {
1480 log::error!("batch_cancel chunk failed: {e}");
1481 let ts_event = clock.get_time_ns();
1482
1483 for vid in chunk {
1484 let matching = entries
1485 .iter()
1486 .find(|(_, v)| v.is_some_and(|id| id == *vid))
1487 .map(|(cid, _)| *cid);
1488 if let Some(cid) = matching {
1489 emitter.emit_order_cancel_rejected_event(
1490 strategy_id,
1491 instrument_id,
1492 cid,
1493 Some(*vid),
1494 &format!("batch-cancel-error: {e}"),
1495 ts_event,
1496 );
1497 }
1498 }
1499 }
1500 }
1501 }
1502 Ok(())
1503 });
1504
1505 Ok(())
1506 }
1507}
1508
1509#[allow(clippy::too_many_arguments)]
1513async fn handle_user_order_update(
1514 carrier: UserOrderUpdate,
1515 emitter: &ExecutionEventEmitter,
1516 fill_dedup: &Arc<Mutex<FillDedup>>,
1517 cumulative_state: &Arc<Mutex<CumulativeStateMap>>,
1518 order_contexts: &Arc<Mutex<AHashMap<String, OrderContext>>>,
1519 external_order_contexts: &Arc<Mutex<AHashMap<String, OrderContext>>>,
1520 http_client: &CoinbaseHttpClient,
1521 account_id: AccountId,
1522) {
1523 let context = resolve_order_context(
1529 &carrier.update,
1530 carrier.report.order_type,
1531 carrier.report.price.is_none(),
1532 order_contexts,
1533 external_order_contexts,
1534 http_client,
1535 account_id,
1536 )
1537 .await;
1538
1539 let is_terminal = carrier.update.status.is_terminal();
1540 let client_order_id = carrier.update.client_order_id.clone();
1541 let venue_order_id = carrier.update.order_id.clone();
1542
1543 process_user_order_update(
1544 carrier,
1545 context,
1546 emitter,
1547 fill_dedup,
1548 cumulative_state,
1549 Some(http_client),
1550 );
1551
1552 if is_terminal {
1557 if !client_order_id.is_empty() {
1558 order_contexts
1559 .lock()
1560 .expect(MUTEX_POISONED)
1561 .remove(&client_order_id);
1562 }
1563 external_order_contexts
1564 .lock()
1565 .expect(MUTEX_POISONED)
1566 .remove(&venue_order_id);
1567 }
1568}
1569
1570fn process_user_order_update(
1574 carrier: UserOrderUpdate,
1575 context: Option<OrderContext>,
1576 emitter: &ExecutionEventEmitter,
1577 fill_dedup: &Arc<Mutex<FillDedup>>,
1578 cumulative_state: &Arc<Mutex<CumulativeStateMap>>,
1579 http_client: Option<&CoinbaseHttpClient>,
1580) {
1581 let UserOrderUpdate {
1582 mut report,
1583 update,
1584 mut instrument,
1585 is_snapshot,
1586 ts_event,
1587 ts_init,
1588 } = carrier;
1589
1590 let mut fill_liquidity_side = LiquiditySide::NoLiquiditySide;
1591 let have_order_contexts = context.is_some();
1592 let mut publish_instrument_id: Option<InstrumentId> = None;
1593
1594 if let Some(meta) = context {
1595 if report.price.is_none() && meta.price.is_some() {
1596 report.price = meta.price;
1597 }
1598
1599 if report.trigger_price.is_none() && meta.trigger_price.is_some() {
1600 report.trigger_price = meta.trigger_price;
1601 }
1602
1603 if report.trigger_type.is_none() && meta.trigger_type.is_some() {
1604 report.trigger_type = meta.trigger_type;
1605 }
1606
1607 if meta.post_only {
1608 fill_liquidity_side = LiquiditySide::Maker;
1613 report.post_only = true;
1617 }
1618
1619 if let Some(submitted) = meta.submitted_product_id
1620 && submitted != update.product_id
1621 {
1622 let submitted_id = InstrumentId::new(Symbol::new(submitted), *COINBASE_VENUE);
1623 report.instrument_id = submitted_id;
1624 publish_instrument_id = Some(submitted_id);
1625 if let Some(http) = http_client
1631 && let Some(submitted_instrument) = http.instruments().get_cloned(&submitted_id)
1632 {
1633 instrument = submitted_instrument;
1634 }
1635 }
1636 }
1637
1638 let size_precision = instrument.size_precision();
1639
1640 let cumulative_qty = if update.cumulative_quantity.is_empty() {
1641 Quantity::zero(size_precision)
1642 } else {
1643 match parse_quantity(&update.cumulative_quantity, size_precision) {
1644 Ok(q) => q,
1645 Err(e) => {
1646 log::warn!(
1647 "Failed to parse cumulative_quantity for order {}: {e}",
1648 update.order_id
1649 );
1650 return;
1651 }
1652 }
1653 };
1654
1655 let cumulative_fees = if update.total_fees.is_empty() {
1656 Decimal::ZERO
1657 } else {
1658 match Decimal::from_str(&update.total_fees) {
1659 Ok(d) => d,
1660 Err(e) => {
1661 log::warn!(
1662 "Failed to parse total_fees for order {}: {e}",
1663 update.order_id
1664 );
1665 return;
1666 }
1667 }
1668 };
1669
1670 let cumulative_avg = if update.avg_price.is_empty() {
1671 Decimal::ZERO
1672 } else {
1673 match Decimal::from_str(&update.avg_price) {
1674 Ok(d) => d,
1675 Err(e) => {
1676 log::warn!(
1677 "Failed to parse avg_price for order {}: {e}",
1678 update.order_id
1679 );
1680 return;
1681 }
1682 }
1683 };
1684 let order_id = update.order_id.clone();
1685
1686 let is_terminal = update.status.is_terminal();
1687
1688 let (delta_qty, delta_fees, last_fill_price_decimal, restored_quantity) = {
1691 let mut state = cumulative_state.lock().expect(MUTEX_POISONED);
1692 let entry = state.entry_or_default(&order_id);
1693 let prev_qty = entry
1694 .filled_qty
1695 .unwrap_or_else(|| Quantity::zero(size_precision));
1696 let prev_fees = entry.total_fees;
1697 let prev_avg = entry.avg_price;
1698
1699 let observed_quantity = report.quantity;
1704 let stored_quantity = match entry.quantity {
1705 Some(q) if q >= observed_quantity => q,
1706 _ => observed_quantity,
1707 };
1708 entry.quantity = Some(stored_quantity);
1709
1710 if is_snapshot {
1714 entry.filled_qty = Some(cumulative_qty);
1715 entry.total_fees = cumulative_fees;
1716 entry.avg_price = cumulative_avg;
1717
1718 if is_terminal {
1719 state.remove(&order_id);
1720 }
1721 (
1722 Quantity::zero(size_precision),
1723 Decimal::ZERO,
1724 Decimal::ZERO,
1725 stored_quantity,
1726 )
1727 } else {
1728 let delta_qty = if cumulative_qty > prev_qty {
1729 cumulative_qty - prev_qty
1730 } else {
1731 Quantity::zero(size_precision)
1732 };
1733 let delta_fees = cumulative_fees - prev_fees;
1734
1735 let last_fill_price_decimal = if delta_qty.is_positive() {
1740 let now_notional = cumulative_avg * cumulative_qty.as_decimal();
1741 let prev_notional = prev_avg * prev_qty.as_decimal();
1742 let delta_notional = now_notional - prev_notional;
1743 let delta_qty_dec = delta_qty.as_decimal();
1744 if delta_qty_dec.is_zero() {
1745 cumulative_avg
1746 } else {
1747 delta_notional / delta_qty_dec
1748 }
1749 } else {
1750 Decimal::ZERO
1751 };
1752
1753 entry.filled_qty = Some(cumulative_qty);
1754 entry.total_fees = cumulative_fees;
1755 entry.avg_price = cumulative_avg;
1756
1757 if is_terminal {
1758 state.remove(&order_id);
1759 }
1760
1761 (
1762 delta_qty,
1763 delta_fees,
1764 last_fill_price_decimal,
1765 stored_quantity,
1766 )
1767 }
1768 };
1769
1770 if is_terminal && report.quantity < restored_quantity {
1773 report.quantity = restored_quantity;
1774 }
1775
1776 let synthesized_fill = if delta_qty.is_positive()
1783 && last_fill_price_decimal.is_sign_positive()
1784 && !last_fill_price_decimal.is_zero()
1785 {
1786 let price_precision = instrument.price_precision();
1787 match Price::from_decimal_dp(last_fill_price_decimal, price_precision) {
1788 Ok(last_px) => {
1789 let order_id_short = &update.order_id[..update.order_id.len().min(8)];
1796 let trade_id = TradeId::new(format!("{order_id_short}-{cumulative_qty}"));
1797 let trade_id_str = trade_id.as_str().to_string();
1798
1799 let is_new = {
1800 let mut dedup = fill_dedup.lock().expect(MUTEX_POISONED);
1801 dedup.insert((update.order_id.clone(), trade_id_str))
1802 };
1803
1804 if is_new {
1805 let commission_currency = instrument.quote_currency();
1806 match Money::from_decimal(delta_fees, commission_currency) {
1807 Ok(commission) => Some(parse_ws_user_event_to_fill_report(
1808 &update,
1809 delta_qty,
1810 last_px,
1811 commission,
1812 trade_id,
1813 &instrument,
1814 emitter.account_id(),
1815 fill_liquidity_side,
1816 ts_event,
1817 ts_init,
1818 )),
1819 Err(e) => {
1820 log::warn!(
1821 "Failed to build commission Money for order {}: {e}",
1822 update.order_id
1823 );
1824 None
1825 }
1826 }
1827 } else {
1828 log::debug!(
1829 "Dropping duplicate fill venue_order_id={}, trade_id={}",
1830 update.order_id,
1831 trade_id,
1832 );
1833 None
1834 }
1835 }
1836 Err(e) => {
1837 log::warn!(
1838 "Failed to build Price from derived last_fill={last_fill_price_decimal} at precision {price_precision} for order {}: {e}",
1839 update.order_id
1840 );
1841 None
1842 }
1843 }
1844 } else {
1845 None
1846 };
1847
1848 if let Some(mut fill_report) = synthesized_fill {
1849 if let Some(id) = publish_instrument_id {
1850 fill_report.instrument_id = id;
1851 }
1852 emitter.send_fill_report(fill_report);
1853 }
1854
1855 let report_safe_for_type = match report.order_type {
1871 OrderType::Limit | OrderType::LimitIfTouched => report.price.is_some(),
1872 OrderType::StopLimit => report.price.is_some() && report.trigger_price.is_some(),
1873 OrderType::StopMarket | OrderType::MarketIfTouched => report.trigger_price.is_some(),
1874 _ => true,
1875 };
1876 let should_emit = (!is_snapshot || have_order_contexts) && report_safe_for_type;
1877 if should_emit {
1878 emitter.send_order_status_report(*report);
1879 } else if !report_safe_for_type {
1880 log::warn!(
1881 "Suppressed unsafe OrderStatusReport for {} {}: missing price/trigger after enrichment",
1882 report.order_type,
1883 update.order_id,
1884 );
1885 }
1886}
1887
1888async fn resolve_order_context(
1895 update: &WsOrderUpdate,
1896 order_type: OrderType,
1897 report_price_missing: bool,
1898 order_contexts: &Arc<Mutex<AHashMap<String, OrderContext>>>,
1899 external_order_contexts: &Arc<Mutex<AHashMap<String, OrderContext>>>,
1900 http_client: &CoinbaseHttpClient,
1901 account_id: AccountId,
1902) -> Option<OrderContext> {
1903 if !update.client_order_id.is_empty() {
1904 let map = order_contexts.lock().expect(MUTEX_POISONED);
1905 if let Some(meta) = map.get(&update.client_order_id) {
1906 return Some(meta.clone());
1907 }
1908 }
1909
1910 if let Some(meta) = external_order_contexts
1911 .lock()
1912 .expect(MUTEX_POISONED)
1913 .get(&update.order_id)
1914 {
1915 return Some(meta.clone());
1916 }
1917
1918 let needs_enrichment = report_price_missing
1919 && matches!(
1920 order_type,
1921 OrderType::Limit
1922 | OrderType::StopLimit
1923 | OrderType::LimitIfTouched
1924 | OrderType::StopMarket
1925 | OrderType::MarketIfTouched
1926 );
1927
1928 if !needs_enrichment {
1929 return None;
1930 }
1931
1932 let venue_order_id = VenueOrderId::new(update.order_id.as_str());
1933 match http_client
1934 .request_order_status_report(account_id, None, Some(venue_order_id))
1935 .await
1936 {
1937 Ok(rest_report) => {
1938 let post_only_from_rest = matches!(order_type, OrderType::Limit | OrderType::StopLimit)
1939 && rest_report.post_only;
1940 let meta = OrderContext {
1941 price: rest_report.price,
1942 trigger_price: rest_report.trigger_price,
1943 trigger_type: rest_report.trigger_type,
1944 post_only: post_only_from_rest,
1945 submitted_product_id: None,
1946 };
1947 external_order_contexts
1948 .lock()
1949 .expect(MUTEX_POISONED)
1950 .insert(update.order_id.clone(), meta.clone());
1951 Some(meta)
1952 }
1953 Err(e) => {
1954 log::warn!(
1955 "Failed to enrich external order {} via REST: {e}",
1956 update.order_id
1957 );
1958 None
1959 }
1960 }
1961}
1962
1963#[cfg(test)]
1964mod tests {
1965 use nautilus_common::messages::{ExecutionEvent, ExecutionReport};
1966 use nautilus_model::{
1967 enums::AccountType,
1968 identifiers::{Symbol, TraderId},
1969 instruments::CurrencyPair,
1970 types::Currency,
1971 };
1972 use rstest::rstest;
1973 use ustr::Ustr;
1974
1975 use super::*;
1976 use crate::{
1977 common::enums::{
1978 CoinbaseContractExpiryType, CoinbaseOrderSide as CbSide,
1979 CoinbaseOrderStatus as CbStatus, CoinbaseOrderType as CbType,
1980 CoinbaseProductType as CbProductType, CoinbaseRiskManagedBy,
1981 CoinbaseTimeInForce as CbTif, CoinbaseTriggerStatus,
1982 },
1983 websocket::messages::WsOrderUpdate,
1984 };
1985
1986 #[rstest]
1987 fn test_fill_dedup_rejects_duplicates() {
1988 let mut dedup = FillDedup::new(4);
1989 let key = ("venue-1".to_string(), "trade-1".to_string());
1990 assert!(dedup.insert(key.clone()));
1991 assert!(!dedup.insert(key));
1992 }
1993
1994 #[rstest]
1995 fn test_fill_dedup_evicts_oldest_when_full() {
1996 let mut dedup = FillDedup::new(2);
1997 assert!(dedup.insert(("v".to_string(), "t1".to_string())));
1998 assert!(dedup.insert(("v".to_string(), "t2".to_string())));
1999 assert!(dedup.insert(("v".to_string(), "t3".to_string())));
2001 assert!(dedup.insert(("v".to_string(), "t1".to_string())));
2002 }
2003
2004 #[rstest]
2005 fn test_cumulative_state_evicts_oldest_at_capacity() {
2006 let mut state = CumulativeStateMap::with_capacity(2);
2007 state.entry_or_default("a");
2008 state.entry_or_default("b");
2009 state.entry_or_default("c");
2011 assert_eq!(state.len(), 2);
2012 assert!(state.map.contains_key("b"));
2013 assert!(state.map.contains_key("c"));
2014 assert!(!state.map.contains_key("a"));
2015 }
2016
2017 #[rstest]
2018 fn test_cumulative_state_remove_drops_entry_and_allows_reinsert() {
2019 let mut state = CumulativeStateMap::with_capacity(2);
2020 state.entry_or_default("a");
2021 state.entry_or_default("b");
2022 state.remove("a");
2023 state.entry_or_default("c");
2025 assert_eq!(state.len(), 2);
2026 assert!(state.map.contains_key("b"));
2027 assert!(state.map.contains_key("c"));
2028 }
2029
2030 #[rstest]
2031 fn test_cumulative_state_remove_and_reinsert_does_not_evict_live_state() {
2032 let mut state = CumulativeStateMap::with_capacity(2);
2036 state.entry_or_default("a");
2037 state.remove("a");
2038 state.entry_or_default("b");
2039 state.entry_or_default("a");
2040 state.entry_or_default("c");
2044 assert_eq!(state.len(), 2);
2045 assert!(
2046 state.map.contains_key("a"),
2047 "re-inserted live key must survive"
2048 );
2049 assert!(state.map.contains_key("c"));
2050 assert!(!state.map.contains_key("b"));
2051 }
2052
2053 #[rstest]
2054 fn test_cumulative_state_hit_refreshes_lru_recency() {
2055 let mut state = CumulativeStateMap::with_capacity(2);
2059 state.entry_or_default("a");
2060 state.entry_or_default("b");
2061 state.entry_or_default("a");
2064 state.entry_or_default("c");
2065 assert_eq!(state.len(), 2);
2066 assert!(
2067 state.map.contains_key("a"),
2068 "recently-accessed key must survive eviction"
2069 );
2070 assert!(state.map.contains_key("c"));
2071 assert!(!state.map.contains_key("b"));
2072 }
2073
2074 #[rstest]
2075 fn test_cumulative_state_preserves_live_entry_when_trimming_stale() {
2076 let mut state = CumulativeStateMap::with_capacity(2);
2080 state.entry_or_default("live");
2081 for i in 0..50 {
2083 let key = format!("t{i}");
2084 state.entry_or_default(&key);
2085 state.remove(&key);
2086 }
2087 assert!(
2088 state.map.contains_key("live"),
2089 "live entry must survive stale-trim cycles"
2090 );
2091 assert_eq!(state.len(), 1);
2092 assert!(
2093 state.order.len() <= 2 * state.capacity,
2094 "deque must remain bounded after compaction (was {})",
2095 state.order.len(),
2096 );
2097 assert!(
2102 state.order.iter().any(|k| k == "live"),
2103 "live key must remain in the deque, was: {:?}",
2104 state.order,
2105 );
2106 state.entry_or_default("a");
2111 state.entry_or_default("b");
2112 state.entry_or_default("c");
2113 assert_eq!(state.len(), state.capacity);
2114 assert!(
2115 !state.map.contains_key("live"),
2116 "live key should have been evicted in LRU order once capacity demanded it"
2117 );
2118 }
2119
2120 fn test_instrument() -> InstrumentAny {
2121 let instrument_id =
2122 InstrumentId::new(Symbol::new("BTC-USD"), Venue::new(Ustr::from("COINBASE")));
2123 InstrumentAny::CurrencyPair(CurrencyPair::new(
2124 instrument_id,
2125 Symbol::new("BTC-USD"),
2126 Currency::get_or_create_crypto("BTC"),
2127 Currency::get_or_create_crypto("USD"),
2128 2,
2129 8,
2130 Price::from("0.01"),
2131 Quantity::from("0.00000001"),
2132 None,
2133 None,
2134 None,
2135 Some(Quantity::from("0.00000001")),
2136 None,
2137 None,
2138 None,
2139 None,
2140 None,
2141 None,
2142 None,
2143 None,
2144 None,
2145 UnixNanos::default(),
2146 UnixNanos::default(),
2147 ))
2148 }
2149
2150 fn make_emitter() -> (
2151 ExecutionEventEmitter,
2152 tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2153 ) {
2154 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
2155 let mut emitter = ExecutionEventEmitter::new(
2156 get_atomic_clock_realtime(),
2157 TraderId::from("TRADER-001"),
2158 AccountId::new("COINBASE-001"),
2159 AccountType::Cash,
2160 None,
2161 );
2162 emitter.set_sender(tx);
2163 (emitter, rx)
2164 }
2165
2166 fn make_user_order_update(
2167 cumulative: &str,
2168 leaves: &str,
2169 avg_price: &str,
2170 total_fees: &str,
2171 status: CbStatus,
2172 ) -> WsOrderUpdate {
2173 WsOrderUpdate {
2174 order_id: "venue-1".to_string(),
2175 client_order_id: "client-1".to_string(),
2176 contract_expiry_type: CoinbaseContractExpiryType::Unknown,
2177 cumulative_quantity: cumulative.to_string(),
2178 leaves_quantity: leaves.to_string(),
2179 avg_price: avg_price.to_string(),
2180 total_fees: total_fees.to_string(),
2181 status,
2182 product_id: Ustr::from("BTC-USD"),
2183 product_type: CbProductType::Spot,
2184 creation_time: String::new(),
2185 order_side: CbSide::Buy,
2186 order_type: CbType::Limit,
2187 risk_managed_by: CoinbaseRiskManagedBy::Unknown,
2188 time_in_force: CbTif::GoodUntilCancelled,
2189 trigger_status: CoinbaseTriggerStatus::InvalidOrderType,
2190 cancel_reason: String::new(),
2191 reject_reason: String::new(),
2192 total_value_after_fees: String::new(),
2193 }
2194 }
2195
2196 fn make_carrier(update: WsOrderUpdate) -> UserOrderUpdate {
2197 make_carrier_with_kind(update, false)
2198 }
2199
2200 fn make_limit_context() -> OrderContext {
2204 OrderContext {
2205 price: Some(Price::from("100.00")),
2206 ..OrderContext::default()
2207 }
2208 }
2209
2210 fn make_carrier_with_kind(update: WsOrderUpdate, is_snapshot: bool) -> UserOrderUpdate {
2211 let instrument = test_instrument();
2212 let report = crate::websocket::parse::parse_ws_user_event_to_order_status_report(
2213 &update,
2214 &instrument,
2215 AccountId::new("COINBASE-001"),
2216 UnixNanos::default(),
2217 UnixNanos::default(),
2218 )
2219 .unwrap();
2220 UserOrderUpdate {
2221 report: Box::new(report),
2222 update: Box::new(update),
2223 instrument,
2224 is_snapshot,
2225 ts_event: UnixNanos::default(),
2226 ts_init: UnixNanos::default(),
2227 }
2228 }
2229
2230 fn drain_fill_reports(
2231 rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2232 ) -> Vec<FillReport> {
2233 let mut reports = Vec::new();
2234
2235 while let Ok(event) = rx.try_recv() {
2236 if let ExecutionEvent::Report(ExecutionReport::Fill(report)) = event {
2237 reports.push(*report);
2238 }
2239 }
2240 reports
2241 }
2242
2243 fn drain_all_reports(
2248 rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2249 ) -> (Vec<OrderStatusReport>, Vec<FillReport>) {
2250 let mut orders = Vec::new();
2251 let mut fills = Vec::new();
2252
2253 while let Ok(event) = rx.try_recv() {
2254 match event {
2255 ExecutionEvent::Report(ExecutionReport::Order(r)) => orders.push(*r),
2256 ExecutionEvent::Report(ExecutionReport::Fill(r)) => fills.push(*r),
2257 _ => {}
2258 }
2259 }
2260 (orders, fills)
2261 }
2262
2263 fn drain_status_reports(
2264 rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2265 ) -> Vec<OrderStatusReport> {
2266 let mut reports = Vec::new();
2267
2268 while let Ok(event) = rx.try_recv() {
2269 if let ExecutionEvent::Report(ExecutionReport::Order(report)) = event {
2270 reports.push(*report);
2271 }
2272 }
2273 reports
2274 }
2275
2276 fn make_dedup_state_pair() -> (Arc<Mutex<FillDedup>>, Arc<Mutex<CumulativeStateMap>>) {
2277 (
2278 Arc::new(Mutex::new(FillDedup::new(64))),
2279 Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2280 CUMULATIVE_STATE_CAPACITY,
2281 ))),
2282 )
2283 }
2284
2285 #[rstest]
2286 fn test_handle_user_order_update_emits_status_report_and_no_fill_when_zero_filled() {
2287 let (emitter, mut rx) = make_emitter();
2288 let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2289 let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2290 CUMULATIVE_STATE_CAPACITY,
2291 )));
2292
2293 let update = make_user_order_update("0", "1.0", "0", "0", CbStatus::Open);
2295 process_user_order_update(
2296 make_carrier(update),
2297 Some(make_limit_context()),
2298 &emitter,
2299 &dedup,
2300 &state,
2301 None,
2302 );
2303
2304 let mut got_status = false;
2306 let mut got_fill = false;
2307
2308 while let Ok(event) = rx.try_recv() {
2309 match event {
2310 ExecutionEvent::Report(ExecutionReport::Order(_)) => got_status = true,
2311 ExecutionEvent::Report(ExecutionReport::Fill(_)) => got_fill = true,
2312 _ => {}
2313 }
2314 }
2315 assert!(got_status);
2316 assert!(!got_fill);
2317 }
2318
2319 #[rstest]
2320 fn test_handle_user_order_update_synthesizes_per_fill_price_from_notional_delta() {
2321 let (emitter, mut rx) = make_emitter();
2322 let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2323 let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2324 CUMULATIVE_STATE_CAPACITY,
2325 )));
2326
2327 let update_1 = make_user_order_update("0.5", "0.5", "100.00", "0.05", CbStatus::Open);
2329 process_user_order_update(make_carrier(update_1), None, &emitter, &dedup, &state, None);
2330
2331 let update_2 = make_user_order_update("1.0", "0", "110.00", "0.15", CbStatus::Filled);
2335 process_user_order_update(make_carrier(update_2), None, &emitter, &dedup, &state, None);
2336
2337 let fills = drain_fill_reports(&mut rx);
2338 assert_eq!(fills.len(), 2);
2339
2340 assert_eq!(fills[0].last_qty, Quantity::from("0.50000000"));
2342 assert_eq!(fills[0].last_px, Price::from("100.00"));
2343 assert_eq!(fills[0].commission.as_decimal().to_string(), "0.05");
2344
2345 assert_eq!(fills[1].last_qty, Quantity::from("0.50000000"));
2347 assert_eq!(fills[1].last_px, Price::from("120.00"));
2348 assert_eq!(fills[1].commission.as_decimal().to_string(), "0.10");
2349 }
2350
2351 #[rstest]
2352 fn test_handle_user_order_update_drops_replayed_fills() {
2353 let (emitter, mut rx) = make_emitter();
2354 let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2355 let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2356 CUMULATIVE_STATE_CAPACITY,
2357 )));
2358
2359 let update = make_user_order_update("0.5", "0.5", "100.00", "0.05", CbStatus::Open);
2360 process_user_order_update(
2361 make_carrier(update.clone()),
2362 None,
2363 &emitter,
2364 &dedup,
2365 &state,
2366 None,
2367 );
2368
2369 {
2373 let mut s = state.lock().unwrap();
2374 s.clear();
2375 }
2376 process_user_order_update(make_carrier(update), None, &emitter, &dedup, &state, None);
2377
2378 let fills = drain_fill_reports(&mut rx);
2379 assert_eq!(fills.len(), 1, "replay should be deduplicated");
2380 }
2381
2382 #[rstest]
2383 fn test_handle_user_order_update_clears_state_on_terminal_status() {
2384 let (emitter, mut rx) = make_emitter();
2385 let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2386 let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2387 CUMULATIVE_STATE_CAPACITY,
2388 )));
2389
2390 let update = make_user_order_update("1.0", "0", "100.00", "0.10", CbStatus::Filled);
2391 process_user_order_update(make_carrier(update), None, &emitter, &dedup, &state, None);
2392
2393 let _ = drain_fill_reports(&mut rx);
2395
2396 let s = state.lock().unwrap();
2397 assert!(
2398 s.get("venue-1").is_none(),
2399 "terminal status should remove cumulative state entry"
2400 );
2401 }
2402
2403 #[rstest]
2404 fn test_handle_user_order_update_skips_when_avg_price_nonpositive() {
2405 let (emitter, mut rx) = make_emitter();
2406 let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2407 let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2408 CUMULATIVE_STATE_CAPACITY,
2409 )));
2410
2411 let update = make_user_order_update("0.5", "0.5", "0", "0", CbStatus::Open);
2413 process_user_order_update(make_carrier(update), None, &emitter, &dedup, &state, None);
2414
2415 let fills = drain_fill_reports(&mut rx);
2416 assert!(
2417 fills.is_empty(),
2418 "non-positive avg_price should not emit a fill"
2419 );
2420 }
2421
2422 #[rstest]
2423 fn test_handle_user_order_update_snapshot_does_not_synthesize_fill() {
2424 let (emitter, mut rx) = make_emitter();
2425 let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2426 let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2427 CUMULATIVE_STATE_CAPACITY,
2428 )));
2429
2430 let update = make_user_order_update("0.5", "0.5", "100.00", "0.05", CbStatus::Open);
2434 process_user_order_update(
2435 make_carrier_with_kind(update, true),
2436 None,
2437 &emitter,
2438 &dedup,
2439 &state,
2440 None,
2441 );
2442
2443 let fills = drain_fill_reports(&mut rx);
2444 assert!(
2445 fills.is_empty(),
2446 "snapshot must not synthesize a fill from pre-existing cumulative state"
2447 );
2448
2449 let s = state.lock().unwrap();
2452 let entry = s.get("venue-1").expect("snapshot should seed state");
2453 assert_eq!(entry.filled_qty.unwrap(), Quantity::from("0.50000000"));
2454 }
2455
2456 #[rstest]
2457 fn test_handle_user_order_update_snapshot_then_update_synthesizes_only_delta() {
2458 let (emitter, mut rx) = make_emitter();
2459 let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2460 let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2461 CUMULATIVE_STATE_CAPACITY,
2462 )));
2463
2464 let snap = make_user_order_update("0.5", "0.5", "100.00", "0.05", CbStatus::Open);
2466 process_user_order_update(
2467 make_carrier_with_kind(snap, true),
2468 None,
2469 &emitter,
2470 &dedup,
2471 &state,
2472 None,
2473 );
2474
2475 let live = make_user_order_update("1.0", "0", "110.00", "0.15", CbStatus::Filled);
2478 process_user_order_update(make_carrier(live), None, &emitter, &dedup, &state, None);
2479
2480 let fills = drain_fill_reports(&mut rx);
2481 assert_eq!(fills.len(), 1);
2482 assert_eq!(fills[0].last_qty, Quantity::from("0.50000000"));
2483 assert_eq!(fills[0].last_px, Price::from("120.00"));
2485 assert_eq!(fills[0].commission.as_decimal().to_string(), "0.10");
2487 }
2488
2489 #[rstest]
2490 fn test_handle_user_order_update_terminal_restores_original_quantity() {
2491 use nautilus_common::messages::{ExecutionEvent, ExecutionReport};
2492
2493 let (emitter, mut rx) = make_emitter();
2494 let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2495 let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2496 CUMULATIVE_STATE_CAPACITY,
2497 )));
2498
2499 let working = make_user_order_update("0", "1.0", "0", "0", CbStatus::Open);
2501 process_user_order_update(
2502 make_carrier(working),
2503 Some(make_limit_context()),
2504 &emitter,
2505 &dedup,
2506 &state,
2507 None,
2508 );
2509 while rx.try_recv().is_ok() {}
2511
2512 let cancelled = make_user_order_update("0", "0", "0", "0", CbStatus::Cancelled);
2515 process_user_order_update(
2516 make_carrier(cancelled),
2517 Some(make_limit_context()),
2518 &emitter,
2519 &dedup,
2520 &state,
2521 None,
2522 );
2523
2524 let mut got_terminal_report: Option<OrderStatusReport> = None;
2525
2526 while let Ok(event) = rx.try_recv() {
2527 if let ExecutionEvent::Report(ExecutionReport::Order(r)) = event {
2528 got_terminal_report = Some(*r);
2529 }
2530 }
2531 let report = got_terminal_report.expect("terminal report emitted");
2532 assert_eq!(
2533 report.quantity,
2534 Quantity::from("1.00000000"),
2535 "terminal report must restore the original order quantity"
2536 );
2537 }
2538
2539 #[rstest]
2540 fn test_process_user_order_update_suppresses_snapshot_without_context() {
2541 let (emitter, mut rx) = make_emitter();
2545 let (dedup, state) = make_dedup_state_pair();
2546
2547 let update = make_user_order_update("0", "1.0", "0", "0", CbStatus::Open);
2548 process_user_order_update(
2549 make_carrier_with_kind(update, true),
2550 None,
2551 &emitter,
2552 &dedup,
2553 &state,
2554 None,
2555 );
2556
2557 assert!(drain_status_reports(&mut rx).is_empty());
2558 assert!(drain_fill_reports(&mut rx).is_empty());
2559 }
2560
2561 #[rstest]
2562 fn test_process_user_order_update_emits_snapshot_when_context_present() {
2563 let (emitter, mut rx) = make_emitter();
2566 let (dedup, state) = make_dedup_state_pair();
2567 let context = OrderContext {
2568 price: Some(Price::from("100.00")),
2569 ..Default::default()
2570 };
2571
2572 let update = make_user_order_update("0", "1.0", "0", "0", CbStatus::Open);
2573 process_user_order_update(
2574 make_carrier_with_kind(update, true),
2575 Some(context),
2576 &emitter,
2577 &dedup,
2578 &state,
2579 None,
2580 );
2581
2582 let osrs = drain_status_reports(&mut rx);
2583 assert_eq!(osrs.len(), 1);
2584 assert_eq!(osrs[0].price, Some(Price::from("100.00")));
2585 }
2586
2587 #[rstest]
2588 fn test_process_user_order_update_patches_price_and_trigger_from_context() {
2589 let (emitter, mut rx) = make_emitter();
2593 let (dedup, state) = make_dedup_state_pair();
2594 let context = OrderContext {
2595 price: Some(Price::from("100.50")),
2596 trigger_price: Some(Price::from("99.00")),
2597 trigger_type: Some(TriggerType::LastPrice),
2598 ..Default::default()
2599 };
2600
2601 let update = make_user_order_update("0", "1.0", "0", "0", CbStatus::Open);
2602 process_user_order_update(
2603 make_carrier(update),
2604 Some(context),
2605 &emitter,
2606 &dedup,
2607 &state,
2608 None,
2609 );
2610
2611 let osrs = drain_status_reports(&mut rx);
2612 assert_eq!(osrs[0].price, Some(Price::from("100.50")));
2613 assert_eq!(osrs[0].trigger_price, Some(Price::from("99.00")));
2614 assert_eq!(osrs[0].trigger_type, Some(TriggerType::LastPrice));
2615 }
2616
2617 #[rstest]
2618 fn test_process_user_order_update_rekeys_to_submitted_product_id() {
2619 let (emitter, mut rx) = make_emitter();
2623 let (dedup, state) = make_dedup_state_pair();
2624 let context = OrderContext {
2625 price: Some(Price::from("100.00")),
2626 submitted_product_id: Some(Ustr::from("BTC-USDC")),
2627 ..Default::default()
2628 };
2629
2630 let update = make_user_order_update("1.0", "0", "100.00", "0.05", CbStatus::Filled);
2631 process_user_order_update(
2632 make_carrier(update),
2633 Some(context),
2634 &emitter,
2635 &dedup,
2636 &state,
2637 None,
2638 );
2639
2640 let (osrs, fills) = drain_all_reports(&mut rx);
2641 assert_eq!(osrs.len(), 1);
2642 assert_eq!(
2643 osrs[0].instrument_id,
2644 InstrumentId::from("BTC-USDC.COINBASE")
2645 );
2646 assert_eq!(fills.len(), 1);
2647 assert_eq!(
2648 fills[0].instrument_id,
2649 InstrumentId::from("BTC-USDC.COINBASE")
2650 );
2651 }
2652
2653 #[rstest]
2654 #[case(true, LiquiditySide::Maker)]
2655 #[case(false, LiquiditySide::NoLiquiditySide)]
2656 fn test_process_user_order_update_stamps_liquidity_side_from_post_only(
2657 #[case] post_only: bool,
2658 #[case] expected: LiquiditySide,
2659 ) {
2660 let (emitter, mut rx) = make_emitter();
2661 let (dedup, state) = make_dedup_state_pair();
2662 let context = OrderContext {
2663 price: Some(Price::from("100.00")),
2664 post_only,
2665 ..Default::default()
2666 };
2667
2668 let update = make_user_order_update("1.0", "0", "100.00", "0.05", CbStatus::Filled);
2669 process_user_order_update(
2670 make_carrier(update),
2671 Some(context),
2672 &emitter,
2673 &dedup,
2674 &state,
2675 None,
2676 );
2677
2678 let fills = drain_fill_reports(&mut rx);
2679 assert_eq!(fills.len(), 1);
2680 assert_eq!(fills[0].liquidity_side, expected);
2681 }
2682
2683 #[rstest]
2684 fn test_process_user_order_update_propagates_post_only_to_status_report() {
2685 let (emitter, mut rx) = make_emitter();
2689 let (dedup, state) = make_dedup_state_pair();
2690 let context = OrderContext {
2691 price: Some(Price::from("100.00")),
2692 post_only: true,
2693 ..Default::default()
2694 };
2695
2696 let update = make_user_order_update("0", "1.0", "0", "0", CbStatus::Open);
2697 process_user_order_update(
2698 make_carrier(update),
2699 Some(context),
2700 &emitter,
2701 &dedup,
2702 &state,
2703 None,
2704 );
2705
2706 let osrs = drain_status_reports(&mut rx);
2707 assert_eq!(osrs.len(), 1);
2708 assert!(osrs[0].post_only);
2709 }
2710
2711 #[rstest]
2712 #[case(OrderType::Limit)]
2713 #[case(OrderType::StopLimit)]
2714 fn test_process_user_order_update_suppresses_unsafe_report_when_enrichment_unavailable(
2715 #[case] order_type: OrderType,
2716 ) {
2717 let (emitter, mut rx) = make_emitter();
2721 let (dedup, state) = make_dedup_state_pair();
2722 let mut update = make_user_order_update("0", "1.0", "0", "0", CbStatus::Open);
2723 update.order_type = match order_type {
2724 OrderType::Limit => CbType::Limit,
2725 OrderType::StopLimit => CbType::StopLimit,
2726 _ => CbType::Limit,
2727 };
2728
2729 process_user_order_update(make_carrier(update), None, &emitter, &dedup, &state, None);
2730
2731 assert!(drain_status_reports(&mut rx).is_empty());
2732 }
2733
2734 #[rstest]
2735 fn test_process_user_order_update_trade_id_fits_stack_str() {
2736 let (emitter, mut rx) = make_emitter();
2740 let (dedup, state) = make_dedup_state_pair();
2741 let mut update = make_user_order_update("1.0", "0", "100.00", "0.05", CbStatus::Filled);
2742 update.order_id = "11d357f0-155e-4ed4-b87c-1cf966f65d10".to_string();
2743
2744 process_user_order_update(make_carrier(update), None, &emitter, &dedup, &state, None);
2745
2746 let fills = drain_fill_reports(&mut rx);
2747 assert_eq!(fills.len(), 1);
2748 let trade_id = fills[0].trade_id.as_str();
2749 assert!(
2750 trade_id.len() <= 36,
2751 "trade_id was {} chars",
2752 trade_id.len()
2753 );
2754 assert!(
2755 trade_id.starts_with("11d357f0-"),
2756 "trade_id should start with the 8-char prefix, was {trade_id}",
2757 );
2758 }
2759}