1use std::str::FromStr;
23
24use anyhow::Context;
25use chrono::{DateTime, Utc};
26use dashmap::DashMap;
27use nautilus_core::UnixNanos;
28use nautilus_model::{
29 data::{Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, TradeTick},
30 enums::{AggressorSide, BookAction, OrderSide, OrderStatus, RecordFlag},
31 identifiers::{AccountId, InstrumentId, TradeId},
32 instruments::{Instrument, InstrumentAny},
33 reports::{FillReport, OrderStatusReport, PositionStatusReport},
34 types::{Price, Quantity},
35};
36use rust_decimal::Decimal;
37
38use super::{DydxWsError, DydxWsResult};
39use crate::{
40 common::{
41 enums::{DydxOrderStatus, DydxTickerType},
42 instrument_cache::InstrumentCache,
43 },
44 execution::{encoder::ClientOrderIdEncoder, types::OrderContext},
45 http::{
46 models::{Fill, Order, PerpetualPosition},
47 parse::{parse_fill_report, parse_order_status_report, parse_position_status_report},
48 },
49 websocket::messages::{
50 DydxCandle, DydxOrderbookContents, DydxOrderbookSnapshotContents, DydxPerpetualPosition,
51 DydxTradeContents, DydxWsFillSubaccountMessageContents,
52 DydxWsOrderSubaccountMessageContents,
53 },
54};
55
56pub fn parse_ws_order_report(
78 ws_order: &DydxWsOrderSubaccountMessageContents,
79 instrument_cache: &InstrumentCache,
80 order_contexts: &DashMap<u32, OrderContext>,
81 encoder: &ClientOrderIdEncoder,
82 account_id: AccountId,
83 ts_init: UnixNanos,
84) -> anyhow::Result<OrderStatusReport> {
85 let clob_pair_id: u32 = ws_order.clob_pair_id.parse().context(format!(
86 "Failed to parse clob_pair_id '{}'",
87 ws_order.clob_pair_id
88 ))?;
89
90 let instrument = instrument_cache
91 .get_by_clob_id(clob_pair_id)
92 .ok_or_else(|| {
93 instrument_cache.log_missing_clob_pair_id(clob_pair_id);
94 anyhow::anyhow!("No instrument cached for clob_pair_id {clob_pair_id}")
95 })?;
96
97 let http_order = convert_ws_order_to_http(ws_order)?;
98 let mut report = parse_order_status_report(&http_order, &instrument, account_id, ts_init)?;
99
100 let dydx_client_id = ws_order.client_id.parse::<u32>().ok();
101 let dydx_client_metadata = ws_order
102 .client_metadata
103 .as_ref()
104 .and_then(|s| s.parse::<u32>().ok())
105 .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
106
107 log::debug!(
108 "[WS_ORDER_RECV] dYdX client_id='{}' meta={:#x} (parsed u32={:?}) | status={:?} | clob_pair={} | side={:?} | size={} | filled={}",
109 ws_order.client_id,
110 dydx_client_metadata,
111 dydx_client_id,
112 ws_order.status,
113 ws_order.clob_pair_id,
114 ws_order.side,
115 ws_order.size,
116 ws_order.total_filled.as_deref().unwrap_or("?")
117 );
118
119 if let Some(client_id) = dydx_client_id {
122 if let Some(ctx) = order_contexts.get(&client_id) {
123 log::debug!(
124 "[WS_ORDER_RECV] DECODE via order_contexts: dYdX u32={} -> Nautilus '{}'",
125 client_id,
126 ctx.client_order_id
127 );
128 report.client_order_id = Some(ctx.client_order_id);
129 } else if let Some(client_order_id) =
130 encoder.decode_if_known(client_id, dydx_client_metadata)
131 {
132 log::debug!(
133 "[WS_ORDER_RECV] DECODE via encoder fallback: dYdX u32={client_id} meta={dydx_client_metadata:#x} -> Nautilus '{client_order_id}'"
134 );
135 report.client_order_id = Some(client_order_id);
136 } else {
137 log::debug!(
138 "[WS_ORDER_RECV] Unknown order: dYdX u32={client_id} meta={dydx_client_metadata:#x} (external or previous session)"
139 );
140 }
141 } else {
142 log::warn!(
143 "[WS_ORDER_RECV] Could not parse client_id '{}' as u32",
144 ws_order.client_id
145 );
146 }
147
148 if matches!(ws_order.status, DydxOrderStatus::Untriggered) && ws_order.trigger_price.is_some() {
152 report.order_status = OrderStatus::PendingUpdate;
153 }
154
155 Ok(report)
156}
157
158fn convert_ws_order_to_http(
164 ws_order: &DydxWsOrderSubaccountMessageContents,
165) -> anyhow::Result<Order> {
166 let clob_pair_id: u32 = ws_order
167 .clob_pair_id
168 .parse()
169 .context("Failed to parse clob_pair_id")?;
170
171 let size: Decimal = ws_order.size.parse().context("Failed to parse size")?;
172
173 let total_filled: Decimal = ws_order
174 .total_filled
175 .as_ref()
176 .map(|s| s.parse())
177 .transpose()
178 .context("Failed to parse total_filled")?
179 .unwrap_or(Decimal::ZERO);
180
181 let remaining_size = (size - total_filled).max(Decimal::ZERO);
183
184 let price: Decimal = ws_order.price.parse().context("Failed to parse price")?;
185
186 let created_at_height: u64 = ws_order
187 .created_at_height
188 .as_ref()
189 .map(|s| s.parse())
190 .transpose()
191 .context("Failed to parse created_at_height")?
192 .unwrap_or(0);
193
194 let client_metadata: u32 = ws_order
195 .client_metadata
196 .as_ref()
197 .ok_or_else(|| anyhow::anyhow!("Missing required field: client_metadata"))?
198 .parse()
199 .context("Failed to parse client_metadata")?;
200
201 let order_flags: u32 = ws_order
202 .order_flags
203 .parse()
204 .context("Failed to parse order_flags")?;
205
206 let good_til_block = ws_order
207 .good_til_block
208 .as_ref()
209 .and_then(|s| s.parse::<u64>().ok());
210
211 let good_til_block_time = ws_order
212 .good_til_block_time
213 .as_ref()
214 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
215 .map(|dt| dt.with_timezone(&Utc));
216
217 let trigger_price = ws_order
218 .trigger_price
219 .as_ref()
220 .and_then(|s| Decimal::from_str(s).ok());
221
222 let updated_at = ws_order
224 .updated_at
225 .as_ref()
226 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
227 .map(|dt| dt.with_timezone(&Utc));
228
229 let updated_at_height = ws_order
231 .updated_at_height
232 .as_ref()
233 .and_then(|s| s.parse::<u64>().ok());
234
235 let total_filled = size.checked_sub(remaining_size).unwrap_or(Decimal::ZERO);
236
237 Ok(Order {
238 id: ws_order.id.clone(),
239 subaccount_id: ws_order.subaccount_id.clone(),
240 client_id: ws_order.client_id.clone(),
241 clob_pair_id,
242 side: ws_order.side,
243 size,
244 total_filled,
245 price,
246 status: ws_order.status,
247 order_type: ws_order.order_type,
248 time_in_force: ws_order.time_in_force,
249 reduce_only: ws_order.reduce_only,
250 post_only: ws_order.post_only,
251 order_flags,
252 good_til_block,
253 good_til_block_time,
254 created_at_height: Some(created_at_height),
255 client_metadata,
256 trigger_price,
257 condition_type: None, conditional_order_trigger_subticks: None, execution: None, updated_at,
261 updated_at_height,
262 ticker: None, subaccount_number: 0, order_router_address: None, })
266}
267
268pub fn parse_ws_fill_report(
281 ws_fill: &DydxWsFillSubaccountMessageContents,
282 instrument_cache: &InstrumentCache,
283 order_id_map: &DashMap<String, (u32, u32)>,
284 order_contexts: &DashMap<u32, OrderContext>,
285 encoder: &ClientOrderIdEncoder,
286 account_id: AccountId,
287 ts_init: UnixNanos,
288) -> anyhow::Result<FillReport> {
289 let instrument = instrument_cache
290 .get_by_market(&ws_fill.market)
291 .ok_or_else(|| {
292 let available: Vec<String> = instrument_cache
293 .all_instruments()
294 .into_iter()
295 .map(|inst| inst.id().symbol.to_string())
296 .collect();
297 anyhow::anyhow!(
298 "No instrument cached for market '{}'. Available: {:?}",
299 ws_fill.market,
300 available
301 )
302 })?;
303
304 let http_fill = convert_ws_fill_to_http(ws_fill)?;
305 let mut report = parse_fill_report(&http_fill, &instrument, account_id, ts_init)?;
306
307 if let Some(ref order_id) = ws_fill.order_id {
309 if let Some(entry) = order_id_map.get(order_id) {
310 let (client_id, client_metadata) = *entry.value();
311 if let Some(ctx) = order_contexts.get(&client_id) {
312 report.client_order_id = Some(ctx.client_order_id);
313 } else if let Some(client_order_id) =
314 encoder.decode_if_known(client_id, client_metadata)
315 {
316 report.client_order_id = Some(client_order_id);
317 } else {
318 log::debug!(
319 "[WS_FILL_RECV] Unknown order: order_id={order_id} -> client_id={client_id} meta={client_metadata:#x} (external or previous session)",
320 );
321 }
322 } else {
323 log::warn!(
324 "[WS_FILL_RECV] No order_id mapping for '{order_id}', fill cannot be correlated",
325 );
326 }
327 }
328
329 Ok(report)
330}
331
332fn convert_ws_fill_to_http(ws_fill: &DydxWsFillSubaccountMessageContents) -> anyhow::Result<Fill> {
338 let price: Decimal = ws_fill.price.parse().context("Failed to parse price")?;
339 let size: Decimal = ws_fill.size.parse().context("Failed to parse size")?;
340 let fee: Decimal = ws_fill.fee.parse().context("Failed to parse fee")?;
341
342 let created_at_height: u64 = ws_fill
343 .created_at_height
344 .as_ref()
345 .map(|s| s.parse())
346 .transpose()
347 .context("Failed to parse created_at_height")?
348 .unwrap_or(0);
349
350 let client_metadata: u32 = ws_fill
351 .client_metadata
352 .as_ref()
353 .ok_or_else(|| anyhow::anyhow!("Missing required field: client_metadata"))?
354 .parse()
355 .context("Failed to parse client_metadata")?;
356
357 let order_id = ws_fill
358 .order_id
359 .clone()
360 .ok_or_else(|| anyhow::anyhow!("Missing required field: order_id"))?;
361
362 let created_at = DateTime::parse_from_rfc3339(&ws_fill.created_at)
363 .context("Failed to parse created_at")?
364 .with_timezone(&Utc);
365
366 Ok(Fill {
367 id: ws_fill.id.clone(),
368 side: ws_fill.side,
369 liquidity: ws_fill.liquidity,
370 fill_type: ws_fill.fill_type,
371 market: ws_fill.market,
372 market_type: ws_fill.market_type.unwrap_or(DydxTickerType::Perpetual),
373 price,
374 size,
375 fee,
376 created_at,
377 created_at_height,
378 order_id,
379 client_metadata,
380 })
381}
382
383pub fn parse_ws_position_report(
395 ws_position: &DydxPerpetualPosition,
396 instrument_cache: &InstrumentCache,
397 account_id: AccountId,
398 ts_init: UnixNanos,
399) -> anyhow::Result<PositionStatusReport> {
400 let instrument = instrument_cache
401 .get_by_market(&ws_position.market)
402 .ok_or_else(|| {
403 let available: Vec<String> = instrument_cache
404 .all_instruments()
405 .into_iter()
406 .map(|inst| inst.id().symbol.to_string())
407 .collect();
408 anyhow::anyhow!(
409 "No instrument cached for market '{}'. Available: {:?}",
410 ws_position.market,
411 available
412 )
413 })?;
414
415 let http_position = convert_ws_position_to_http(ws_position)?;
416 parse_position_status_report(&http_position, &instrument, account_id, ts_init)
417}
418
419fn convert_ws_position_to_http(
425 ws_position: &DydxPerpetualPosition,
426) -> anyhow::Result<PerpetualPosition> {
427 let size: Decimal = ws_position.size.parse().context("Failed to parse size")?;
428
429 let max_size: Decimal = ws_position
430 .max_size
431 .parse()
432 .context("Failed to parse max_size")?;
433
434 let entry_price: Decimal = ws_position
435 .entry_price
436 .parse()
437 .context("Failed to parse entry_price")?;
438
439 let exit_price: Option<Decimal> = ws_position
440 .exit_price
441 .as_ref()
442 .map(|s| s.parse())
443 .transpose()
444 .context("Failed to parse exit_price")?;
445
446 let realized_pnl: Decimal = ws_position
447 .realized_pnl
448 .parse()
449 .context("Failed to parse realized_pnl")?;
450
451 let unrealized_pnl: Decimal = ws_position
452 .unrealized_pnl
453 .parse()
454 .context("Failed to parse unrealized_pnl")?;
455
456 let sum_open: Decimal = ws_position
457 .sum_open
458 .parse()
459 .context("Failed to parse sum_open")?;
460
461 let sum_close: Decimal = ws_position
462 .sum_close
463 .parse()
464 .context("Failed to parse sum_close")?;
465
466 let net_funding: Decimal = ws_position
467 .net_funding
468 .parse()
469 .context("Failed to parse net_funding")?;
470
471 let created_at = DateTime::parse_from_rfc3339(&ws_position.created_at)
472 .context("Failed to parse created_at")?
473 .with_timezone(&Utc);
474
475 let closed_at = ws_position
476 .closed_at
477 .as_ref()
478 .map(|s| DateTime::parse_from_rfc3339(s))
479 .transpose()
480 .context("Failed to parse closed_at")?
481 .map(|dt| dt.with_timezone(&Utc));
482
483 let side = ws_position.side;
486
487 Ok(PerpetualPosition {
488 market: ws_position.market,
489 status: ws_position.status,
490 side,
491 size,
492 max_size,
493 entry_price,
494 exit_price,
495 realized_pnl,
496 created_at_height: 0, created_at,
498 sum_open,
499 sum_close,
500 net_funding,
501 unrealized_pnl,
502 closed_at,
503 })
504}
505
506pub fn parse_orderbook_snapshot(
516 instrument_id: &InstrumentId,
517 contents: &DydxOrderbookSnapshotContents,
518 price_precision: u8,
519 size_precision: u8,
520 ts_init: UnixNanos,
521) -> DydxWsResult<OrderBookDeltas> {
522 let bids = contents.bids.as_deref().unwrap_or(&[]);
523 let asks = contents.asks.as_deref().unwrap_or(&[]);
524
525 let mut deltas = Vec::with_capacity(1 + bids.len() + asks.len());
526 let snapshot_flag = RecordFlag::F_SNAPSHOT as u8;
527
528 if bids.is_empty() && asks.is_empty() {
530 let clear_flags = snapshot_flag | RecordFlag::F_LAST as u8;
531 let mut clear_delta = OrderBookDelta::clear(*instrument_id, 0, ts_init, ts_init);
532 clear_delta.flags = clear_flags;
533 deltas.push(clear_delta);
534 return Ok(OrderBookDeltas::new(*instrument_id, deltas));
535 }
536
537 let mut clear_delta = OrderBookDelta::clear(*instrument_id, 0, ts_init, ts_init);
539 clear_delta.flags = snapshot_flag;
540 deltas.push(clear_delta);
541
542 let bids_len = bids.len();
543 let asks_len = asks.len();
544
545 for (idx, bid) in bids.iter().enumerate() {
546 let is_last = idx == bids_len - 1 && asks_len == 0;
547 let flags = if is_last {
548 snapshot_flag | RecordFlag::F_LAST as u8
549 } else {
550 snapshot_flag
551 };
552
553 let price = Decimal::from_str(&bid.price)
554 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
555
556 let size = Decimal::from_str(&bid.size)
557 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
558
559 let order = BookOrder::new(
560 OrderSide::Buy,
561 Price::from_decimal_dp(price, price_precision).map_err(|e| {
562 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
563 })?,
564 Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
565 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
566 })?,
567 0,
568 );
569
570 deltas.push(OrderBookDelta::new(
571 *instrument_id,
572 BookAction::Add,
573 order,
574 flags,
575 0,
576 ts_init,
577 ts_init,
578 ));
579 }
580
581 for (idx, ask) in asks.iter().enumerate() {
582 let is_last = idx == asks_len - 1;
583 let flags = if is_last {
584 snapshot_flag | RecordFlag::F_LAST as u8
585 } else {
586 snapshot_flag
587 };
588
589 let price = Decimal::from_str(&ask.price)
590 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
591
592 let size = Decimal::from_str(&ask.size)
593 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
594
595 let order = BookOrder::new(
596 OrderSide::Sell,
597 Price::from_decimal_dp(price, price_precision).map_err(|e| {
598 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
599 })?,
600 Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
601 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
602 })?,
603 0,
604 );
605
606 deltas.push(OrderBookDelta::new(
607 *instrument_id,
608 BookAction::Add,
609 order,
610 flags,
611 0,
612 ts_init,
613 ts_init,
614 ));
615 }
616
617 Ok(OrderBookDeltas::new(*instrument_id, deltas))
618}
619
620pub fn parse_orderbook_deltas(
626 instrument_id: &InstrumentId,
627 contents: &DydxOrderbookContents,
628 price_precision: u8,
629 size_precision: u8,
630 ts_init: UnixNanos,
631) -> DydxWsResult<OrderBookDeltas> {
632 let deltas = parse_orderbook_deltas_with_flag(
633 instrument_id,
634 contents,
635 price_precision,
636 size_precision,
637 ts_init,
638 true,
639 )?;
640 Ok(OrderBookDeltas::new(*instrument_id, deltas))
641}
642
643pub fn parse_orderbook_deltas_with_flag(
649 instrument_id: &InstrumentId,
650 contents: &DydxOrderbookContents,
651 price_precision: u8,
652 size_precision: u8,
653 ts_init: UnixNanos,
654 is_last_message: bool,
655) -> DydxWsResult<Vec<OrderBookDelta>> {
656 let mut deltas = Vec::new();
657
658 let bids = contents.bids.as_deref().unwrap_or(&[]);
659 let asks = contents.asks.as_deref().unwrap_or(&[]);
660
661 let bids_len = bids.len();
662 let asks_len = asks.len();
663
664 for (idx, (price_str, size_str)) in bids.iter().enumerate() {
665 let is_last = is_last_message && idx == bids_len - 1 && asks_len == 0;
666 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
667
668 let price = Decimal::from_str(price_str)
669 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid price: {e}")))?;
670
671 let size = Decimal::from_str(size_str)
672 .map_err(|e| DydxWsError::Parse(format!("Failed to parse bid size: {e}")))?;
673
674 let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
675 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
676 })?;
677 let action = if qty.is_zero() {
678 BookAction::Delete
679 } else {
680 BookAction::Update
681 };
682
683 let order = BookOrder::new(
684 OrderSide::Buy,
685 Price::from_decimal_dp(price, price_precision).map_err(|e| {
686 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
687 })?,
688 qty,
689 0,
690 );
691
692 deltas.push(OrderBookDelta::new(
693 *instrument_id,
694 action,
695 order,
696 flags,
697 0,
698 ts_init,
699 ts_init,
700 ));
701 }
702
703 for (idx, (price_str, size_str)) in asks.iter().enumerate() {
704 let is_last = is_last_message && idx == asks_len - 1;
705 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
706
707 let price = Decimal::from_str(price_str)
708 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask price: {e}")))?;
709
710 let size = Decimal::from_str(size_str)
711 .map_err(|e| DydxWsError::Parse(format!("Failed to parse ask size: {e}")))?;
712
713 let qty = Quantity::from_decimal_dp(size, size_precision).map_err(|e| {
714 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
715 })?;
716 let action = if qty.is_zero() {
717 BookAction::Delete
718 } else {
719 BookAction::Update
720 };
721
722 let order = BookOrder::new(
723 OrderSide::Sell,
724 Price::from_decimal_dp(price, price_precision).map_err(|e| {
725 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
726 })?,
727 qty,
728 0,
729 );
730
731 deltas.push(OrderBookDelta::new(
732 *instrument_id,
733 action,
734 order,
735 flags,
736 0,
737 ts_init,
738 ts_init,
739 ));
740 }
741
742 Ok(deltas)
743}
744
745pub fn parse_trade_ticks(
751 instrument_id: InstrumentId,
752 instrument: &InstrumentAny,
753 contents: &DydxTradeContents,
754 ts_init: UnixNanos,
755) -> DydxWsResult<Vec<Data>> {
756 let mut ticks = Vec::new();
757
758 for trade in &contents.trades {
759 let aggressor_side = match trade.side {
760 OrderSide::Buy => AggressorSide::Buyer,
761 OrderSide::Sell => AggressorSide::Seller,
762 _ => continue,
763 };
764
765 let price = Decimal::from_str(&trade.price)
766 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade price: {e}")))?;
767
768 let size = Decimal::from_str(&trade.size)
769 .map_err(|e| DydxWsError::Parse(format!("Failed to parse trade size: {e}")))?;
770
771 let trade_ts = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
772 DydxWsError::Parse(format!("Timestamp out of range for trade {}", trade.id))
773 })?;
774
775 let tick = TradeTick::new(
776 instrument_id,
777 Price::from_decimal_dp(price, instrument.price_precision()).map_err(|e| {
778 DydxWsError::Parse(format!("Failed to create Price from decimal: {e}"))
779 })?,
780 Quantity::from_decimal_dp(size, instrument.size_precision()).map_err(|e| {
781 DydxWsError::Parse(format!("Failed to create Quantity from decimal: {e}"))
782 })?,
783 aggressor_side,
784 TradeId::new(&trade.id),
785 UnixNanos::from(trade_ts as u64),
786 ts_init,
787 );
788 ticks.push(Data::Trade(tick));
789 }
790
791 Ok(ticks)
792}
793
794pub fn parse_candle_bar(
803 bar_type: BarType,
804 instrument: &InstrumentAny,
805 candle: &DydxCandle,
806 timestamp_on_close: bool,
807 ts_init: UnixNanos,
808) -> DydxWsResult<Bar> {
809 let open = Decimal::from_str(&candle.open)
810 .map_err(|e| DydxWsError::Parse(format!("Failed to parse open: {e}")))?;
811 let high = Decimal::from_str(&candle.high)
812 .map_err(|e| DydxWsError::Parse(format!("Failed to parse high: {e}")))?;
813 let low = Decimal::from_str(&candle.low)
814 .map_err(|e| DydxWsError::Parse(format!("Failed to parse low: {e}")))?;
815 let close = Decimal::from_str(&candle.close)
816 .map_err(|e| DydxWsError::Parse(format!("Failed to parse close: {e}")))?;
817 let volume = candle
818 .base_token_volume
819 .as_deref()
820 .map(Decimal::from_str)
821 .transpose()
822 .map_err(|e| DydxWsError::Parse(format!("Failed to parse volume: {e}")))?
823 .unwrap_or(Decimal::ZERO);
824
825 let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
826 DydxWsError::Parse(format!(
827 "Timestamp out of range for candle at {}",
828 candle.started_at
829 ))
830 })?;
831 let mut ts_event = UnixNanos::from(started_at_nanos as u64);
832
833 if timestamp_on_close {
834 let interval_ns = bar_type
835 .spec()
836 .timedelta()
837 .num_nanoseconds()
838 .ok_or_else(|| DydxWsError::Parse("Bar interval overflow".to_string()))?;
839 let updated = (started_at_nanos as u64)
840 .checked_add(interval_ns as u64)
841 .ok_or_else(|| {
842 DydxWsError::Parse("Bar timestamp overflowed adjusting to close time".to_string())
843 })?;
844 ts_event = UnixNanos::from(updated);
845 }
846
847 let bar = Bar::new(
848 bar_type,
849 Price::from_decimal_dp(open, instrument.price_precision()).map_err(|e| {
850 DydxWsError::Parse(format!("Failed to create open Price from decimal: {e}"))
851 })?,
852 Price::from_decimal_dp(high, instrument.price_precision()).map_err(|e| {
853 DydxWsError::Parse(format!("Failed to create high Price from decimal: {e}"))
854 })?,
855 Price::from_decimal_dp(low, instrument.price_precision()).map_err(|e| {
856 DydxWsError::Parse(format!("Failed to create low Price from decimal: {e}"))
857 })?,
858 Price::from_decimal_dp(close, instrument.price_precision()).map_err(|e| {
859 DydxWsError::Parse(format!("Failed to create close Price from decimal: {e}"))
860 })?,
861 Quantity::from_decimal_dp(volume, instrument.size_precision()).map_err(|e| {
862 DydxWsError::Parse(format!(
863 "Failed to create volume Quantity from decimal: {e}"
864 ))
865 })?,
866 ts_event,
867 ts_init,
868 );
869
870 Ok(bar)
871}
872
873#[cfg(test)]
874mod tests {
875 use std::str::FromStr;
876
877 use nautilus_model::{
878 data::{BarType, Data},
879 enums::{
880 AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType,
881 PositionSideSpecified,
882 },
883 identifiers::{AccountId, InstrumentId, Symbol, Venue},
884 instruments::{CryptoPerpetual, InstrumentAny},
885 types::{Currency, Price, Quantity},
886 };
887 use rstest::rstest;
888 use rust_decimal_macros::dec;
889 use ustr::Ustr;
890
891 use super::*;
892 use crate::{
893 common::{
894 enums::{
895 DydxFillType, DydxLiquidity, DydxMarketStatus, DydxOrderStatus, DydxOrderType,
896 DydxPositionSide, DydxPositionStatus, DydxTickerType, DydxTimeInForce,
897 },
898 testing::load_json_fixture,
899 },
900 http::models::PerpetualMarket,
901 websocket::messages::{DydxPerpetualPosition, DydxWsFillSubaccountMessageContents},
902 };
903
904 fn create_test_market(ticker: &str, clob_pair_id: u32) -> PerpetualMarket {
906 PerpetualMarket {
907 clob_pair_id,
908 ticker: Ustr::from(ticker),
909 status: DydxMarketStatus::Active,
910 base_asset: Some(Ustr::from("BTC")),
911 quote_asset: Some(Ustr::from("USD")),
912 step_size: dec!(0.001),
913 tick_size: dec!(0.01),
914 index_price: Some(dec!(50000)),
915 oracle_price: Some(dec!(50000)),
916 price_change_24h: dec!(0),
917 next_funding_rate: dec!(0),
918 next_funding_at: None,
919 min_order_size: Some(dec!(0.001)),
920 market_type: None,
921 initial_margin_fraction: dec!(0.05),
922 maintenance_margin_fraction: dec!(0.03),
923 base_position_notional: None,
924 incremental_position_size: None,
925 incremental_initial_margin_fraction: None,
926 max_position_size: None,
927 open_interest: dec!(1000),
928 atomic_resolution: -10,
929 quantum_conversion_exponent: -9,
930 subticks_per_tick: 1000000,
931 step_base_quantums: 1000000,
932 is_reduce_only: false,
933 }
934 }
935
936 fn create_test_instrument_cache() -> InstrumentCache {
938 let cache = InstrumentCache::new();
939 let instrument = create_test_instrument();
940 let market = create_test_market("BTC-USD", 1);
941 cache.insert(instrument, market);
942 cache
943 }
944
945 fn create_test_instrument() -> InstrumentAny {
946 let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
947
948 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
949 instrument_id,
950 Symbol::new("BTC-USD"),
951 Currency::BTC(),
952 Currency::USD(),
953 Currency::USD(),
954 false,
955 2,
956 8,
957 Price::new(0.01, 2),
958 Quantity::new(0.001, 8),
959 Some(Quantity::new(1.0, 0)),
960 Some(Quantity::new(0.001, 8)),
961 Some(Quantity::new(100000.0, 8)),
962 Some(Quantity::new(0.001, 8)),
963 None,
964 None,
965 Some(Price::new(1000000.0, 2)),
966 Some(Price::new(0.01, 2)),
967 Some(rust_decimal_macros::dec!(0.05)),
968 Some(rust_decimal_macros::dec!(0.03)),
969 Some(rust_decimal_macros::dec!(0.0002)),
970 Some(rust_decimal_macros::dec!(0.0005)),
971 None, UnixNanos::default(),
973 UnixNanos::default(),
974 ))
975 }
976
977 #[rstest]
978 fn test_convert_ws_order_to_http_basic() {
979 let ws_order = DydxWsOrderSubaccountMessageContents {
980 id: "order123".to_string(),
981 subaccount_id: "dydx1test/0".to_string(),
982 client_id: "12345".to_string(),
983 clob_pair_id: "1".to_string(),
984 side: OrderSide::Buy,
985 size: "1.5".to_string(),
986 price: "50000.0".to_string(),
987 status: DydxOrderStatus::PartiallyFilled,
988 order_type: DydxOrderType::Limit,
989 time_in_force: DydxTimeInForce::Gtt,
990 post_only: false,
991 reduce_only: false,
992 order_flags: "0".to_string(),
993 good_til_block: Some("1000".to_string()),
994 good_til_block_time: None,
995 created_at_height: Some("900".to_string()),
996 client_metadata: Some("0".to_string()),
997 trigger_price: None,
998 total_filled: Some("0.5".to_string()),
999 updated_at: Some("2024-11-14T10:00:00Z".to_string()),
1000 updated_at_height: Some("950".to_string()),
1001 };
1002
1003 let result = convert_ws_order_to_http(&ws_order);
1004 assert!(result.is_ok());
1005
1006 let http_order = result.unwrap();
1007 assert_eq!(http_order.id, "order123");
1008 assert_eq!(http_order.clob_pair_id, 1);
1009 assert_eq!(http_order.size.to_string(), "1.5");
1010 assert_eq!(http_order.total_filled, rust_decimal_macros::dec!(0.5)); assert_eq!(http_order.status, DydxOrderStatus::PartiallyFilled);
1012 }
1013
1014 #[rstest]
1015 fn test_parse_ws_order_report_success() {
1016 let ws_order = DydxWsOrderSubaccountMessageContents {
1017 id: "order456".to_string(),
1018 subaccount_id: "dydx1test/0".to_string(),
1019 client_id: "67890".to_string(),
1020 clob_pair_id: "1".to_string(),
1021 side: OrderSide::Sell,
1022 size: "2.0".to_string(),
1023 price: "51000.0".to_string(),
1024 status: DydxOrderStatus::Open,
1025 order_type: DydxOrderType::Limit,
1026 time_in_force: DydxTimeInForce::Gtt,
1027 post_only: true,
1028 reduce_only: false,
1029 order_flags: "0".to_string(),
1030 good_til_block: Some("2000".to_string()),
1031 good_til_block_time: None,
1032 created_at_height: Some("1800".to_string()),
1033 client_metadata: Some("0".to_string()),
1034 trigger_price: None,
1035 total_filled: Some("0.0".to_string()),
1036 updated_at: None,
1037 updated_at_height: None,
1038 };
1039
1040 let instrument_cache = create_test_instrument_cache();
1041 let encoder = ClientOrderIdEncoder::new();
1042
1043 let account_id = AccountId::new("DYDX-001");
1044 let ts_init = UnixNanos::default();
1045 let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1046
1047 let result = parse_ws_order_report(
1048 &ws_order,
1049 &instrument_cache,
1050 &order_contexts,
1051 &encoder,
1052 account_id,
1053 ts_init,
1054 );
1055
1056 assert!(result.is_ok());
1057 let report = result.unwrap();
1058 assert_eq!(report.account_id, account_id);
1059 assert_eq!(report.order_side, OrderSide::Sell);
1060 }
1061
1062 #[rstest]
1063 fn test_parse_ws_order_report_missing_instrument() {
1064 let ws_order = DydxWsOrderSubaccountMessageContents {
1065 id: "order789".to_string(),
1066 subaccount_id: "dydx1test/0".to_string(),
1067 client_id: "11111".to_string(),
1068 clob_pair_id: "99".to_string(), side: OrderSide::Buy,
1070 size: "1.0".to_string(),
1071 price: "50000.0".to_string(),
1072 status: DydxOrderStatus::Open,
1073 order_type: DydxOrderType::Market,
1074 time_in_force: DydxTimeInForce::Ioc,
1075 post_only: false,
1076 reduce_only: false,
1077 order_flags: "0".to_string(),
1078 good_til_block: Some("1000".to_string()),
1079 good_til_block_time: None,
1080 created_at_height: Some("900".to_string()),
1081 client_metadata: Some("0".to_string()),
1082 trigger_price: None,
1083 total_filled: Some("0.0".to_string()),
1084 updated_at: None,
1085 updated_at_height: None,
1086 };
1087
1088 let instrument_cache = InstrumentCache::new(); let encoder = ClientOrderIdEncoder::new();
1090 let account_id = AccountId::new("DYDX-001");
1091 let ts_init = UnixNanos::default();
1092 let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1093
1094 let result = parse_ws_order_report(
1095 &ws_order,
1096 &instrument_cache,
1097 &order_contexts,
1098 &encoder,
1099 account_id,
1100 ts_init,
1101 );
1102
1103 assert!(result.is_err());
1104 assert!(
1105 result
1106 .unwrap_err()
1107 .to_string()
1108 .contains("No instrument cached")
1109 );
1110 }
1111
1112 #[rstest]
1113 fn test_convert_ws_fill_to_http() {
1114 let ws_fill = DydxWsFillSubaccountMessageContents {
1115 id: "fill123".to_string(),
1116 subaccount_id: "sub1".to_string(),
1117 side: OrderSide::Buy,
1118 liquidity: DydxLiquidity::Maker,
1119 fill_type: DydxFillType::Limit,
1120 market: "BTC-USD".into(),
1121 market_type: Some(DydxTickerType::Perpetual),
1122 price: "50000.5".to_string(),
1123 size: "0.1".to_string(),
1124 fee: "-2.5".to_string(), created_at: "2024-01-15T10:30:00Z".to_string(),
1126 created_at_height: Some("12345".to_string()),
1127 order_id: Some("order456".to_string()),
1128 client_metadata: Some("999".to_string()),
1129 };
1130
1131 let result = convert_ws_fill_to_http(&ws_fill);
1132 assert!(result.is_ok());
1133
1134 let http_fill = result.unwrap();
1135 assert_eq!(http_fill.id, "fill123");
1136 assert_eq!(http_fill.side, OrderSide::Buy);
1137 assert_eq!(http_fill.liquidity, DydxLiquidity::Maker);
1138 assert_eq!(http_fill.price, rust_decimal_macros::dec!(50000.5));
1139 assert_eq!(http_fill.size, rust_decimal_macros::dec!(0.1));
1140 assert_eq!(http_fill.fee, rust_decimal_macros::dec!(-2.5));
1141 assert_eq!(http_fill.created_at_height, 12345);
1142 assert_eq!(http_fill.order_id, "order456");
1143 assert_eq!(http_fill.client_metadata, 999);
1144 }
1145
1146 #[rstest]
1147 fn test_parse_ws_fill_report_success() {
1148 let instrument_cache = create_test_instrument_cache();
1149 let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1150
1151 let ws_fill = DydxWsFillSubaccountMessageContents {
1154 id: "fill789".to_string(),
1155 subaccount_id: "sub1".to_string(),
1156 side: OrderSide::Sell,
1157 liquidity: DydxLiquidity::Taker,
1158 fill_type: DydxFillType::Limit,
1159 market: "BTC-USD".into(),
1160 market_type: Some(DydxTickerType::Perpetual),
1161 price: "49500.0".to_string(),
1162 size: "0.5".to_string(),
1163 fee: "12.375".to_string(), created_at: "2024-01-15T11:00:00Z".to_string(),
1165 created_at_height: Some("12400".to_string()),
1166 order_id: Some("order999".to_string()),
1167 client_metadata: Some("888".to_string()),
1168 };
1169
1170 let account_id = AccountId::new("DYDX-001");
1171 let ts_init = UnixNanos::default();
1172 let order_id_map = DashMap::new();
1173 let order_contexts = DashMap::new();
1174 let encoder = ClientOrderIdEncoder::new();
1175
1176 let result = parse_ws_fill_report(
1177 &ws_fill,
1178 &instrument_cache,
1179 &order_id_map,
1180 &order_contexts,
1181 &encoder,
1182 account_id,
1183 ts_init,
1184 );
1185 assert!(result.is_ok());
1186
1187 let fill_report = result.unwrap();
1188 assert_eq!(fill_report.instrument_id, instrument_id);
1189 assert_eq!(fill_report.venue_order_id.as_str(), "order999");
1190 assert_eq!(fill_report.last_qty.as_f64(), 0.5);
1191 assert_eq!(fill_report.last_px.as_f64(), 49500.0);
1192 assert_eq!(fill_report.commission.as_decimal(), dec!(12.38));
1193 }
1194
1195 #[rstest]
1196 fn test_parse_ws_fill_report_missing_instrument() {
1197 let instrument_cache = InstrumentCache::new(); let ws_fill = DydxWsFillSubaccountMessageContents {
1200 id: "fill000".to_string(),
1201 subaccount_id: "sub1".to_string(),
1202 side: OrderSide::Buy,
1203 liquidity: DydxLiquidity::Maker,
1204 fill_type: DydxFillType::Limit,
1205 market: "ETH-USD-PERP".into(),
1206 market_type: Some(DydxTickerType::Perpetual),
1207 price: "3000.0".to_string(),
1208 size: "1.0".to_string(),
1209 fee: "-1.5".to_string(),
1210 created_at: "2024-01-15T12:00:00Z".to_string(),
1211 created_at_height: Some("12500".to_string()),
1212 order_id: Some("order111".to_string()),
1213 client_metadata: Some("777".to_string()),
1214 };
1215
1216 let account_id = AccountId::new("DYDX-001");
1217 let ts_init = UnixNanos::default();
1218 let order_id_map = DashMap::new();
1219 let order_contexts = DashMap::new();
1220 let encoder = ClientOrderIdEncoder::new();
1221
1222 let result = parse_ws_fill_report(
1223 &ws_fill,
1224 &instrument_cache,
1225 &order_id_map,
1226 &order_contexts,
1227 &encoder,
1228 account_id,
1229 ts_init,
1230 );
1231 assert!(result.is_err());
1232 assert!(
1233 result
1234 .unwrap_err()
1235 .to_string()
1236 .contains("No instrument cached for market")
1237 );
1238 }
1239
1240 #[rstest]
1241 fn test_convert_ws_position_to_http() {
1242 let ws_position = DydxPerpetualPosition {
1243 market: "BTC-USD".into(),
1244 status: DydxPositionStatus::Open,
1245 side: DydxPositionSide::Long,
1246 size: "1.5".to_string(),
1247 max_size: "2.0".to_string(),
1248 entry_price: "50000.0".to_string(),
1249 exit_price: None,
1250 realized_pnl: "100.0".to_string(),
1251 unrealized_pnl: "250.5".to_string(),
1252 created_at: "2024-01-15T10:00:00Z".to_string(),
1253 closed_at: None,
1254 sum_open: "5.0".to_string(),
1255 sum_close: "3.5".to_string(),
1256 net_funding: "-10.25".to_string(),
1257 };
1258
1259 let result = convert_ws_position_to_http(&ws_position);
1260 assert!(result.is_ok());
1261
1262 let http_position = result.unwrap();
1263 assert_eq!(http_position.market, "BTC-USD");
1264 assert_eq!(http_position.status, DydxPositionStatus::Open);
1265 assert_eq!(http_position.side, DydxPositionSide::Long); assert_eq!(http_position.size, rust_decimal_macros::dec!(1.5));
1267 assert_eq!(http_position.max_size, rust_decimal_macros::dec!(2.0));
1268 assert_eq!(
1269 http_position.entry_price,
1270 rust_decimal_macros::dec!(50000.0)
1271 );
1272 assert_eq!(http_position.exit_price, None);
1273 assert_eq!(http_position.realized_pnl, rust_decimal_macros::dec!(100.0));
1274 assert_eq!(
1275 http_position.unrealized_pnl,
1276 rust_decimal_macros::dec!(250.5)
1277 );
1278 assert_eq!(http_position.sum_open, rust_decimal_macros::dec!(5.0));
1279 assert_eq!(http_position.sum_close, rust_decimal_macros::dec!(3.5));
1280 assert_eq!(http_position.net_funding, rust_decimal_macros::dec!(-10.25));
1281 }
1282
1283 #[rstest]
1287 #[case::long_positive(DydxPositionSide::Long, "1.0", DydxPositionSide::Long)]
1288 #[case::short_negative(DydxPositionSide::Short, "-1.0", DydxPositionSide::Short)]
1289 #[case::long_zero(DydxPositionSide::Long, "0.0", DydxPositionSide::Long)]
1290 #[case::short_zero(DydxPositionSide::Short, "0.0", DydxPositionSide::Short)]
1291 #[case::long_with_negative_size(DydxPositionSide::Long, "-1.0", DydxPositionSide::Long)]
1292 #[case::short_with_positive_size(DydxPositionSide::Short, "1.0", DydxPositionSide::Short)]
1293 fn test_convert_ws_position_preserves_venue_side(
1294 #[case] venue_side: DydxPositionSide,
1295 #[case] size: &str,
1296 #[case] expected_side: DydxPositionSide,
1297 ) {
1298 let ws_position = DydxPerpetualPosition {
1299 market: "BTC-USD".into(),
1300 status: DydxPositionStatus::Open,
1301 side: venue_side,
1302 size: size.to_string(),
1303 max_size: "1.0".to_string(),
1304 entry_price: "50000.0".to_string(),
1305 exit_price: None,
1306 realized_pnl: "0.0".to_string(),
1307 unrealized_pnl: "0.0".to_string(),
1308 created_at: "2024-01-15T10:00:00Z".to_string(),
1309 closed_at: None,
1310 sum_open: "0.0".to_string(),
1311 sum_close: "0.0".to_string(),
1312 net_funding: "0.0".to_string(),
1313 };
1314
1315 let http_position =
1316 convert_ws_position_to_http(&ws_position).expect("conversion should succeed");
1317 assert_eq!(http_position.side, expected_side);
1318 }
1319
1320 #[rstest]
1325 fn test_ws_position_report_emits_venue_side_for_mismatched_size() {
1326 use nautilus_model::enums::PositionSideSpecified;
1327
1328 let instrument_cache = create_test_instrument_cache();
1329 let ws_position = DydxPerpetualPosition {
1332 market: "BTC-USD".into(),
1333 status: DydxPositionStatus::Open,
1334 side: DydxPositionSide::Short,
1335 size: "1.0".to_string(),
1336 max_size: "1.0".to_string(),
1337 entry_price: "50000.0".to_string(),
1338 exit_price: None,
1339 realized_pnl: "0.0".to_string(),
1340 unrealized_pnl: "0.0".to_string(),
1341 created_at: "2024-01-15T10:00:00Z".to_string(),
1342 closed_at: None,
1343 sum_open: "0.0".to_string(),
1344 sum_close: "0.0".to_string(),
1345 net_funding: "0.0".to_string(),
1346 };
1347
1348 let report = parse_ws_position_report(
1349 &ws_position,
1350 &instrument_cache,
1351 AccountId::new("DYDX-001"),
1352 UnixNanos::default(),
1353 )
1354 .expect("parse should succeed");
1355 assert_eq!(report.position_side, PositionSideSpecified::Short);
1356 }
1357
1358 #[rstest]
1359 fn test_parse_ws_position_report_success() {
1360 let instrument_cache = create_test_instrument_cache();
1361 let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1362
1363 let ws_position = DydxPerpetualPosition {
1364 market: "BTC-USD".into(),
1365 status: DydxPositionStatus::Open,
1366 side: DydxPositionSide::Long,
1367 size: "0.5".to_string(),
1368 max_size: "1.0".to_string(),
1369 entry_price: "49500.0".to_string(),
1370 exit_price: None,
1371 realized_pnl: "0.0".to_string(),
1372 unrealized_pnl: "125.0".to_string(),
1373 created_at: "2024-01-15T09:00:00Z".to_string(),
1374 closed_at: None,
1375 sum_open: "0.5".to_string(),
1376 sum_close: "0.0".to_string(),
1377 net_funding: "-2.5".to_string(),
1378 };
1379
1380 let account_id = AccountId::new("DYDX-001");
1381 let ts_init = UnixNanos::default();
1382
1383 let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1384 assert!(result.is_ok());
1385
1386 let position_report = result.unwrap();
1387 assert_eq!(position_report.instrument_id, instrument_id);
1388 assert_eq!(position_report.position_side, PositionSideSpecified::Long);
1389 assert_eq!(position_report.quantity.as_f64(), 0.5);
1390 assert!(position_report.avg_px_open.is_some());
1392 }
1393
1394 #[rstest]
1395 fn test_parse_ws_position_report_short() {
1396 let instrument_cache = create_test_instrument_cache();
1397 let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1398
1399 let ws_position = DydxPerpetualPosition {
1400 market: "BTC-USD".into(),
1401 status: DydxPositionStatus::Open,
1402 side: DydxPositionSide::Short,
1403 size: "-0.25".to_string(), max_size: "0.5".to_string(),
1405 entry_price: "51000.0".to_string(),
1406 exit_price: None,
1407 realized_pnl: "50.0".to_string(),
1408 unrealized_pnl: "-75.25".to_string(),
1409 created_at: "2024-01-15T08:00:00Z".to_string(),
1410 closed_at: None,
1411 sum_open: "0.25".to_string(),
1412 sum_close: "0.0".to_string(),
1413 net_funding: "1.5".to_string(),
1414 };
1415
1416 let account_id = AccountId::new("DYDX-001");
1417 let ts_init = UnixNanos::default();
1418
1419 let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1420 assert!(result.is_ok());
1421
1422 let position_report = result.unwrap();
1423 assert_eq!(position_report.instrument_id, instrument_id);
1424 assert_eq!(position_report.position_side, PositionSideSpecified::Short);
1425 assert_eq!(position_report.quantity.as_f64(), 0.25); }
1427
1428 #[rstest]
1429 fn test_parse_ws_position_report_missing_instrument() {
1430 let instrument_cache = InstrumentCache::new(); let ws_position = DydxPerpetualPosition {
1433 market: "ETH-USD-PERP".into(),
1434 status: DydxPositionStatus::Open,
1435 side: DydxPositionSide::Long,
1436 size: "5.0".to_string(),
1437 max_size: "10.0".to_string(),
1438 entry_price: "3000.0".to_string(),
1439 exit_price: None,
1440 realized_pnl: "0.0".to_string(),
1441 unrealized_pnl: "500.0".to_string(),
1442 created_at: "2024-01-15T07:00:00Z".to_string(),
1443 closed_at: None,
1444 sum_open: "5.0".to_string(),
1445 sum_close: "0.0".to_string(),
1446 net_funding: "-5.0".to_string(),
1447 };
1448
1449 let account_id = AccountId::new("DYDX-001");
1450 let ts_init = UnixNanos::default();
1451
1452 let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1453 assert!(result.is_err());
1454 assert!(
1455 result
1456 .unwrap_err()
1457 .to_string()
1458 .contains("No instrument cached for market")
1459 );
1460 }
1461
1462 #[rstest]
1463 #[case(DydxOrderStatus::Filled, "2.0")]
1464 #[case(DydxOrderStatus::Canceled, "0.0")]
1465 #[case(DydxOrderStatus::BestEffortCanceled, "0.5")]
1466 #[case(DydxOrderStatus::BestEffortOpened, "0.0")]
1467 #[case(DydxOrderStatus::Untriggered, "0.0")]
1468 fn test_parse_ws_order_various_statuses(
1469 #[case] status: DydxOrderStatus,
1470 #[case] total_filled: &str,
1471 ) {
1472 let ws_order = DydxWsOrderSubaccountMessageContents {
1473 id: format!("order_{status:?}"),
1474 subaccount_id: "dydx1test/0".to_string(),
1475 client_id: "99999".to_string(),
1476 clob_pair_id: "1".to_string(),
1477 side: OrderSide::Buy,
1478 size: "2.0".to_string(),
1479 price: "50000.0".to_string(),
1480 status,
1481 order_type: DydxOrderType::Limit,
1482 time_in_force: DydxTimeInForce::Gtt,
1483 post_only: false,
1484 reduce_only: false,
1485 order_flags: "0".to_string(),
1486 good_til_block: Some("1000".to_string()),
1487 good_til_block_time: None,
1488 created_at_height: Some("900".to_string()),
1489 client_metadata: Some("0".to_string()),
1490 trigger_price: None,
1491 total_filled: Some(total_filled.to_string()),
1492 updated_at: Some("2024-11-14T10:00:00Z".to_string()),
1493 updated_at_height: Some("950".to_string()),
1494 };
1495
1496 let instrument_cache = create_test_instrument_cache();
1497 let encoder = ClientOrderIdEncoder::new();
1498
1499 let account_id = AccountId::new("DYDX-001");
1500 let ts_init = UnixNanos::default();
1501 let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1502
1503 let result = parse_ws_order_report(
1504 &ws_order,
1505 &instrument_cache,
1506 &order_contexts,
1507 &encoder,
1508 account_id,
1509 ts_init,
1510 );
1511
1512 assert!(
1513 result.is_ok(),
1514 "Failed to parse order with status {status:?}"
1515 );
1516 let report = result.unwrap();
1517
1518 let expected_status = match status {
1520 DydxOrderStatus::Open
1521 | DydxOrderStatus::BestEffortOpened
1522 | DydxOrderStatus::Untriggered => OrderStatus::Accepted,
1523 DydxOrderStatus::PartiallyFilled => OrderStatus::PartiallyFilled,
1524 DydxOrderStatus::Filled => OrderStatus::Filled,
1525 DydxOrderStatus::Canceled | DydxOrderStatus::BestEffortCanceled => {
1526 OrderStatus::Canceled
1527 }
1528 };
1529 assert_eq!(report.order_status, expected_status);
1530 }
1531
1532 #[rstest]
1533 fn test_parse_ws_order_with_trigger_price() {
1534 let ws_order = DydxWsOrderSubaccountMessageContents {
1535 id: "conditional_order".to_string(),
1536 subaccount_id: "dydx1test/0".to_string(),
1537 client_id: "88888".to_string(),
1538 clob_pair_id: "1".to_string(),
1539 side: OrderSide::Sell,
1540 size: "1.0".to_string(),
1541 price: "52000.0".to_string(),
1542 status: DydxOrderStatus::Untriggered,
1543 order_type: DydxOrderType::StopLimit,
1544 time_in_force: DydxTimeInForce::Gtt,
1545 post_only: false,
1546 reduce_only: true,
1547 order_flags: "32".to_string(),
1548 good_til_block: None,
1549 good_til_block_time: Some("2024-12-31T23:59:59Z".to_string()),
1550 created_at_height: Some("1000".to_string()),
1551 client_metadata: Some("100".to_string()),
1552 trigger_price: Some("51500.0".to_string()),
1553 total_filled: Some("0.0".to_string()),
1554 updated_at: Some("2024-11-14T11:00:00Z".to_string()),
1555 updated_at_height: Some("1050".to_string()),
1556 };
1557
1558 let instrument_cache = create_test_instrument_cache();
1559 let encoder = ClientOrderIdEncoder::new();
1560
1561 let account_id = AccountId::new("DYDX-001");
1562 let ts_init = UnixNanos::default();
1563 let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1564
1565 let result = parse_ws_order_report(
1566 &ws_order,
1567 &instrument_cache,
1568 &order_contexts,
1569 &encoder,
1570 account_id,
1571 ts_init,
1572 );
1573
1574 assert!(result.is_ok());
1575 let report = result.unwrap();
1576 assert_eq!(report.order_status, OrderStatus::PendingUpdate);
1577 assert!(report.trigger_price.is_some());
1579 }
1580
1581 #[rstest]
1582 fn test_parse_ws_order_market_type() {
1583 let ws_order = DydxWsOrderSubaccountMessageContents {
1584 id: "market_order".to_string(),
1585 subaccount_id: "dydx1test/0".to_string(),
1586 client_id: "77777".to_string(),
1587 clob_pair_id: "1".to_string(),
1588 side: OrderSide::Buy,
1589 size: "0.5".to_string(),
1590 price: "50000.0".to_string(), status: DydxOrderStatus::Filled,
1592 order_type: DydxOrderType::Market,
1593 time_in_force: DydxTimeInForce::Ioc,
1594 post_only: false,
1595 reduce_only: false,
1596 order_flags: "0".to_string(),
1597 good_til_block: Some("1000".to_string()),
1598 good_til_block_time: None,
1599 created_at_height: Some("900".to_string()),
1600 client_metadata: Some("0".to_string()),
1601 trigger_price: None,
1602 total_filled: Some("0.5".to_string()),
1603 updated_at: Some("2024-11-14T10:01:00Z".to_string()),
1604 updated_at_height: Some("901".to_string()),
1605 };
1606
1607 let instrument_cache = create_test_instrument_cache();
1608 let encoder = ClientOrderIdEncoder::new();
1609
1610 let account_id = AccountId::new("DYDX-001");
1611 let ts_init = UnixNanos::default();
1612 let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1613
1614 let result = parse_ws_order_report(
1615 &ws_order,
1616 &instrument_cache,
1617 &order_contexts,
1618 &encoder,
1619 account_id,
1620 ts_init,
1621 );
1622
1623 assert!(result.is_ok());
1624 let report = result.unwrap();
1625 assert_eq!(report.order_type, OrderType::Market);
1626 assert_eq!(report.order_status, OrderStatus::Filled);
1627 }
1628
1629 #[rstest]
1630 fn test_parse_ws_order_invalid_clob_pair_id() {
1631 let ws_order = DydxWsOrderSubaccountMessageContents {
1632 id: "bad_order".to_string(),
1633 subaccount_id: "dydx1test/0".to_string(),
1634 client_id: "12345".to_string(),
1635 clob_pair_id: "not_a_number".to_string(), side: OrderSide::Buy,
1637 size: "1.0".to_string(),
1638 price: "50000.0".to_string(),
1639 status: DydxOrderStatus::Open,
1640 order_type: DydxOrderType::Limit,
1641 time_in_force: DydxTimeInForce::Gtt,
1642 post_only: false,
1643 reduce_only: false,
1644 order_flags: "0".to_string(),
1645 good_til_block: Some("1000".to_string()),
1646 good_til_block_time: None,
1647 created_at_height: Some("900".to_string()),
1648 client_metadata: Some("0".to_string()),
1649 trigger_price: None,
1650 total_filled: Some("0.0".to_string()),
1651 updated_at: None,
1652 updated_at_height: None,
1653 };
1654
1655 let instrument_cache = InstrumentCache::new(); let encoder = ClientOrderIdEncoder::new();
1657 let account_id = AccountId::new("DYDX-001");
1658 let ts_init = UnixNanos::default();
1659 let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
1660
1661 let result = parse_ws_order_report(
1662 &ws_order,
1663 &instrument_cache,
1664 &order_contexts,
1665 &encoder,
1666 account_id,
1667 ts_init,
1668 );
1669
1670 assert!(result.is_err());
1671 assert!(
1672 result
1673 .unwrap_err()
1674 .to_string()
1675 .contains("Failed to parse clob_pair_id")
1676 );
1677 }
1678
1679 #[rstest]
1680 fn test_parse_ws_position_closed() {
1681 let instrument_cache = create_test_instrument_cache();
1682 let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1683
1684 let ws_position = DydxPerpetualPosition {
1685 market: "BTC-USD".into(),
1686 status: DydxPositionStatus::Closed,
1687 side: DydxPositionSide::Long,
1688 size: "0.0".to_string(), max_size: "2.0".to_string(),
1690 entry_price: "48000.0".to_string(),
1691 exit_price: Some("52000.0".to_string()),
1692 realized_pnl: "2000.0".to_string(),
1693 unrealized_pnl: "0.0".to_string(),
1694 created_at: "2024-01-10T09:00:00Z".to_string(),
1695 closed_at: Some("2024-01-15T14:00:00Z".to_string()),
1696 sum_open: "5.0".to_string(),
1697 sum_close: "5.0".to_string(), net_funding: "-25.5".to_string(),
1699 };
1700
1701 let account_id = AccountId::new("DYDX-001");
1702 let ts_init = UnixNanos::default();
1703
1704 let result = parse_ws_position_report(&ws_position, &instrument_cache, account_id, ts_init);
1705 assert!(result.is_ok());
1706
1707 let position_report = result.unwrap();
1708 assert_eq!(position_report.instrument_id, instrument_id);
1709 assert_eq!(position_report.quantity.as_f64(), 0.0);
1711 }
1712
1713 #[rstest]
1714 fn test_parse_ws_fill_with_maker_rebate() {
1715 let instrument_cache = create_test_instrument_cache();
1716
1717 let ws_fill = DydxWsFillSubaccountMessageContents {
1718 id: "fill_rebate".to_string(),
1719 subaccount_id: "sub1".to_string(),
1720 side: OrderSide::Buy,
1721 liquidity: DydxLiquidity::Maker,
1722 fill_type: DydxFillType::Limit,
1723 market: "BTC-USD".into(),
1724 market_type: Some(DydxTickerType::Perpetual),
1725 price: "50000.0".to_string(),
1726 size: "1.0".to_string(),
1727 fee: "-15.0".to_string(), created_at: "2024-01-15T13:00:00Z".to_string(),
1729 created_at_height: Some("13000".to_string()),
1730 order_id: Some("order_maker".to_string()),
1731 client_metadata: Some("200".to_string()),
1732 };
1733
1734 let account_id = AccountId::new("DYDX-001");
1735 let ts_init = UnixNanos::default();
1736 let order_id_map = DashMap::new();
1737 let order_contexts = DashMap::new();
1738 let encoder = ClientOrderIdEncoder::new();
1739
1740 let result = parse_ws_fill_report(
1741 &ws_fill,
1742 &instrument_cache,
1743 &order_id_map,
1744 &order_contexts,
1745 &encoder,
1746 account_id,
1747 ts_init,
1748 );
1749 assert!(result.is_ok());
1750
1751 let fill_report = result.unwrap();
1752 assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1753 assert!(fill_report.commission.as_decimal() < dec!(0));
1754 }
1755
1756 #[rstest]
1757 fn test_parse_ws_fill_taker_with_fee() {
1758 let instrument_cache = create_test_instrument_cache();
1759
1760 let ws_fill = DydxWsFillSubaccountMessageContents {
1761 id: "fill_taker".to_string(),
1762 subaccount_id: "sub2".to_string(),
1763 side: OrderSide::Sell,
1764 liquidity: DydxLiquidity::Taker,
1765 fill_type: DydxFillType::Limit,
1766 market: "BTC-USD".into(),
1767 market_type: Some(DydxTickerType::Perpetual),
1768 price: "49800.0".to_string(),
1769 size: "0.75".to_string(),
1770 fee: "18.675".to_string(), created_at: "2024-01-15T14:00:00Z".to_string(),
1772 created_at_height: Some("14000".to_string()),
1773 order_id: Some("order_taker".to_string()),
1774 client_metadata: Some("300".to_string()),
1775 };
1776
1777 let account_id = AccountId::new("DYDX-001");
1778 let ts_init = UnixNanos::default();
1779 let order_id_map = DashMap::new();
1780 let order_contexts = DashMap::new();
1781 let encoder = ClientOrderIdEncoder::new();
1782
1783 let result = parse_ws_fill_report(
1784 &ws_fill,
1785 &instrument_cache,
1786 &order_id_map,
1787 &order_contexts,
1788 &encoder,
1789 account_id,
1790 ts_init,
1791 );
1792 assert!(result.is_ok());
1793
1794 let fill_report = result.unwrap();
1795 assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
1796 assert_eq!(fill_report.order_side, OrderSide::Sell);
1797 assert!(fill_report.commission.as_decimal() > dec!(0));
1798 }
1799
1800 #[rstest]
1801 fn test_parse_orderbook_snapshot() {
1802 let json = load_json_fixture("ws_orderbook_subscribed.json");
1803 let contents: DydxOrderbookSnapshotContents =
1804 serde_json::from_value(json["contents"].clone())
1805 .expect("Failed to parse orderbook snapshot contents");
1806
1807 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
1808 let ts_init = UnixNanos::from(1_000_000_000u64);
1809
1810 let deltas = parse_orderbook_snapshot(&instrument_id, &contents, 2, 8, ts_init)
1811 .expect("Failed to parse orderbook snapshot");
1812
1813 assert_eq!(deltas.deltas.len(), 7);
1815
1816 assert_eq!(deltas.deltas[0].action, BookAction::Clear);
1817 assert_eq!(deltas.deltas[1].action, BookAction::Add);
1818 assert_eq!(deltas.deltas[1].order.side, OrderSide::Buy);
1819 assert_eq!(deltas.deltas[1].order.price.to_string(), "43240.00");
1820 assert_eq!(deltas.deltas[1].order.size.to_string(), "1.50000000");
1821
1822 assert_eq!(deltas.deltas[4].action, BookAction::Add);
1823 assert_eq!(deltas.deltas[4].order.side, OrderSide::Sell);
1824 assert_eq!(deltas.deltas[4].order.price.to_string(), "43250.00");
1825 assert_eq!(deltas.deltas[4].order.size.to_string(), "1.20000000");
1826
1827 let snapshot = RecordFlag::F_SNAPSHOT as u8;
1831 let last_flag = RecordFlag::F_LAST as u8;
1832
1833 assert_eq!(deltas.deltas[0].flags, snapshot, "Clear missing F_SNAPSHOT");
1834 for (idx, delta) in deltas.deltas.iter().enumerate().skip(1) {
1835 let expected = if idx == deltas.deltas.len() - 1 {
1836 snapshot | last_flag
1837 } else {
1838 snapshot
1839 };
1840 assert_eq!(
1841 delta.flags, expected,
1842 "delta at index {idx} has wrong flags: got {:#010b}, expected {expected:#010b}",
1843 delta.flags,
1844 );
1845 }
1846 }
1847
1848 #[rstest]
1849 #[case::empty_book(vec![], vec![], 1)]
1850 #[case::bids_only(vec![("100.0", "1.0")], vec![], 2)]
1851 #[case::asks_only(vec![], vec![("101.0", "2.0")], 2)]
1852 fn test_parse_orderbook_snapshot_flag_shapes(
1853 #[case] bids: Vec<(&str, &str)>,
1854 #[case] asks: Vec<(&str, &str)>,
1855 #[case] expected_len: usize,
1856 ) {
1857 use crate::websocket::messages::DydxPriceLevel;
1858 let contents = DydxOrderbookSnapshotContents {
1859 bids: if bids.is_empty() {
1860 None
1861 } else {
1862 Some(
1863 bids.into_iter()
1864 .map(|(p, s)| DydxPriceLevel {
1865 price: p.to_string(),
1866 size: s.to_string(),
1867 })
1868 .collect(),
1869 )
1870 },
1871 asks: if asks.is_empty() {
1872 None
1873 } else {
1874 Some(
1875 asks.into_iter()
1876 .map(|(p, s)| DydxPriceLevel {
1877 price: p.to_string(),
1878 size: s.to_string(),
1879 })
1880 .collect(),
1881 )
1882 },
1883 };
1884 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
1885 let ts_init = UnixNanos::from(1_000_000_000u64);
1886
1887 let deltas = parse_orderbook_snapshot(&instrument_id, &contents, 2, 8, ts_init)
1888 .expect("Failed to parse orderbook snapshot");
1889
1890 let snapshot = RecordFlag::F_SNAPSHOT as u8;
1891 let last_flag = RecordFlag::F_LAST as u8;
1892
1893 assert_eq!(deltas.deltas.len(), expected_len);
1894
1895 if expected_len == 1 {
1896 assert_eq!(deltas.deltas[0].action, BookAction::Clear);
1899 assert_eq!(deltas.deltas[0].flags, snapshot | last_flag);
1900 } else {
1901 assert_eq!(deltas.deltas[0].flags, snapshot);
1903 let terminator = deltas.deltas.last().unwrap();
1904 assert_eq!(terminator.flags, snapshot | last_flag);
1905 }
1906 }
1907
1908 #[rstest]
1909 fn test_parse_orderbook_deltas_update() {
1910 let json = load_json_fixture("ws_orderbook_update.json");
1911 let contents: DydxOrderbookContents = serde_json::from_value(json["contents"].clone())
1912 .expect("Failed to parse orderbook update contents");
1913
1914 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
1915 let ts_init = UnixNanos::from(1_000_000_000u64);
1916
1917 let deltas = parse_orderbook_deltas(&instrument_id, &contents, 2, 8, ts_init)
1918 .expect("Failed to parse orderbook deltas");
1919
1920 assert_eq!(deltas.deltas.len(), 4);
1922
1923 assert_eq!(deltas.deltas[0].action, BookAction::Update);
1924 assert_eq!(deltas.deltas[0].order.side, OrderSide::Buy);
1925 assert_eq!(deltas.deltas[0].order.price.to_string(), "43240.00");
1926
1927 assert_eq!(deltas.deltas[2].action, BookAction::Delete);
1929 assert_eq!(deltas.deltas[2].order.side, OrderSide::Sell);
1930 assert_eq!(deltas.deltas[2].order.price.to_string(), "43250.00");
1931
1932 assert_eq!(deltas.deltas[3].action, BookAction::Update);
1933 assert_eq!(deltas.deltas[3].order.side, OrderSide::Sell);
1934 }
1935
1936 #[rstest]
1937 fn test_parse_trade_ticks_ws() {
1938 let json = load_json_fixture("ws_trades_subscribed.json");
1939 let contents: DydxTradeContents = serde_json::from_value(json["contents"].clone())
1940 .expect("Failed to parse trade contents");
1941
1942 let instrument = create_test_instrument();
1943 let instrument_id = instrument.id();
1944 let ts_init = UnixNanos::from(1_000_000_000u64);
1945
1946 let ticks = parse_trade_ticks(instrument_id, &instrument, &contents, ts_init)
1947 .expect("Failed to parse trade ticks");
1948
1949 assert_eq!(ticks.len(), 1);
1950 if let Data::Trade(tick) = &ticks[0] {
1951 assert_eq!(tick.instrument_id, instrument_id);
1952 assert_eq!(tick.price.to_string(), "43250.00");
1953 assert_eq!(tick.size.to_string(), "0.50000000");
1954 assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
1955 assert_eq!(tick.trade_id.to_string(), "trade-001");
1956 } else {
1957 panic!("Expected Trade data");
1958 }
1959 }
1960
1961 #[rstest]
1962 #[case(true)]
1963 #[case(false)]
1964 fn test_parse_candle_bar_timestamp_on_close(#[case] timestamp_on_close: bool) {
1965 let json = load_json_fixture("ws_candles_subscribed.json");
1966 let candles_value = &json["contents"]["candles"];
1967 let candles: Vec<DydxCandle> =
1968 serde_json::from_value(candles_value.clone()).expect("Failed to parse candle array");
1969
1970 let instrument = create_test_instrument();
1971 let bar_type = BarType::from_str("BTC-USD-PERP.DYDX-1-MINUTE-LAST-EXTERNAL")
1972 .expect("Failed to parse bar type");
1973 let ts_init = UnixNanos::from(1_000_000_000u64);
1974
1975 let bar = parse_candle_bar(
1976 bar_type,
1977 &instrument,
1978 &candles[0],
1979 timestamp_on_close,
1980 ts_init,
1981 )
1982 .expect("Failed to parse candle bar");
1983
1984 assert_eq!(bar.bar_type, bar_type);
1985 assert_eq!(bar.open.to_string(), "43100.00");
1986 assert_eq!(bar.high.to_string(), "43500.00");
1987 assert_eq!(bar.low.to_string(), "43000.00");
1988 assert_eq!(bar.close.to_string(), "43400.00");
1989 assert_eq!(bar.volume.to_string(), "12.34500000");
1990
1991 let started_at_ns = 1_704_067_200_000_000_000u64;
1993 let one_min_ns = 60_000_000_000u64;
1994
1995 if timestamp_on_close {
1996 assert_eq!(bar.ts_event.as_u64(), started_at_ns + one_min_ns);
1997 } else {
1998 assert_eq!(bar.ts_event.as_u64(), started_at_ns);
1999 }
2000 }
2001
2002 #[rstest]
2003 fn test_deserialize_market_trading_update_with_status() {
2004 let json = load_json_fixture("ws_markets_status_update.json");
2005 let contents: super::super::messages::DydxMarketsContents =
2006 serde_json::from_value(json["contents"].clone())
2007 .expect("Failed to deserialize markets contents");
2008
2009 let trading = contents.trading.expect("Expected trading data");
2010 assert_eq!(trading.len(), 2);
2011
2012 let btc = trading.get("BTC-USD").expect("Expected BTC-USD");
2013 assert_eq!(btc.status, Some(DydxMarketStatus::Paused));
2014 assert_eq!(btc.next_funding_rate, Some("0.0001".to_string()));
2015
2016 let eth = trading.get("ETH-USD").expect("Expected ETH-USD");
2017 assert_eq!(eth.status, Some(DydxMarketStatus::Active));
2018 }
2019
2020 #[rstest]
2021 #[case("ACTIVE", DydxMarketStatus::Active)]
2022 #[case("PAUSED", DydxMarketStatus::Paused)]
2023 #[case("CANCEL_ONLY", DydxMarketStatus::CancelOnly)]
2024 #[case("POST_ONLY", DydxMarketStatus::PostOnly)]
2025 #[case("INITIALIZING", DydxMarketStatus::Initializing)]
2026 #[case("FINAL_SETTLEMENT", DydxMarketStatus::FinalSettlement)]
2027 fn test_deserialize_market_status_variants(
2028 #[case] status_str: &str,
2029 #[case] expected: DydxMarketStatus,
2030 ) {
2031 let json_str = format!(r#"{{"status": "{status_str}"}}"#);
2032 let update: super::super::messages::DydxMarketTradingUpdate =
2033 serde_json::from_str(&json_str).expect("Failed to deserialize");
2034 assert_eq!(update.status, Some(expected));
2035 }
2036
2037 #[rstest]
2038 fn test_deserialize_market_trading_update_without_status() {
2039 let json_str = r#"{"nextFundingRate": "0.0001"}"#;
2040 let update: super::super::messages::DydxMarketTradingUpdate =
2041 serde_json::from_str(json_str).expect("Failed to deserialize");
2042 assert_eq!(update.status, None);
2043 assert_eq!(update.next_funding_rate, Some("0.0001".to_string()));
2044 }
2045}