Skip to main content

nautilus_coinbase/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Feed handler for parsing Coinbase WebSocket messages into Nautilus types.
17
18use std::{fmt::Debug, sync::Arc};
19
20use ahash::AHashMap;
21use nautilus_core::{
22    AtomicMap, UnixNanos,
23    time::{AtomicTime, get_atomic_clock_realtime},
24};
25use nautilus_model::{
26    data::{Bar, BarType, OrderBookDeltas, QuoteTick, TradeTick},
27    identifiers::{AccountId, InstrumentId},
28    instruments::{Instrument, InstrumentAny},
29    reports::OrderStatusReport,
30};
31use nautilus_network::{RECONNECTED, websocket::WebSocketClient};
32use tokio_tungstenite::tungstenite::Message;
33use ustr::Ustr;
34
35use crate::{
36    common::consts::COINBASE,
37    websocket::{
38        client::COINBASE_WS_SUBSCRIPTION_KEYS,
39        messages::{CoinbaseWsMessage, CoinbaseWsSubscription, WsEventType, WsOrderUpdate},
40        parse::{
41            parse_ws_candle, parse_ws_l2_snapshot, parse_ws_l2_update, parse_ws_ticker,
42            parse_ws_trade, parse_ws_user_event_to_order_status_report,
43        },
44    },
45};
46
47fn instrument_id_from_product(product_id: &Ustr) -> InstrumentId {
48    InstrumentId::from(format!("{product_id}.{COINBASE}").as_str())
49}
50
51fn resolve_instrument_id(aliases: &AtomicMap<Ustr, Ustr>, product_id: &Ustr) -> InstrumentId {
52    let resolved = aliases.get_cloned(product_id).unwrap_or(*product_id);
53    instrument_id_from_product(&resolved)
54}
55
56/// Commands sent from [`super::client::CoinbaseWebSocketClient`] to the feed handler.
57pub enum HandlerCommand {
58    /// Provides the network-level WebSocket client.
59    SetClient(WebSocketClient),
60    /// Subscribes to a channel for the given product IDs.
61    Subscribe(CoinbaseWsSubscription),
62    /// Unsubscribes from a channel.
63    Unsubscribe(CoinbaseWsSubscription),
64    /// Disconnects the WebSocket.
65    Disconnect,
66    /// Caches instruments for precision lookups during parsing.
67    InitializeInstruments(Vec<InstrumentAny>),
68    /// Updates a single instrument in the cache.
69    UpdateInstrument(Box<InstrumentAny>),
70    /// Registers a bar type for candle parsing.
71    AddBarType { key: String, bar_type: BarType },
72    /// Removes a bar type registration.
73    RemoveBarType { key: String },
74    /// Sets the account ID used when emitting user-channel execution reports.
75    SetAccountId(AccountId),
76}
77
78impl Debug for HandlerCommand {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        match self {
81            Self::SetClient(_) => f.write_str("SetClient"),
82            Self::Subscribe(s) => write!(f, "Subscribe({:?})", s.channel),
83            Self::Unsubscribe(s) => write!(f, "Unsubscribe({:?})", s.channel),
84            Self::Disconnect => f.write_str("Disconnect"),
85            Self::InitializeInstruments(v) => write!(f, "InitializeInstruments({})", v.len()),
86            Self::UpdateInstrument(i) => write!(f, "UpdateInstrument({})", i.id()),
87            Self::AddBarType { key, .. } => write!(f, "AddBarType({key})"),
88            Self::RemoveBarType { key } => write!(f, "RemoveBarType({key})"),
89            Self::SetAccountId(id) => write!(f, "SetAccountId({id})"),
90        }
91    }
92}
93
94/// Carrier for a single user-channel order update.
95///
96/// Pairs the parsed [`OrderStatusReport`] with the resolved instrument and
97/// the raw venue payload so downstream consumers (e.g. the execution client)
98/// can diff cumulative quantity and fees against their own tracked state.
99///
100/// `is_snapshot` is true when the wrapping `WsUserEvent` was a `snapshot`
101/// type. Snapshots restate the current cumulative state of every open order
102/// and must NOT be interpreted as fresh fills, otherwise a cold start (or
103/// any state-clearing reconnect) would synthesize phantom fills covering the
104/// entire pre-existing cumulative quantity.
105#[derive(Debug, Clone)]
106pub struct UserOrderUpdate {
107    pub report: Box<OrderStatusReport>,
108    pub update: Box<WsOrderUpdate>,
109    pub instrument: InstrumentAny,
110    pub is_snapshot: bool,
111    pub ts_event: UnixNanos,
112    pub ts_init: UnixNanos,
113}
114
115/// Nautilus-typed messages produced by the feed handler.
116#[derive(Debug, Clone)]
117pub enum NautilusWsMessage {
118    /// Trade tick from market_trades channel.
119    Trade(TradeTick),
120    /// Quote tick from ticker channel.
121    Quote(QuoteTick),
122    /// Order book deltas from l2_data channel.
123    Deltas(OrderBookDeltas),
124    /// Bar from candles channel.
125    Bar(Bar),
126    /// Order status update from the user channel.
127    UserOrder(Box<UserOrderUpdate>),
128    /// Futures balance summary snapshot from the
129    /// `futures_balance_summary` channel.
130    FuturesBalanceSummary(Box<crate::websocket::messages::WsFcmBalanceSummary>),
131    /// The connection was re-established after a drop.
132    Reconnected,
133    /// An error occurred during message processing.
134    Error(String),
135}
136
137/// Processes raw WebSocket messages into Nautilus domain types.
138#[derive(Debug)]
139pub struct FeedHandler {
140    clock: &'static AtomicTime,
141    signal: Arc<std::sync::atomic::AtomicBool>,
142    client: Option<WebSocketClient>,
143    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
144    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
145    instruments: AHashMap<InstrumentId, InstrumentAny>,
146    /// Shared with [`super::client::CoinbaseWebSocketClient`]; consulted in
147    /// `resolve_instrument_id` to re-key inbound messages whose wire `product_id`
148    /// is the canonical alias of a subscribed/submitted product.
149    subscription_aliases: Arc<AtomicMap<Ustr, Ustr>>,
150    bar_types: AHashMap<String, BarType>,
151    account_id: Option<AccountId>,
152    buffer: Vec<NautilusWsMessage>,
153}
154
155impl FeedHandler {
156    /// Creates a new [`FeedHandler`] instance.
157    pub fn new(
158        signal: Arc<std::sync::atomic::AtomicBool>,
159        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
160        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
161        subscription_aliases: Arc<AtomicMap<Ustr, Ustr>>,
162    ) -> Self {
163        Self {
164            clock: get_atomic_clock_realtime(),
165            signal,
166            client: None,
167            cmd_rx,
168            raw_rx,
169            instruments: AHashMap::new(),
170            subscription_aliases,
171            bar_types: AHashMap::new(),
172            account_id: None,
173            buffer: Vec::new(),
174        }
175    }
176
177    fn resolve_instrument_id(&self, product_id: &Ustr) -> InstrumentId {
178        resolve_instrument_id(&self.subscription_aliases, product_id)
179    }
180
181    /// Sets the account ID used to stamp user-channel execution reports.
182    pub fn set_account_id(&mut self, account_id: AccountId) {
183        self.account_id = Some(account_id);
184    }
185
186    /// Polls for the next output message, processing commands and raw messages.
187    ///
188    /// Returns `None` when the handler should shut down.
189    pub async fn next(&mut self) -> Option<NautilusWsMessage> {
190        // Check signal before draining buffer so disconnect takes
191        // priority over pending buffered messages
192        if self.signal.load(std::sync::atomic::Ordering::Acquire) {
193            self.buffer.clear();
194            return None;
195        }
196
197        if let Some(msg) = self.buffer.pop() {
198            return Some(msg);
199        }
200
201        loop {
202            if self.signal.load(std::sync::atomic::Ordering::Acquire) {
203                return None;
204            }
205
206            tokio::select! {
207                Some(cmd) = self.cmd_rx.recv() => {
208                    match cmd {
209                        HandlerCommand::SetClient(client) => {
210                            self.client = Some(client);
211                        }
212                        HandlerCommand::Subscribe(sub) => {
213                            self.send_subscription(&sub).await;
214                        }
215                        HandlerCommand::Unsubscribe(sub) => {
216                            self.send_subscription(&sub).await;
217                        }
218                        HandlerCommand::Disconnect => {
219                            if let Some(client) = self.client.take() {
220                                // Transition to CLOSED immediately without waiting
221                                // for ACTIVE (avoids blocking during reconnect)
222                                client.notify_closed();
223                            }
224                            return None;
225                        }
226                        HandlerCommand::InitializeInstruments(instruments) => {
227                            for inst in instruments {
228                                self.instruments.insert(inst.id(), inst);
229                            }
230                        }
231                        HandlerCommand::UpdateInstrument(inst) => {
232                            self.instruments.insert(inst.id(), *inst);
233                        }
234                        HandlerCommand::AddBarType { key, bar_type } => {
235                            self.bar_types.insert(key, bar_type);
236                        }
237                        HandlerCommand::RemoveBarType { key } => {
238                            self.bar_types.remove(&key);
239                        }
240                        HandlerCommand::SetAccountId(account_id) => {
241                            self.account_id = Some(account_id);
242                        }
243                    }
244                }
245                Some(raw) = self.raw_rx.recv() => {
246                    match raw {
247                        Message::Text(text) => {
248                            if let Some(msg) = self.handle_text(&text) {
249                                return Some(msg);
250                            }
251                        }
252                        Message::Ping(data) => {
253                            if let Some(client) = &self.client
254                                && let Err(e) = client.send_pong(data.to_vec()).await
255                            {
256                                log::error!("Failed to send pong: {e}");
257                            }
258                        }
259                        Message::Close(_) => return None,
260                        _ => {}
261                    }
262                }
263                else => return None,
264            }
265        }
266    }
267
268    async fn send_subscription(&self, sub: &CoinbaseWsSubscription) {
269        let Some(client) = &self.client else {
270            log::warn!("Cannot send subscription, no WebSocket client set");
271            return;
272        };
273
274        match serde_json::to_string(sub) {
275            Ok(json) => {
276                if let Err(e) = client
277                    .send_text(json, Some(COINBASE_WS_SUBSCRIPTION_KEYS.as_slice()))
278                    .await
279                {
280                    log::error!("Failed to send subscription: {e}");
281                }
282            }
283            Err(e) => log::error!("Failed to serialize subscription: {e}"),
284        }
285    }
286
287    fn handle_text(&mut self, text: &str) -> Option<NautilusWsMessage> {
288        if text == RECONNECTED {
289            return Some(NautilusWsMessage::Reconnected);
290        }
291
292        let ts_init = self.clock.get_time_ns();
293
294        let msg: CoinbaseWsMessage = match serde_json::from_str(text) {
295            Ok(m) => m,
296            Err(e) => {
297                log::warn!("Failed to parse WS message: {e}");
298                return None;
299            }
300        };
301
302        match msg {
303            CoinbaseWsMessage::L2Data {
304                timestamp, events, ..
305            } => self.handle_l2_events(&events, &timestamp, ts_init),
306            CoinbaseWsMessage::MarketTrades { events, .. } => {
307                self.handle_market_trades(&events, ts_init)
308            }
309            CoinbaseWsMessage::Ticker {
310                timestamp, events, ..
311            }
312            | CoinbaseWsMessage::TickerBatch {
313                timestamp, events, ..
314            } => self.handle_ticker(&events, &timestamp, ts_init),
315            CoinbaseWsMessage::Candles { events, .. } => self.handle_candles(&events, ts_init),
316            CoinbaseWsMessage::Heartbeats { .. } => None,
317            CoinbaseWsMessage::Subscriptions { events, .. } => {
318                // Coinbase emits this after every subscribe and unsubscribe
319                // with the full current subscription set, so it's noisy at
320                // INFO and not strictly a "confirmation" of the latest action.
321                log::debug!("Subscription state: {events:?}");
322                None
323            }
324            CoinbaseWsMessage::User {
325                timestamp, events, ..
326            } => self.handle_user_events(&events, &timestamp, ts_init),
327            CoinbaseWsMessage::FuturesBalanceSummary { events, .. } => {
328                self.handle_futures_balance_summary(events)
329            }
330            CoinbaseWsMessage::Status { events, .. } => {
331                log::debug!(
332                    "Ignoring {} status events until venue status handling lands",
333                    events.len()
334                );
335                None
336            }
337        }
338    }
339
340    fn handle_l2_events(
341        &mut self,
342        events: &[crate::websocket::messages::WsL2DataEvent],
343        timestamp: &str,
344        ts_init: UnixNanos,
345    ) -> Option<NautilusWsMessage> {
346        let ts_event = match crate::http::parse::parse_rfc3339_timestamp(timestamp) {
347            Ok(ts) => ts,
348            Err(e) => {
349                log::warn!("Failed to parse L2 message timestamp {timestamp}: {e}");
350                ts_init
351            }
352        };
353
354        let mut first: Option<NautilusWsMessage> = None;
355
356        for event in events {
357            let instrument_id = self.resolve_instrument_id(&event.product_id);
358
359            let instrument = match self.instruments.get(&instrument_id) {
360                Some(inst) => inst,
361                None => {
362                    log::warn!("No instrument cached for {instrument_id}");
363                    continue;
364                }
365            };
366
367            let result = match event.event_type {
368                WsEventType::Snapshot => parse_ws_l2_snapshot(event, instrument, ts_event, ts_init),
369                WsEventType::Update => parse_ws_l2_update(event, instrument, ts_event, ts_init),
370            };
371
372            match result {
373                Ok(deltas) => {
374                    let msg = NautilusWsMessage::Deltas(deltas);
375
376                    if first.is_none() {
377                        first = Some(msg);
378                    } else {
379                        self.buffer.push(msg);
380                    }
381                }
382                Err(e) => log::warn!("Failed to parse L2 event: {e}"),
383            }
384        }
385
386        if first.is_some() {
387            self.buffer.reverse();
388        }
389        first
390    }
391
392    fn handle_market_trades(
393        &mut self,
394        events: &[crate::websocket::messages::WsMarketTradesEvent],
395        ts_init: UnixNanos,
396    ) -> Option<NautilusWsMessage> {
397        for event in events {
398            for trade in &event.trades {
399                let instrument_id = self.resolve_instrument_id(&trade.product_id);
400
401                let instrument = match self.instruments.get(&instrument_id) {
402                    Some(inst) => inst,
403                    None => {
404                        log::warn!("No instrument cached for {instrument_id}");
405                        continue;
406                    }
407                };
408
409                match parse_ws_trade(trade, instrument, ts_init) {
410                    Ok(tick) => {
411                        self.buffer_remaining_trades(events, event, trade, ts_init);
412                        // Reverse so pop() drains in exchange order
413                        self.buffer.reverse();
414                        return Some(NautilusWsMessage::Trade(tick));
415                    }
416                    Err(e) => log::warn!("Failed to parse trade: {e}"),
417                }
418            }
419        }
420        None
421    }
422
423    fn buffer_remaining_trades(
424        &mut self,
425        events: &[crate::websocket::messages::WsMarketTradesEvent],
426        current_event: &crate::websocket::messages::WsMarketTradesEvent,
427        current_trade: &crate::websocket::messages::WsTrade,
428        ts_init: UnixNanos,
429    ) {
430        let mut found_current = false;
431
432        for event in events {
433            let is_current_event = std::ptr::eq(event, current_event);
434
435            for trade in &event.trades {
436                if !found_current {
437                    if is_current_event && std::ptr::eq(trade, current_trade) {
438                        found_current = true;
439                    }
440                    continue;
441                }
442
443                let instrument_id = self.resolve_instrument_id(&trade.product_id);
444
445                if let Some(instrument) = self.instruments.get(&instrument_id)
446                    && let Ok(tick) = parse_ws_trade(trade, instrument, ts_init)
447                {
448                    self.buffer.push(NautilusWsMessage::Trade(tick));
449                }
450            }
451        }
452    }
453
454    fn handle_ticker(
455        &mut self,
456        events: &[crate::websocket::messages::WsTickerEvent],
457        timestamp: &str,
458        ts_init: UnixNanos,
459    ) -> Option<NautilusWsMessage> {
460        let ts_event = crate::http::parse::parse_rfc3339_timestamp(timestamp).unwrap_or(ts_init);
461
462        let mut first: Option<NautilusWsMessage> = None;
463
464        for event in events {
465            for ticker in &event.tickers {
466                let instrument_id = self.resolve_instrument_id(&ticker.product_id);
467
468                let instrument = match self.instruments.get(&instrument_id) {
469                    Some(inst) => inst,
470                    None => {
471                        log::warn!("No instrument cached for {instrument_id}");
472                        continue;
473                    }
474                };
475
476                match parse_ws_ticker(ticker, instrument, ts_event, ts_init) {
477                    Ok(quote) => {
478                        let msg = NautilusWsMessage::Quote(quote);
479
480                        if first.is_none() {
481                            first = Some(msg);
482                        } else {
483                            self.buffer.push(msg);
484                        }
485                    }
486                    Err(e) => log::warn!("Failed to parse ticker: {e}"),
487                }
488            }
489        }
490
491        if first.is_some() {
492            self.buffer.reverse();
493        }
494        first
495    }
496
497    fn handle_user_events(
498        &mut self,
499        events: &[crate::websocket::messages::WsUserEvent],
500        timestamp: &str,
501        ts_init: UnixNanos,
502    ) -> Option<NautilusWsMessage> {
503        let Some(account_id) = self.account_id else {
504            log::debug!(
505                "Dropping user event: account_id not set (call SetAccountId after connect)"
506            );
507            return None;
508        };
509
510        let ts_event = match crate::http::parse::parse_rfc3339_timestamp(timestamp) {
511            Ok(ts) => ts,
512            Err(e) => {
513                log::warn!("Failed to parse user message timestamp {timestamp}: {e}");
514                ts_init
515            }
516        };
517
518        let mut first: Option<NautilusWsMessage> = None;
519
520        for event in events {
521            let is_snapshot = matches!(event.event_type, WsEventType::Snapshot);
522
523            for order in &event.orders {
524                let instrument_id = self.resolve_instrument_id(&order.product_id);
525                let instrument = match self.instruments.get(&instrument_id).cloned() {
526                    Some(inst) => inst,
527                    None => {
528                        log::warn!("No instrument cached for {instrument_id}");
529                        continue;
530                    }
531                };
532
533                self.emit_user_event_messages(
534                    order,
535                    &instrument,
536                    account_id,
537                    is_snapshot,
538                    ts_event,
539                    ts_init,
540                    &mut first,
541                );
542            }
543        }
544
545        if first.is_some() {
546            self.buffer.reverse();
547        }
548        first
549    }
550
551    #[allow(clippy::too_many_arguments)]
552    fn emit_user_event_messages(
553        &mut self,
554        order: &WsOrderUpdate,
555        instrument: &InstrumentAny,
556        account_id: AccountId,
557        is_snapshot: bool,
558        ts_event: UnixNanos,
559        ts_init: UnixNanos,
560        first: &mut Option<NautilusWsMessage>,
561    ) {
562        let report = match parse_ws_user_event_to_order_status_report(
563            order, instrument, account_id, ts_event, ts_init,
564        ) {
565            Ok(r) => r,
566            Err(e) => {
567                log::warn!("Failed to parse user order update: {e}");
568                return;
569            }
570        };
571
572        let msg = NautilusWsMessage::UserOrder(Box::new(UserOrderUpdate {
573            report: Box::new(report),
574            update: Box::new(order.clone()),
575            instrument: instrument.clone(),
576            is_snapshot,
577            ts_event,
578            ts_init,
579        }));
580
581        if first.is_none() {
582            *first = Some(msg);
583        } else {
584            self.buffer.push(msg);
585        }
586    }
587
588    fn handle_futures_balance_summary(
589        &mut self,
590        events: Vec<crate::websocket::messages::WsFuturesBalanceSummaryEvent>,
591    ) -> Option<NautilusWsMessage> {
592        let mut first: Option<NautilusWsMessage> = None;
593
594        for event in events {
595            let msg = NautilusWsMessage::FuturesBalanceSummary(Box::new(event.fcm_balance_summary));
596
597            if first.is_none() {
598                first = Some(msg);
599            } else {
600                self.buffer.push(msg);
601            }
602        }
603
604        if first.is_some() {
605            self.buffer.reverse();
606        }
607        first
608    }
609
610    fn handle_candles(
611        &mut self,
612        events: &[crate::websocket::messages::WsCandlesEvent],
613        ts_init: UnixNanos,
614    ) -> Option<NautilusWsMessage> {
615        let mut first: Option<NautilusWsMessage> = None;
616
617        for event in events {
618            for candle in &event.candles {
619                let key = candle.product_id.as_str();
620
621                let bar_type = match self.bar_types.get(key) {
622                    Some(bt) => *bt,
623                    None => {
624                        log::debug!("No bar type registered for {key}");
625                        continue;
626                    }
627                };
628
629                let instrument_id = self.resolve_instrument_id(&candle.product_id);
630
631                let instrument = match self.instruments.get(&instrument_id) {
632                    Some(inst) => inst,
633                    None => {
634                        log::warn!("No instrument cached for {instrument_id}");
635                        continue;
636                    }
637                };
638
639                match parse_ws_candle(candle, bar_type, instrument, ts_init) {
640                    Ok(bar) => {
641                        let msg = NautilusWsMessage::Bar(bar);
642
643                        if first.is_none() {
644                            first = Some(msg);
645                        } else {
646                            self.buffer.push(msg);
647                        }
648                    }
649                    Err(e) => log::warn!("Failed to parse candle: {e}"),
650                }
651            }
652        }
653
654        if first.is_some() {
655            self.buffer.reverse();
656        }
657        first
658    }
659}
660
661#[cfg(test)]
662mod tests {
663    use std::sync::{Arc, atomic::AtomicBool};
664
665    use nautilus_model::{
666        identifiers::{Symbol, Venue},
667        instruments::CurrencyPair,
668        types::{Currency, Price, Quantity},
669    };
670    use rstest::rstest;
671
672    use super::*;
673    use crate::common::testing::load_test_fixture;
674
675    fn test_handler() -> FeedHandler {
676        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
677        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
678        FeedHandler::new(
679            Arc::new(AtomicBool::new(false)),
680            cmd_rx,
681            raw_rx,
682            Arc::new(AtomicMap::new()),
683        )
684    }
685
686    fn btc_usd_instrument() -> InstrumentAny {
687        let instrument_id =
688            InstrumentId::new(Symbol::new("BTC-USD"), Venue::new(Ustr::from("COINBASE")));
689        InstrumentAny::CurrencyPair(CurrencyPair::new(
690            instrument_id,
691            Symbol::new("BTC-USD"),
692            Currency::get_or_create_crypto("BTC"),
693            Currency::get_or_create_crypto("USD"),
694            2,
695            8,
696            Price::from("0.01"),
697            Quantity::from("0.00000001"),
698            None,
699            None,
700            None,
701            Some(Quantity::from("0.00000001")),
702            None,
703            None,
704            None,
705            None,
706            None,
707            None,
708            None,
709            None,
710            None,
711            UnixNanos::default(),
712            UnixNanos::default(),
713        ))
714    }
715
716    #[rstest]
717    fn test_handle_text_drops_user_channel_when_account_id_unset() {
718        let json = load_test_fixture("ws_user.json");
719        let mut handler = test_handler();
720
721        // account_id is intentionally left unset; events should be dropped
722        assert!(handler.handle_text(&json).is_none());
723        assert!(handler.buffer.is_empty());
724    }
725
726    #[rstest]
727    fn test_handle_user_event_emits_user_order_update() {
728        use nautilus_model::{
729            enums::{OrderSide, OrderStatus},
730            identifiers::AccountId,
731            types::Quantity,
732        };
733
734        use crate::common::enums::CoinbaseProductType;
735
736        let json = load_test_fixture("ws_user.json");
737        let mut handler = test_handler();
738        handler.set_account_id(AccountId::new("COINBASE-001"));
739        handler
740            .instruments
741            .insert(btc_usd_instrument().id(), btc_usd_instrument());
742
743        let msg = handler
744            .handle_text(&json)
745            .expect("handler should emit a user-channel update");
746
747        match msg {
748            NautilusWsMessage::UserOrder(carrier) => {
749                // Status report fields.
750                assert_eq!(carrier.report.account_id.as_str(), "COINBASE-001");
751                assert_eq!(carrier.report.instrument_id, btc_usd_instrument().id());
752                assert_eq!(
753                    carrier.report.venue_order_id.as_str(),
754                    "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
755                );
756                assert_eq!(
757                    carrier.report.client_order_id.unwrap().as_str(),
758                    "11111-000000-000001"
759                );
760                assert_eq!(carrier.report.order_side, OrderSide::Buy);
761                assert_eq!(carrier.report.order_status, OrderStatus::Accepted);
762                assert_eq!(carrier.report.filled_qty, Quantity::from("0.00000000"));
763                assert_eq!(carrier.report.quantity, Quantity::from("0.00100000"));
764
765                // Raw venue update fields.
766                assert_eq!(carrier.update.product_id, "BTC-USD");
767                assert_eq!(carrier.update.product_type, CoinbaseProductType::Spot);
768                assert_eq!(carrier.update.cumulative_quantity, "0");
769                assert_eq!(carrier.update.leaves_quantity, "0.001");
770
771                // Carrier metadata.
772                assert_eq!(carrier.instrument.id(), btc_usd_instrument().id());
773                assert!(carrier.ts_event.as_u64() > 0);
774            }
775            other => panic!("expected UserOrder, was {other:?}"),
776        }
777    }
778
779    #[rstest]
780    fn test_handle_text_ignores_status_channel() {
781        let json = r#"{
782          "channel": "status",
783          "client_id": "",
784          "timestamp": "2023-02-09T20:29:49.753424311Z",
785          "sequence_num": 0,
786          "events": [
787            {
788              "type": "snapshot",
789              "products": [
790                {
791                  "product_type": "SPOT",
792                  "id": "BTC-USD",
793                  "base_currency": "BTC",
794                  "quote_currency": "USD",
795                  "base_increment": "0.00000001",
796                  "quote_increment": "0.01",
797                  "display_name": "BTC/USD",
798                  "status": "online",
799                  "status_message": "",
800                  "min_market_funds": "1"
801                }
802              ]
803            }
804          ]
805        }"#;
806        let mut handler = test_handler();
807
808        assert!(handler.handle_text(json).is_none());
809        assert!(handler.buffer.is_empty());
810    }
811
812    #[rstest]
813    fn test_handle_l2_update_uses_batch_timestamp_for_all_deltas() {
814        let json = load_test_fixture("ws_l2_data_update.json");
815        let mut handler = test_handler();
816        handler
817            .instruments
818            .insert(btc_usd_instrument().id(), btc_usd_instrument());
819
820        let msg = handler
821            .handle_text(&json)
822            .expect("handler should emit deltas for a valid L2 update");
823
824        let deltas = match msg {
825            NautilusWsMessage::Deltas(d) => d,
826            other => panic!("expected Deltas, was {other:?}"),
827        };
828
829        assert!(!deltas.deltas.is_empty());
830        let expected_ts = deltas.deltas[0].ts_event;
831        for delta in &deltas.deltas {
832            assert_eq!(
833                delta.ts_event, expected_ts,
834                "all deltas in a batch must share ts_event"
835            );
836        }
837    }
838
839    #[rstest]
840    fn test_handle_l2_update_malformed_timestamp_falls_back_to_ts_init() {
841        let json = load_test_fixture("ws_l2_data_update.json")
842            .replace("2026-04-07T14:30:01.456789Z", "not-a-valid-timestamp");
843        let mut handler = test_handler();
844        handler
845            .instruments
846            .insert(btc_usd_instrument().id(), btc_usd_instrument());
847
848        let msg = handler
849            .handle_text(&json)
850            .expect("handler should still emit deltas when timestamp is malformed");
851
852        let deltas = match msg {
853            NautilusWsMessage::Deltas(d) => d,
854            other => panic!("expected Deltas, was {other:?}"),
855        };
856
857        assert!(!deltas.deltas.is_empty());
858        for delta in &deltas.deltas {
859            assert_eq!(
860                delta.ts_event, delta.ts_init,
861                "malformed timestamp must fall back to ts_init"
862            );
863        }
864    }
865
866    #[rstest]
867    fn test_handle_text_emits_futures_balance_summary_snapshot() {
868        use rust_decimal::Decimal;
869
870        let json = r#"{
871          "channel": "futures_balance_summary",
872          "client_id": "",
873          "timestamp": "2023-02-09T20:33:57.609931463Z",
874          "sequence_num": 0,
875          "events": [
876            {
877              "type": "snapshot",
878              "fcm_balance_summary": {
879                "futures_buying_power": "100.00",
880                "total_usd_balance": "200.00",
881                "cbi_usd_balance": "300.00",
882                "cfm_usd_balance": "400.00",
883                "total_open_orders_hold_amount": "500.00",
884                "unrealized_pnl": "600.00",
885                "daily_realized_pnl": "0",
886                "initial_margin": "700.00",
887                "available_margin": "800.00",
888                "liquidation_threshold": "900.00",
889                "liquidation_buffer_amount": "1000.00",
890                "liquidation_buffer_percentage": "1000",
891                "intraday_margin_window_measure": {
892                  "margin_window_type": "FCM_MARGIN_WINDOW_TYPE_INTRADAY",
893                  "margin_level": "MARGIN_LEVEL_TYPE_BASE",
894                  "initial_margin": "100.00",
895                  "maintenance_margin": "200.00",
896                  "liquidation_buffer_percentage": "1000",
897                  "total_hold": "100.00",
898                  "futures_buying_power": "400.00"
899                },
900                "overnight_margin_window_measure": {
901                  "margin_window_type": "FCM_MARGIN_WINDOW_TYPE_OVERNIGHT",
902                  "margin_level": "MARGIN_LEVEL_TYPE_BASE",
903                  "initial_margin": "300.00",
904                  "maintenance_margin": "200.00",
905                  "liquidation_buffer_percentage": "1000",
906                  "total_hold": "-30.00",
907                  "futures_buying_power": "2000.00"
908                }
909              }
910            }
911          ]
912        }"#;
913        let mut handler = test_handler();
914
915        let msg = handler
916            .handle_text(json)
917            .expect("handler should emit a futures balance summary");
918        match msg {
919            NautilusWsMessage::FuturesBalanceSummary(summary) => {
920                assert_eq!(summary.futures_buying_power, Decimal::from(100));
921                assert_eq!(summary.total_usd_balance, Decimal::from(200));
922                assert_eq!(summary.total_open_orders_hold_amount, Decimal::from(500));
923                assert_eq!(summary.available_margin, Decimal::from(800));
924                let intraday = &summary.intraday_margin_window_measure;
925                assert_eq!(intraday.initial_margin, Decimal::from(100));
926                assert_eq!(intraday.maintenance_margin, Decimal::from(200));
927                let overnight = &summary.overnight_margin_window_measure;
928                assert_eq!(overnight.initial_margin, Decimal::from(300));
929                assert_eq!(overnight.maintenance_margin, Decimal::from(200));
930                // `total_hold` carries negative values on the wire; ensure
931                // the signed decimal survives the round trip.
932                assert_eq!(overnight.total_hold, "-30".parse::<Decimal>().unwrap());
933            }
934            other => panic!("expected FuturesBalanceSummary, was {other:?}"),
935        }
936    }
937
938    #[rstest]
939    fn test_handle_text_routes_reconnected_sentinel() {
940        let mut handler = test_handler();
941        let result = handler.handle_text(RECONNECTED);
942        assert!(matches!(result, Some(NautilusWsMessage::Reconnected)));
943    }
944
945    #[rstest]
946    fn test_signal_release_acquire_exits_handler_loop() {
947        use std::sync::atomic::Ordering;
948
949        let signal = Arc::new(AtomicBool::new(false));
950        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
951        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
952        let mut handler =
953            FeedHandler::new(signal.clone(), cmd_rx, raw_rx, Arc::new(AtomicMap::new()));
954
955        signal.store(true, Ordering::Release);
956
957        let runtime = tokio::runtime::Builder::new_current_thread()
958            .enable_all()
959            .build()
960            .unwrap();
961        let result = runtime.block_on(async { handler.next().await });
962        assert!(result.is_none(), "{result:?}");
963    }
964}