Skip to main content

nautilus_dydx/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing utilities for dYdX WebSocket messages.
17//!
18//! Converts WebSocket-specific message formats into Nautilus domain types
19//! by transforming them into HTTP-equivalent structures and delegating to
20//! the HTTP parser for consistency.
21
22use std::str::FromStr;
23
24use anyhow::Context;
25use chrono::{DateTime, Utc};
26use dashmap::DashMap;
27use nautilus_core::UnixNanos;
28use nautilus_model::{
29    data::{Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, TradeTick},
30    enums::{AggressorSide, BookAction, OrderSide, OrderStatus, RecordFlag},
31    identifiers::{AccountId, InstrumentId, TradeId},
32    instruments::{Instrument, InstrumentAny},
33    reports::{FillReport, OrderStatusReport, PositionStatusReport},
34    types::{Price, Quantity},
35};
36use rust_decimal::Decimal;
37
38use super::{DydxWsError, DydxWsResult};
39use crate::{
40    common::{
41        enums::{DydxOrderStatus, DydxTickerType},
42        instrument_cache::InstrumentCache,
43    },
44    execution::{encoder::ClientOrderIdEncoder, types::OrderContext},
45    http::{
46        models::{Fill, Order, PerpetualPosition},
47        parse::{parse_fill_report, parse_order_status_report, parse_position_status_report},
48    },
49    websocket::messages::{
50        DydxCandle, DydxOrderbookContents, DydxOrderbookSnapshotContents, DydxPerpetualPosition,
51        DydxTradeContents, DydxWsFillSubaccountMessageContents,
52        DydxWsOrderSubaccountMessageContents,
53    },
54};
55
56/// Parses a WebSocket order update into an OrderStatusReport.
57///
58/// Converts the WebSocket order format to the HTTP Order format, then delegates
59/// to the existing HTTP parser for consistency.
60///
61/// # Arguments
62///
63/// * `ws_order` - The WebSocket order message to parse
64/// * `instrument_cache` - Cache for looking up instruments by clob_pair_id
65/// * `order_contexts` - Map of dYdX u32 client IDs to order contexts
66/// * `encoder` - Bidirectional encoder for ClientOrderId ↔ u32 mapping
67/// * `account_id` - Account ID for the report
68/// * `ts_init` - Timestamp for initialization
69///
70/// # Errors
71///
72/// Returns an error if:
73/// - clob_pair_id cannot be parsed from string.
74/// - Instrument lookup fails for the clob_pair_id.
75/// - Field parsing fails (price, size, etc.).
76/// - HTTP parser fails.
77pub fn parse_ws_order_report(
78    ws_order: &DydxWsOrderSubaccountMessageContents,
79    instrument_cache: &InstrumentCache,
80    order_contexts: &DashMap<u32, OrderContext>,
81    encoder: &ClientOrderIdEncoder,
82    account_id: AccountId,
83    ts_init: UnixNanos,
84) -> anyhow::Result<OrderStatusReport> {
85    let clob_pair_id: u32 = ws_order.clob_pair_id.parse().context(format!(
86        "Failed to parse clob_pair_id '{}'",
87        ws_order.clob_pair_id
88    ))?;
89
90    let instrument = instrument_cache
91        .get_by_clob_id(clob_pair_id)
92        .ok_or_else(|| {
93            instrument_cache.log_missing_clob_pair_id(clob_pair_id);
94            anyhow::anyhow!("No instrument cached for clob_pair_id {clob_pair_id}")
95        })?;
96
97    let http_order = convert_ws_order_to_http(ws_order)?;
98    let mut report = parse_order_status_report(&http_order, &instrument, account_id, ts_init)?;
99
100    let dydx_client_id = ws_order.client_id.parse::<u32>().ok();
101    let dydx_client_metadata = ws_order
102        .client_metadata
103        .as_ref()
104        .and_then(|s| s.parse::<u32>().ok())
105        .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
106
107    log::debug!(
108        "[WS_ORDER_RECV] dYdX client_id='{}' meta={:#x} (parsed u32={:?}) | status={:?} | clob_pair={} | side={:?} | size={} | filled={}",
109        ws_order.client_id,
110        dydx_client_metadata,
111        dydx_client_id,
112        ws_order.status,
113        ws_order.clob_pair_id,
114        ws_order.side,
115        ws_order.size,
116        ws_order.total_filled.as_deref().unwrap_or("?")
117    );
118
119    // Look up the original Nautilus client_order_id from the order context first,
120    // then fall back to encoder.decode_if_known() if not found in context
121    if let Some(client_id) = dydx_client_id {
122        if let Some(ctx) = order_contexts.get(&client_id) {
123            log::debug!(
124                "[WS_ORDER_RECV] DECODE via order_contexts: dYdX u32={} -> Nautilus '{}'",
125                client_id,
126                ctx.client_order_id
127            );
128            report.client_order_id = Some(ctx.client_order_id);
129        } else if let Some(client_order_id) =
130            encoder.decode_if_known(client_id, dydx_client_metadata)
131        {
132            log::debug!(
133                "[WS_ORDER_RECV] DECODE via encoder fallback: dYdX u32={client_id} meta={dydx_client_metadata:#x} -> Nautilus '{client_order_id}'"
134            );
135            report.client_order_id = Some(client_order_id);
136        } else {
137            log::debug!(
138                "[WS_ORDER_RECV] Unknown order: dYdX u32={client_id} meta={dydx_client_metadata:#x} (external or previous session)"
139            );
140        }
141    } else {
142        log::warn!(
143            "[WS_ORDER_RECV] Could not parse client_id '{}' as u32",
144            ws_order.client_id
145        );
146    }
147
148    // For untriggered conditional orders with an explicit trigger price we
149    // surface `PendingUpdate` to match Nautilus semantics and existing dYdX
150    // enum mapping.
151    if matches!(ws_order.status, DydxOrderStatus::Untriggered) && ws_order.trigger_price.is_some() {
152        report.order_status = OrderStatus::PendingUpdate;
153    }
154
155    Ok(report)
156}
157
158/// Converts a WebSocket order message to HTTP Order format.
159///
160/// # Errors
161///
162/// Returns an error if any field parsing fails.
163fn convert_ws_order_to_http(
164    ws_order: &DydxWsOrderSubaccountMessageContents,
165) -> anyhow::Result<Order> {
166    let clob_pair_id: u32 = ws_order
167        .clob_pair_id
168        .parse()
169        .context("Failed to parse clob_pair_id")?;
170
171    let size: Decimal = ws_order.size.parse().context("Failed to parse size")?;
172
173    let total_filled: Decimal = ws_order
174        .total_filled
175        .as_ref()
176        .map(|s| s.parse())
177        .transpose()
178        .context("Failed to parse total_filled")?
179        .unwrap_or(Decimal::ZERO);
180
181    // Saturate to zero if total_filled exceeds size (edge case: rounding or partial fills)
182    let remaining_size = (size - total_filled).max(Decimal::ZERO);
183
184    let price: Decimal = ws_order.price.parse().context("Failed to parse price")?;
185
186    let created_at_height: u64 = ws_order
187        .created_at_height
188        .as_ref()
189        .map(|s| s.parse())
190        .transpose()
191        .context("Failed to parse created_at_height")?
192        .unwrap_or(0);
193
194    let client_metadata: u32 = ws_order
195        .client_metadata
196        .as_ref()
197        .ok_or_else(|| anyhow::anyhow!("Missing required field: client_metadata"))?
198        .parse()
199        .context("Failed to parse client_metadata")?;
200
201    let order_flags: u32 = ws_order
202        .order_flags
203        .parse()
204        .context("Failed to parse order_flags")?;
205
206    let good_til_block = ws_order
207        .good_til_block
208        .as_ref()
209        .and_then(|s| s.parse::<u64>().ok());
210
211    let good_til_block_time = ws_order
212        .good_til_block_time
213        .as_ref()
214        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
215        .map(|dt| dt.with_timezone(&Utc));
216
217    let trigger_price = ws_order
218        .trigger_price
219        .as_ref()
220        .and_then(|s| Decimal::from_str(s).ok());
221
222    // Parse updated_at (optional for BEST_EFFORT_OPENED orders)
223    let updated_at = ws_order
224        .updated_at
225        .as_ref()
226        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
227        .map(|dt| dt.with_timezone(&Utc));
228
229    // Parse updated_at_height (optional for BEST_EFFORT_OPENED orders)
230    let updated_at_height = ws_order
231        .updated_at_height
232        .as_ref()
233        .and_then(|s| s.parse::<u64>().ok());
234
235    let total_filled = size.checked_sub(remaining_size).unwrap_or(Decimal::ZERO);
236
237    Ok(Order {
238        id: ws_order.id.clone(),
239        subaccount_id: ws_order.subaccount_id.clone(),
240        client_id: ws_order.client_id.clone(),
241        clob_pair_id,
242        side: ws_order.side,
243        size,
244        total_filled,
245        price,
246        status: ws_order.status,
247        order_type: ws_order.order_type,
248        time_in_force: ws_order.time_in_force,
249        reduce_only: ws_order.reduce_only,
250        post_only: ws_order.post_only,
251        order_flags,
252        good_til_block,
253        good_til_block_time,
254        created_at_height: Some(created_at_height),
255        client_metadata,
256        trigger_price,
257        condition_type: None, // Not provided in WebSocket messages
258        conditional_order_trigger_subticks: None, // Not provided in WebSocket messages
259        execution: None,      // Inferred from post_only flag by HTTP parser
260        updated_at,
261        updated_at_height,
262        ticker: None,               // Not provided in WebSocket messages
263        subaccount_number: 0,       // Default to 0 for WebSocket messages
264        order_router_address: None, // Not provided in WebSocket messages
265    })
266}
267
268/// Parses a WebSocket fill update into a FillReport.
269///
270/// Converts the WebSocket fill format to the HTTP Fill format, then delegates
271/// to the existing HTTP parser for consistency. Correlates the fill back to the
272/// originating order using the `order_id_map` (built from WS order updates).
273///
274/// # Errors
275///
276/// Returns an error if:
277/// - Instrument lookup fails for the market symbol.
278/// - Field parsing fails (price, size, fee, etc.).
279/// - HTTP parser fails.
280pub fn parse_ws_fill_report(
281    ws_fill: &DydxWsFillSubaccountMessageContents,
282    instrument_cache: &InstrumentCache,
283    order_id_map: &DashMap<String, (u32, u32)>,
284    order_contexts: &DashMap<u32, OrderContext>,
285    encoder: &ClientOrderIdEncoder,
286    account_id: AccountId,
287    ts_init: UnixNanos,
288) -> anyhow::Result<FillReport> {
289    let instrument = instrument_cache
290        .get_by_market(&ws_fill.market)
291        .ok_or_else(|| {
292            let available: Vec<String> = instrument_cache
293                .all_instruments()
294                .into_iter()
295                .map(|inst| inst.id().symbol.to_string())
296                .collect();
297            anyhow::anyhow!(
298                "No instrument cached for market '{}'. Available: {:?}",
299                ws_fill.market,
300                available
301            )
302        })?;
303
304    let http_fill = convert_ws_fill_to_http(ws_fill)?;
305    let mut report = parse_fill_report(&http_fill, &instrument, account_id, ts_init)?;
306
307    // Correlate fill to order via order_id → (client_id, client_metadata) → client_order_id
308    if let Some(ref order_id) = ws_fill.order_id {
309        if let Some(entry) = order_id_map.get(order_id) {
310            let (client_id, client_metadata) = *entry.value();
311            if let Some(ctx) = order_contexts.get(&client_id) {
312                report.client_order_id = Some(ctx.client_order_id);
313            } else if let Some(client_order_id) =
314                encoder.decode_if_known(client_id, client_metadata)
315            {
316                report.client_order_id = Some(client_order_id);
317            } else {
318                log::debug!(
319                    "[WS_FILL_RECV] Unknown order: order_id={order_id} -> client_id={client_id} meta={client_metadata:#x} (external or previous session)",
320                );
321            }
322        } else {
323            log::warn!(
324                "[WS_FILL_RECV] No order_id mapping for '{order_id}', fill cannot be correlated",
325            );
326        }
327    }
328
329    Ok(report)
330}
331
332/// Converts a WebSocket fill message to HTTP Fill format.
333///
334/// # Errors
335///
336/// Returns an error if any field parsing fails.
337fn convert_ws_fill_to_http(ws_fill: &DydxWsFillSubaccountMessageContents) -> anyhow::Result<Fill> {
338    let price: Decimal = ws_fill.price.parse().context("Failed to parse price")?;
339    let size: Decimal = ws_fill.size.parse().context("Failed to parse size")?;
340    let fee: Decimal = ws_fill.fee.parse().context("Failed to parse fee")?;
341
342    let created_at_height: u64 = ws_fill
343        .created_at_height
344        .as_ref()
345        .map(|s| s.parse())
346        .transpose()
347        .context("Failed to parse created_at_height")?
348        .unwrap_or(0);
349
350    let client_metadata: u32 = ws_fill
351        .client_metadata
352        .as_ref()
353        .ok_or_else(|| anyhow::anyhow!("Missing required field: client_metadata"))?
354        .parse()
355        .context("Failed to parse client_metadata")?;
356
357    let order_id = ws_fill
358        .order_id
359        .clone()
360        .ok_or_else(|| anyhow::anyhow!("Missing required field: order_id"))?;
361
362    let created_at = DateTime::parse_from_rfc3339(&ws_fill.created_at)
363        .context("Failed to parse created_at")?
364        .with_timezone(&Utc);
365
366    Ok(Fill {
367        id: ws_fill.id.clone(),
368        side: ws_fill.side,
369        liquidity: ws_fill.liquidity,
370        fill_type: ws_fill.fill_type,
371        market: ws_fill.market,
372        market_type: ws_fill.market_type.unwrap_or(DydxTickerType::Perpetual),
373        price,
374        size,
375        fee,
376        created_at,
377        created_at_height,
378        order_id,
379        client_metadata,
380    })
381}
382
383/// Parses a WebSocket position into a PositionStatusReport.
384///
385/// Converts the WebSocket position format to the HTTP PerpetualPosition format,
386/// then delegates to the existing HTTP parser for consistency.
387///
388/// # Errors
389///
390/// Returns an error if:
391/// - Instrument lookup fails for the market symbol.
392/// - Field parsing fails (size, prices, etc.).
393/// - HTTP parser fails.
394pub fn parse_ws_position_report(
395    ws_position: &DydxPerpetualPosition,
396    instrument_cache: &InstrumentCache,
397    account_id: AccountId,
398    ts_init: UnixNanos,
399) -> anyhow::Result<PositionStatusReport> {
400    let instrument = instrument_cache
401        .get_by_market(&ws_position.market)
402        .ok_or_else(|| {
403            let available: Vec<String> = instrument_cache
404                .all_instruments()
405                .into_iter()
406                .map(|inst| inst.id().symbol.to_string())
407                .collect();
408            anyhow::anyhow!(
409                "No instrument cached for market '{}'. Available: {:?}",
410                ws_position.market,
411                available
412            )
413        })?;
414
415    let http_position = convert_ws_position_to_http(ws_position)?;
416    parse_position_status_report(&http_position, &instrument, account_id, ts_init)
417}
418
419/// Converts a WebSocket position to HTTP PerpetualPosition format.
420///
421/// # Errors
422///
423/// Returns an error if any field parsing fails.
424fn convert_ws_position_to_http(
425    ws_position: &DydxPerpetualPosition,
426) -> anyhow::Result<PerpetualPosition> {
427    let size: Decimal = ws_position.size.parse().context("Failed to parse size")?;
428
429    let max_size: Decimal = ws_position
430        .max_size
431        .parse()
432        .context("Failed to parse max_size")?;
433
434    let entry_price: Decimal = ws_position
435        .entry_price
436        .parse()
437        .context("Failed to parse entry_price")?;
438
439    let exit_price: Option<Decimal> = ws_position
440        .exit_price
441        .as_ref()
442        .map(|s| s.parse())
443        .transpose()
444        .context("Failed to parse exit_price")?;
445
446    let realized_pnl: Decimal = ws_position
447        .realized_pnl
448        .parse()
449        .context("Failed to parse realized_pnl")?;
450
451    let unrealized_pnl: Decimal = ws_position
452        .unrealized_pnl
453        .parse()
454        .context("Failed to parse unrealized_pnl")?;
455
456    let sum_open: Decimal = ws_position
457        .sum_open
458        .parse()
459        .context("Failed to parse sum_open")?;
460
461    let sum_close: Decimal = ws_position
462        .sum_close
463        .parse()
464        .context("Failed to parse sum_close")?;
465
466    let net_funding: Decimal = ws_position
467        .net_funding
468        .parse()
469        .context("Failed to parse net_funding")?;
470
471    let created_at = DateTime::parse_from_rfc3339(&ws_position.created_at)
472        .context("Failed to parse created_at")?
473        .with_timezone(&Utc);
474
475    let closed_at = ws_position
476        .closed_at
477        .as_ref()
478        .map(|s| DateTime::parse_from_rfc3339(s))
479        .transpose()
480        .context("Failed to parse closed_at")?
481        .map(|dt| dt.with_timezone(&Utc));
482
483    // Preserve the venue-supplied side; only derive from size sign when side is absent
484    // (the WS schema always provides it, but this keeps the behavior explicit).
485    let side = ws_position.side;
486
487    Ok(PerpetualPosition {
488        market: ws_position.market,
489        status: ws_position.status,
490        side,
491        size,
492        max_size,
493        entry_price,
494        exit_price,
495        realized_pnl,
496        created_at_height: 0, // Not provided in WebSocket messages
497        created_at,
498        sum_open,
499        sum_close,
500        net_funding,
501        unrealized_pnl,
502        closed_at,
503    })
504}
505
506// ---------------------------------------------------------------------------
507//  Market data parsing functions
508// ---------------------------------------------------------------------------
509
510/// Parses an orderbook snapshot into [`OrderBookDeltas`].
511///
512/// # Errors
513///
514/// Returns an error if price/size parsing fails.
515pub fn parse_orderbook_snapshot(
516    instrument_id: &InstrumentId,
517    contents: &DydxOrderbookSnapshotContents,
518    price_precision: u8,
519    size_precision: u8,
520    ts_init: UnixNanos,
521) -> DydxWsResult<OrderBookDeltas> {
522    let bids = contents.bids.as_deref().unwrap_or(&[]);
523    let asks = contents.asks.as_deref().unwrap_or(&[]);
524
525    let mut deltas = Vec::with_capacity(1 + bids.len() + asks.len());
526    let snapshot_flag = RecordFlag::F_SNAPSHOT as u8;
527
528    // Empty book snapshot: Clear alone must carry F_SNAPSHOT | F_LAST
529    if bids.is_empty() && asks.is_empty() {
530        let clear_flags = snapshot_flag | RecordFlag::F_LAST as u8;
531        let mut clear_delta = OrderBookDelta::clear(*instrument_id, 0, ts_init, ts_init);
532        clear_delta.flags = clear_flags;
533        deltas.push(clear_delta);
534        return Ok(OrderBookDeltas::new(*instrument_id, deltas));
535    }
536
537    // Non-empty: Clear carries F_SNAPSHOT (not last)
538    let mut clear_delta = OrderBookDelta::clear(*instrument_id, 0, ts_init, ts_init);
539    clear_delta.flags = snapshot_flag;
540    deltas.push(clear_delta);
541
542    let bids_len = bids.len();
543    let asks_len = asks.len();
544
545    for (idx, bid) in bids.iter().enumerate() {
546        let is_last = idx == bids_len - 1 && asks_len == 0;
547        let flags = if is_last {
548            snapshot_flag | RecordFlag::F_LAST as u8
549        } else {
550            snapshot_flag
551        };
552
553        let price = Decimal::from_str(&bid.price)
554            .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
555
556        let size = Decimal::from_str(&bid.size)
557            .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
558
559        let order = BookOrder::new(
560            OrderSide::Buy,
561            Price::from_decimal_dp(price, price_precision).map_err(|e| {
562                DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
563            })?,
564            Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
565                DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
566            })?,
567            0,
568        );
569
570        deltas.push(OrderBookDelta::new(
571            *instrument_id,
572            BookAction::Add,
573            order,
574            flags,
575            0,
576            ts_init,
577            ts_init,
578        ));
579    }
580
581    for (idx, ask) in asks.iter().enumerate() {
582        let is_last = idx == asks_len - 1;
583        let flags = if is_last {
584            snapshot_flag | RecordFlag::F_LAST as u8
585        } else {
586            snapshot_flag
587        };
588
589        let price = Decimal::from_str(&ask.price)
590            .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
591
592        let size = Decimal::from_str(&ask.size)
593            .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
594
595        let order = BookOrder::new(
596            OrderSide::Sell,
597            Price::from_decimal_dp(price, price_precision).map_err(|e| {
598                DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
599            })?,
600            Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
601                DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
602            })?,
603            0,
604        );
605
606        deltas.push(OrderBookDelta::new(
607            *instrument_id,
608            BookAction::Add,
609            order,
610            flags,
611            0,
612            ts_init,
613            ts_init,
614        ));
615    }
616
617    Ok(OrderBookDeltas::new(*instrument_id, deltas))
618}
619
620/// Parses orderbook deltas (marks as last message by default).
621///
622/// # Errors
623///
624/// Returns an error if price/size parsing fails.
625pub fn parse_orderbook_deltas(
626    instrument_id: &InstrumentId,
627    contents: &DydxOrderbookContents,
628    price_precision: u8,
629    size_precision: u8,
630    ts_init: UnixNanos,
631) -> DydxWsResult<OrderBookDeltas> {
632    let deltas = parse_orderbook_deltas_with_flag(
633        instrument_id,
634        contents,
635        price_precision,
636        size_precision,
637        ts_init,
638        true,
639    )?;
640    Ok(OrderBookDeltas::new(*instrument_id, deltas))
641}
642
643/// Parses orderbook deltas with explicit last-message flag for batch processing.
644///
645/// # Errors
646///
647/// Returns an error if price/size parsing fails.
648pub fn parse_orderbook_deltas_with_flag(
649    instrument_id: &InstrumentId,
650    contents: &DydxOrderbookContents,
651    price_precision: u8,
652    size_precision: u8,
653    ts_init: UnixNanos,
654    is_last_message: bool,
655) -> DydxWsResult<Vec<OrderBookDelta>> {
656    let mut deltas = Vec::new();
657
658    let bids = contents.bids.as_deref().unwrap_or(&[]);
659    let asks = contents.asks.as_deref().unwrap_or(&[]);
660
661    let bids_len = bids.len();
662    let asks_len = asks.len();
663
664    for (idx, (price_str, size_str)) in bids.iter().enumerate() {
665        let is_last = is_last_message && idx == bids_len - 1 && asks_len == 0;
666        let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
667
668        let price = Decimal::from_str(price_str)
669            .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
670
671        let size = Decimal::from_str(size_str)
672            .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
673
674        let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
675            DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
676        })?;
677        let action = if qty.is_zero() {
678            BookAction::Delete
679        } else {
680            BookAction::Update
681        };
682
683        let order = BookOrder::new(
684            OrderSide::Buy,
685            Price::from_decimal_dp(price, price_precision).map_err(|e| {
686                DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
687            })?,
688            qty,
689            0,
690        );
691
692        deltas.push(OrderBookDelta::new(
693            *instrument_id,
694            action,
695            order,
696            flags,
697            0,
698            ts_init,
699            ts_init,
700        ));
701    }
702
703    for (idx, (price_str, size_str)) in asks.iter().enumerate() {
704        let is_last = is_last_message && idx == asks_len - 1;
705        let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
706
707        let price = Decimal::from_str(price_str)
708            .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
709
710        let size = Decimal::from_str(size_str)
711            .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
712
713        let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
714            DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
715        })?;
716        let action = if qty.is_zero() {
717            BookAction::Delete
718        } else {
719            BookAction::Update
720        };
721
722        let order = BookOrder::new(
723            OrderSide::Sell,
724            Price::from_decimal_dp(price, price_precision).map_err(|e| {
725                DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
726            })?,
727            qty,
728            0,
729        );
730
731        deltas.push(OrderBookDelta::new(
732            *instrument_id,
733            action,
734            order,
735            flags,
736            0,
737            ts_init,
738            ts_init,
739        ));
740    }
741
742    Ok(deltas)
743}
744
745/// Parses trade ticks from trade contents.
746///
747/// # Errors
748///
749/// Returns an error if price/size/timestamp parsing fails.
750pub fn parse_trade_ticks(
751    instrument_id: InstrumentId,
752    instrument: &InstrumentAny,
753    contents: &DydxTradeContents,
754    ts_init: UnixNanos,
755) -> DydxWsResult<Vec<Data>> {
756    let mut ticks = Vec::new();
757
758    for trade in &contents.trades {
759        let aggressor_side = match trade.side {
760            OrderSide::Buy => AggressorSide::Buyer,
761            OrderSide::Sell => AggressorSide::Seller,
762            _ => continue,
763        };
764
765        let price = Decimal::from_str(&trade.price)
766            .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade price: {e}")))?;
767
768        let size = Decimal::from_str(&trade.size)
769            .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade size: {e}")))?;
770
771        let trade_ts = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
772            DydxWsError::Parse(format!("Timestamp out of range for trade {}", trade.id))
773        })?;
774
775        let tick = TradeTick::new(
776            instrument_id,
777            Price::from_decimal_dp(price, instrument.price_precision()).map_err(|e| {
778                DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
779            })?,
780            Quantity::from_decimal_dp(size, instrument.size_precision()).map_err(|e| {
781                DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
782            })?,
783            aggressor_side,
784            TradeId::new(&trade.id),
785            UnixNanos::from(trade_ts as u64),
786            ts_init,
787        );
788        ticks.push(Data::Trade(tick));
789    }
790
791    Ok(ticks)
792}
793
794/// Parses a single candle into a [`Bar`].
795///
796/// When `timestamp_on_close` is true, `ts_event` is set to bar close time
797/// (started_at + interval). When false, uses the venue-native open time.
798///
799/// # Errors
800///
801/// Returns an error if OHLCV/timestamp parsing fails.
802pub fn parse_candle_bar(
803    bar_type: BarType,
804    instrument: &InstrumentAny,
805    candle: &DydxCandle,
806    timestamp_on_close: bool,
807    ts_init: UnixNanos,
808) -> DydxWsResult<Bar> {
809    let open = Decimal::from_str(&candle.open)
810        .map_err(|e| DydxWsError::Parse(format!("Failed to parse open: {e}")))?;
811    let high = Decimal::from_str(&candle.high)
812        .map_err(|e| DydxWsError::Parse(format!("Failed to parse high: {e}")))?;
813    let low = Decimal::from_str(&candle.low)
814        .map_err(|e| DydxWsError::Parse(format!("Failed to parse low: {e}")))?;
815    let close = Decimal::from_str(&candle.close)
816        .map_err(|e| DydxWsError::Parse(format!("Failed to parse close: {e}")))?;
817    let volume = candle
818        .base_token_volume
819        .as_deref()
820        .map(Decimal::from_str)
821        .transpose()
822        .map_err(|e| DydxWsError::Parse(format!("Failed to parse volume: {e}")))?
823        .unwrap_or(Decimal::ZERO);
824
825    let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
826        DydxWsError::Parse(format!(
827            "Timestamp out of range for candle at {}",
828            candle.started_at
829        ))
830    })?;
831    let mut ts_event = UnixNanos::from(started_at_nanos as u64);
832
833    if timestamp_on_close {
834        let interval_ns = bar_type
835            .spec()
836            .timedelta()
837            .num_nanoseconds()
838            .ok_or_else(|| DydxWsError::Parse("Bar interval overflow".to_string()))?;
839        let updated = (started_at_nanos as u64)
840            .checked_add(interval_ns as u64)
841            .ok_or_else(|| {
842                DydxWsError::Parse("Bar timestamp overflowed adjusting to close time".to_string())
843            })?;
844        ts_event = UnixNanos::from(updated);
845    }
846
847    let bar = Bar::new(
848        bar_type,
849        Price::from_decimal_dp(open, instrument.price_precision()).map_err(|e| {
850            DydxWsError::Parse(format!("Failed to create open Price from decimal: {e}"))
851        })?,
852        Price::from_decimal_dp(high, instrument.price_precision()).map_err(|e| {
853            DydxWsError::Parse(format!("Failed to create high Price from decimal: {e}"))
854        })?,
855        Price::from_decimal_dp(low, instrument.price_precision()).map_err(|e| {
856            DydxWsError::Parse(format!("Failed to create low Price from decimal: {e}"))
857        })?,
858        Price::from_decimal_dp(close, instrument.price_precision()).map_err(|e| {
859            DydxWsError::Parse(format!("Failed to create close Price from decimal: {e}"))
860        })?,
861        Quantity::from_decimal_dp(volume, instrument.size_precision()).map_err(|e| {
862            DydxWsError::Parse(format!(
863                "Failed to create volume Quantity from decimal: {e}"
864            ))
865        })?,
866        ts_event,
867        ts_init,
868    );
869
870    Ok(bar)
871}
872
873#[cfg(test)]
874mod tests {
875    use std::str::FromStr;
876
877    use nautilus_model::{
878        data::{BarType, Data},
879        enums::{
880            AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType,
881            PositionSideSpecified,
882        },
883        identifiers::{AccountId, InstrumentId, Symbol, Venue},
884        instruments::{CryptoPerpetual, InstrumentAny},
885        types::{Currency, Price, Quantity},
886    };
887    use rstest::rstest;
888    use rust_decimal_macros::dec;
889    use ustr::Ustr;
890
891    use super::*;
892    use crate::{
893        common::{
894            enums::{
895                DydxFillType, DydxLiquidity, DydxMarketStatus, DydxOrderStatus, DydxOrderType,
896                DydxPositionSide, DydxPositionStatus, DydxTickerType, DydxTimeInForce,
897            },
898            testing::load_json_fixture,
899        },
900        http::models::PerpetualMarket,
901        websocket::messages::{DydxPerpetualPosition, DydxWsFillSubaccountMessageContents},
902    };
903
904    /// Creates a test market with BTC-USD ticker and specified clob_pair_id.
905    fn create_test_market(ticker: &str, clob_pair_id: u32) -> PerpetualMarket {
906        PerpetualMarket {
907            clob_pair_id,
908            ticker: Ustr::from(ticker),
909            status: DydxMarketStatus::Active,
910            base_asset: Some(Ustr::from("BTC")),
911            quote_asset: Some(Ustr::from("USD")),
912            step_size: dec!(0.001),
913            tick_size: dec!(0.01),
914            index_price: Some(dec!(50000)),
915            oracle_price: Some(dec!(50000)),
916            price_change_24h: dec!(0),
917            next_funding_rate: dec!(0),
918            next_funding_at: None,
919            min_order_size: Some(dec!(0.001)),
920            market_type: None,
921            initial_margin_fraction: dec!(0.05),
922            maintenance_margin_fraction: dec!(0.03),
923            base_position_notional: None,
924            incremental_position_size: None,
925            incremental_initial_margin_fraction: None,
926            max_position_size: None,
927            open_interest: dec!(1000),
928            atomic_resolution: -10,
929            quantum_conversion_exponent: -9,
930            subticks_per_tick: 1000000,
931            step_base_quantums: 1000000,
932            is_reduce_only: false,
933        }
934    }
935
936    /// Creates an InstrumentCache populated with the test instrument.
937    fn create_test_instrument_cache() -> InstrumentCache {
938        let cache = InstrumentCache::new();
939        let instrument = create_test_instrument();
940        let market = create_test_market("BTC-USD", 1);
941        cache.insert(instrument, market);
942        cache
943    }
944
945    fn create_test_instrument() -> InstrumentAny {
946        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
947
948        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
949            instrument_id,
950            Symbol::new("BTC-USD"),
951            Currency::BTC(),
952            Currency::USD(),
953            Currency::USD(),
954            false,
955            2,
956            8,
957            Price::new(0.01, 2),
958            Quantity::new(0.001, 8),
959            Some(Quantity::new(1.0, 0)),
960            Some(Quantity::new(0.001, 8)),
961            Some(Quantity::new(100000.0, 8)),
962            Some(Quantity::new(0.001, 8)),
963            None,
964            None,
965            Some(Price::new(1000000.0, 2)),
966            Some(Price::new(0.01, 2)),
967            Some(rust_decimal_macros::dec!(0.05)),
968            Some(rust_decimal_macros::dec!(0.03)),
969            Some(rust_decimal_macros::dec!(0.0002)),
970            Some(rust_decimal_macros::dec!(0.0005)),
971            None, // info: Option<Params>
972            UnixNanos::default(),
973            UnixNanos::default(),
974        ))
975    }
976
977    #[rstest]
978    fn test_convert_ws_order_to_http_basic() {
979        let ws_order = DydxWsOrderSubaccountMessageContents {
980            id: "order123".to_string(),
981            subaccount_id: "dydx1test/0".to_string(),
982            client_id: "12345".to_string(),
983            clob_pair_id: "1".to_string(),
984            side: OrderSide::Buy,
985            size: "1.5".to_string(),
986            price: "50000.0".to_string(),
987            status: DydxOrderStatus::PartiallyFilled,
988            order_type: DydxOrderType::Limit,
989            time_in_force: DydxTimeInForce::Gtt,
990            post_only: false,
991            reduce_only: false,
992            order_flags: "0".to_string(),
993            good_til_block: Some("1000".to_string()),
994            good_til_block_time: None,
995            created_at_height: Some("900".to_string()),
996            client_metadata: Some("0".to_string()),
997            trigger_price: None,
998            total_filled: Some("0.5".to_string()),
999            updated_at: Some("2024-11-14T10:00:00Z".to_string()),
1000            updated_at_height: Some("950".to_string()),
1001        };
1002
1003        let result = convert_ws_order_to_http(&ws_order);
1004        assert!(result.is_ok());
1005
1006        let http_order = result.unwrap();
1007        assert_eq!(http_order.id, "order123");
1008        assert_eq!(http_order.clob_pair_id, 1);
1009        assert_eq!(http_order.size.to_string(), "1.5");
1010        assert_eq!(http_order.total_filled, rust_decimal_macros::dec!(0.5)); // 0.5 filled
1011        assert_eq!(http_order.status, DydxOrderStatus::PartiallyFilled);
1012    }
1013
1014    #[rstest]
1015    fn test_parse_ws_order_report_success() {
1016        let ws_order = DydxWsOrderSubaccountMessageContents {
1017            id: "order456".to_string(),
1018            subaccount_id: "dydx1test/0".to_string(),
1019            client_id: "67890".to_string(),
1020            clob_pair_id: "1".to_string(),
1021            side: OrderSide::Sell,
1022            size: "2.0".to_string(),
1023            price: "51000.0".to_string(),
1024            status: DydxOrderStatus::Open,
1025            order_type: DydxOrderType::Limit,
1026            time_in_force: DydxTimeInForce::Gtt,
1027            post_only: true,
1028            reduce_only: false,
1029            order_flags: "0".to_string(),
1030            good_til_block: Some("2000".to_string()),
1031            good_til_block_time: None,
1032            created_at_height: Some("1800".to_string()),
1033            client_metadata: Some("0".to_string()),
1034            trigger_price: None,
1035            total_filled: Some("0.0".to_string()),
1036            updated_at: None,
1037            updated_at_height: None,
1038        };
1039
1040        let instrument_cache = create_test_instrument_cache();
1041        let encoder = ClientOrderIdEncoder::new();
1042
1043        let account_id = AccountId::new("DYDX-001");
1044        let ts_init = UnixNanos::default();
1045        let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1046
1047        let result = parse_ws_order_report(
1048            &ws_order,
1049            &instrument_cache,
1050            &order_contexts,
1051            &encoder,
1052            account_id,
1053            ts_init,
1054        );
1055
1056        assert!(result.is_ok());
1057        let report = result.unwrap();
1058        assert_eq!(report.account_id, account_id);
1059        assert_eq!(report.order_side, OrderSide::Sell);
1060    }
1061
1062    #[rstest]
1063    fn test_parse_ws_order_report_missing_instrument() {
1064        let ws_order = DydxWsOrderSubaccountMessageContents {
1065            id: "order789".to_string(),
1066            subaccount_id: "dydx1test/0".to_string(),
1067            client_id: "11111".to_string(),
1068            clob_pair_id: "99".to_string(), // Non-existent
1069            side: OrderSide::Buy,
1070            size: "1.0".to_string(),
1071            price: "50000.0".to_string(),
1072            status: DydxOrderStatus::Open,
1073            order_type: DydxOrderType::Market,
1074            time_in_force: DydxTimeInForce::Ioc,
1075            post_only: false,
1076            reduce_only: false,
1077            order_flags: "0".to_string(),
1078            good_til_block: Some("1000".to_string()),
1079            good_til_block_time: None,
1080            created_at_height: Some("900".to_string()),
1081            client_metadata: Some("0".to_string()),
1082            trigger_price: None,
1083            total_filled: Some("0.0".to_string()),
1084            updated_at: None,
1085            updated_at_height: None,
1086        };
1087
1088        let instrument_cache = InstrumentCache::new(); // Empty cache
1089        let encoder = ClientOrderIdEncoder::new();
1090        let account_id = AccountId::new("DYDX-001");
1091        let ts_init = UnixNanos::default();
1092        let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1093
1094        let result = parse_ws_order_report(
1095            &ws_order,
1096            &instrument_cache,
1097            &order_contexts,
1098            &encoder,
1099            account_id,
1100            ts_init,
1101        );
1102
1103        assert!(result.is_err());
1104        assert!(
1105            result
1106                .unwrap_err()
1107                .to_string()
1108                .contains("No instrument cached")
1109        );
1110    }
1111
1112    #[rstest]
1113    fn test_convert_ws_fill_to_http() {
1114        let ws_fill = DydxWsFillSubaccountMessageContents {
1115            id: "fill123".to_string(),
1116            subaccount_id: "sub1".to_string(),
1117            side: OrderSide::Buy,
1118            liquidity: DydxLiquidity::Maker,
1119            fill_type: DydxFillType::Limit,
1120            market: "BTC-USD".into(),
1121            market_type: Some(DydxTickerType::Perpetual),
1122            price: "50000.5".to_string(),
1123            size: "0.1".to_string(),
1124            fee: "-2.5".to_string(), // Negative for maker rebate
1125            created_at: "2024-01-15T10:30:00Z".to_string(),
1126            created_at_height: Some("12345".to_string()),
1127            order_id: Some("order456".to_string()),
1128            client_metadata: Some("999".to_string()),
1129        };
1130
1131        let result = convert_ws_fill_to_http(&ws_fill);
1132        assert!(result.is_ok());
1133
1134        let http_fill = result.unwrap();
1135        assert_eq!(http_fill.id, "fill123");
1136        assert_eq!(http_fill.side, OrderSide::Buy);
1137        assert_eq!(http_fill.liquidity, DydxLiquidity::Maker);
1138        assert_eq!(http_fill.price, rust_decimal_macros::dec!(50000.5));
1139        assert_eq!(http_fill.size, rust_decimal_macros::dec!(0.1));
1140        assert_eq!(http_fill.fee, rust_decimal_macros::dec!(-2.5));
1141        assert_eq!(http_fill.created_at_height, 12345);
1142        assert_eq!(http_fill.order_id, "order456");
1143        assert_eq!(http_fill.client_metadata, 999);
1144    }
1145
1146    #[rstest]
1147    fn test_parse_ws_fill_report_success() {
1148        let instrument_cache = create_test_instrument_cache();
1149        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1150
1151        // dYdX WS fills use market format "BTC-USD" (not "BTC-USD-PERP")
1152        // but the instrument symbol is "BTC-USD-PERP"
1153        let ws_fill = DydxWsFillSubaccountMessageContents {
1154            id: "fill789".to_string(),
1155            subaccount_id: "sub1".to_string(),
1156            side: OrderSide::Sell,
1157            liquidity: DydxLiquidity::Taker,
1158            fill_type: DydxFillType::Limit,
1159            market: "BTC-USD".into(),
1160            market_type: Some(DydxTickerType::Perpetual),
1161            price: "49500.0".to_string(),
1162            size: "0.5".to_string(),
1163            fee: "12.375".to_string(), // Positive for taker fee
1164            created_at: "2024-01-15T11:00:00Z".to_string(),
1165            created_at_height: Some("12400".to_string()),
1166            order_id: Some("order999".to_string()),
1167            client_metadata: Some("888".to_string()),
1168        };
1169
1170        let account_id = AccountId::new("DYDX-001");
1171        let ts_init = UnixNanos::default();
1172        let order_id_map = DashMap::new();
1173        let order_contexts = DashMap::new();
1174        let encoder = ClientOrderIdEncoder::new();
1175
1176        let result = parse_ws_fill_report(
1177            &ws_fill,
1178            &instrument_cache,
1179            &order_id_map,
1180            &order_contexts,
1181            &encoder,
1182            account_id,
1183            ts_init,
1184        );
1185        assert!(result.is_ok());
1186
1187        let fill_report = result.unwrap();
1188        assert_eq!(fill_report.instrument_id, instrument_id);
1189        assert_eq!(fill_report.venue_order_id.as_str(), "order999");
1190        assert_eq!(fill_report.last_qty.as_f64(), 0.5);
1191        assert_eq!(fill_report.last_px.as_f64(), 49500.0);
1192        assert_eq!(fill_report.commission.as_decimal(), dec!(12.38));
1193    }
1194
1195    #[rstest]
1196    fn test_parse_ws_fill_report_missing_instrument() {
1197        let instrument_cache = InstrumentCache::new(); // Empty - no instruments cached
1198
1199        let ws_fill = DydxWsFillSubaccountMessageContents {
1200            id: "fill000".to_string(),
1201            subaccount_id: "sub1".to_string(),
1202            side: OrderSide::Buy,
1203            liquidity: DydxLiquidity::Maker,
1204            fill_type: DydxFillType::Limit,
1205            market: "ETH-USD-PERP".into(),
1206            market_type: Some(DydxTickerType::Perpetual),
1207            price: "3000.0".to_string(),
1208            size: "1.0".to_string(),
1209            fee: "-1.5".to_string(),
1210            created_at: "2024-01-15T12:00:00Z".to_string(),
1211            created_at_height: Some("12500".to_string()),
1212            order_id: Some("order111".to_string()),
1213            client_metadata: Some("777".to_string()),
1214        };
1215
1216        let account_id = AccountId::new("DYDX-001");
1217        let ts_init = UnixNanos::default();
1218        let order_id_map = DashMap::new();
1219        let order_contexts = DashMap::new();
1220        let encoder = ClientOrderIdEncoder::new();
1221
1222        let result = parse_ws_fill_report(
1223            &ws_fill,
1224            &instrument_cache,
1225            &order_id_map,
1226            &order_contexts,
1227            &encoder,
1228            account_id,
1229            ts_init,
1230        );
1231        assert!(result.is_err());
1232        assert!(
1233            result
1234                .unwrap_err()
1235                .to_string()
1236                .contains("No instrument cached for market")
1237        );
1238    }
1239
1240    #[rstest]
1241    fn test_convert_ws_position_to_http() {
1242        let ws_position = DydxPerpetualPosition {
1243            market: "BTC-USD".into(),
1244            status: DydxPositionStatus::Open,
1245            side: DydxPositionSide::Long,
1246            size: "1.5".to_string(),
1247            max_size: "2.0".to_string(),
1248            entry_price: "50000.0".to_string(),
1249            exit_price: None,
1250            realized_pnl: "100.0".to_string(),
1251            unrealized_pnl: "250.5".to_string(),
1252            created_at: "2024-01-15T10:00:00Z".to_string(),
1253            closed_at: None,
1254            sum_open: "5.0".to_string(),
1255            sum_close: "3.5".to_string(),
1256            net_funding: "-10.25".to_string(),
1257        };
1258
1259        let result = convert_ws_position_to_http(&ws_position);
1260        assert!(result.is_ok());
1261
1262        let http_position = result.unwrap();
1263        assert_eq!(http_position.market, "BTC-USD");
1264        assert_eq!(http_position.status, DydxPositionStatus::Open);
1265        assert_eq!(http_position.side, DydxPositionSide::Long); // Positive size = Long
1266        assert_eq!(http_position.size, rust_decimal_macros::dec!(1.5));
1267        assert_eq!(http_position.max_size, rust_decimal_macros::dec!(2.0));
1268        assert_eq!(
1269            http_position.entry_price,
1270            rust_decimal_macros::dec!(50000.0)
1271        );
1272        assert_eq!(http_position.exit_price, None);
1273        assert_eq!(http_position.realized_pnl, rust_decimal_macros::dec!(100.0));
1274        assert_eq!(
1275            http_position.unrealized_pnl,
1276            rust_decimal_macros::dec!(250.5)
1277        );
1278        assert_eq!(http_position.sum_open, rust_decimal_macros::dec!(5.0));
1279        assert_eq!(http_position.sum_close, rust_decimal_macros::dec!(3.5));
1280        assert_eq!(http_position.net_funding, rust_decimal_macros::dec!(-10.25));
1281    }
1282
1283    /// The converter must preserve the venue-supplied `side`, not re-derive it from
1284    /// the sign of `size`. A zero-size position reported as `Long` must stay `Long`,
1285    /// and a mismatched (Long, negative-size) payload must retain the venue side.
1286    #[rstest]
1287    #[case::long_positive(DydxPositionSide::Long, "1.0", DydxPositionSide::Long)]
1288    #[case::short_negative(DydxPositionSide::Short, "-1.0", DydxPositionSide::Short)]
1289    #[case::long_zero(DydxPositionSide::Long, "0.0", DydxPositionSide::Long)]
1290    #[case::short_zero(DydxPositionSide::Short, "0.0", DydxPositionSide::Short)]
1291    #[case::long_with_negative_size(DydxPositionSide::Long, "-1.0", DydxPositionSide::Long)]
1292    #[case::short_with_positive_size(DydxPositionSide::Short, "1.0", DydxPositionSide::Short)]
1293    fn test_convert_ws_position_preserves_venue_side(
1294        #[case] venue_side: DydxPositionSide,
1295        #[case] size: &str,
1296        #[case] expected_side: DydxPositionSide,
1297    ) {
1298        let ws_position = DydxPerpetualPosition {
1299            market: "BTC-USD".into(),
1300            status: DydxPositionStatus::Open,
1301            side: venue_side,
1302            size: size.to_string(),
1303            max_size: "1.0".to_string(),
1304            entry_price: "50000.0".to_string(),
1305            exit_price: None,
1306            realized_pnl: "0.0".to_string(),
1307            unrealized_pnl: "0.0".to_string(),
1308            created_at: "2024-01-15T10:00:00Z".to_string(),
1309            closed_at: None,
1310            sum_open: "0.0".to_string(),
1311            sum_close: "0.0".to_string(),
1312            net_funding: "0.0".to_string(),
1313        };
1314
1315        let http_position =
1316            convert_ws_position_to_http(&ws_position).expect("conversion should succeed");
1317        assert_eq!(http_position.side, expected_side);
1318    }
1319
1320    /// End-to-end verification that the venue-supplied side flows through to the
1321    /// emitted `PositionStatusReport`. The previous implementation re-derived side
1322    /// from `size.is_sign_positive()` inside `parse_position_status_report`, which
1323    /// silently overrode the venue side for the mismatched case below.
1324    #[rstest]
1325    fn test_ws_position_report_emits_venue_side_for_mismatched_size() {
1326        use nautilus_model::enums::PositionSideSpecified;
1327
1328        let instrument_cache = create_test_instrument_cache();
1329        // Venue reports a Short position but the `size` field would round to
1330        // positive via the legacy sign check. The report must show Short.
1331        let ws_position = DydxPerpetualPosition {
1332            market: "BTC-USD".into(),
1333            status: DydxPositionStatus::Open,
1334            side: DydxPositionSide::Short,
1335            size: "1.0".to_string(),
1336            max_size: "1.0".to_string(),
1337            entry_price: "50000.0".to_string(),
1338            exit_price: None,
1339            realized_pnl: "0.0".to_string(),
1340            unrealized_pnl: "0.0".to_string(),
1341            created_at: "2024-01-15T10:00:00Z".to_string(),
1342            closed_at: None,
1343            sum_open: "0.0".to_string(),
1344            sum_close: "0.0".to_string(),
1345            net_funding: "0.0".to_string(),
1346        };
1347
1348        let report = parse_ws_position_report(
1349            &ws_position,
1350            &instrument_cache,
1351            AccountId::new("DYDX-001"),
1352            UnixNanos::default(),
1353        )
1354        .expect("parse should succeed");
1355        assert_eq!(report.position_side, PositionSideSpecified::Short);
1356    }
1357
1358    #[rstest]
1359    fn test_parse_ws_position_report_success() {
1360        let instrument_cache = create_test_instrument_cache();
1361        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1362
1363        let ws_position = DydxPerpetualPosition {
1364            market: "BTC-USD".into(),
1365            status: DydxPositionStatus::Open,
1366            side: DydxPositionSide::Long,
1367            size: "0.5".to_string(),
1368            max_size: "1.0".to_string(),
1369            entry_price: "49500.0".to_string(),
1370            exit_price: None,
1371            realized_pnl: "0.0".to_string(),
1372            unrealized_pnl: "125.0".to_string(),
1373            created_at: "2024-01-15T09:00:00Z".to_string(),
1374            closed_at: None,
1375            sum_open: "0.5".to_string(),
1376            sum_close: "0.0".to_string(),
1377            net_funding: "-2.5".to_string(),
1378        };
1379
1380        let account_id = AccountId::new("DYDX-001");
1381        let ts_init = UnixNanos::default();
1382
1383        let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1384        assert!(result.is_ok());
1385
1386        let position_report = result.unwrap();
1387        assert_eq!(position_report.instrument_id, instrument_id);
1388        assert_eq!(position_report.position_side, PositionSideSpecified::Long);
1389        assert_eq!(position_report.quantity.as_f64(), 0.5);
1390        // avg_px_open should be entry_price
1391        assert!(position_report.avg_px_open.is_some());
1392    }
1393
1394    #[rstest]
1395    fn test_parse_ws_position_report_short() {
1396        let instrument_cache = create_test_instrument_cache();
1397        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1398
1399        let ws_position = DydxPerpetualPosition {
1400            market: "BTC-USD".into(),
1401            status: DydxPositionStatus::Open,
1402            side: DydxPositionSide::Short,
1403            size: "-0.25".to_string(), // Negative for short
1404            max_size: "0.5".to_string(),
1405            entry_price: "51000.0".to_string(),
1406            exit_price: None,
1407            realized_pnl: "50.0".to_string(),
1408            unrealized_pnl: "-75.25".to_string(),
1409            created_at: "2024-01-15T08:00:00Z".to_string(),
1410            closed_at: None,
1411            sum_open: "0.25".to_string(),
1412            sum_close: "0.0".to_string(),
1413            net_funding: "1.5".to_string(),
1414        };
1415
1416        let account_id = AccountId::new("DYDX-001");
1417        let ts_init = UnixNanos::default();
1418
1419        let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1420        assert!(result.is_ok());
1421
1422        let position_report = result.unwrap();
1423        assert_eq!(position_report.instrument_id, instrument_id);
1424        assert_eq!(position_report.position_side, PositionSideSpecified::Short);
1425        assert_eq!(position_report.quantity.as_f64(), 0.25); // Quantity is always positive
1426    }
1427
1428    #[rstest]
1429    fn test_parse_ws_position_report_missing_instrument() {
1430        let instrument_cache = InstrumentCache::new(); // Empty - no instruments cached
1431
1432        let ws_position = DydxPerpetualPosition {
1433            market: "ETH-USD-PERP".into(),
1434            status: DydxPositionStatus::Open,
1435            side: DydxPositionSide::Long,
1436            size: "5.0".to_string(),
1437            max_size: "10.0".to_string(),
1438            entry_price: "3000.0".to_string(),
1439            exit_price: None,
1440            realized_pnl: "0.0".to_string(),
1441            unrealized_pnl: "500.0".to_string(),
1442            created_at: "2024-01-15T07:00:00Z".to_string(),
1443            closed_at: None,
1444            sum_open: "5.0".to_string(),
1445            sum_close: "0.0".to_string(),
1446            net_funding: "-5.0".to_string(),
1447        };
1448
1449        let account_id = AccountId::new("DYDX-001");
1450        let ts_init = UnixNanos::default();
1451
1452        let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1453        assert!(result.is_err());
1454        assert!(
1455            result
1456                .unwrap_err()
1457                .to_string()
1458                .contains("No instrument cached for market")
1459        );
1460    }
1461
1462    #[rstest]
1463    #[case(DydxOrderStatus::Filled, "2.0")]
1464    #[case(DydxOrderStatus::Canceled, "0.0")]
1465    #[case(DydxOrderStatus::BestEffortCanceled, "0.5")]
1466    #[case(DydxOrderStatus::BestEffortOpened, "0.0")]
1467    #[case(DydxOrderStatus::Untriggered, "0.0")]
1468    fn test_parse_ws_order_various_statuses(
1469        #[case] status: DydxOrderStatus,
1470        #[case] total_filled: &str,
1471    ) {
1472        let ws_order = DydxWsOrderSubaccountMessageContents {
1473            id: format!("order_{status:?}"),
1474            subaccount_id: "dydx1test/0".to_string(),
1475            client_id: "99999".to_string(),
1476            clob_pair_id: "1".to_string(),
1477            side: OrderSide::Buy,
1478            size: "2.0".to_string(),
1479            price: "50000.0".to_string(),
1480            status,
1481            order_type: DydxOrderType::Limit,
1482            time_in_force: DydxTimeInForce::Gtt,
1483            post_only: false,
1484            reduce_only: false,
1485            order_flags: "0".to_string(),
1486            good_til_block: Some("1000".to_string()),
1487            good_til_block_time: None,
1488            created_at_height: Some("900".to_string()),
1489            client_metadata: Some("0".to_string()),
1490            trigger_price: None,
1491            total_filled: Some(total_filled.to_string()),
1492            updated_at: Some("2024-11-14T10:00:00Z".to_string()),
1493            updated_at_height: Some("950".to_string()),
1494        };
1495
1496        let instrument_cache = create_test_instrument_cache();
1497        let encoder = ClientOrderIdEncoder::new();
1498
1499        let account_id = AccountId::new("DYDX-001");
1500        let ts_init = UnixNanos::default();
1501        let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1502
1503        let result = parse_ws_order_report(
1504            &ws_order,
1505            &instrument_cache,
1506            &order_contexts,
1507            &encoder,
1508            account_id,
1509            ts_init,
1510        );
1511
1512        assert!(
1513            result.is_ok(),
1514            "Failed to parse order with status {status:?}"
1515        );
1516        let report = result.unwrap();
1517
1518        // Verify status conversion
1519        let expected_status = match status {
1520            DydxOrderStatus::Open
1521            | DydxOrderStatus::BestEffortOpened
1522            | DydxOrderStatus::Untriggered => OrderStatus::Accepted,
1523            DydxOrderStatus::PartiallyFilled => OrderStatus::PartiallyFilled,
1524            DydxOrderStatus::Filled => OrderStatus::Filled,
1525            DydxOrderStatus::Canceled | DydxOrderStatus::BestEffortCanceled => {
1526                OrderStatus::Canceled
1527            }
1528        };
1529        assert_eq!(report.order_status, expected_status);
1530    }
1531
1532    #[rstest]
1533    fn test_parse_ws_order_with_trigger_price() {
1534        let ws_order = DydxWsOrderSubaccountMessageContents {
1535            id: "conditional_order".to_string(),
1536            subaccount_id: "dydx1test/0".to_string(),
1537            client_id: "88888".to_string(),
1538            clob_pair_id: "1".to_string(),
1539            side: OrderSide::Sell,
1540            size: "1.0".to_string(),
1541            price: "52000.0".to_string(),
1542            status: DydxOrderStatus::Untriggered,
1543            order_type: DydxOrderType::StopLimit,
1544            time_in_force: DydxTimeInForce::Gtt,
1545            post_only: false,
1546            reduce_only: true,
1547            order_flags: "32".to_string(),
1548            good_til_block: None,
1549            good_til_block_time: Some("2024-12-31T23:59:59Z".to_string()),
1550            created_at_height: Some("1000".to_string()),
1551            client_metadata: Some("100".to_string()),
1552            trigger_price: Some("51500.0".to_string()),
1553            total_filled: Some("0.0".to_string()),
1554            updated_at: Some("2024-11-14T11:00:00Z".to_string()),
1555            updated_at_height: Some("1050".to_string()),
1556        };
1557
1558        let instrument_cache = create_test_instrument_cache();
1559        let encoder = ClientOrderIdEncoder::new();
1560
1561        let account_id = AccountId::new("DYDX-001");
1562        let ts_init = UnixNanos::default();
1563        let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1564
1565        let result = parse_ws_order_report(
1566            &ws_order,
1567            &instrument_cache,
1568            &order_contexts,
1569            &encoder,
1570            account_id,
1571            ts_init,
1572        );
1573
1574        assert!(result.is_ok());
1575        let report = result.unwrap();
1576        assert_eq!(report.order_status, OrderStatus::PendingUpdate);
1577        // Trigger price should be parsed and available in the report
1578        assert!(report.trigger_price.is_some());
1579    }
1580
1581    #[rstest]
1582    fn test_parse_ws_order_market_type() {
1583        let ws_order = DydxWsOrderSubaccountMessageContents {
1584            id: "market_order".to_string(),
1585            subaccount_id: "dydx1test/0".to_string(),
1586            client_id: "77777".to_string(),
1587            clob_pair_id: "1".to_string(),
1588            side: OrderSide::Buy,
1589            size: "0.5".to_string(),
1590            price: "50000.0".to_string(), // Market orders still have a price
1591            status: DydxOrderStatus::Filled,
1592            order_type: DydxOrderType::Market,
1593            time_in_force: DydxTimeInForce::Ioc,
1594            post_only: false,
1595            reduce_only: false,
1596            order_flags: "0".to_string(),
1597            good_til_block: Some("1000".to_string()),
1598            good_til_block_time: None,
1599            created_at_height: Some("900".to_string()),
1600            client_metadata: Some("0".to_string()),
1601            trigger_price: None,
1602            total_filled: Some("0.5".to_string()),
1603            updated_at: Some("2024-11-14T10:01:00Z".to_string()),
1604            updated_at_height: Some("901".to_string()),
1605        };
1606
1607        let instrument_cache = create_test_instrument_cache();
1608        let encoder = ClientOrderIdEncoder::new();
1609
1610        let account_id = AccountId::new("DYDX-001");
1611        let ts_init = UnixNanos::default();
1612        let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1613
1614        let result = parse_ws_order_report(
1615            &ws_order,
1616            &instrument_cache,
1617            &order_contexts,
1618            &encoder,
1619            account_id,
1620            ts_init,
1621        );
1622
1623        assert!(result.is_ok());
1624        let report = result.unwrap();
1625        assert_eq!(report.order_type, OrderType::Market);
1626        assert_eq!(report.order_status, OrderStatus::Filled);
1627    }
1628
1629    #[rstest]
1630    fn test_parse_ws_order_invalid_clob_pair_id() {
1631        let ws_order = DydxWsOrderSubaccountMessageContents {
1632            id: "bad_order".to_string(),
1633            subaccount_id: "dydx1test/0".to_string(),
1634            client_id: "12345".to_string(),
1635            clob_pair_id: "not_a_number".to_string(), // Invalid
1636            side: OrderSide::Buy,
1637            size: "1.0".to_string(),
1638            price: "50000.0".to_string(),
1639            status: DydxOrderStatus::Open,
1640            order_type: DydxOrderType::Limit,
1641            time_in_force: DydxTimeInForce::Gtt,
1642            post_only: false,
1643            reduce_only: false,
1644            order_flags: "0".to_string(),
1645            good_til_block: Some("1000".to_string()),
1646            good_til_block_time: None,
1647            created_at_height: Some("900".to_string()),
1648            client_metadata: Some("0".to_string()),
1649            trigger_price: None,
1650            total_filled: Some("0.0".to_string()),
1651            updated_at: None,
1652            updated_at_height: None,
1653        };
1654
1655        let instrument_cache = InstrumentCache::new(); // Empty cache
1656        let encoder = ClientOrderIdEncoder::new();
1657        let account_id = AccountId::new("DYDX-001");
1658        let ts_init = UnixNanos::default();
1659        let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1660
1661        let result = parse_ws_order_report(
1662            &ws_order,
1663            &instrument_cache,
1664            &order_contexts,
1665            &encoder,
1666            account_id,
1667            ts_init,
1668        );
1669
1670        assert!(result.is_err());
1671        assert!(
1672            result
1673                .unwrap_err()
1674                .to_string()
1675                .contains("Failed to parse clob_pair_id")
1676        );
1677    }
1678
1679    #[rstest]
1680    fn test_parse_ws_position_closed() {
1681        let instrument_cache = create_test_instrument_cache();
1682        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1683
1684        let ws_position = DydxPerpetualPosition {
1685            market: "BTC-USD".into(),
1686            status: DydxPositionStatus::Closed,
1687            side: DydxPositionSide::Long,
1688            size: "0.0".to_string(), // Closed = zero size
1689            max_size: "2.0".to_string(),
1690            entry_price: "48000.0".to_string(),
1691            exit_price: Some("52000.0".to_string()),
1692            realized_pnl: "2000.0".to_string(),
1693            unrealized_pnl: "0.0".to_string(),
1694            created_at: "2024-01-10T09:00:00Z".to_string(),
1695            closed_at: Some("2024-01-15T14:00:00Z".to_string()),
1696            sum_open: "5.0".to_string(),
1697            sum_close: "5.0".to_string(), // Fully closed
1698            net_funding: "-25.5".to_string(),
1699        };
1700
1701        let account_id = AccountId::new("DYDX-001");
1702        let ts_init = UnixNanos::default();
1703
1704        let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1705        assert!(result.is_ok());
1706
1707        let position_report = result.unwrap();
1708        assert_eq!(position_report.instrument_id, instrument_id);
1709        // Closed position should have zero quantity
1710        assert_eq!(position_report.quantity.as_f64(), 0.0);
1711    }
1712
1713    #[rstest]
1714    fn test_parse_ws_fill_with_maker_rebate() {
1715        let instrument_cache = create_test_instrument_cache();
1716
1717        let ws_fill = DydxWsFillSubaccountMessageContents {
1718            id: "fill_rebate".to_string(),
1719            subaccount_id: "sub1".to_string(),
1720            side: OrderSide::Buy,
1721            liquidity: DydxLiquidity::Maker,
1722            fill_type: DydxFillType::Limit,
1723            market: "BTC-USD".into(),
1724            market_type: Some(DydxTickerType::Perpetual),
1725            price: "50000.0".to_string(),
1726            size: "1.0".to_string(),
1727            fee: "-15.0".to_string(), // Negative fee = rebate
1728            created_at: "2024-01-15T13:00:00Z".to_string(),
1729            created_at_height: Some("13000".to_string()),
1730            order_id: Some("order_maker".to_string()),
1731            client_metadata: Some("200".to_string()),
1732        };
1733
1734        let account_id = AccountId::new("DYDX-001");
1735        let ts_init = UnixNanos::default();
1736        let order_id_map = DashMap::new();
1737        let order_contexts = DashMap::new();
1738        let encoder = ClientOrderIdEncoder::new();
1739
1740        let result = parse_ws_fill_report(
1741            &ws_fill,
1742            &instrument_cache,
1743            &order_id_map,
1744            &order_contexts,
1745            &encoder,
1746            account_id,
1747            ts_init,
1748        );
1749        assert!(result.is_ok());
1750
1751        let fill_report = result.unwrap();
1752        assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1753        assert!(fill_report.commission.as_decimal() < dec!(0));
1754    }
1755
1756    #[rstest]
1757    fn test_parse_ws_fill_taker_with_fee() {
1758        let instrument_cache = create_test_instrument_cache();
1759
1760        let ws_fill = DydxWsFillSubaccountMessageContents {
1761            id: "fill_taker".to_string(),
1762            subaccount_id: "sub2".to_string(),
1763            side: OrderSide::Sell,
1764            liquidity: DydxLiquidity::Taker,
1765            fill_type: DydxFillType::Limit,
1766            market: "BTC-USD".into(),
1767            market_type: Some(DydxTickerType::Perpetual),
1768            price: "49800.0".to_string(),
1769            size: "0.75".to_string(),
1770            fee: "18.675".to_string(), // Positive fee for taker
1771            created_at: "2024-01-15T14:00:00Z".to_string(),
1772            created_at_height: Some("14000".to_string()),
1773            order_id: Some("order_taker".to_string()),
1774            client_metadata: Some("300".to_string()),
1775        };
1776
1777        let account_id = AccountId::new("DYDX-001");
1778        let ts_init = UnixNanos::default();
1779        let order_id_map = DashMap::new();
1780        let order_contexts = DashMap::new();
1781        let encoder = ClientOrderIdEncoder::new();
1782
1783        let result = parse_ws_fill_report(
1784            &ws_fill,
1785            &instrument_cache,
1786            &order_id_map,
1787            &order_contexts,
1788            &encoder,
1789            account_id,
1790            ts_init,
1791        );
1792        assert!(result.is_ok());
1793
1794        let fill_report = result.unwrap();
1795        assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
1796        assert_eq!(fill_report.order_side, OrderSide::Sell);
1797        assert!(fill_report.commission.as_decimal() > dec!(0));
1798    }
1799
1800    #[rstest]
1801    fn test_parse_orderbook_snapshot() {
1802        let json = load_json_fixture("ws_orderbook_subscribed.json");
1803        let contents: DydxOrderbookSnapshotContents =
1804            serde_json::from_value(json["contents"].clone())
1805                .expect("Failed to parse orderbook snapshot contents");
1806
1807        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
1808        let ts_init = UnixNanos::from(1_000_000_000u64);
1809
1810        let deltas = parse_orderbook_snapshot(&instrument_id, &contents, 2, 8, ts_init)
1811            .expect("Failed to parse orderbook snapshot");
1812
1813        // 1 clear + 3 bids + 3 asks = 7 deltas
1814        assert_eq!(deltas.deltas.len(), 7);
1815
1816        assert_eq!(deltas.deltas[0].action, BookAction::Clear);
1817        assert_eq!(deltas.deltas[1].action, BookAction::Add);
1818        assert_eq!(deltas.deltas[1].order.side, OrderSide::Buy);
1819        assert_eq!(deltas.deltas[1].order.price.to_string(), "43240.00");
1820        assert_eq!(deltas.deltas[1].order.size.to_string(), "1.50000000");
1821
1822        assert_eq!(deltas.deltas[4].action, BookAction::Add);
1823        assert_eq!(deltas.deltas[4].order.side, OrderSide::Sell);
1824        assert_eq!(deltas.deltas[4].order.price.to_string(), "43250.00");
1825        assert_eq!(deltas.deltas[4].order.size.to_string(), "1.20000000");
1826
1827        // Every snapshot delta must carry F_SNAPSHOT. The Clear carries F_SNAPSHOT only
1828        // (not last); every intermediate delta carries F_SNAPSHOT only; the terminator
1829        // carries F_SNAPSHOT | F_LAST.
1830        let snapshot = RecordFlag::F_SNAPSHOT as u8;
1831        let last_flag = RecordFlag::F_LAST as u8;
1832
1833        assert_eq!(deltas.deltas[0].flags, snapshot, "Clear missing F_SNAPSHOT");
1834        for (idx, delta) in deltas.deltas.iter().enumerate().skip(1) {
1835            let expected = if idx == deltas.deltas.len() - 1 {
1836                snapshot | last_flag
1837            } else {
1838                snapshot
1839            };
1840            assert_eq!(
1841                delta.flags, expected,
1842                "delta at index {idx} has wrong flags: got {:#010b}, expected {expected:#010b}",
1843                delta.flags,
1844            );
1845        }
1846    }
1847
1848    #[rstest]
1849    #[case::empty_book(vec![], vec![], 1)]
1850    #[case::bids_only(vec![("100.0", "1.0")], vec![], 2)]
1851    #[case::asks_only(vec![], vec![("101.0", "2.0")], 2)]
1852    fn test_parse_orderbook_snapshot_flag_shapes(
1853        #[case] bids: Vec<(&str, &str)>,
1854        #[case] asks: Vec<(&str, &str)>,
1855        #[case] expected_len: usize,
1856    ) {
1857        use crate::websocket::messages::DydxPriceLevel;
1858        let contents = DydxOrderbookSnapshotContents {
1859            bids: if bids.is_empty() {
1860                None
1861            } else {
1862                Some(
1863                    bids.into_iter()
1864                        .map(|(p, s)| DydxPriceLevel {
1865                            price: p.to_string(),
1866                            size: s.to_string(),
1867                        })
1868                        .collect(),
1869                )
1870            },
1871            asks: if asks.is_empty() {
1872                None
1873            } else {
1874                Some(
1875                    asks.into_iter()
1876                        .map(|(p, s)| DydxPriceLevel {
1877                            price: p.to_string(),
1878                            size: s.to_string(),
1879                        })
1880                        .collect(),
1881                )
1882            },
1883        };
1884        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
1885        let ts_init = UnixNanos::from(1_000_000_000u64);
1886
1887        let deltas = parse_orderbook_snapshot(&instrument_id, &contents, 2, 8, ts_init)
1888            .expect("Failed to parse orderbook snapshot");
1889
1890        let snapshot = RecordFlag::F_SNAPSHOT as u8;
1891        let last_flag = RecordFlag::F_LAST as u8;
1892
1893        assert_eq!(deltas.deltas.len(), expected_len);
1894
1895        if expected_len == 1 {
1896            // Empty book: Clear alone must carry F_SNAPSHOT | F_LAST so buffered
1897            // subscribers flush when the book is empty.
1898            assert_eq!(deltas.deltas[0].action, BookAction::Clear);
1899            assert_eq!(deltas.deltas[0].flags, snapshot | last_flag);
1900        } else {
1901            // Non-empty: Clear carries F_SNAPSHOT only; terminator carries both.
1902            assert_eq!(deltas.deltas[0].flags, snapshot);
1903            let terminator = deltas.deltas.last().unwrap();
1904            assert_eq!(terminator.flags, snapshot | last_flag);
1905        }
1906    }
1907
1908    #[rstest]
1909    fn test_parse_orderbook_deltas_update() {
1910        let json = load_json_fixture("ws_orderbook_update.json");
1911        let contents: DydxOrderbookContents = serde_json::from_value(json["contents"].clone())
1912            .expect("Failed to parse orderbook update contents");
1913
1914        let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
1915        let ts_init = UnixNanos::from(1_000_000_000u64);
1916
1917        let deltas = parse_orderbook_deltas(&instrument_id, &contents, 2, 8, ts_init)
1918            .expect("Failed to parse orderbook deltas");
1919
1920        // 2 bids + 2 asks = 4 deltas
1921        assert_eq!(deltas.deltas.len(), 4);
1922
1923        assert_eq!(deltas.deltas[0].action, BookAction::Update);
1924        assert_eq!(deltas.deltas[0].order.side, OrderSide::Buy);
1925        assert_eq!(deltas.deltas[0].order.price.to_string(), "43240.00");
1926
1927        // First ask with size 0.0 should be a Delete
1928        assert_eq!(deltas.deltas[2].action, BookAction::Delete);
1929        assert_eq!(deltas.deltas[2].order.side, OrderSide::Sell);
1930        assert_eq!(deltas.deltas[2].order.price.to_string(), "43250.00");
1931
1932        assert_eq!(deltas.deltas[3].action, BookAction::Update);
1933        assert_eq!(deltas.deltas[3].order.side, OrderSide::Sell);
1934    }
1935
1936    #[rstest]
1937    fn test_parse_trade_ticks_ws() {
1938        let json = load_json_fixture("ws_trades_subscribed.json");
1939        let contents: DydxTradeContents = serde_json::from_value(json["contents"].clone())
1940            .expect("Failed to parse trade contents");
1941
1942        let instrument = create_test_instrument();
1943        let instrument_id = instrument.id();
1944        let ts_init = UnixNanos::from(1_000_000_000u64);
1945
1946        let ticks = parse_trade_ticks(instrument_id, &instrument, &contents, ts_init)
1947            .expect("Failed to parse trade ticks");
1948
1949        assert_eq!(ticks.len(), 1);
1950        if let Data::Trade(tick) = &ticks[0] {
1951            assert_eq!(tick.instrument_id, instrument_id);
1952            assert_eq!(tick.price.to_string(), "43250.00");
1953            assert_eq!(tick.size.to_string(), "0.50000000");
1954            assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
1955            assert_eq!(tick.trade_id.to_string(), "trade-001");
1956        } else {
1957            panic!("Expected Trade data");
1958        }
1959    }
1960
1961    #[rstest]
1962    #[case(true)]
1963    #[case(false)]
1964    fn test_parse_candle_bar_timestamp_on_close(#[case] timestamp_on_close: bool) {
1965        let json = load_json_fixture("ws_candles_subscribed.json");
1966        let candles_value = &json["contents"]["candles"];
1967        let candles: Vec<DydxCandle> =
1968            serde_json::from_value(candles_value.clone()).expect("Failed to parse candle array");
1969
1970        let instrument = create_test_instrument();
1971        let bar_type = BarType::from_str("BTC-USD-PERP.DYDX-1-MINUTE-LAST-EXTERNAL")
1972            .expect("Failed to parse bar type");
1973        let ts_init = UnixNanos::from(1_000_000_000u64);
1974
1975        let bar = parse_candle_bar(
1976            bar_type,
1977            &instrument,
1978            &candles[0],
1979            timestamp_on_close,
1980            ts_init,
1981        )
1982        .expect("Failed to parse candle bar");
1983
1984        assert_eq!(bar.bar_type, bar_type);
1985        assert_eq!(bar.open.to_string(), "43100.00");
1986        assert_eq!(bar.high.to_string(), "43500.00");
1987        assert_eq!(bar.low.to_string(), "43000.00");
1988        assert_eq!(bar.close.to_string(), "43400.00");
1989        assert_eq!(bar.volume.to_string(), "12.34500000");
1990
1991        // 2024-01-01T00:00:00.000Z = 1_704_067_200_000_000_000 ns
1992        let started_at_ns = 1_704_067_200_000_000_000u64;
1993        let one_min_ns = 60_000_000_000u64;
1994
1995        if timestamp_on_close {
1996            assert_eq!(bar.ts_event.as_u64(), started_at_ns + one_min_ns);
1997        } else {
1998            assert_eq!(bar.ts_event.as_u64(), started_at_ns);
1999        }
2000    }
2001
2002    #[rstest]
2003    fn test_deserialize_market_trading_update_with_status() {
2004        let json = load_json_fixture("ws_markets_status_update.json");
2005        let contents: super::super::messages::DydxMarketsContents =
2006            serde_json::from_value(json["contents"].clone())
2007                .expect("Failed to deserialize markets contents");
2008
2009        let trading = contents.trading.expect("Expected trading data");
2010        assert_eq!(trading.len(), 2);
2011
2012        let btc = trading.get("BTC-USD").expect("Expected BTC-USD");
2013        assert_eq!(btc.status, Some(DydxMarketStatus::Paused));
2014        assert_eq!(btc.next_funding_rate, Some("0.0001".to_string()));
2015
2016        let eth = trading.get("ETH-USD").expect("Expected ETH-USD");
2017        assert_eq!(eth.status, Some(DydxMarketStatus::Active));
2018    }
2019
2020    #[rstest]
2021    #[case("ACTIVE", DydxMarketStatus::Active)]
2022    #[case("PAUSED", DydxMarketStatus::Paused)]
2023    #[case("CANCEL_ONLY", DydxMarketStatus::CancelOnly)]
2024    #[case("POST_ONLY", DydxMarketStatus::PostOnly)]
2025    #[case("INITIALIZING", DydxMarketStatus::Initializing)]
2026    #[case("FINAL_SETTLEMENT", DydxMarketStatus::FinalSettlement)]
2027    fn test_deserialize_market_status_variants(
2028        #[case] status_str: &str,
2029        #[case] expected: DydxMarketStatus,
2030    ) {
2031        let json_str = format!(r#"{{"status": "{status_str}"}}"#);
2032        let update: super::super::messages::DydxMarketTradingUpdate =
2033            serde_json::from_str(&json_str).expect("Failed to deserialize");
2034        assert_eq!(update.status, Some(expected));
2035    }
2036
2037    #[rstest]
2038    fn test_deserialize_market_trading_update_without_status() {
2039        let json_str = r#"{"nextFundingRate": "0.0001"}"#;
2040        let update: super::super::messages::DydxMarketTradingUpdate =
2041            serde_json::from_str(json_str).expect("Failed to deserialize");
2042        assert_eq!(update.status, None);
2043        assert_eq!(update.next_funding_rate, Some("0.0001".to_string()));
2044    }
2045}