Skip to main content

nautilus_coinbase/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Coinbase Advanced Trade adapter.
17
18use std::{
19    collections::VecDeque,
20    future::Future,
21    str::FromStr,
22    sync::{Arc, Mutex},
23    time::{Duration, Instant},
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use async_trait::async_trait;
29use nautilus_common::{
30    clients::ExecutionClient,
31    live::{get_runtime, runner::get_exec_event_sender},
32    messages::execution::{
33        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
34        GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
35        GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
36        GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
37    },
38};
39use nautilus_core::{
40    MUTEX_POISONED, UnixNanos,
41    time::{AtomicTime, get_atomic_clock_realtime},
42};
43use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
44use nautilus_model::{
45    accounts::AccountAny,
46    enums::{AccountType, LiquiditySide, OmsType, OrderSide, OrderStatus, OrderType, TriggerType},
47    identifiers::{
48        AccountId, ClientId, ClientOrderId, InstrumentId, Symbol, TradeId, Venue, VenueOrderId,
49    },
50    instruments::{Instrument, InstrumentAny},
51    orders::Order,
52    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
53    types::{AccountBalance, MarginBalance, Money, Price, Quantity},
54};
55use nautilus_network::retry::RetryConfig;
56use rust_decimal::Decimal;
57use tokio::task::JoinHandle;
58use ustr::Ustr;
59
60use crate::{
61    common::{
62        consts::COINBASE_VENUE,
63        credential::CoinbaseCredential,
64        enums::{CoinbaseProductType, CoinbaseWsChannel},
65    },
66    config::CoinbaseExecClientConfig,
67    http::{
68        client::CoinbaseHttpClient,
69        parse::{parse_quantity, parse_ws_cfm_account_state},
70    },
71    websocket::{
72        client::CoinbaseWebSocketClient,
73        handler::{NautilusWsMessage, UserOrderUpdate},
74        messages::WsOrderUpdate,
75        parse::parse_ws_user_event_to_fill_report,
76    },
77};
78
79// Coinbase does not publish a formal max for batch_cancel; conservative chunk
80// size mirrors the 100 used by other adapters and keeps request bodies small.
81const BATCH_CANCEL_CHUNK: usize = 100;
82
83// Bounded LRU to drop replayed fills after reconnect. Size follows the
84// pattern used elsewhere; keyed by (venue_order_id, trade_id) as owned strings
85// so the global Ustr arena is not polluted with unique trade IDs.
86const FILL_DEDUP_CAPACITY: usize = 10_000;
87
88// Bounded LRU for per-order cumulative tracking. Terminal events drop entries
89// eagerly; this cap also protects against orders that this client never
90// observes a terminal status for (e.g. cancelled out-of-band).
91const CUMULATIVE_STATE_CAPACITY: usize = 10_000;
92
93// Coinbase spot account is ready as soon as the REST account state lands, but
94// the engine registers it asynchronously; wait up to 30s for that to happen.
95const ACCOUNT_REGISTERED_TIMEOUT_SECS: f64 = 30.0;
96
97#[derive(Debug)]
98struct FillDedup {
99    seen: AHashMap<(String, String), ()>,
100    order: VecDeque<(String, String)>,
101    capacity: usize,
102}
103
104impl FillDedup {
105    fn new(capacity: usize) -> Self {
106        Self {
107            seen: AHashMap::with_capacity(capacity),
108            order: VecDeque::with_capacity(capacity),
109            capacity,
110        }
111    }
112
113    // Returns true if the key is new (and inserts it); false when already seen.
114    fn insert(&mut self, key: (String, String)) -> bool {
115        if self.seen.contains_key(&key) {
116            return false;
117        }
118
119        if self.order.len() >= self.capacity
120            && let Some(oldest) = self.order.pop_front()
121        {
122            self.seen.remove(&oldest);
123        }
124        self.order.push_back(key.clone());
125        self.seen.insert(key, ());
126        true
127    }
128}
129
130// Per-order cumulative state tracked across WS reconnects so that delta-based
131// fill synthesis remains correct even when the feed handler is recreated.
132// `avg_price` is Coinbase's cumulative weighted-average fill price; the exec
133// client derives the per-fill price from the notional delta between successive
134// cumulative states.
135//
136// `quantity` records the largest `cumulative_quantity + leaves_quantity` ever
137// observed for the order. Coinbase zeroes `leaves_quantity` on terminal updates
138// (REJECTED / CANCELLED / EXPIRED), so the OSR's quantity computed from
139// cum+leaves on those events would collapse to filled_qty (or zero). Holding
140// the max-observed total lets us restore the original order quantity before
141// emitting the terminal report.
142#[derive(Debug, Default, Clone)]
143struct OrderCumulativeState {
144    filled_qty: Option<Quantity>,
145    total_fees: Decimal,
146    avg_price: Decimal,
147    quantity: Option<Quantity>,
148}
149
150// Captures the limit / trigger metadata of a submitted order, keyed by
151// `client_order_id` so it survives the venue-id-keyed cumulative state being
152// dropped on terminal user-channel events. Coinbase's user channel does not
153// echo `price`, `stop_price`, or `trigger_type`, so without these locally
154// cached values the engine reconciler would clear the local price the moment
155// a post-fill or cancel update lands.
156#[derive(Debug, Default, Clone)]
157struct OrderContext {
158    price: Option<Price>,
159    trigger_price: Option<Price>,
160    trigger_type: Option<TriggerType>,
161    // `post_only` order fills are guaranteed `Maker` (the venue rejects an
162    // immediate match outright). The Coinbase user channel does not echo
163    // this flag, so we cache it at submit time and pass it through to the
164    // synthesized FillReport's `liquidity_side`.
165    post_only: bool,
166    // The `product_id` the order was submitted with. Coinbase rewrites
167    // aliased products to the canonical id on the user channel, so
168    // `update.product_id` always reads as the canonical (e.g. `BTC-USD`)
169    // even for an order placed on the alias side (`BTC-USDC`). Looking the
170    // submitted id up by `client_order_id` lets us re-key user-channel
171    // echoes back to the caller's id without rewriting *every* canonical
172    // event globally.
173    submitted_product_id: Option<Ustr>,
174}
175
176// Bounded map for per-order cumulative tracking. Insertions track LRU order;
177// when the live entry count reaches `capacity`, the oldest non-stale entry is
178// evicted. Terminal events call `remove()` which clears the map entry; the
179// matching deque slot becomes stale and is reclaimed during the next eviction
180// pass (the deque is also trimmed if it grows beyond `2 * capacity`).
181#[derive(Debug)]
182struct CumulativeStateMap {
183    map: AHashMap<String, OrderCumulativeState>,
184    order: VecDeque<String>,
185    capacity: usize,
186}
187
188impl CumulativeStateMap {
189    fn with_capacity(capacity: usize) -> Self {
190        Self {
191            map: AHashMap::with_capacity(capacity),
192            order: VecDeque::with_capacity(capacity),
193            capacity,
194        }
195    }
196
197    fn entry_or_default(&mut self, key: &str) -> &mut OrderCumulativeState {
198        if self.map.contains_key(key) {
199            // Hit: refresh recency so a long-lived order receiving updates
200            // is not evicted by churn on other orders. O(n) lookup and
201            // shift; tolerated because user-channel update volume is small
202            // relative to capacity
203            if let Some(pos) = self.order.iter().position(|k| k == key) {
204                self.order.remove(pos);
205            }
206            self.order.push_back(key.to_string());
207        } else {
208            self.evict_until_capacity_or_empty();
209            self.order.push_back(key.to_string());
210            self.map
211                .insert(key.to_string(), OrderCumulativeState::default());
212        }
213        self.map
214            .get_mut(key)
215            .expect("key was just inserted or confirmed present")
216    }
217
218    fn remove(&mut self, key: &str) {
219        if self.map.remove(key).is_some() {
220            // Drop the matching deque slot too. Without this, a later
221            // re-insert of the same key would leave a stale slot ahead of
222            // the new live one, and the eviction loop would pop the stale
223            // slot and remove the live entry from the map
224            self.order.retain(|k| k != key);
225        }
226    }
227
228    fn evict_until_capacity_or_empty(&mut self) {
229        // Evict the oldest live entries until we're under capacity. Stale
230        // deque entries (already removed from the map) are skipped naturally
231        // because removing a missing key is a no-op
232        while self.map.len() >= self.capacity {
233            match self.order.pop_front() {
234                Some(oldest) => {
235                    self.map.remove(&oldest);
236                }
237                None => break,
238            }
239        }
240
241        // When the deque accumulates many stale entries (e.g. a long-lived
242        // order at the front while later orders churn through terminal
243        // events), compact in place: keep live entries in their original
244        // order and drop the rest. Bounds memory without ever evicting live
245        // state
246        if self.order.len() > 2 * self.capacity {
247            self.order.retain(|key| self.map.contains_key(key));
248        }
249    }
250
251    #[cfg(test)]
252    fn len(&self) -> usize {
253        self.map.len()
254    }
255
256    #[cfg(test)]
257    fn get(&self, key: &str) -> Option<&OrderCumulativeState> {
258        self.map.get(key)
259    }
260
261    #[cfg(test)]
262    fn clear(&mut self) {
263        self.map.clear();
264        self.order.clear();
265    }
266}
267
268/// Live execution client for Coinbase Advanced Trade.
269#[derive(Debug)]
270pub struct CoinbaseExecutionClient {
271    core: ExecutionClientCore,
272    clock: &'static AtomicTime,
273    config: CoinbaseExecClientConfig,
274    emitter: ExecutionEventEmitter,
275    http_client: CoinbaseHttpClient,
276    ws_user: CoinbaseWebSocketClient,
277    ws_stream_handle: Option<JoinHandle<()>>,
278    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
279    instruments_cache: Arc<AHashMap<String, InstrumentAny>>,
280    fill_dedup: Arc<Mutex<FillDedup>>,
281    cumulative_state: Arc<Mutex<CumulativeStateMap>>,
282    order_contexts: Arc<Mutex<AHashMap<String, OrderContext>>>,
283    // Caches REST-derived metadata for orders this client did not submit
284    // (keyed by `venue_order_id`). Populated lazily when the user-channel
285    // handler encounters an unknown order whose `OrderStatusReport` would
286    // otherwise lack `price` / `trigger_price` / `trigger_type` and panic
287    // the engine's reconstruction path. Separate from `order_contexts`
288    // because external orders may carry a `client_order_id` we never set.
289    external_order_contexts: Arc<Mutex<AHashMap<String, OrderContext>>>,
290}
291
292impl CoinbaseExecutionClient {
293    /// Creates a new [`CoinbaseExecutionClient`].
294    ///
295    /// # Errors
296    ///
297    /// Returns an error if credentials cannot be resolved or the underlying
298    /// HTTP / WebSocket client cannot be constructed.
299    pub fn new(
300        core: ExecutionClientCore,
301        config: CoinbaseExecClientConfig,
302    ) -> anyhow::Result<Self> {
303        let credential =
304            CoinbaseCredential::resolve(config.api_key.as_deref(), config.api_secret.as_deref())
305                .ok_or_else(|| {
306                    anyhow::anyhow!(
307                        "Coinbase credentials not available; set COINBASE_API_KEY and COINBASE_API_SECRET or pass them in the config"
308                    )
309                })?;
310
311        let retry_config = RetryConfig {
312            max_retries: config.max_retries,
313            initial_delay_ms: config.retry_delay_initial_ms,
314            max_delay_ms: config.retry_delay_max_ms,
315            backoff_factor: 2.0,
316            jitter_ms: 250,
317            operation_timeout_ms: Some(60_000),
318            immediate_first: false,
319            max_elapsed_ms: Some(180_000),
320        };
321
322        let http_client = CoinbaseHttpClient::with_credentials(
323            credential.clone(),
324            config.environment,
325            config.http_timeout_secs,
326            config.proxy_url.clone(),
327            Some(retry_config),
328        )
329        .map_err(|e| anyhow::anyhow!("Failed to create Coinbase HTTP client: {e}"))?;
330
331        if let Some(ref url) = config.base_url_rest {
332            http_client.set_base_url(url.clone());
333        }
334
335        let ws_url = config.ws_url();
336        let ws_user = CoinbaseWebSocketClient::with_credential(
337            &ws_url,
338            credential,
339            config.transport_backend,
340            config.proxy_url.clone(),
341        );
342
343        let clock = get_atomic_clock_realtime();
344        let emitter = ExecutionEventEmitter::new(
345            clock,
346            core.trader_id,
347            core.account_id,
348            core.account_type,
349            None,
350        );
351
352        Ok(Self {
353            core,
354            clock,
355            config,
356            emitter,
357            http_client,
358            ws_user,
359            ws_stream_handle: None,
360            pending_tasks: Mutex::new(Vec::new()),
361            instruments_cache: Arc::new(AHashMap::new()),
362            fill_dedup: Arc::new(Mutex::new(FillDedup::new(FILL_DEDUP_CAPACITY))),
363            cumulative_state: Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
364                CUMULATIVE_STATE_CAPACITY,
365            ))),
366            order_contexts: Arc::new(Mutex::new(AHashMap::new())),
367            external_order_contexts: Arc::new(Mutex::new(AHashMap::new())),
368        })
369    }
370
371    fn spawn_task<F>(&self, description: &'static str, fut: F)
372    where
373        F: Future<Output = anyhow::Result<()>> + Send + 'static,
374    {
375        let runtime = get_runtime();
376        let handle = runtime.spawn(async move {
377            if let Err(e) = fut.await {
378                log::warn!("{description} failed: {e:?}");
379            }
380        });
381
382        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
383        tasks.retain(|h| !h.is_finished());
384        tasks.push(handle);
385    }
386
387    fn abort_pending_tasks(&self) {
388        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
389        for handle in tasks.drain(..) {
390            handle.abort();
391        }
392    }
393
394    // Returns true when the exec client was created with a Margin account,
395    // indicating it should handle CFM-backed derivatives traffic.
396    fn is_margin(&self) -> bool {
397        self.core.account_type == AccountType::Margin
398    }
399
400    // Returns true when the instrument resides in the connect-time bootstrap
401    // cache. For the Cash (spot) factory this gates spot-only traffic; for the
402    // Margin factory the cache contains CFM perp + future products.
403    fn is_instrument_cached(&self, instrument_id: &InstrumentId) -> bool {
404        self.instruments_cache
405            .contains_key(instrument_id.symbol.as_str())
406    }
407
408    // Polls the cache until the account is registered or the timeout is hit.
409    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
410        let account_id = self.core.account_id;
411
412        if self.core.cache().account(&account_id).is_some() {
413            log::info!("Account {account_id} registered");
414            return Ok(());
415        }
416
417        let start = Instant::now();
418        let timeout = Duration::from_secs_f64(timeout_secs);
419        let interval = Duration::from_millis(10);
420
421        loop {
422            tokio::time::sleep(interval).await;
423
424            if self.core.cache().account(&account_id).is_some() {
425                log::info!("Account {account_id} registered");
426                return Ok(());
427            }
428
429            if start.elapsed() >= timeout {
430                anyhow::bail!(
431                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
432                );
433            }
434        }
435    }
436}
437
438// Converts a UnixNanos to a UTC chrono::DateTime; returns an error when the
439// nanosecond value is out of range.
440fn unix_nanos_to_utc(ts: UnixNanos) -> anyhow::Result<chrono::DateTime<chrono::Utc>> {
441    let secs = (ts.as_u64() / 1_000_000_000) as i64;
442    let nanos = (ts.as_u64() % 1_000_000_000) as u32;
443    chrono::DateTime::<chrono::Utc>::from_timestamp(secs, nanos)
444        .ok_or_else(|| anyhow::anyhow!("UnixNanos {ts} is out of range for chrono::DateTime"))
445}
446
447#[async_trait(?Send)]
448impl ExecutionClient for CoinbaseExecutionClient {
449    fn is_connected(&self) -> bool {
450        self.core.is_connected()
451    }
452
453    fn client_id(&self) -> ClientId {
454        self.core.client_id
455    }
456
457    fn account_id(&self) -> AccountId {
458        self.core.account_id
459    }
460
461    fn venue(&self) -> Venue {
462        *COINBASE_VENUE
463    }
464
465    fn oms_type(&self) -> OmsType {
466        self.core.oms_type
467    }
468
469    fn get_account(&self) -> Option<AccountAny> {
470        self.core.cache().account(&self.core.account_id).cloned()
471    }
472
473    async fn connect(&mut self) -> anyhow::Result<()> {
474        if self.core.is_connected() {
475            return Ok(());
476        }
477
478        // If the underlying WS is still alive from a prior stop() that did not
479        // explicitly disconnect, tear it down before reconnecting. The
480        // in-handler signal path can race with the Disconnect command, leaving
481        // the inner connection_mode stale even after disconnect().await, so
482        // we rebuild the client outright to guarantee clean cmd_tx/out_rx
483        // pairs and a fresh signal.
484        if self.ws_user.is_active() || self.ws_user.is_reconnecting() {
485            log::info!("Tearing down stale user WS before reconnect");
486            self.ws_user.disconnect().await;
487            // Abort any prior consumer task; the rebuilt ws_user gets a fresh
488            // out_rx so the previous task is otherwise leaked.
489            if let Some(handle) = self.ws_stream_handle.take() {
490                handle.abort();
491            }
492            let credential = CoinbaseCredential::resolve(
493                self.config.api_key.as_deref(),
494                self.config.api_secret.as_deref(),
495            )
496            .ok_or_else(|| anyhow::anyhow!("Coinbase credentials unavailable for WS reset"))?;
497            self.ws_user = CoinbaseWebSocketClient::with_credential(
498                &self.config.ws_url(),
499                credential,
500                self.config.transport_backend,
501                self.config.proxy_url.clone(),
502            );
503        }
504
505        if self.core.instruments_initialized() {
506            // Instruments were loaded externally; still propagate the cached
507            // set to the WS client on reconnect scenarios.
508            let cached: Vec<InstrumentAny> = self.instruments_cache.values().cloned().collect();
509            if !cached.is_empty() {
510                self.ws_user.initialize_instruments(cached).await;
511            }
512        } else {
513            // The Cash (spot) factory loads only spot products; the Margin
514            // (derivatives) factory loads the futures universe so CFM perps
515            // and dated futures can be reconciled. Mixing the two through a
516            // single client is intentionally unsupported, so each factory
517            // picks one branch.
518            let instruments = if self.is_margin() {
519                self.http_client
520                    .request_instruments(Some(CoinbaseProductType::Future))
521                    .await
522                    .context("failed to load Coinbase futures instruments")?
523            } else {
524                self.http_client
525                    .request_instruments(Some(CoinbaseProductType::Spot))
526                    .await
527                    .context("failed to load Coinbase instruments")?
528            };
529
530            let product_kind = if self.is_margin() { "futures" } else { "spot" };
531
532            if instruments.is_empty() {
533                log::warn!("Coinbase instrument bootstrap returned no {product_kind} instruments");
534            } else {
535                log::info!(
536                    "Coinbase exec client loaded {} {product_kind} instruments",
537                    instruments.len()
538                );
539            }
540
541            let mut map: AHashMap<String, InstrumentAny> =
542                AHashMap::with_capacity(instruments.len());
543            for inst in &instruments {
544                map.insert(inst.id().symbol.as_str().to_string(), inst.clone());
545            }
546            self.instruments_cache = Arc::new(map);
547
548            // Propagate to the WS client so the feed handler can resolve
549            // user-channel product IDs to cached instruments.
550            self.ws_user.initialize_instruments(instruments).await;
551
552            self.core.set_instruments_initialized();
553        }
554
555        self.ws_user.set_account_id(self.core.account_id).await;
556        self.ws_user.connect().await?;
557
558        // Subscribe to the user channel (product-agnostic). User channel with
559        // an empty product list returns events for all products.
560        self.ws_user
561            .subscribe(CoinbaseWsChannel::User, &[])
562            .await
563            .context("failed to subscribe to Coinbase user channel")?;
564
565        if self.is_margin() {
566            self.ws_user
567                .subscribe(CoinbaseWsChannel::FuturesBalanceSummary, &[])
568                .await
569                .context("failed to subscribe to Coinbase futures_balance_summary channel")?;
570        }
571
572        if let Some(mut rx) = self.ws_user.take_out_rx() {
573            let fill_dedup = Arc::clone(&self.fill_dedup);
574            let cumulative_state = Arc::clone(&self.cumulative_state);
575            let order_contexts = Arc::clone(&self.order_contexts);
576            let external_order_contexts = Arc::clone(&self.external_order_contexts);
577            let emitter = self.emitter.clone();
578            let http_client = self.http_client.clone();
579            let account_id = self.core.account_id;
580            let clock = self.clock;
581            let is_margin = self.is_margin();
582
583            let handle = get_runtime().spawn(async move {
584                while let Some(message) = rx.recv().await {
585                    match message {
586                        NautilusWsMessage::UserOrder(carrier) => {
587                            handle_user_order_update(
588                                *carrier,
589                                &emitter,
590                                &fill_dedup,
591                                &cumulative_state,
592                                &order_contexts,
593                                &external_order_contexts,
594                                &http_client,
595                                account_id,
596                            )
597                            .await;
598                        }
599                        NautilusWsMessage::FuturesBalanceSummary(summary) => {
600                            let ts = clock.get_time_ns();
601                            match parse_ws_cfm_account_state(&summary, account_id, ts, ts) {
602                                Ok(state) => emitter.send_account_state(state),
603                                Err(e) => log::warn!(
604                                    "Failed to parse futures_balance_summary into AccountState: {e}"
605                                ),
606                            }
607                        }
608                        NautilusWsMessage::Reconnected => {
609                            log::info!("Coinbase user WebSocket reconnected");
610                            // Re-fetch account state so any balance change
611                            // during the disconnect window is picked up. The
612                            // margin flavor targets the CFM summary so the
613                            // account type matches the registered Margin
614                            // account.
615                            let refresh = if is_margin {
616                                http_client.request_cfm_account_state(account_id).await
617                            } else {
618                                http_client.request_account_state(account_id).await
619                            };
620
621                            match refresh {
622                                Ok(state) => emitter.send_account_state(state),
623                                Err(e) => {
624                                    log::warn!("Failed to refresh account state on reconnect: {e}");
625                                }
626                            }
627                        }
628                        NautilusWsMessage::Error(err) => {
629                            log::warn!("Coinbase user WebSocket error: {err}");
630                        }
631                        _ => {}
632                    }
633                }
634            });
635            self.ws_stream_handle = Some(handle);
636        }
637
638        let account_state = if self.is_margin() {
639            self.http_client
640                .request_cfm_account_state(self.core.account_id)
641                .await
642                .context("failed to request Coinbase CFM account state")?
643        } else {
644            self.http_client
645                .request_account_state(self.core.account_id)
646                .await
647                .context("failed to request Coinbase account state")?
648        };
649
650        if !account_state.balances.is_empty() {
651            log::info!(
652                "Received account state with {} balance(s)",
653                account_state.balances.len()
654            );
655        }
656        self.emitter.send_account_state(account_state);
657
658        self.await_account_registered(ACCOUNT_REGISTERED_TIMEOUT_SECS)
659            .await?;
660
661        self.core.set_connected();
662        log::info!("Connected: client_id={}", self.core.client_id);
663        Ok(())
664    }
665
666    async fn disconnect(&mut self) -> anyhow::Result<()> {
667        if self.core.is_disconnected() {
668            return Ok(());
669        }
670
671        self.abort_pending_tasks();
672        self.ws_user.disconnect().await;
673
674        if let Some(handle) = self.ws_stream_handle.take() {
675            handle.abort();
676        }
677
678        self.core.set_disconnected();
679        log::info!("Disconnected: client_id={}", self.core.client_id);
680        Ok(())
681    }
682
683    fn start(&mut self) -> anyhow::Result<()> {
684        if self.core.is_started() {
685            return Ok(());
686        }
687
688        let sender = get_exec_event_sender();
689        self.emitter.set_sender(sender);
690        self.core.set_started();
691
692        log::info!(
693            "Started: client_id={}, account_id={}, account_type={:?}, environment={:?}",
694            self.core.client_id,
695            self.core.account_id,
696            self.core.account_type,
697            self.config.environment,
698        );
699        Ok(())
700    }
701
702    fn stop(&mut self) -> anyhow::Result<()> {
703        if self.core.is_stopped() {
704            return Ok(());
705        }
706
707        self.core.set_stopped();
708        self.core.set_disconnected();
709
710        if let Some(handle) = self.ws_stream_handle.take() {
711            handle.abort();
712        }
713        self.abort_pending_tasks();
714        log::info!("Stopped: client_id={}", self.core.client_id);
715        Ok(())
716    }
717
718    fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
719        let http_client = self.http_client.clone();
720        let account_id = self.core.account_id;
721        let emitter = self.emitter.clone();
722        let is_margin = self.is_margin();
723
724        self.spawn_task("query_account", async move {
725            let account_state = if is_margin {
726                http_client
727                    .request_cfm_account_state(account_id)
728                    .await
729                    .context("failed to request Coinbase CFM account state")?
730            } else {
731                http_client
732                    .request_account_state(account_id)
733                    .await
734                    .context("failed to request Coinbase account state")?
735            };
736            emitter.send_account_state(account_state);
737            Ok(())
738        });
739        Ok(())
740    }
741
742    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
743        let http_client = self.http_client.clone();
744        let account_id = self.core.account_id;
745        let emitter = self.emitter.clone();
746        let client_order_id = Some(cmd.client_order_id);
747        let venue_order_id = cmd.venue_order_id;
748
749        self.spawn_task("query_order", async move {
750            match http_client
751                .request_order_status_report(account_id, client_order_id, venue_order_id)
752                .await
753            {
754                Ok(report) => emitter.send_order_status_report(report),
755                Err(e) => log::warn!("Failed to query order: {e}"),
756            }
757            Ok(())
758        });
759
760        Ok(())
761    }
762
763    fn generate_account_state(
764        &self,
765        balances: Vec<AccountBalance>,
766        margins: Vec<MarginBalance>,
767        reported: bool,
768        ts_event: UnixNanos,
769    ) -> anyhow::Result<()> {
770        self.emitter
771            .emit_account_state(balances, margins, reported, ts_event);
772        Ok(())
773    }
774
775    async fn generate_order_status_report(
776        &self,
777        cmd: &GenerateOrderStatusReport,
778    ) -> anyhow::Result<Option<OrderStatusReport>> {
779        let report = self
780            .http_client
781            .request_order_status_report(
782                self.core.account_id,
783                cmd.client_order_id,
784                cmd.venue_order_id,
785            )
786            .await
787            .ok();
788
789        // Filter reports to instruments this client bootstrapped. A Cash
790        // client drops derivatives reports (and vice-versa) so mixed activity
791        // on the same venue account does not poison the engine state
792        // associated with either exec client.
793        Ok(report.filter(|r| self.is_instrument_cached(&r.instrument_id)))
794    }
795
796    async fn generate_order_status_reports(
797        &self,
798        cmd: &GenerateOrderStatusReports,
799    ) -> anyhow::Result<Vec<OrderStatusReport>> {
800        let start = cmd.start.map(unix_nanos_to_utc).transpose()?;
801        let end = cmd.end.map(unix_nanos_to_utc).transpose()?;
802
803        let mut reports = self
804            .http_client
805            .request_order_status_reports(
806                self.core.account_id,
807                cmd.instrument_id,
808                cmd.open_only,
809                start,
810                end,
811                None,
812            )
813            .await?;
814
815        let before = reports.len();
816        reports.retain(|r| self.is_instrument_cached(&r.instrument_id));
817        if reports.len() != before {
818            let scope = if self.is_margin() {
819                "non-futures"
820            } else {
821                "non-spot"
822            };
823            log::debug!("Filtered {} {scope} order reports", before - reports.len());
824        }
825        Ok(reports)
826    }
827
828    async fn generate_fill_reports(
829        &self,
830        cmd: GenerateFillReports,
831    ) -> anyhow::Result<Vec<FillReport>> {
832        let start = cmd.start.map(unix_nanos_to_utc).transpose()?;
833        let end = cmd.end.map(unix_nanos_to_utc).transpose()?;
834
835        let mut reports = self
836            .http_client
837            .request_fill_reports(
838                self.core.account_id,
839                cmd.instrument_id,
840                cmd.venue_order_id,
841                start,
842                end,
843                None,
844            )
845            .await?;
846
847        let before = reports.len();
848        reports.retain(|r| self.is_instrument_cached(&r.instrument_id));
849        if reports.len() != before {
850            let scope = if self.is_margin() {
851                "non-futures"
852            } else {
853                "non-spot"
854            };
855            log::debug!("Filtered {} {scope} fill reports", before - reports.len());
856        }
857        Ok(reports)
858    }
859
860    async fn generate_position_status_reports(
861        &self,
862        cmd: &GeneratePositionStatusReports,
863    ) -> anyhow::Result<Vec<PositionStatusReport>> {
864        // Coinbase spot has no positions.
865        if !self.is_margin() {
866            return Ok(Vec::new());
867        }
868
869        // Errors propagate (matching `generate_order_status_reports` /
870        // `generate_fill_reports`) so `generate_mass_status` and the live
871        // manager's reconciliation path see venue failures rather than
872        // receive a silently-empty report set.
873        if let Some(instrument_id) = cmd.instrument_id {
874            let report = self
875                .http_client
876                .request_position_status_report(self.core.account_id, instrument_id)
877                .await
878                .with_context(|| format!("failed to request CFM position for {instrument_id}"))?;
879            Ok(report.map(|r| vec![r]).unwrap_or_default())
880        } else {
881            self.http_client
882                .request_position_status_reports(self.core.account_id)
883                .await
884                .context("failed to request CFM positions")
885        }
886    }
887
888    async fn generate_mass_status(
889        &self,
890        lookback_mins: Option<u64>,
891    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
892        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
893
894        let ts_now = self.clock.get_time_ns();
895        let start = lookback_mins.map(|mins| {
896            let lookback_ns = mins * 60 * 1_000_000_000;
897            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
898        });
899
900        let order_cmd = GenerateOrderStatusReportsBuilder::default()
901            .ts_init(ts_now)
902            .open_only(false)
903            .start(start)
904            .build()
905            .map_err(|e| anyhow::anyhow!("{e}"))?;
906        let fill_cmd = GenerateFillReportsBuilder::default()
907            .ts_init(ts_now)
908            .start(start)
909            .build()
910            .map_err(|e| anyhow::anyhow!("{e}"))?;
911        let position_cmd = GeneratePositionStatusReportsBuilder::default()
912            .ts_init(ts_now)
913            .build()
914            .map_err(|e| anyhow::anyhow!("{e}"))?;
915
916        let (order_reports, fill_reports, position_reports) = tokio::try_join!(
917            self.generate_order_status_reports(&order_cmd),
918            self.generate_fill_reports(fill_cmd),
919            self.generate_position_status_reports(&position_cmd),
920        )?;
921
922        log::info!("Received {} OrderStatusReports", order_reports.len());
923        log::info!("Received {} FillReports", fill_reports.len());
924        log::info!("Received {} PositionReports", position_reports.len());
925
926        let mut mass_status = ExecutionMassStatus::new(
927            self.core.client_id,
928            self.core.account_id,
929            *COINBASE_VENUE,
930            ts_now,
931            None,
932        );
933
934        mass_status.add_order_reports(order_reports);
935        mass_status.add_fill_reports(fill_reports);
936        mass_status.add_position_reports(position_reports);
937
938        Ok(Some(mass_status))
939    }
940
941    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
942        let order = {
943            let cache = self.core.cache();
944            let order = cache
945                .order(&cmd.client_order_id)
946                .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
947
948            if order.is_closed() {
949                log::warn!("Cannot submit closed order {}", order.client_order_id());
950                return Ok(());
951            }
952
953            order.clone()
954        };
955
956        // The connect-time bootstrap caches only the product family this
957        // client was configured for (Cash -> spot, Margin -> futures). An
958        // instrument outside that family is either not loaded yet or lives on
959        // the other venue scope, so deny instead of forwarding to the venue
960        // where the account type cannot reconcile the order's state.
961        let instrument_id = order.instrument_id();
962        let symbol_key = instrument_id.symbol.as_str();
963        if !self.instruments_cache.contains_key(symbol_key) {
964            let scope = if self.is_margin() {
965                "a Coinbase futures / perpetual product"
966            } else {
967                "a Coinbase spot product"
968            };
969            self.emitter.emit_order_denied(
970                &order,
971                &format!(
972                    "Instrument {} is not {scope} in this client's bootstrap cache",
973                    order.instrument_id()
974                ),
975            );
976            return Ok(());
977        }
978
979        // The user channel does not need a product-wide alias registration:
980        // `order_contexts` (keyed by `client_order_id`) records the
981        // submitted `product_id` and `handle_user_order_update` rewrites the
982        // report's instrument id from there. A product-wide map would
983        // misroute external or canonical-side orders that share the same
984        // wire `product_id`.
985
986        log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
987        self.emitter.emit_order_submitted(&order);
988
989        let http_client = self.http_client.clone();
990        let emitter = self.emitter.clone();
991        let order_contexts = Arc::clone(&self.order_contexts);
992        let clock = self.clock;
993        let strategy_id = order.strategy_id();
994        let client_order_id = order.client_order_id();
995        let side = order.order_side();
996        let order_type = order.order_type();
997        let quantity = order.quantity();
998        let time_in_force = order.time_in_force();
999        let price = order.price();
1000        let trigger_price = order.trigger_price();
1001        let trigger_type = order.trigger_type();
1002        let expire_time = order.expire_time();
1003        let post_only = order.is_post_only();
1004        let is_quote_quantity = order.is_quote_quantity();
1005        let reduce_only = order.is_reduce_only();
1006
1007        // Cache limit/trigger metadata under `client_order_id` synchronously
1008        // before the spawn so user-channel updates that race the REST submit
1009        // response can still patch their reports. Coinbase's user channel does
1010        // not echo `price`, `stop_price`, `trigger_type`, or whether the order
1011        // is `post_only`, so without this the engine reconciler would clear
1012        // the local price and synthesized fills would lack `LiquiditySide`.
1013        {
1014            let mut map = self.order_contexts.lock().expect(MUTEX_POISONED);
1015            map.insert(
1016                client_order_id.to_string(),
1017                OrderContext {
1018                    price,
1019                    trigger_price,
1020                    trigger_type,
1021                    post_only,
1022                    submitted_product_id: Some(instrument_id.symbol.inner()),
1023                },
1024            );
1025        }
1026        let (leverage, margin_type) = if self.core.account_type == AccountType::Margin {
1027            (
1028                self.config.default_leverage,
1029                self.config.default_margin_type,
1030            )
1031        } else {
1032            (None, None)
1033        };
1034        let retail_portfolio_id = self.config.retail_portfolio_id.clone();
1035
1036        self.spawn_task("submit_order", async move {
1037            let result = http_client
1038                .submit_order(
1039                    client_order_id,
1040                    instrument_id,
1041                    side,
1042                    order_type,
1043                    quantity,
1044                    time_in_force,
1045                    price,
1046                    trigger_price,
1047                    expire_time,
1048                    post_only,
1049                    is_quote_quantity,
1050                    leverage,
1051                    margin_type,
1052                    reduce_only,
1053                    retail_portfolio_id,
1054                )
1055                .await;
1056
1057            match result {
1058                Ok(response) => {
1059                    if response.success {
1060                        let venue_id = response
1061                            .success_response
1062                            .as_ref()
1063                            .map(|s| s.order_id.clone())
1064                            .unwrap_or(response.order_id);
1065
1066                        if venue_id.is_empty() {
1067                            log::warn!(
1068                                "Submit succeeded but no order_id returned for {client_order_id}"
1069                            );
1070                        } else {
1071                            let venue_order_id = VenueOrderId::new(&venue_id);
1072                            let ts_event = clock.get_time_ns();
1073                            emitter.emit_order_accepted(&order, venue_order_id, ts_event);
1074                        }
1075                    } else {
1076                        let reason = response.error_response.as_ref().map_or_else(
1077                            || response.failure_reason.clone(),
1078                            |e| format!("{}: {}", e.error, e.message),
1079                        );
1080                        // `INVALID_LIMIT_PRICE_POST_ONLY` is Coinbase's reject
1081                        // code when a `post_only` order would have crossed
1082                        // the spread by the time it reached the matching
1083                        // engine. Mark the rejection so strategies can react
1084                        // (typically: re-quote at the new TOB).
1085                        let due_post_only = reason.contains("INVALID_LIMIT_PRICE_POST_ONLY")
1086                            || response.error_response.as_ref().is_some_and(|e| {
1087                                e.preview_failure_reason == "PREVIEW_INVALID_LIMIT_PRICE_POSTONLY"
1088                                    || e.new_order_failure_reason == "INVALID_LIMIT_PRICE_POST_ONLY"
1089                            });
1090                        // Order never made it to the venue: drop the cached
1091                        // metadata so the map does not grow unbounded with
1092                        // dead entries.
1093                        order_contexts
1094                            .lock()
1095                            .expect(MUTEX_POISONED)
1096                            .remove(client_order_id.as_str());
1097                        let ts_event = clock.get_time_ns();
1098                        emitter.emit_order_rejected_event(
1099                            strategy_id,
1100                            instrument_id,
1101                            client_order_id,
1102                            &format!("submit-order-rejected: {reason}"),
1103                            ts_event,
1104                            due_post_only,
1105                        );
1106                    }
1107                }
1108                Err(e) => {
1109                    order_contexts
1110                        .lock()
1111                        .expect(MUTEX_POISONED)
1112                        .remove(client_order_id.as_str());
1113                    let ts_event = clock.get_time_ns();
1114                    emitter.emit_order_rejected_event(
1115                        strategy_id,
1116                        instrument_id,
1117                        client_order_id,
1118                        &format!("submit-order-error: {e}"),
1119                        ts_event,
1120                        false,
1121                    );
1122                    anyhow::bail!("submit order failed: {e}");
1123                }
1124            }
1125            Ok(())
1126        });
1127
1128        Ok(())
1129    }
1130
1131    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1132        let ts_event = self.clock.get_time_ns();
1133
1134        let Some(venue_order_id) = cmd.venue_order_id else {
1135            self.emitter.emit_order_modify_rejected_event(
1136                cmd.strategy_id,
1137                cmd.instrument_id,
1138                cmd.client_order_id,
1139                None,
1140                "modify-order requires venue_order_id",
1141                ts_event,
1142            );
1143            return Ok(());
1144        };
1145
1146        if cmd.price.is_none() && cmd.quantity.is_none() && cmd.trigger_price.is_none() {
1147            self.emitter.emit_order_modify_rejected_event(
1148                cmd.strategy_id,
1149                cmd.instrument_id,
1150                cmd.client_order_id,
1151                Some(venue_order_id),
1152                "modify-order requires price, quantity, or trigger_price",
1153                ts_event,
1154            );
1155            return Ok(());
1156        }
1157
1158        // Coinbase's `/orders/edit` requires both `price` and `size` to be
1159        // present in the request even when only one is changing; omitting
1160        // `size` is interpreted as 0 and rejected with `INVALID_EDITED_SIZE` /
1161        // `CANNOT_EDIT_TO_BELOW_FILLED_SIZE`. Auto-fill missing fields from
1162        // the cached order so strategies can call `modify_order(price=...)`
1163        // without having to look up the current quantity themselves.
1164        let (auto_price, auto_quantity) = {
1165            let cache = self.core.cache();
1166            let order = cache.order(&cmd.client_order_id);
1167            (
1168                cmd.price.or_else(|| order.and_then(|o| o.price())),
1169                cmd.quantity.or_else(|| order.map(|o| o.quantity())),
1170            )
1171        };
1172
1173        let http_client = self.http_client.clone();
1174        let emitter = self.emitter.clone();
1175        let order_contexts = Arc::clone(&self.order_contexts);
1176        let clock = self.clock;
1177        let strategy_id = cmd.strategy_id;
1178        let instrument_id = cmd.instrument_id;
1179        let client_order_id = cmd.client_order_id;
1180        let price = auto_price;
1181        let quantity = auto_quantity;
1182        let trigger_price = cmd.trigger_price;
1183
1184        self.spawn_task("modify_order", async move {
1185            let result = http_client
1186                .modify_order(venue_order_id, price, quantity, trigger_price)
1187                .await;
1188
1189            match result {
1190                Ok(resp) => {
1191                    if resp.success {
1192                        // Refresh the submit-time metadata cache so subsequent
1193                        // user-channel updates patch with the new price /
1194                        // trigger_price (Coinbase user channel does not echo
1195                        // these fields, so a stale cache would let the
1196                        // reconciler revert the local order to the pre-edit
1197                        // values).
1198                        let mut map = order_contexts.lock().expect(MUTEX_POISONED);
1199                        if let Some(meta) = map.get_mut(client_order_id.as_str()) {
1200                            if price.is_some() {
1201                                meta.price = price;
1202                            }
1203
1204                            if trigger_price.is_some() {
1205                                meta.trigger_price = trigger_price;
1206                            }
1207                        }
1208                    } else {
1209                        let reason = resp
1210                            .errors
1211                            .iter()
1212                            .map(|e| {
1213                                if e.edit_failure_reason.is_empty() {
1214                                    e.preview_failure_reason.clone()
1215                                } else {
1216                                    e.edit_failure_reason.clone()
1217                                }
1218                            })
1219                            .collect::<Vec<_>>()
1220                            .join(",");
1221                        let ts_event = clock.get_time_ns();
1222                        emitter.emit_order_modify_rejected_event(
1223                            strategy_id,
1224                            instrument_id,
1225                            client_order_id,
1226                            Some(venue_order_id),
1227                            &format!("modify-order-rejected: {reason}"),
1228                            ts_event,
1229                        );
1230                    }
1231                }
1232                Err(e) => {
1233                    let ts_event = clock.get_time_ns();
1234                    emitter.emit_order_modify_rejected_event(
1235                        strategy_id,
1236                        instrument_id,
1237                        client_order_id,
1238                        Some(venue_order_id),
1239                        &format!("modify-order-error: {e}"),
1240                        ts_event,
1241                    );
1242                    anyhow::bail!("modify order failed: {e}");
1243                }
1244            }
1245
1246            Ok(())
1247        });
1248
1249        Ok(())
1250    }
1251
1252    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1253        let ts_event = self.clock.get_time_ns();
1254
1255        let Some(venue_order_id) = cmd.venue_order_id else {
1256            self.emitter.emit_order_cancel_rejected_event(
1257                cmd.strategy_id,
1258                cmd.instrument_id,
1259                cmd.client_order_id,
1260                None,
1261                "cancel-order requires venue_order_id",
1262                ts_event,
1263            );
1264            return Ok(());
1265        };
1266
1267        let http_client = self.http_client.clone();
1268        let emitter = self.emitter.clone();
1269        let clock = self.clock;
1270        let strategy_id = cmd.strategy_id;
1271        let instrument_id = cmd.instrument_id;
1272        let client_order_id = cmd.client_order_id;
1273
1274        self.spawn_task("cancel_order", async move {
1275            match http_client.cancel_orders(&[venue_order_id]).await {
1276                Ok(resp) => {
1277                    if let Some(result) = resp.results.first()
1278                        && !result.success
1279                    {
1280                        let ts_event = clock.get_time_ns();
1281                        emitter.emit_order_cancel_rejected_event(
1282                            strategy_id,
1283                            instrument_id,
1284                            client_order_id,
1285                            Some(venue_order_id),
1286                            &format!("cancel-order-rejected: {}", result.failure_reason),
1287                            ts_event,
1288                        );
1289                    }
1290                }
1291                Err(e) => {
1292                    let ts_event = clock.get_time_ns();
1293                    emitter.emit_order_cancel_rejected_event(
1294                        strategy_id,
1295                        instrument_id,
1296                        client_order_id,
1297                        Some(venue_order_id),
1298                        &format!("cancel-order-error: {e}"),
1299                        ts_event,
1300                    );
1301                    anyhow::bail!("cancel order failed: {e}");
1302                }
1303            }
1304            Ok(())
1305        });
1306
1307        Ok(())
1308    }
1309
1310    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1311        let http_client = self.http_client.clone();
1312        let account_id = self.core.account_id;
1313        let instrument_id = cmd.instrument_id;
1314        let side_filter = cmd.order_side;
1315        let emitter = self.emitter.clone();
1316        let clock = self.clock;
1317        let strategy_id = cmd.strategy_id;
1318
1319        self.spawn_task("cancel_all_orders", async move {
1320            // Coinbase's `order_status=OPEN` filter excludes PENDING / QUEUED
1321            // orders that were submitted very recently and are still cancelable.
1322            // Fetch all reports and filter to any open status locally so a cancel-
1323            // all issued right after submission does not leave working orders behind.
1324            let reports = http_client
1325                .request_order_status_reports(
1326                    account_id,
1327                    Some(instrument_id),
1328                    false,
1329                    None,
1330                    None,
1331                    None,
1332                )
1333                .await
1334                .context("failed to list orders for cancel_all")?;
1335
1336            // Filter to statuses that are safe to cancel and to the requested
1337            // side since Coinbase's batch-cancel endpoint has no side parameter.
1338            //
1339            // Coinbase's `PENDING` / `QUEUED` / `OPEN` all map to `Accepted`
1340            // and are cancelable. We can't use `OrderStatus::is_open()` because
1341            // it includes `PendingCancel`, and re-cancelling a `CANCEL_QUEUED`
1342            // order risks `CancelRejected` flipping the order back to its prior
1343            // working status.
1344            let filtered: Vec<(Option<ClientOrderId>, VenueOrderId)> = reports
1345                .into_iter()
1346                .filter(|r| {
1347                    matches!(
1348                        r.order_status,
1349                        OrderStatus::Accepted
1350                            | OrderStatus::Triggered
1351                            | OrderStatus::PendingUpdate
1352                            | OrderStatus::PartiallyFilled
1353                    )
1354                })
1355                .filter(|r| side_filter == OrderSide::NoOrderSide || r.order_side == side_filter)
1356                .map(|r| (r.client_order_id, r.venue_order_id))
1357                .collect();
1358
1359            if filtered.is_empty() {
1360                return Ok(());
1361            }
1362
1363            for chunk in filtered.chunks(BATCH_CANCEL_CHUNK) {
1364                let venue_ids: Vec<VenueOrderId> = chunk.iter().map(|(_, v)| *v).collect();
1365                match http_client.cancel_orders(&venue_ids).await {
1366                    Ok(resp) => {
1367                        for result in &resp.results {
1368                            if result.success {
1369                                continue;
1370                            }
1371                            let matching = chunk
1372                                .iter()
1373                                .find(|(_, vid)| vid.as_str() == result.order_id);
1374                            if let Some((cid_opt, vid)) = matching
1375                                && let Some(cid) = cid_opt
1376                            {
1377                                let ts_event = clock.get_time_ns();
1378                                emitter.emit_order_cancel_rejected_event(
1379                                    strategy_id,
1380                                    instrument_id,
1381                                    *cid,
1382                                    Some(*vid),
1383                                    &format!("cancel-all-rejected: {}", result.failure_reason),
1384                                    ts_event,
1385                                );
1386                            }
1387                        }
1388                    }
1389                    Err(e) => {
1390                        log::error!("Failed to cancel chunk for {instrument_id}: {e}");
1391                        let ts_event = clock.get_time_ns();
1392
1393                        for (cid_opt, vid) in chunk {
1394                            if let Some(cid) = cid_opt {
1395                                emitter.emit_order_cancel_rejected_event(
1396                                    strategy_id,
1397                                    instrument_id,
1398                                    *cid,
1399                                    Some(*vid),
1400                                    &format!("cancel-all-error: {e}"),
1401                                    ts_event,
1402                                );
1403                            }
1404                        }
1405                    }
1406                }
1407            }
1408            Ok(())
1409        });
1410
1411        Ok(())
1412    }
1413
1414    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1415        if cmd.cancels.is_empty() {
1416            return Ok(());
1417        }
1418
1419        let http_client = self.http_client.clone();
1420        let emitter = self.emitter.clone();
1421        let clock = self.clock;
1422        let strategy_id = cmd.strategy_id;
1423        let instrument_id = cmd.instrument_id;
1424
1425        // Build parallel vectors so we can report per-order failures.
1426        let entries: Vec<(ClientOrderId, Option<VenueOrderId>)> = cmd
1427            .cancels
1428            .iter()
1429            .map(|c| (c.client_order_id, c.venue_order_id))
1430            .collect();
1431
1432        self.spawn_task("batch_cancel_orders", async move {
1433            let venue_order_ids: Vec<VenueOrderId> =
1434                entries.iter().filter_map(|(_, v)| *v).collect();
1435
1436            for (cid, vid_opt) in &entries {
1437                if vid_opt.is_none() {
1438                    let ts_event = clock.get_time_ns();
1439                    emitter.emit_order_cancel_rejected_event(
1440                        strategy_id,
1441                        instrument_id,
1442                        *cid,
1443                        None,
1444                        "batch-cancel requires venue_order_id",
1445                        ts_event,
1446                    );
1447                }
1448            }
1449
1450            for chunk in venue_order_ids.chunks(BATCH_CANCEL_CHUNK) {
1451                match http_client.cancel_orders(chunk).await {
1452                    Ok(resp) => {
1453                        for result in &resp.results {
1454                            if !result.success {
1455                                let vid = VenueOrderId::new(&result.order_id);
1456                                let matching = entries
1457                                    .iter()
1458                                    .find(|(_, v)| {
1459                                        v.is_some_and(|id| id.as_str() == result.order_id)
1460                                    })
1461                                    .map(|(cid, _)| *cid);
1462                                if let Some(cid) = matching {
1463                                    let ts_event = clock.get_time_ns();
1464                                    emitter.emit_order_cancel_rejected_event(
1465                                        strategy_id,
1466                                        instrument_id,
1467                                        cid,
1468                                        Some(vid),
1469                                        &format!(
1470                                            "batch-cancel-rejected: {}",
1471                                            result.failure_reason
1472                                        ),
1473                                        ts_event,
1474                                    );
1475                                }
1476                            }
1477                        }
1478                    }
1479                    Err(e) => {
1480                        log::error!("batch_cancel chunk failed: {e}");
1481                        let ts_event = clock.get_time_ns();
1482
1483                        for vid in chunk {
1484                            let matching = entries
1485                                .iter()
1486                                .find(|(_, v)| v.is_some_and(|id| id == *vid))
1487                                .map(|(cid, _)| *cid);
1488                            if let Some(cid) = matching {
1489                                emitter.emit_order_cancel_rejected_event(
1490                                    strategy_id,
1491                                    instrument_id,
1492                                    cid,
1493                                    Some(*vid),
1494                                    &format!("batch-cancel-error: {e}"),
1495                                    ts_event,
1496                                );
1497                            }
1498                        }
1499                    }
1500                }
1501            }
1502            Ok(())
1503        });
1504
1505        Ok(())
1506    }
1507}
1508
1509// Processes a single user-channel order update: emits the status report,
1510// synthesizes a FillReport from the cumulative delta, and deduplicates
1511// replayed fills by (venue_order_id, trade_id).
1512#[allow(clippy::too_many_arguments)]
1513async fn handle_user_order_update(
1514    carrier: UserOrderUpdate,
1515    emitter: &ExecutionEventEmitter,
1516    fill_dedup: &Arc<Mutex<FillDedup>>,
1517    cumulative_state: &Arc<Mutex<CumulativeStateMap>>,
1518    order_contexts: &Arc<Mutex<AHashMap<String, OrderContext>>>,
1519    external_order_contexts: &Arc<Mutex<AHashMap<String, OrderContext>>>,
1520    http_client: &CoinbaseHttpClient,
1521    account_id: AccountId,
1522) {
1523    // Coinbase's user channel does not echo `price`, `stop_price`,
1524    // `trigger_type`, or `post_only`. Resolve an `OrderContext` (cached
1525    // from `submit_order` for orders this client placed, or fetched from
1526    // REST and cached for external orders) so the report can be patched
1527    // before reaching the engine reconciler.
1528    let context = resolve_order_context(
1529        &carrier.update,
1530        carrier.report.order_type,
1531        carrier.report.price.is_none(),
1532        order_contexts,
1533        external_order_contexts,
1534        http_client,
1535        account_id,
1536    )
1537    .await;
1538
1539    let is_terminal = carrier.update.status.is_terminal();
1540    let client_order_id = carrier.update.client_order_id.clone();
1541    let venue_order_id = carrier.update.order_id.clone();
1542
1543    process_user_order_update(
1544        carrier,
1545        context,
1546        emitter,
1547        fill_dedup,
1548        cumulative_state,
1549        Some(http_client),
1550    );
1551
1552    // Drop submit-time / enrichment metadata once the order reaches a
1553    // terminal state so long-running clients do not accumulate one entry
1554    // per order. Mirrors the cumulative-state cleanup in
1555    // `process_user_order_update`.
1556    if is_terminal {
1557        if !client_order_id.is_empty() {
1558            order_contexts
1559                .lock()
1560                .expect(MUTEX_POISONED)
1561                .remove(&client_order_id);
1562        }
1563        external_order_contexts
1564            .lock()
1565            .expect(MUTEX_POISONED)
1566            .remove(&venue_order_id);
1567    }
1568}
1569
1570// Sync portion of the user-channel update handler. Split from
1571// `handle_user_order_update` so tests can drive it without a tokio runtime;
1572// the only async dependency is REST enrichment in `resolve_order_context`.
1573fn process_user_order_update(
1574    carrier: UserOrderUpdate,
1575    context: Option<OrderContext>,
1576    emitter: &ExecutionEventEmitter,
1577    fill_dedup: &Arc<Mutex<FillDedup>>,
1578    cumulative_state: &Arc<Mutex<CumulativeStateMap>>,
1579    http_client: Option<&CoinbaseHttpClient>,
1580) {
1581    let UserOrderUpdate {
1582        mut report,
1583        update,
1584        mut instrument,
1585        is_snapshot,
1586        ts_event,
1587        ts_init,
1588    } = carrier;
1589
1590    let mut fill_liquidity_side = LiquiditySide::NoLiquiditySide;
1591    let have_order_contexts = context.is_some();
1592    let mut publish_instrument_id: Option<InstrumentId> = None;
1593
1594    if let Some(meta) = context {
1595        if report.price.is_none() && meta.price.is_some() {
1596            report.price = meta.price;
1597        }
1598
1599        if report.trigger_price.is_none() && meta.trigger_price.is_some() {
1600            report.trigger_price = meta.trigger_price;
1601        }
1602
1603        if report.trigger_type.is_none() && meta.trigger_type.is_some() {
1604            report.trigger_type = meta.trigger_type;
1605        }
1606
1607        if meta.post_only {
1608            // `post_only` orders are guaranteed `Maker`. Non-post-only
1609            // orders cannot be classified from the user channel alone so
1610            // they keep `NoLiquiditySide` until the fill is reconciled
1611            // against the REST `/orders/historical/fills` endpoint.
1612            fill_liquidity_side = LiquiditySide::Maker;
1613            // The user channel does not echo `post_only`, so propagate the
1614            // cached flag to the OSR to preserve maker-only semantics for
1615            // any downstream order reconstruction.
1616            report.post_only = true;
1617        }
1618
1619        if let Some(submitted) = meta.submitted_product_id
1620            && submitted != update.product_id
1621        {
1622            let submitted_id = InstrumentId::new(Symbol::new(submitted), *COINBASE_VENUE);
1623            report.instrument_id = submitted_id;
1624            publish_instrument_id = Some(submitted_id);
1625            // Replace the carrier's instrument with the submitted-side one
1626            // (looked up from the http client's bootstrapped cache) so the
1627            // FillReport's commission currency, price/size precision, and
1628            // any other instrument-derived field reflect the actual order's
1629            // instrument rather than the canonical wire alias.
1630            if let Some(http) = http_client
1631                && let Some(submitted_instrument) = http.instruments().get_cloned(&submitted_id)
1632            {
1633                instrument = submitted_instrument;
1634            }
1635        }
1636    }
1637
1638    let size_precision = instrument.size_precision();
1639
1640    let cumulative_qty = if update.cumulative_quantity.is_empty() {
1641        Quantity::zero(size_precision)
1642    } else {
1643        match parse_quantity(&update.cumulative_quantity, size_precision) {
1644            Ok(q) => q,
1645            Err(e) => {
1646                log::warn!(
1647                    "Failed to parse cumulative_quantity for order {}: {e}",
1648                    update.order_id
1649                );
1650                return;
1651            }
1652        }
1653    };
1654
1655    let cumulative_fees = if update.total_fees.is_empty() {
1656        Decimal::ZERO
1657    } else {
1658        match Decimal::from_str(&update.total_fees) {
1659            Ok(d) => d,
1660            Err(e) => {
1661                log::warn!(
1662                    "Failed to parse total_fees for order {}: {e}",
1663                    update.order_id
1664                );
1665                return;
1666            }
1667        }
1668    };
1669
1670    let cumulative_avg = if update.avg_price.is_empty() {
1671        Decimal::ZERO
1672    } else {
1673        match Decimal::from_str(&update.avg_price) {
1674            Ok(d) => d,
1675            Err(e) => {
1676                log::warn!(
1677                    "Failed to parse avg_price for order {}: {e}",
1678                    update.order_id
1679                );
1680                return;
1681            }
1682        }
1683    };
1684    let order_id = update.order_id.clone();
1685
1686    let is_terminal = update.status.is_terminal();
1687
1688    // Snapshot previous state under lock; update immediately to avoid races
1689    // between concurrent handler tasks for the same order.
1690    let (delta_qty, delta_fees, last_fill_price_decimal, restored_quantity) = {
1691        let mut state = cumulative_state.lock().expect(MUTEX_POISONED);
1692        let entry = state.entry_or_default(&order_id);
1693        let prev_qty = entry
1694            .filled_qty
1695            .unwrap_or_else(|| Quantity::zero(size_precision));
1696        let prev_fees = entry.total_fees;
1697        let prev_avg = entry.avg_price;
1698
1699        // Track the max-observed total quantity. The freshly-built report has
1700        // quantity = cum+leaves which is correct for working orders; on
1701        // terminal events Coinbase zeroes leaves_quantity, so we use the
1702        // stored max instead.
1703        let observed_quantity = report.quantity;
1704        let stored_quantity = match entry.quantity {
1705            Some(q) if q >= observed_quantity => q,
1706            _ => observed_quantity,
1707        };
1708        entry.quantity = Some(stored_quantity);
1709
1710        // Snapshots restate the cumulative state of pre-existing open orders.
1711        // Treat them as the new baseline (so subsequent updates compute correct
1712        // deltas) but never synthesize a fill from them.
1713        if is_snapshot {
1714            entry.filled_qty = Some(cumulative_qty);
1715            entry.total_fees = cumulative_fees;
1716            entry.avg_price = cumulative_avg;
1717
1718            if is_terminal {
1719                state.remove(&order_id);
1720            }
1721            (
1722                Quantity::zero(size_precision),
1723                Decimal::ZERO,
1724                Decimal::ZERO,
1725                stored_quantity,
1726            )
1727        } else {
1728            let delta_qty = if cumulative_qty > prev_qty {
1729                cumulative_qty - prev_qty
1730            } else {
1731                Quantity::zero(size_precision)
1732            };
1733            let delta_fees = cumulative_fees - prev_fees;
1734
1735            // Derive per-fill price from the cumulative notional delta:
1736            //   last_px = (avg_now * qty_now - avg_prev * qty_prev) / delta_qty
1737            // Falls back to the cumulative avg on the first fill (where
1738            // delta_qty equals qty_now and prev_notional is zero).
1739            let last_fill_price_decimal = if delta_qty.is_positive() {
1740                let now_notional = cumulative_avg * cumulative_qty.as_decimal();
1741                let prev_notional = prev_avg * prev_qty.as_decimal();
1742                let delta_notional = now_notional - prev_notional;
1743                let delta_qty_dec = delta_qty.as_decimal();
1744                if delta_qty_dec.is_zero() {
1745                    cumulative_avg
1746                } else {
1747                    delta_notional / delta_qty_dec
1748                }
1749            } else {
1750                Decimal::ZERO
1751            };
1752
1753            entry.filled_qty = Some(cumulative_qty);
1754            entry.total_fees = cumulative_fees;
1755            entry.avg_price = cumulative_avg;
1756
1757            if is_terminal {
1758                state.remove(&order_id);
1759            }
1760
1761            (
1762                delta_qty,
1763                delta_fees,
1764                last_fill_price_decimal,
1765                stored_quantity,
1766            )
1767        }
1768    };
1769
1770    // Restore the original order quantity on terminal events when the venue's
1771    // zeroed leaves_quantity would otherwise collapse the report to filled_qty.
1772    if is_terminal && report.quantity < restored_quantity {
1773        report.quantity = restored_quantity;
1774    }
1775
1776    // Emit the synthesized FillReport before the OrderStatusReport when there
1777    // is one. The engine's reconciler treats an OrderStatusReport with status
1778    // `Filled` / `PartiallyFilled` as authoritative for `filled_qty` and will
1779    // *infer* a synthetic fill when the local order is behind the report. If
1780    // the OrderStatusReport landed first, that inferred fill would race ours
1781    // and ours would then be rejected as an overfill.
1782    let synthesized_fill = if delta_qty.is_positive()
1783        && last_fill_price_decimal.is_sign_positive()
1784        && !last_fill_price_decimal.is_zero()
1785    {
1786        let price_precision = instrument.price_precision();
1787        match Price::from_decimal_dp(last_fill_price_decimal, price_precision) {
1788            Ok(last_px) => {
1789                // Coinbase's user channel reports cumulative state and does
1790                // not assign a per-fill trade id, so we synthesize one.
1791                // `TradeId` is a 36-char stack string; a full venue UUID
1792                // (36 chars) plus the cumulative_qty would overflow. Use the
1793                // first 8 chars of the venue UUID (already random hex) as a
1794                // stable per-order discriminator.
1795                let order_id_short = &update.order_id[..update.order_id.len().min(8)];
1796                let trade_id = TradeId::new(format!("{order_id_short}-{cumulative_qty}"));
1797                let trade_id_str = trade_id.as_str().to_string();
1798
1799                let is_new = {
1800                    let mut dedup = fill_dedup.lock().expect(MUTEX_POISONED);
1801                    dedup.insert((update.order_id.clone(), trade_id_str))
1802                };
1803
1804                if is_new {
1805                    let commission_currency = instrument.quote_currency();
1806                    match Money::from_decimal(delta_fees, commission_currency) {
1807                        Ok(commission) => Some(parse_ws_user_event_to_fill_report(
1808                            &update,
1809                            delta_qty,
1810                            last_px,
1811                            commission,
1812                            trade_id,
1813                            &instrument,
1814                            emitter.account_id(),
1815                            fill_liquidity_side,
1816                            ts_event,
1817                            ts_init,
1818                        )),
1819                        Err(e) => {
1820                            log::warn!(
1821                                "Failed to build commission Money for order {}: {e}",
1822                                update.order_id
1823                            );
1824                            None
1825                        }
1826                    }
1827                } else {
1828                    log::debug!(
1829                        "Dropping duplicate fill venue_order_id={}, trade_id={}",
1830                        update.order_id,
1831                        trade_id,
1832                    );
1833                    None
1834                }
1835            }
1836            Err(e) => {
1837                log::warn!(
1838                    "Failed to build Price from derived last_fill={last_fill_price_decimal} at precision {price_precision} for order {}: {e}",
1839                    update.order_id
1840                );
1841                None
1842            }
1843        }
1844    } else {
1845        None
1846    };
1847
1848    if let Some(mut fill_report) = synthesized_fill {
1849        if let Some(id) = publish_instrument_id {
1850            fill_report.instrument_id = id;
1851        }
1852        emitter.send_fill_report(fill_report);
1853    }
1854
1855    // OSR emission policy:
1856    // - For order types that carry a price (LIMIT / STOP_LIMIT) or trigger
1857    //   (STOP_MARKET / *_IF_TOUCHED), the report must include the relevant
1858    //   field before reaching the engine reconciler; otherwise the order
1859    //   reconstruction path panics with a missing-field error. Patching
1860    //   above pulls these from the OrderContext when one is available, but
1861    //   if enrichment was needed and unavailable (REST fetch failed for an
1862    //   external order) the report is still missing the field and is unsafe
1863    //   to emit.
1864    // - Snapshots emit only when we have submit-time metadata; the
1865    //   user-channel snapshot omits these fields entirely. With metadata,
1866    //   the report has been patched above and is safe to emit (this
1867    //   preserves reconnect-time partial-fill recovery for orders submitted
1868    //   by this process). For unknown orders, the REST mass-status path
1869    //   called from `LiveNode` startup is the canonical source.
1870    let report_safe_for_type = match report.order_type {
1871        OrderType::Limit | OrderType::LimitIfTouched => report.price.is_some(),
1872        OrderType::StopLimit => report.price.is_some() && report.trigger_price.is_some(),
1873        OrderType::StopMarket | OrderType::MarketIfTouched => report.trigger_price.is_some(),
1874        _ => true,
1875    };
1876    let should_emit = (!is_snapshot || have_order_contexts) && report_safe_for_type;
1877    if should_emit {
1878        emitter.send_order_status_report(*report);
1879    } else if !report_safe_for_type {
1880        log::warn!(
1881            "Suppressed unsafe OrderStatusReport for {} {}: missing price/trigger after enrichment",
1882            report.order_type,
1883            update.order_id,
1884        );
1885    }
1886}
1887
1888// Returns the submit-time / enriched metadata for `update`, fetching from
1889// REST and populating the enrichment cache the first time an external order
1890// is seen. `order_contexts` (keyed by `client_order_id`) covers orders this
1891// client placed; `external_order_contexts` (keyed by venue `order_id`) covers
1892// external orders whose `OrderStatusReport` would otherwise be unsafe to
1893// reconstruct (LIMIT / STOP_LIMIT with `price = None`).
1894async fn resolve_order_context(
1895    update: &WsOrderUpdate,
1896    order_type: OrderType,
1897    report_price_missing: bool,
1898    order_contexts: &Arc<Mutex<AHashMap<String, OrderContext>>>,
1899    external_order_contexts: &Arc<Mutex<AHashMap<String, OrderContext>>>,
1900    http_client: &CoinbaseHttpClient,
1901    account_id: AccountId,
1902) -> Option<OrderContext> {
1903    if !update.client_order_id.is_empty() {
1904        let map = order_contexts.lock().expect(MUTEX_POISONED);
1905        if let Some(meta) = map.get(&update.client_order_id) {
1906            return Some(meta.clone());
1907        }
1908    }
1909
1910    if let Some(meta) = external_order_contexts
1911        .lock()
1912        .expect(MUTEX_POISONED)
1913        .get(&update.order_id)
1914    {
1915        return Some(meta.clone());
1916    }
1917
1918    let needs_enrichment = report_price_missing
1919        && matches!(
1920            order_type,
1921            OrderType::Limit
1922                | OrderType::StopLimit
1923                | OrderType::LimitIfTouched
1924                | OrderType::StopMarket
1925                | OrderType::MarketIfTouched
1926        );
1927
1928    if !needs_enrichment {
1929        return None;
1930    }
1931
1932    let venue_order_id = VenueOrderId::new(update.order_id.as_str());
1933    match http_client
1934        .request_order_status_report(account_id, None, Some(venue_order_id))
1935        .await
1936    {
1937        Ok(rest_report) => {
1938            let post_only_from_rest = matches!(order_type, OrderType::Limit | OrderType::StopLimit)
1939                && rest_report.post_only;
1940            let meta = OrderContext {
1941                price: rest_report.price,
1942                trigger_price: rest_report.trigger_price,
1943                trigger_type: rest_report.trigger_type,
1944                post_only: post_only_from_rest,
1945                submitted_product_id: None,
1946            };
1947            external_order_contexts
1948                .lock()
1949                .expect(MUTEX_POISONED)
1950                .insert(update.order_id.clone(), meta.clone());
1951            Some(meta)
1952        }
1953        Err(e) => {
1954            log::warn!(
1955                "Failed to enrich external order {} via REST: {e}",
1956                update.order_id
1957            );
1958            None
1959        }
1960    }
1961}
1962
1963#[cfg(test)]
1964mod tests {
1965    use nautilus_common::messages::{ExecutionEvent, ExecutionReport};
1966    use nautilus_model::{
1967        enums::AccountType,
1968        identifiers::{Symbol, TraderId},
1969        instruments::CurrencyPair,
1970        types::Currency,
1971    };
1972    use rstest::rstest;
1973    use ustr::Ustr;
1974
1975    use super::*;
1976    use crate::{
1977        common::enums::{
1978            CoinbaseContractExpiryType, CoinbaseOrderSide as CbSide,
1979            CoinbaseOrderStatus as CbStatus, CoinbaseOrderType as CbType,
1980            CoinbaseProductType as CbProductType, CoinbaseRiskManagedBy,
1981            CoinbaseTimeInForce as CbTif, CoinbaseTriggerStatus,
1982        },
1983        websocket::messages::WsOrderUpdate,
1984    };
1985
1986    #[rstest]
1987    fn test_fill_dedup_rejects_duplicates() {
1988        let mut dedup = FillDedup::new(4);
1989        let key = ("venue-1".to_string(), "trade-1".to_string());
1990        assert!(dedup.insert(key.clone()));
1991        assert!(!dedup.insert(key));
1992    }
1993
1994    #[rstest]
1995    fn test_fill_dedup_evicts_oldest_when_full() {
1996        let mut dedup = FillDedup::new(2);
1997        assert!(dedup.insert(("v".to_string(), "t1".to_string())));
1998        assert!(dedup.insert(("v".to_string(), "t2".to_string())));
1999        // Insert a third; oldest (t1) should be evicted so re-insertion succeeds.
2000        assert!(dedup.insert(("v".to_string(), "t3".to_string())));
2001        assert!(dedup.insert(("v".to_string(), "t1".to_string())));
2002    }
2003
2004    #[rstest]
2005    fn test_cumulative_state_evicts_oldest_at_capacity() {
2006        let mut state = CumulativeStateMap::with_capacity(2);
2007        state.entry_or_default("a");
2008        state.entry_or_default("b");
2009        // Capacity reached; inserting a third evicts "a"
2010        state.entry_or_default("c");
2011        assert_eq!(state.len(), 2);
2012        assert!(state.map.contains_key("b"));
2013        assert!(state.map.contains_key("c"));
2014        assert!(!state.map.contains_key("a"));
2015    }
2016
2017    #[rstest]
2018    fn test_cumulative_state_remove_drops_entry_and_allows_reinsert() {
2019        let mut state = CumulativeStateMap::with_capacity(2);
2020        state.entry_or_default("a");
2021        state.entry_or_default("b");
2022        state.remove("a");
2023        // After remove, the next insert should fit without evicting "b"
2024        state.entry_or_default("c");
2025        assert_eq!(state.len(), 2);
2026        assert!(state.map.contains_key("b"));
2027        assert!(state.map.contains_key("c"));
2028    }
2029
2030    #[rstest]
2031    fn test_cumulative_state_remove_and_reinsert_does_not_evict_live_state() {
2032        // Codex repro: remove() must purge stale deque slots so a later
2033        // re-insert of the same key cannot have the eviction loop pop the
2034        // stale slot and remove the now-live entry.
2035        let mut state = CumulativeStateMap::with_capacity(2);
2036        state.entry_or_default("a");
2037        state.remove("a");
2038        state.entry_or_default("b");
2039        state.entry_or_default("a");
2040        // With the bug, inserting "c" pops the stale "a" slot at the front
2041        // and removes the live "a" entry from the map; the live "b" should
2042        // be evicted instead because it is now the oldest live entry.
2043        state.entry_or_default("c");
2044        assert_eq!(state.len(), 2);
2045        assert!(
2046            state.map.contains_key("a"),
2047            "re-inserted live key must survive"
2048        );
2049        assert!(state.map.contains_key("c"));
2050        assert!(!state.map.contains_key("b"));
2051    }
2052
2053    #[rstest]
2054    fn test_cumulative_state_hit_refreshes_lru_recency() {
2055        // A repeat access to an existing key must move it to the back of the
2056        // eviction queue so a hot order receiving many updates is not evicted
2057        // by churn on other orders.
2058        let mut state = CumulativeStateMap::with_capacity(2);
2059        state.entry_or_default("a");
2060        state.entry_or_default("b");
2061        // Re-access "a": without the LRU refresh this is a no-op and the
2062        // next insert evicts "a"; with the refresh it should evict "b".
2063        state.entry_or_default("a");
2064        state.entry_or_default("c");
2065        assert_eq!(state.len(), 2);
2066        assert!(
2067            state.map.contains_key("a"),
2068            "recently-accessed key must survive eviction"
2069        );
2070        assert!(state.map.contains_key("c"));
2071        assert!(!state.map.contains_key("b"));
2072    }
2073
2074    #[rstest]
2075    fn test_cumulative_state_preserves_live_entry_when_trimming_stale() {
2076        // A long-lived order at the front of the deque must survive any number
2077        // of terminal events on later orders, and the deque must stay bounded
2078        // (compacted) so memory does not grow without bound under high churn.
2079        let mut state = CumulativeStateMap::with_capacity(2);
2080        state.entry_or_default("live");
2081        // Churn far beyond 2*capacity to force the deque-compaction path.
2082        for i in 0..50 {
2083            let key = format!("t{i}");
2084            state.entry_or_default(&key);
2085            state.remove(&key);
2086        }
2087        assert!(
2088            state.map.contains_key("live"),
2089            "live entry must survive stale-trim cycles"
2090        );
2091        assert_eq!(state.len(), 1);
2092        assert!(
2093            state.order.len() <= 2 * state.capacity,
2094            "deque must remain bounded after compaction (was {})",
2095            state.order.len(),
2096        );
2097        // The live key must remain reachable through the deque so future
2098        // eviction can find and (correctly) evict it. A bug that drops live
2099        // keys from the deque would let the map grow past capacity on the
2100        // next series of inserts.
2101        assert!(
2102            state.order.iter().any(|k| k == "live"),
2103            "live key must remain in the deque, was: {:?}",
2104            state.order,
2105        );
2106        // Drive eviction past capacity to confirm the live key still
2107        // participates in LRU. With capacity=2, "live" plus two new keys
2108        // means the next insert must evict the next-oldest live key
2109        // ("live"), not silently grow the map.
2110        state.entry_or_default("a");
2111        state.entry_or_default("b");
2112        state.entry_or_default("c");
2113        assert_eq!(state.len(), state.capacity);
2114        assert!(
2115            !state.map.contains_key("live"),
2116            "live key should have been evicted in LRU order once capacity demanded it"
2117        );
2118    }
2119
2120    fn test_instrument() -> InstrumentAny {
2121        let instrument_id =
2122            InstrumentId::new(Symbol::new("BTC-USD"), Venue::new(Ustr::from("COINBASE")));
2123        InstrumentAny::CurrencyPair(CurrencyPair::new(
2124            instrument_id,
2125            Symbol::new("BTC-USD"),
2126            Currency::get_or_create_crypto("BTC"),
2127            Currency::get_or_create_crypto("USD"),
2128            2,
2129            8,
2130            Price::from("0.01"),
2131            Quantity::from("0.00000001"),
2132            None,
2133            None,
2134            None,
2135            Some(Quantity::from("0.00000001")),
2136            None,
2137            None,
2138            None,
2139            None,
2140            None,
2141            None,
2142            None,
2143            None,
2144            None,
2145            UnixNanos::default(),
2146            UnixNanos::default(),
2147        ))
2148    }
2149
2150    fn make_emitter() -> (
2151        ExecutionEventEmitter,
2152        tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2153    ) {
2154        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
2155        let mut emitter = ExecutionEventEmitter::new(
2156            get_atomic_clock_realtime(),
2157            TraderId::from("TRADER-001"),
2158            AccountId::new("COINBASE-001"),
2159            AccountType::Cash,
2160            None,
2161        );
2162        emitter.set_sender(tx);
2163        (emitter, rx)
2164    }
2165
2166    fn make_user_order_update(
2167        cumulative: &str,
2168        leaves: &str,
2169        avg_price: &str,
2170        total_fees: &str,
2171        status: CbStatus,
2172    ) -> WsOrderUpdate {
2173        WsOrderUpdate {
2174            order_id: "venue-1".to_string(),
2175            client_order_id: "client-1".to_string(),
2176            contract_expiry_type: CoinbaseContractExpiryType::Unknown,
2177            cumulative_quantity: cumulative.to_string(),
2178            leaves_quantity: leaves.to_string(),
2179            avg_price: avg_price.to_string(),
2180            total_fees: total_fees.to_string(),
2181            status,
2182            product_id: Ustr::from("BTC-USD"),
2183            product_type: CbProductType::Spot,
2184            creation_time: String::new(),
2185            order_side: CbSide::Buy,
2186            order_type: CbType::Limit,
2187            risk_managed_by: CoinbaseRiskManagedBy::Unknown,
2188            time_in_force: CbTif::GoodUntilCancelled,
2189            trigger_status: CoinbaseTriggerStatus::InvalidOrderType,
2190            cancel_reason: String::new(),
2191            reject_reason: String::new(),
2192            total_value_after_fees: String::new(),
2193        }
2194    }
2195
2196    fn make_carrier(update: WsOrderUpdate) -> UserOrderUpdate {
2197        make_carrier_with_kind(update, false)
2198    }
2199
2200    // Stub OrderContext with `price` populated so process_user_order_update's
2201    // safe-emission gate accepts a LIMIT report. Mirrors what `submit_order`
2202    // would have cached under production flow.
2203    fn make_limit_context() -> OrderContext {
2204        OrderContext {
2205            price: Some(Price::from("100.00")),
2206            ..OrderContext::default()
2207        }
2208    }
2209
2210    fn make_carrier_with_kind(update: WsOrderUpdate, is_snapshot: bool) -> UserOrderUpdate {
2211        let instrument = test_instrument();
2212        let report = crate::websocket::parse::parse_ws_user_event_to_order_status_report(
2213            &update,
2214            &instrument,
2215            AccountId::new("COINBASE-001"),
2216            UnixNanos::default(),
2217            UnixNanos::default(),
2218        )
2219        .unwrap();
2220        UserOrderUpdate {
2221            report: Box::new(report),
2222            update: Box::new(update),
2223            instrument,
2224            is_snapshot,
2225            ts_event: UnixNanos::default(),
2226            ts_init: UnixNanos::default(),
2227        }
2228    }
2229
2230    fn drain_fill_reports(
2231        rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2232    ) -> Vec<FillReport> {
2233        let mut reports = Vec::new();
2234
2235        while let Ok(event) = rx.try_recv() {
2236            if let ExecutionEvent::Report(ExecutionReport::Fill(report)) = event {
2237                reports.push(*report);
2238            }
2239        }
2240        reports
2241    }
2242
2243    // Drains both `OrderStatusReport`s and `FillReport`s from `rx` in a single
2244    // pass. Tests that need both must use this rather than calling
2245    // `drain_status_reports` and `drain_fill_reports` sequentially, since each
2246    // consumes the channel and discards non-matching events.
2247    fn drain_all_reports(
2248        rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2249    ) -> (Vec<OrderStatusReport>, Vec<FillReport>) {
2250        let mut orders = Vec::new();
2251        let mut fills = Vec::new();
2252
2253        while let Ok(event) = rx.try_recv() {
2254            match event {
2255                ExecutionEvent::Report(ExecutionReport::Order(r)) => orders.push(*r),
2256                ExecutionEvent::Report(ExecutionReport::Fill(r)) => fills.push(*r),
2257                _ => {}
2258            }
2259        }
2260        (orders, fills)
2261    }
2262
2263    fn drain_status_reports(
2264        rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
2265    ) -> Vec<OrderStatusReport> {
2266        let mut reports = Vec::new();
2267
2268        while let Ok(event) = rx.try_recv() {
2269            if let ExecutionEvent::Report(ExecutionReport::Order(report)) = event {
2270                reports.push(*report);
2271            }
2272        }
2273        reports
2274    }
2275
2276    fn make_dedup_state_pair() -> (Arc<Mutex<FillDedup>>, Arc<Mutex<CumulativeStateMap>>) {
2277        (
2278            Arc::new(Mutex::new(FillDedup::new(64))),
2279            Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2280                CUMULATIVE_STATE_CAPACITY,
2281            ))),
2282        )
2283    }
2284
2285    #[rstest]
2286    fn test_handle_user_order_update_emits_status_report_and_no_fill_when_zero_filled() {
2287        let (emitter, mut rx) = make_emitter();
2288        let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2289        let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2290            CUMULATIVE_STATE_CAPACITY,
2291        )));
2292
2293        // Open with no fills yet.
2294        let update = make_user_order_update("0", "1.0", "0", "0", CbStatus::Open);
2295        process_user_order_update(
2296            make_carrier(update),
2297            Some(make_limit_context()),
2298            &emitter,
2299            &dedup,
2300            &state,
2301            None,
2302        );
2303
2304        // Status report emitted, no fill report.
2305        let mut got_status = false;
2306        let mut got_fill = false;
2307
2308        while let Ok(event) = rx.try_recv() {
2309            match event {
2310                ExecutionEvent::Report(ExecutionReport::Order(_)) => got_status = true,
2311                ExecutionEvent::Report(ExecutionReport::Fill(_)) => got_fill = true,
2312                _ => {}
2313            }
2314        }
2315        assert!(got_status);
2316        assert!(!got_fill);
2317    }
2318
2319    #[rstest]
2320    fn test_handle_user_order_update_synthesizes_per_fill_price_from_notional_delta() {
2321        let (emitter, mut rx) = make_emitter();
2322        let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2323        let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2324            CUMULATIVE_STATE_CAPACITY,
2325        )));
2326
2327        // First partial: 0.5 @ 100, total_fees=0.05.
2328        let update_1 = make_user_order_update("0.5", "0.5", "100.00", "0.05", CbStatus::Open);
2329        process_user_order_update(make_carrier(update_1), None, &emitter, &dedup, &state, None);
2330
2331        // Second partial: cumulative 1.0 @ 110, total_fees=0.15.
2332        // delta_qty = 0.5; per_fill_px = (110*1.0 - 100*0.5) / 0.5 = 120.
2333        // delta_fees = 0.10.
2334        let update_2 = make_user_order_update("1.0", "0", "110.00", "0.15", CbStatus::Filled);
2335        process_user_order_update(make_carrier(update_2), None, &emitter, &dedup, &state, None);
2336
2337        let fills = drain_fill_reports(&mut rx);
2338        assert_eq!(fills.len(), 2);
2339
2340        // First synthesized fill mirrors the first partial.
2341        assert_eq!(fills[0].last_qty, Quantity::from("0.50000000"));
2342        assert_eq!(fills[0].last_px, Price::from("100.00"));
2343        assert_eq!(fills[0].commission.as_decimal().to_string(), "0.05");
2344
2345        // Second synthesized fill is per-fill price (120), not cumulative avg (110).
2346        assert_eq!(fills[1].last_qty, Quantity::from("0.50000000"));
2347        assert_eq!(fills[1].last_px, Price::from("120.00"));
2348        assert_eq!(fills[1].commission.as_decimal().to_string(), "0.10");
2349    }
2350
2351    #[rstest]
2352    fn test_handle_user_order_update_drops_replayed_fills() {
2353        let (emitter, mut rx) = make_emitter();
2354        let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2355        let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2356            CUMULATIVE_STATE_CAPACITY,
2357        )));
2358
2359        let update = make_user_order_update("0.5", "0.5", "100.00", "0.05", CbStatus::Open);
2360        process_user_order_update(
2361            make_carrier(update.clone()),
2362            None,
2363            &emitter,
2364            &dedup,
2365            &state,
2366            None,
2367        );
2368
2369        // Simulate a WS reconnect that wipes the cumulative state, then replays
2370        // the same cumulative=0.5 snapshot. The fill_dedup must drop the
2371        // synthesized fill because the trade_id matches the prior emission.
2372        {
2373            let mut s = state.lock().unwrap();
2374            s.clear();
2375        }
2376        process_user_order_update(make_carrier(update), None, &emitter, &dedup, &state, None);
2377
2378        let fills = drain_fill_reports(&mut rx);
2379        assert_eq!(fills.len(), 1, "replay should be deduplicated");
2380    }
2381
2382    #[rstest]
2383    fn test_handle_user_order_update_clears_state_on_terminal_status() {
2384        let (emitter, mut rx) = make_emitter();
2385        let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2386        let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2387            CUMULATIVE_STATE_CAPACITY,
2388        )));
2389
2390        let update = make_user_order_update("1.0", "0", "100.00", "0.10", CbStatus::Filled);
2391        process_user_order_update(make_carrier(update), None, &emitter, &dedup, &state, None);
2392
2393        // Drain emitted events.
2394        let _ = drain_fill_reports(&mut rx);
2395
2396        let s = state.lock().unwrap();
2397        assert!(
2398            s.get("venue-1").is_none(),
2399            "terminal status should remove cumulative state entry"
2400        );
2401    }
2402
2403    #[rstest]
2404    fn test_handle_user_order_update_skips_when_avg_price_nonpositive() {
2405        let (emitter, mut rx) = make_emitter();
2406        let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2407        let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2408            CUMULATIVE_STATE_CAPACITY,
2409        )));
2410
2411        // cumulative_quantity > 0 but avg_price = 0 (defensive: should not emit fill).
2412        let update = make_user_order_update("0.5", "0.5", "0", "0", CbStatus::Open);
2413        process_user_order_update(make_carrier(update), None, &emitter, &dedup, &state, None);
2414
2415        let fills = drain_fill_reports(&mut rx);
2416        assert!(
2417            fills.is_empty(),
2418            "non-positive avg_price should not emit a fill"
2419        );
2420    }
2421
2422    #[rstest]
2423    fn test_handle_user_order_update_snapshot_does_not_synthesize_fill() {
2424        let (emitter, mut rx) = make_emitter();
2425        let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2426        let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2427            CUMULATIVE_STATE_CAPACITY,
2428        )));
2429
2430        // Cold-start snapshot: order was already partially filled before we
2431        // subscribed. Cumulative_quantity > 0 with positive avg_price would
2432        // normally synthesize a fill, but the snapshot flag must suppress it.
2433        let update = make_user_order_update("0.5", "0.5", "100.00", "0.05", CbStatus::Open);
2434        process_user_order_update(
2435            make_carrier_with_kind(update, true),
2436            None,
2437            &emitter,
2438            &dedup,
2439            &state,
2440            None,
2441        );
2442
2443        let fills = drain_fill_reports(&mut rx);
2444        assert!(
2445            fills.is_empty(),
2446            "snapshot must not synthesize a fill from pre-existing cumulative state"
2447        );
2448
2449        // The snapshot must seed cumulative_state so that the next live update
2450        // computes a correct delta.
2451        let s = state.lock().unwrap();
2452        let entry = s.get("venue-1").expect("snapshot should seed state");
2453        assert_eq!(entry.filled_qty.unwrap(), Quantity::from("0.50000000"));
2454    }
2455
2456    #[rstest]
2457    fn test_handle_user_order_update_snapshot_then_update_synthesizes_only_delta() {
2458        let (emitter, mut rx) = make_emitter();
2459        let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2460        let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2461            CUMULATIVE_STATE_CAPACITY,
2462        )));
2463
2464        // Cold-start snapshot at cumulative=0.5.
2465        let snap = make_user_order_update("0.5", "0.5", "100.00", "0.05", CbStatus::Open);
2466        process_user_order_update(
2467            make_carrier_with_kind(snap, true),
2468            None,
2469            &emitter,
2470            &dedup,
2471            &state,
2472            None,
2473        );
2474
2475        // Subsequent live update at cumulative=1.0 should emit a single fill
2476        // for the 0.5 delta only, not the full cumulative.
2477        let live = make_user_order_update("1.0", "0", "110.00", "0.15", CbStatus::Filled);
2478        process_user_order_update(make_carrier(live), None, &emitter, &dedup, &state, None);
2479
2480        let fills = drain_fill_reports(&mut rx);
2481        assert_eq!(fills.len(), 1);
2482        assert_eq!(fills[0].last_qty, Quantity::from("0.50000000"));
2483        // Per-fill price derived from notional delta: (110*1.0 - 100*0.5) / 0.5 = 120.
2484        assert_eq!(fills[0].last_px, Price::from("120.00"));
2485        // delta_fees = 0.10.
2486        assert_eq!(fills[0].commission.as_decimal().to_string(), "0.10");
2487    }
2488
2489    #[rstest]
2490    fn test_handle_user_order_update_terminal_restores_original_quantity() {
2491        use nautilus_common::messages::{ExecutionEvent, ExecutionReport};
2492
2493        let (emitter, mut rx) = make_emitter();
2494        let dedup = Arc::new(Mutex::new(FillDedup::new(64)));
2495        let state = Arc::new(Mutex::new(CumulativeStateMap::with_capacity(
2496            CUMULATIVE_STATE_CAPACITY,
2497        )));
2498
2499        // Live partial: cumulative=0, leaves=1.0 (full size 1.0 working).
2500        let working = make_user_order_update("0", "1.0", "0", "0", CbStatus::Open);
2501        process_user_order_update(
2502            make_carrier(working),
2503            Some(make_limit_context()),
2504            &emitter,
2505            &dedup,
2506            &state,
2507            None,
2508        );
2509        // Drain the open report.
2510        while rx.try_recv().is_ok() {}
2511
2512        // Cancellation: venue zeroes leaves_quantity. cum+leaves would be 0,
2513        // but the report's quantity must stay 1.0 (the original order size).
2514        let cancelled = make_user_order_update("0", "0", "0", "0", CbStatus::Cancelled);
2515        process_user_order_update(
2516            make_carrier(cancelled),
2517            Some(make_limit_context()),
2518            &emitter,
2519            &dedup,
2520            &state,
2521            None,
2522        );
2523
2524        let mut got_terminal_report: Option<OrderStatusReport> = None;
2525
2526        while let Ok(event) = rx.try_recv() {
2527            if let ExecutionEvent::Report(ExecutionReport::Order(r)) = event {
2528                got_terminal_report = Some(*r);
2529            }
2530        }
2531        let report = got_terminal_report.expect("terminal report emitted");
2532        assert_eq!(
2533            report.quantity,
2534            Quantity::from("1.00000000"),
2535            "terminal report must restore the original order quantity"
2536        );
2537    }
2538
2539    #[rstest]
2540    fn test_process_user_order_update_suppresses_snapshot_without_context() {
2541        // Snapshot for an order we don't have context for must be suppressed
2542        // so the engine reconciler does not panic reconstructing a LIMIT
2543        // order from `report.price = None`.
2544        let (emitter, mut rx) = make_emitter();
2545        let (dedup, state) = make_dedup_state_pair();
2546
2547        let update = make_user_order_update("0", "1.0", "0", "0", CbStatus::Open);
2548        process_user_order_update(
2549            make_carrier_with_kind(update, true),
2550            None,
2551            &emitter,
2552            &dedup,
2553            &state,
2554            None,
2555        );
2556
2557        assert!(drain_status_reports(&mut rx).is_empty());
2558        assert!(drain_fill_reports(&mut rx).is_empty());
2559    }
2560
2561    #[rstest]
2562    fn test_process_user_order_update_emits_snapshot_when_context_present() {
2563        // With a known OrderContext the snapshot OSR is safe to emit and
2564        // the patched price reaches the engine.
2565        let (emitter, mut rx) = make_emitter();
2566        let (dedup, state) = make_dedup_state_pair();
2567        let context = OrderContext {
2568            price: Some(Price::from("100.00")),
2569            ..Default::default()
2570        };
2571
2572        let update = make_user_order_update("0", "1.0", "0", "0", CbStatus::Open);
2573        process_user_order_update(
2574            make_carrier_with_kind(update, true),
2575            Some(context),
2576            &emitter,
2577            &dedup,
2578            &state,
2579            None,
2580        );
2581
2582        let osrs = drain_status_reports(&mut rx);
2583        assert_eq!(osrs.len(), 1);
2584        assert_eq!(osrs[0].price, Some(Price::from("100.00")));
2585    }
2586
2587    #[rstest]
2588    fn test_process_user_order_update_patches_price_and_trigger_from_context() {
2589        // The user channel does not echo `price` / `stop_price` /
2590        // `trigger_type`. Patching from context is what stops the engine
2591        // reconciler clearing the local price.
2592        let (emitter, mut rx) = make_emitter();
2593        let (dedup, state) = make_dedup_state_pair();
2594        let context = OrderContext {
2595            price: Some(Price::from("100.50")),
2596            trigger_price: Some(Price::from("99.00")),
2597            trigger_type: Some(TriggerType::LastPrice),
2598            ..Default::default()
2599        };
2600
2601        let update = make_user_order_update("0", "1.0", "0", "0", CbStatus::Open);
2602        process_user_order_update(
2603            make_carrier(update),
2604            Some(context),
2605            &emitter,
2606            &dedup,
2607            &state,
2608            None,
2609        );
2610
2611        let osrs = drain_status_reports(&mut rx);
2612        assert_eq!(osrs[0].price, Some(Price::from("100.50")));
2613        assert_eq!(osrs[0].trigger_price, Some(Price::from("99.00")));
2614        assert_eq!(osrs[0].trigger_type, Some(TriggerType::LastPrice));
2615    }
2616
2617    #[rstest]
2618    fn test_process_user_order_update_rekeys_to_submitted_product_id() {
2619        // Wire `product_id` is `BTC-USD` (canonical) but the order was
2620        // submitted on the alias side `BTC-USDC`. Both the OSR and the
2621        // synthesized FillReport must surface the submitted id.
2622        let (emitter, mut rx) = make_emitter();
2623        let (dedup, state) = make_dedup_state_pair();
2624        let context = OrderContext {
2625            price: Some(Price::from("100.00")),
2626            submitted_product_id: Some(Ustr::from("BTC-USDC")),
2627            ..Default::default()
2628        };
2629
2630        let update = make_user_order_update("1.0", "0", "100.00", "0.05", CbStatus::Filled);
2631        process_user_order_update(
2632            make_carrier(update),
2633            Some(context),
2634            &emitter,
2635            &dedup,
2636            &state,
2637            None,
2638        );
2639
2640        let (osrs, fills) = drain_all_reports(&mut rx);
2641        assert_eq!(osrs.len(), 1);
2642        assert_eq!(
2643            osrs[0].instrument_id,
2644            InstrumentId::from("BTC-USDC.COINBASE")
2645        );
2646        assert_eq!(fills.len(), 1);
2647        assert_eq!(
2648            fills[0].instrument_id,
2649            InstrumentId::from("BTC-USDC.COINBASE")
2650        );
2651    }
2652
2653    #[rstest]
2654    #[case(true, LiquiditySide::Maker)]
2655    #[case(false, LiquiditySide::NoLiquiditySide)]
2656    fn test_process_user_order_update_stamps_liquidity_side_from_post_only(
2657        #[case] post_only: bool,
2658        #[case] expected: LiquiditySide,
2659    ) {
2660        let (emitter, mut rx) = make_emitter();
2661        let (dedup, state) = make_dedup_state_pair();
2662        let context = OrderContext {
2663            price: Some(Price::from("100.00")),
2664            post_only,
2665            ..Default::default()
2666        };
2667
2668        let update = make_user_order_update("1.0", "0", "100.00", "0.05", CbStatus::Filled);
2669        process_user_order_update(
2670            make_carrier(update),
2671            Some(context),
2672            &emitter,
2673            &dedup,
2674            &state,
2675            None,
2676        );
2677
2678        let fills = drain_fill_reports(&mut rx);
2679        assert_eq!(fills.len(), 1);
2680        assert_eq!(fills[0].liquidity_side, expected);
2681    }
2682
2683    #[rstest]
2684    fn test_process_user_order_update_propagates_post_only_to_status_report() {
2685        // Coinbase's user channel does not echo `post_only`; downstream
2686        // reconstruction would lose maker-only semantics if we did not
2687        // propagate the cached flag to the OrderStatusReport.
2688        let (emitter, mut rx) = make_emitter();
2689        let (dedup, state) = make_dedup_state_pair();
2690        let context = OrderContext {
2691            price: Some(Price::from("100.00")),
2692            post_only: true,
2693            ..Default::default()
2694        };
2695
2696        let update = make_user_order_update("0", "1.0", "0", "0", CbStatus::Open);
2697        process_user_order_update(
2698            make_carrier(update),
2699            Some(context),
2700            &emitter,
2701            &dedup,
2702            &state,
2703            None,
2704        );
2705
2706        let osrs = drain_status_reports(&mut rx);
2707        assert_eq!(osrs.len(), 1);
2708        assert!(osrs[0].post_only);
2709    }
2710
2711    #[rstest]
2712    #[case(OrderType::Limit)]
2713    #[case(OrderType::StopLimit)]
2714    fn test_process_user_order_update_suppresses_unsafe_report_when_enrichment_unavailable(
2715        #[case] order_type: OrderType,
2716    ) {
2717        // For LIMIT / STOP_LIMIT orders, missing `price` (or `trigger_price`)
2718        // would panic the engine reconciler. When enrichment is unavailable
2719        // the OSR must be suppressed rather than emitted with `None` fields.
2720        let (emitter, mut rx) = make_emitter();
2721        let (dedup, state) = make_dedup_state_pair();
2722        let mut update = make_user_order_update("0", "1.0", "0", "0", CbStatus::Open);
2723        update.order_type = match order_type {
2724            OrderType::Limit => CbType::Limit,
2725            OrderType::StopLimit => CbType::StopLimit,
2726            _ => CbType::Limit,
2727        };
2728
2729        process_user_order_update(make_carrier(update), None, &emitter, &dedup, &state, None);
2730
2731        assert!(drain_status_reports(&mut rx).is_empty());
2732    }
2733
2734    #[rstest]
2735    fn test_process_user_order_update_trade_id_fits_stack_str() {
2736        // A full Coinbase venue UUID is 36 characters; concatenating the
2737        // cumulative qty would overflow `TradeId`'s 36-char stack string,
2738        // so the synthesized id is `{order_id_prefix_8}-{cumulative_qty}`.
2739        let (emitter, mut rx) = make_emitter();
2740        let (dedup, state) = make_dedup_state_pair();
2741        let mut update = make_user_order_update("1.0", "0", "100.00", "0.05", CbStatus::Filled);
2742        update.order_id = "11d357f0-155e-4ed4-b87c-1cf966f65d10".to_string();
2743
2744        process_user_order_update(make_carrier(update), None, &emitter, &dedup, &state, None);
2745
2746        let fills = drain_fill_reports(&mut rx);
2747        assert_eq!(fills.len(), 1);
2748        let trade_id = fills[0].trade_id.as_str();
2749        assert!(
2750            trade_id.len() <= 36,
2751            "trade_id was {} chars",
2752            trade_id.len()
2753        );
2754        assert!(
2755            trade_id.starts_with("11d357f0-"),
2756            "trade_id should start with the 8-char prefix, was {trade_id}",
2757        );
2758    }
2759}