Skip to main content

nautilus_coinbase/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing functions for converting Coinbase WebSocket messages to Nautilus domain types.
17
18use anyhow::Context;
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    data::{Bar, BarType, BookOrder, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
22    enums::{BookAction, LiquiditySide, OrderSide, OrderStatus, RecordFlag},
23    identifiers::{AccountId, ClientOrderId, InstrumentId, TradeId, VenueOrderId},
24    instruments::{Instrument, InstrumentAny},
25    reports::{FillReport, OrderStatusReport},
26    types::{Money, Price, Quantity},
27};
28
29use crate::{
30    http::parse::{
31        coinbase_side_to_aggressor, parse_epoch_secs_timestamp, parse_order_side,
32        parse_order_status, parse_order_type, parse_price, parse_quantity, parse_rfc3339_timestamp,
33        parse_time_in_force,
34    },
35    websocket::messages::{
36        WsBookSide, WsCandle, WsL2DataEvent, WsL2Update, WsOrderUpdate, WsTicker, WsTrade,
37    },
38};
39
40/// Parses a WebSocket trade into a [`TradeTick`].
41pub fn parse_ws_trade(
42    trade: &WsTrade,
43    instrument: &InstrumentAny,
44    ts_init: UnixNanos,
45) -> anyhow::Result<TradeTick> {
46    let price = parse_price(&trade.price, instrument.price_precision())?;
47    let size = parse_quantity(&trade.size, instrument.size_precision())?;
48    let aggressor_side = coinbase_side_to_aggressor(&trade.side);
49    let trade_id = TradeId::new(&trade.trade_id);
50    let ts_event = parse_rfc3339_timestamp(&trade.time)?;
51
52    TradeTick::new_checked(
53        instrument.id(),
54        price,
55        size,
56        aggressor_side,
57        trade_id,
58        ts_event,
59        ts_init,
60    )
61}
62
63/// Parses a WebSocket ticker into a [`QuoteTick`].
64pub fn parse_ws_ticker(
65    ticker: &WsTicker,
66    instrument: &InstrumentAny,
67    ts_event: UnixNanos,
68    ts_init: UnixNanos,
69) -> anyhow::Result<QuoteTick> {
70    let bid_price = parse_price(&ticker.best_bid, instrument.price_precision())?;
71    let ask_price = parse_price(&ticker.best_ask, instrument.price_precision())?;
72    let bid_size = parse_quantity(&ticker.best_bid_quantity, instrument.size_precision())?;
73    let ask_size = parse_quantity(&ticker.best_ask_quantity, instrument.size_precision())?;
74
75    QuoteTick::new_checked(
76        instrument.id(),
77        bid_price,
78        ask_price,
79        bid_size,
80        ask_size,
81        ts_event,
82        ts_init,
83    )
84}
85
86/// Parses a WebSocket candle into a [`Bar`].
87pub fn parse_ws_candle(
88    candle: &WsCandle,
89    bar_type: BarType,
90    instrument: &InstrumentAny,
91    ts_init: UnixNanos,
92) -> anyhow::Result<Bar> {
93    let open = parse_price(&candle.open, instrument.price_precision())?;
94    let high = parse_price(&candle.high, instrument.price_precision())?;
95    let low = parse_price(&candle.low, instrument.price_precision())?;
96    let close = parse_price(&candle.close, instrument.price_precision())?;
97    let volume = parse_quantity(&candle.volume, instrument.size_precision())?;
98    let ts_event = parse_epoch_secs_timestamp(&candle.start)?;
99
100    Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
101}
102
103/// Parses a WebSocket L2 snapshot event into [`OrderBookDeltas`].
104///
105/// All deltas in the batch share `ts_event`, which the caller derives from the
106/// message-level `timestamp`. Per-level `event_time` values are not monotonic
107/// across batches and would trigger out-of-order warnings in the managed book.
108pub fn parse_ws_l2_snapshot(
109    event: &WsL2DataEvent,
110    instrument: &InstrumentAny,
111    ts_event: UnixNanos,
112    ts_init: UnixNanos,
113) -> anyhow::Result<OrderBookDeltas> {
114    let instrument_id = instrument.id();
115
116    let total = event.updates.len();
117    let mut deltas = Vec::with_capacity(total + 1);
118
119    let mut clear = OrderBookDelta::clear(instrument_id, 0, ts_event, ts_init);
120
121    if total == 0 {
122        clear.flags |= RecordFlag::F_LAST as u8;
123    }
124    deltas.push(clear);
125
126    for (i, update) in event.updates.iter().enumerate() {
127        let is_last = i == total - 1;
128        let delta = parse_l2_delta(
129            update,
130            instrument_id,
131            instrument.price_precision(),
132            instrument.size_precision(),
133            is_last,
134            ts_event,
135            ts_init,
136        )?;
137        deltas.push(delta);
138    }
139
140    OrderBookDeltas::new_checked(instrument_id, deltas)
141}
142
143/// Parses a WebSocket L2 update event into [`OrderBookDeltas`].
144///
145/// All deltas in the batch share `ts_event`, which the caller derives from the
146/// message-level `timestamp`. Per-level `event_time` values are not monotonic
147/// across batches and would trigger out-of-order warnings in the managed book.
148pub fn parse_ws_l2_update(
149    event: &WsL2DataEvent,
150    instrument: &InstrumentAny,
151    ts_event: UnixNanos,
152    ts_init: UnixNanos,
153) -> anyhow::Result<OrderBookDeltas> {
154    let instrument_id = instrument.id();
155    let total = event.updates.len();
156    let mut deltas = Vec::with_capacity(total);
157
158    for (i, update) in event.updates.iter().enumerate() {
159        let is_last = i == total - 1;
160        let price = parse_price(&update.price_level, instrument.price_precision())?;
161        let size = parse_quantity(&update.new_quantity, instrument.size_precision())?;
162        let side = ws_book_side_to_order_side(update.side);
163
164        let action = if size == Quantity::zero(instrument.size_precision()) {
165            BookAction::Delete
166        } else {
167            BookAction::Update
168        };
169
170        let mut flags = RecordFlag::F_MBP as u8;
171
172        if is_last {
173            flags |= RecordFlag::F_LAST as u8;
174        }
175
176        let order = BookOrder::new(side, price, size, 0);
177        let delta =
178            OrderBookDelta::new_checked(instrument_id, action, order, flags, 0, ts_event, ts_init)?;
179        deltas.push(delta);
180    }
181
182    OrderBookDeltas::new_checked(instrument_id, deltas)
183}
184
185/// Parses a single L2 snapshot level into an [`OrderBookDelta`].
186fn parse_l2_delta(
187    update: &WsL2Update,
188    instrument_id: InstrumentId,
189    price_precision: u8,
190    size_precision: u8,
191    is_last: bool,
192    ts_event: UnixNanos,
193    ts_init: UnixNanos,
194) -> anyhow::Result<OrderBookDelta> {
195    let price = parse_price(&update.price_level, price_precision)?;
196    let size = parse_quantity(&update.new_quantity, size_precision)?;
197    let side = ws_book_side_to_order_side(update.side);
198
199    let mut flags = RecordFlag::F_MBP as u8;
200
201    if is_last {
202        flags |= RecordFlag::F_LAST as u8;
203    }
204
205    let order = BookOrder::new(side, price, size, 0);
206    OrderBookDelta::new_checked(
207        instrument_id,
208        BookAction::Add,
209        order,
210        flags,
211        0,
212        ts_event,
213        ts_init,
214    )
215}
216
217/// Converts a Coinbase WebSocket book side to a Nautilus order side.
218fn ws_book_side_to_order_side(side: WsBookSide) -> OrderSide {
219    match side {
220        WsBookSide::Bid => OrderSide::Buy,
221        WsBookSide::Offer => OrderSide::Sell,
222    }
223}
224
225/// Parses a Coinbase user channel [`WsOrderUpdate`] into an [`OrderStatusReport`].
226///
227/// Derives the total quantity as `cumulative_quantity + leaves_quantity` and
228/// promotes the `Accepted` status to `PartiallyFilled` when the cumulative
229/// fill is positive but below the total quantity, mirroring the REST parser.
230///
231/// # Errors
232///
233/// Returns an error when any numeric field cannot be parsed against the
234/// instrument precision.
235pub fn parse_ws_user_event_to_order_status_report(
236    update: &WsOrderUpdate,
237    instrument: &InstrumentAny,
238    account_id: AccountId,
239    ts_event: UnixNanos,
240    ts_init: UnixNanos,
241) -> anyhow::Result<OrderStatusReport> {
242    let instrument_id = instrument.id();
243    let size_precision = instrument.size_precision();
244
245    let order_side = parse_order_side(&update.order_side);
246    let order_type = parse_order_type(update.order_type);
247    let time_in_force = parse_time_in_force(Some(update.time_in_force));
248    let mut order_status = parse_order_status(update.status);
249
250    let venue_order_id = VenueOrderId::new(&update.order_id);
251    let client_order_id = if update.client_order_id.is_empty() {
252        None
253    } else {
254        Some(ClientOrderId::new(&update.client_order_id))
255    };
256
257    let filled_qty = if update.cumulative_quantity.is_empty() {
258        Quantity::zero(size_precision)
259    } else {
260        parse_quantity(&update.cumulative_quantity, size_precision)
261            .context("failed to parse cumulative_quantity")?
262    };
263    let leaves_qty = if update.leaves_quantity.is_empty() {
264        Quantity::zero(size_precision)
265    } else {
266        parse_quantity(&update.leaves_quantity, size_precision)
267            .context("failed to parse leaves_quantity")?
268    };
269
270    let quantity = filled_qty + leaves_qty;
271
272    if order_status == OrderStatus::Accepted && filled_qty.is_positive() && filled_qty < quantity {
273        order_status = OrderStatus::PartiallyFilled;
274    }
275
276    let ts_accepted = if update.creation_time.is_empty() {
277        ts_event
278    } else {
279        parse_rfc3339_timestamp(&update.creation_time).unwrap_or(ts_event)
280    };
281
282    let mut report = OrderStatusReport::new(
283        account_id,
284        instrument_id,
285        client_order_id,
286        venue_order_id,
287        order_side,
288        order_type,
289        time_in_force,
290        order_status,
291        quantity,
292        filled_qty,
293        ts_accepted,
294        ts_event,
295        ts_init,
296        None,
297    );
298
299    if !update.avg_price.is_empty()
300        && let Ok(avg_px) = update.avg_price.parse::<f64>()
301        && avg_px > 0.0
302    {
303        report = report.with_avg_px(avg_px)?;
304    }
305
306    Ok(report)
307}
308
309/// Parses a Coinbase user channel [`WsOrderUpdate`] into a [`FillReport`].
310///
311/// Coinbase's user channel reports cumulative totals rather than per-trade
312/// fills, so the caller must supply:
313/// - `last_qty`: the quantity delta since the previous cumulative state
314/// - `last_px`: the price of the new fill, derived by the caller from the
315///   cumulative notional delta (Coinbase's `avg_price` is the *cumulative*
316///   weighted average and is not safe to use as the new fill's price for
317///   multi-fill orders)
318/// - `commission`: the commission delta since the previous cumulative state
319/// - `trade_id`: synthesized from the order ID plus the new cumulative total
320#[allow(clippy::too_many_arguments)]
321pub fn parse_ws_user_event_to_fill_report(
322    update: &WsOrderUpdate,
323    last_qty: Quantity,
324    last_px: Price,
325    commission: Money,
326    trade_id: TradeId,
327    instrument: &InstrumentAny,
328    account_id: AccountId,
329    liquidity_side: LiquiditySide,
330    ts_event: UnixNanos,
331    ts_init: UnixNanos,
332) -> FillReport {
333    let instrument_id = instrument.id();
334
335    let venue_order_id = VenueOrderId::new(&update.order_id);
336    let client_order_id = if update.client_order_id.is_empty() {
337        None
338    } else {
339        Some(ClientOrderId::new(&update.client_order_id))
340    };
341    let order_side = parse_order_side(&update.order_side);
342
343    FillReport::new(
344        account_id,
345        instrument_id,
346        venue_order_id,
347        trade_id,
348        order_side,
349        last_qty,
350        last_px,
351        commission,
352        liquidity_side,
353        client_order_id,
354        None,
355        ts_event,
356        ts_init,
357        None,
358    )
359}
360
361#[cfg(test)]
362mod tests {
363    use std::str::FromStr;
364
365    use nautilus_model::{
366        data::bar::BarSpecification,
367        enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
368        identifiers::{Symbol, Venue},
369        instruments::CurrencyPair,
370        types::{Currency, Price},
371    };
372    use rstest::rstest;
373    use ustr::Ustr;
374
375    use super::*;
376    use crate::{
377        common::testing::load_test_fixture,
378        websocket::messages::{CoinbaseWsMessage, WsEventType},
379    };
380
381    fn test_instrument() -> InstrumentAny {
382        let instrument_id =
383            InstrumentId::new(Symbol::new("BTC-USD"), Venue::new(Ustr::from("COINBASE")));
384        let raw_symbol = Symbol::new("BTC-USD");
385        let base_currency = Currency::get_or_create_crypto("BTC");
386        let quote_currency = Currency::get_or_create_crypto("USD");
387
388        InstrumentAny::CurrencyPair(CurrencyPair::new(
389            instrument_id,
390            raw_symbol,
391            base_currency,
392            quote_currency,
393            2,
394            8,
395            Price::from("0.01"),
396            Quantity::from("0.00000001"),
397            None,
398            None,
399            None,
400            Some(Quantity::from("0.00000001")),
401            None,
402            None,
403            None,
404            None,
405            None,
406            None,
407            None,
408            None,
409            None,
410            UnixNanos::default(),
411            UnixNanos::default(),
412        ))
413    }
414
415    #[rstest]
416    fn test_parse_ws_trade() {
417        let json = load_test_fixture("ws_market_trades.json");
418        let msg: CoinbaseWsMessage = serde_json::from_str(&json).unwrap();
419        let instrument = test_instrument();
420        let ts_init = UnixNanos::default();
421
422        match msg {
423            CoinbaseWsMessage::MarketTrades { events, .. } => {
424                let trade_data = &events[0].trades[0];
425                let tick = parse_ws_trade(trade_data, &instrument, ts_init).unwrap();
426
427                assert_eq!(tick.instrument_id, instrument.id());
428                assert_eq!(tick.price, Price::from("68900.50"));
429                assert_eq!(tick.size, Quantity::from("0.00150000"));
430                assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
431                assert_eq!(tick.trade_id.as_str(), "995098700");
432                assert!(tick.ts_event.as_u64() > 0);
433            }
434            _ => panic!("Expected MarketTrades"),
435        }
436    }
437
438    #[rstest]
439    fn test_parse_ws_trade_sell_side() {
440        let json = load_test_fixture("ws_market_trades.json");
441        let msg: CoinbaseWsMessage = serde_json::from_str(&json).unwrap();
442        let instrument = test_instrument();
443        let ts_init = UnixNanos::default();
444
445        match msg {
446            CoinbaseWsMessage::MarketTrades { events, .. } => {
447                let trade_data = &events[0].trades[1];
448                let tick = parse_ws_trade(trade_data, &instrument, ts_init).unwrap();
449
450                assert_eq!(tick.aggressor_side, AggressorSide::Seller);
451                assert_eq!(tick.price, Price::from("68900.00"));
452                assert_eq!(tick.size, Quantity::from("0.05000000"));
453            }
454            _ => panic!("Expected MarketTrades"),
455        }
456    }
457
458    #[rstest]
459    fn test_parse_ws_ticker() {
460        let json = load_test_fixture("ws_ticker.json");
461        let msg: CoinbaseWsMessage = serde_json::from_str(&json).unwrap();
462        let instrument = test_instrument();
463        let ts_init = UnixNanos::default();
464
465        match msg {
466            CoinbaseWsMessage::Ticker {
467                timestamp, events, ..
468            } => {
469                let ticker_data = &events[0].tickers[0];
470                let ts_event = parse_rfc3339_timestamp(&timestamp).unwrap();
471                let quote = parse_ws_ticker(ticker_data, &instrument, ts_event, ts_init).unwrap();
472
473                assert_eq!(quote.instrument_id, instrument.id());
474                assert_eq!(quote.bid_price, Price::from("68900.00"));
475                assert_eq!(quote.ask_price, Price::from("68901.00"));
476                assert_eq!(quote.bid_size, Quantity::from("1.50000000"));
477                assert_eq!(quote.ask_size, Quantity::from("0.50000000"));
478            }
479            _ => panic!("Expected Ticker"),
480        }
481    }
482
483    #[rstest]
484    fn test_parse_ws_candle() {
485        let json = load_test_fixture("ws_candles.json");
486        let msg: CoinbaseWsMessage = serde_json::from_str(&json).unwrap();
487        let instrument = test_instrument();
488        let ts_init = UnixNanos::default();
489
490        let bar_spec = BarSpecification::new(5, BarAggregation::Minute, PriceType::Last);
491        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::External);
492
493        match msg {
494            CoinbaseWsMessage::Candles { events, .. } => {
495                let candle_data = &events[0].candles[0];
496                let bar = parse_ws_candle(candle_data, bar_type, &instrument, ts_init).unwrap();
497
498                assert_eq!(bar.bar_type, bar_type);
499                assert_eq!(bar.open, Price::from("68900.00"));
500                assert_eq!(bar.high, Price::from("68950.00"));
501                assert_eq!(bar.low, Price::from("68850.00"));
502                assert_eq!(bar.close, Price::from("68920.50"));
503                assert_eq!(bar.volume, Quantity::from("42.15000000"));
504                assert_eq!(bar.ts_event.as_u64(), 1_775_521_800_000_000_000);
505            }
506            _ => panic!("Expected Candles"),
507        }
508    }
509
510    #[rstest]
511    fn test_parse_ws_l2_snapshot() {
512        let json = load_test_fixture("ws_l2_data_snapshot.json");
513        let msg: CoinbaseWsMessage = serde_json::from_str(&json).unwrap();
514        let instrument = test_instrument();
515        let ts_init = UnixNanos::default();
516
517        match msg {
518            CoinbaseWsMessage::L2Data {
519                timestamp, events, ..
520            } => {
521                let event = &events[0];
522                assert_eq!(event.event_type, WsEventType::Snapshot);
523                let ts_event = parse_rfc3339_timestamp(&timestamp).unwrap();
524
525                let deltas = parse_ws_l2_snapshot(event, &instrument, ts_event, ts_init).unwrap();
526                assert_eq!(deltas.instrument_id, instrument.id());
527                for delta in &deltas.deltas {
528                    assert_eq!(delta.ts_event, ts_event);
529                }
530
531                // 6 levels + 1 clear = 7 deltas
532                assert_eq!(deltas.deltas.len(), 7);
533
534                // First delta is clear
535                assert_eq!(deltas.deltas[0].action, BookAction::Clear);
536
537                // Bids
538                assert_eq!(deltas.deltas[1].order.side, OrderSide::Buy);
539                assert_eq!(deltas.deltas[1].order.price, Price::from("68900.00"));
540                assert_eq!(deltas.deltas[1].order.size, Quantity::from("1.50000000"));
541
542                // Asks
543                assert_eq!(deltas.deltas[4].order.side, OrderSide::Sell);
544                assert_eq!(deltas.deltas[4].order.price, Price::from("68901.00"));
545
546                // Last delta has F_LAST flag
547                let last = deltas.deltas.last().unwrap();
548                assert_ne!(last.flags & RecordFlag::F_LAST as u8, 0);
549            }
550            _ => panic!("Expected L2Data"),
551        }
552    }
553
554    #[rstest]
555    fn test_parse_ws_l2_update() {
556        let json = load_test_fixture("ws_l2_data_update.json");
557        let msg: CoinbaseWsMessage = serde_json::from_str(&json).unwrap();
558        let instrument = test_instrument();
559        let ts_init = UnixNanos::default();
560
561        match msg {
562            CoinbaseWsMessage::L2Data {
563                timestamp, events, ..
564            } => {
565                let event = &events[0];
566                assert_eq!(event.event_type, WsEventType::Update);
567                let ts_event = parse_rfc3339_timestamp(&timestamp).unwrap();
568
569                let deltas = parse_ws_l2_update(event, &instrument, ts_event, ts_init).unwrap();
570                assert_eq!(deltas.deltas.len(), 2);
571                for delta in &deltas.deltas {
572                    assert_eq!(delta.ts_event, ts_event);
573                }
574
575                // First update: bid at 68900.00, qty 2.0 -> Update action
576                assert_eq!(deltas.deltas[0].order.side, OrderSide::Buy);
577                assert_eq!(deltas.deltas[0].order.price, Price::from("68900.00"));
578                assert_eq!(deltas.deltas[0].order.size, Quantity::from("2.00000000"));
579                assert_eq!(deltas.deltas[0].action, BookAction::Update);
580
581                // Second update: offer at 68901.00, qty 0.0 -> Delete action
582                assert_eq!(deltas.deltas[1].order.side, OrderSide::Sell);
583                assert_eq!(deltas.deltas[1].action, BookAction::Delete);
584                assert_eq!(deltas.deltas[1].order.size, Quantity::from("0.00000000"));
585
586                // Last delta has F_LAST flag
587                assert_ne!(deltas.deltas[1].flags & RecordFlag::F_LAST as u8, 0);
588            }
589            _ => panic!("Expected L2Data"),
590        }
591    }
592
593    #[rstest]
594    fn test_parse_ws_l2_update_zero_quantity_is_delete() {
595        let json = load_test_fixture("ws_l2_data_update.json");
596        let msg: CoinbaseWsMessage = serde_json::from_str(&json).unwrap();
597        let instrument = test_instrument();
598        let ts_init = UnixNanos::default();
599
600        match msg {
601            CoinbaseWsMessage::L2Data {
602                timestamp, events, ..
603            } => {
604                let event = &events[0];
605                let ts_event = parse_rfc3339_timestamp(&timestamp).unwrap();
606                let deltas = parse_ws_l2_update(event, &instrument, ts_event, ts_init).unwrap();
607
608                // The offer with new_quantity "0.00000000" should be a Delete
609                let delete_delta = deltas
610                    .deltas
611                    .iter()
612                    .find(|d| d.action == BookAction::Delete)
613                    .expect("should have a delete action for zero quantity");
614                assert_eq!(delete_delta.order.side, OrderSide::Sell);
615                assert_eq!(delete_delta.ts_event, ts_event);
616            }
617            _ => panic!("Expected L2Data"),
618        }
619    }
620
621    #[rstest]
622    fn test_ws_book_side_conversion() {
623        assert_eq!(ws_book_side_to_order_side(WsBookSide::Bid), OrderSide::Buy);
624        assert_eq!(
625            ws_book_side_to_order_side(WsBookSide::Offer),
626            OrderSide::Sell
627        );
628    }
629
630    #[rstest]
631    fn test_parse_ws_user_event_to_order_status_report_open() {
632        let json = load_test_fixture("ws_user.json");
633        let msg: CoinbaseWsMessage = serde_json::from_str(&json).unwrap();
634        let instrument = test_instrument();
635        let account_id = AccountId::new("COINBASE-001");
636        let ts_event = UnixNanos::from(1_705_314_600_000_000_000u64);
637        let ts_init = UnixNanos::from(1_705_314_700_000_000_000u64);
638
639        let order = match msg {
640            CoinbaseWsMessage::User { events, .. } => events[0].orders[0].clone(),
641            other => panic!("expected User, was {other:?}"),
642        };
643
644        let report = parse_ws_user_event_to_order_status_report(
645            &order,
646            &instrument,
647            account_id,
648            ts_event,
649            ts_init,
650        )
651        .unwrap();
652
653        assert_eq!(report.account_id, account_id);
654        assert_eq!(report.instrument_id, instrument.id());
655        assert_eq!(
656            report.venue_order_id.as_str(),
657            "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
658        );
659        assert_eq!(
660            report.client_order_id.unwrap().as_str(),
661            "11111-000000-000001"
662        );
663        assert_eq!(report.order_side, OrderSide::Buy);
664        assert_eq!(report.order_status, OrderStatus::Accepted);
665        assert_eq!(report.filled_qty, Quantity::from("0.00000000"));
666        assert_eq!(report.quantity, Quantity::from("0.00100000"));
667        assert_eq!(report.ts_init, ts_init);
668    }
669
670    #[rstest]
671    fn test_parse_ws_user_event_to_order_status_report_promotes_partial_fill() {
672        let mut update = WsOrderUpdate {
673            order_id: "venue-1".to_string(),
674            client_order_id: "client-1".to_string(),
675            contract_expiry_type: crate::common::enums::CoinbaseContractExpiryType::Unknown,
676            cumulative_quantity: "0.5".to_string(),
677            leaves_quantity: "0.5".to_string(),
678            avg_price: "100.00".to_string(),
679            total_fees: "0.05".to_string(),
680            status: crate::common::enums::CoinbaseOrderStatus::Open,
681            product_id: ustr::Ustr::from("BTC-USD"),
682            product_type: crate::common::enums::CoinbaseProductType::Spot,
683            creation_time: String::new(),
684            order_side: crate::common::enums::CoinbaseOrderSide::Buy,
685            order_type: crate::common::enums::CoinbaseOrderType::Limit,
686            risk_managed_by: crate::common::enums::CoinbaseRiskManagedBy::Unknown,
687            time_in_force: crate::common::enums::CoinbaseTimeInForce::GoodUntilCancelled,
688            trigger_status: crate::common::enums::CoinbaseTriggerStatus::InvalidOrderType,
689            cancel_reason: String::new(),
690            reject_reason: String::new(),
691            total_value_after_fees: String::new(),
692        };
693        update.creation_time = String::new();
694
695        let instrument = test_instrument();
696        let report = parse_ws_user_event_to_order_status_report(
697            &update,
698            &instrument,
699            AccountId::new("COINBASE-001"),
700            UnixNanos::default(),
701            UnixNanos::default(),
702        )
703        .unwrap();
704
705        // Coinbase Open + positive cumulative + leaves > 0 should promote to PartiallyFilled.
706        assert_eq!(report.order_status, OrderStatus::PartiallyFilled);
707        assert_eq!(report.filled_qty, Quantity::from("0.50000000"));
708        assert_eq!(report.quantity, Quantity::from("1.00000000"));
709    }
710
711    #[rstest]
712    fn test_parse_ws_user_event_to_fill_report_uses_supplied_last_px_and_commission() {
713        let update = WsOrderUpdate {
714            order_id: "venue-1".to_string(),
715            client_order_id: "client-1".to_string(),
716            contract_expiry_type: crate::common::enums::CoinbaseContractExpiryType::Unknown,
717            cumulative_quantity: "0.5".to_string(),
718            leaves_quantity: "0.5".to_string(),
719            avg_price: "100.00".to_string(),
720            total_fees: "0.05".to_string(),
721            status: crate::common::enums::CoinbaseOrderStatus::Open,
722            product_id: ustr::Ustr::from("BTC-USD"),
723            product_type: crate::common::enums::CoinbaseProductType::Spot,
724            creation_time: String::new(),
725            order_side: crate::common::enums::CoinbaseOrderSide::Sell,
726            order_type: crate::common::enums::CoinbaseOrderType::Limit,
727            risk_managed_by: crate::common::enums::CoinbaseRiskManagedBy::Unknown,
728            time_in_force: crate::common::enums::CoinbaseTimeInForce::GoodUntilCancelled,
729            trigger_status: crate::common::enums::CoinbaseTriggerStatus::InvalidOrderType,
730            cancel_reason: String::new(),
731            reject_reason: String::new(),
732            total_value_after_fees: String::new(),
733        };
734
735        let instrument = test_instrument();
736        let usd = Currency::USD();
737        let last_px = Price::from("120.00");
738        let commission =
739            Money::from_decimal(rust_decimal::Decimal::from_str("0.10").unwrap(), usd).unwrap();
740        let trade_id = TradeId::new("venue-1-0.5");
741
742        let report = parse_ws_user_event_to_fill_report(
743            &update,
744            Quantity::from("0.50000000"),
745            last_px,
746            commission,
747            trade_id,
748            &instrument,
749            AccountId::new("COINBASE-001"),
750            LiquiditySide::Maker,
751            UnixNanos::default(),
752            UnixNanos::default(),
753        );
754
755        assert_eq!(report.venue_order_id.as_str(), "venue-1");
756        assert_eq!(report.client_order_id.unwrap().as_str(), "client-1");
757        assert_eq!(report.order_side, OrderSide::Sell);
758        assert_eq!(report.last_qty, Quantity::from("0.50000000"));
759        assert_eq!(report.last_px, Price::from("120.00"));
760        assert_eq!(report.commission, commission);
761        assert_eq!(report.liquidity_side, LiquiditySide::Maker);
762        assert_eq!(report.trade_id, trade_id);
763    }
764}