1use anyhow::Context;
19use chrono::{DateTime, Utc};
20use nautilus_core::{UUID4, nanos::UnixNanos};
21use nautilus_model::{
22 data::{Bar, BarSpecification, BarType, BookOrder, OrderBookDelta, QuoteTick, TradeTick},
23 enums::{
24 AggregationSource, AggressorSide, BarAggregation, BookAction, LiquiditySide, OrderSide,
25 OrderStatus, OrderType, PriceType, TimeInForce, TriggerType,
26 },
27 identifiers::{AccountId, ClientOrderId, InstrumentId, TradeId, VenueOrderId},
28 instruments::{Instrument, any::InstrumentAny},
29 reports::{FillReport, OrderStatusReport},
30 types::{Currency, Money, Price, Quantity},
31};
32
33use super::{
34 enums::{KrakenExecType, KrakenLiquidityInd, KrakenWsOrderStatus},
35 messages::{
36 KrakenWsBookData, KrakenWsBookLevel, KrakenWsExecutionData, KrakenWsOhlcData,
37 KrakenWsTickerData, KrakenWsTradeData,
38 },
39};
40use crate::common::enums::{KrakenOrderSide, KrakenOrderType, KrakenTimeInForce};
41
42pub fn parse_quote_tick(
49 ticker: &KrakenWsTickerData,
50 instrument: &InstrumentAny,
51 ts_init: UnixNanos,
52) -> anyhow::Result<QuoteTick> {
53 let instrument_id = instrument.id();
54 let price_precision = instrument.price_precision();
55 let size_precision = instrument.size_precision();
56
57 let bid_price = Price::new_checked(ticker.bid, price_precision).with_context(|| {
58 format!("Failed to construct bid Price with precision {price_precision}")
59 })?;
60 let bid_size = Quantity::new_checked(ticker.bid_qty, size_precision).with_context(|| {
61 format!("Failed to construct bid Quantity with precision {size_precision}")
62 })?;
63
64 let ask_price = Price::new_checked(ticker.ask, price_precision).with_context(|| {
65 format!("Failed to construct ask Price with precision {price_precision}")
66 })?;
67 let ask_size = Quantity::new_checked(ticker.ask_qty, size_precision).with_context(|| {
68 format!("Failed to construct ask Quantity with precision {size_precision}")
69 })?;
70
71 let ts_event = datetime_to_nanos(ticker.timestamp, "ticker.timestamp")?;
72
73 Ok(QuoteTick::new(
74 instrument_id,
75 bid_price,
76 ask_price,
77 bid_size,
78 ask_size,
79 ts_event,
80 ts_init,
81 ))
82}
83
84pub fn parse_trade_tick(
92 trade: &KrakenWsTradeData,
93 instrument: &InstrumentAny,
94 ts_init: UnixNanos,
95) -> anyhow::Result<TradeTick> {
96 let instrument_id = instrument.id();
97 let price_precision = instrument.price_precision();
98 let size_precision = instrument.size_precision();
99
100 let price = Price::new_checked(trade.price, price_precision)
101 .with_context(|| format!("Failed to construct Price with precision {price_precision}"))?;
102 let size = Quantity::new_checked(trade.qty, size_precision)
103 .with_context(|| format!("Failed to construct Quantity with precision {size_precision}"))?;
104
105 let aggressor = match trade.side {
106 KrakenOrderSide::Buy => AggressorSide::Buyer,
107 KrakenOrderSide::Sell => AggressorSide::Seller,
108 };
109
110 let trade_id = TradeId::new_checked(trade.trade_id.to_string())?;
111 let ts_event = datetime_to_nanos(trade.timestamp, "trade.timestamp")?;
112
113 TradeTick::new_checked(
114 instrument_id,
115 price,
116 size,
117 aggressor,
118 trade_id,
119 ts_event,
120 ts_init,
121 )
122 .context("Failed to construct TradeTick from Kraken WebSocket trade")
123}
124
125pub fn parse_book_deltas(
135 book: &KrakenWsBookData,
136 instrument: &InstrumentAny,
137 sequence: u64,
138 ts_init: UnixNanos,
139) -> anyhow::Result<Vec<OrderBookDelta>> {
140 let instrument_id = instrument.id();
141 let price_precision = instrument.price_precision();
142 let size_precision = instrument.size_precision();
143
144 let ts_event = datetime_to_nanos(book.timestamp, "book.timestamp")?;
145
146 let mut deltas = Vec::new();
147 let mut current_sequence = sequence;
148
149 if let Some(ref bids) = book.bids {
150 for level in bids {
151 let delta = parse_book_level(
152 level,
153 OrderSide::Buy,
154 instrument_id,
155 price_precision,
156 size_precision,
157 current_sequence,
158 ts_event,
159 ts_init,
160 )?;
161 deltas.push(delta);
162 current_sequence += 1;
163 }
164 }
165
166 if let Some(ref asks) = book.asks {
167 for level in asks {
168 let delta = parse_book_level(
169 level,
170 OrderSide::Sell,
171 instrument_id,
172 price_precision,
173 size_precision,
174 current_sequence,
175 ts_event,
176 ts_init,
177 )?;
178 deltas.push(delta);
179 current_sequence += 1;
180 }
181 }
182
183 Ok(deltas)
184}
185
186#[expect(clippy::too_many_arguments)]
187fn parse_book_level(
188 level: &KrakenWsBookLevel,
189 side: OrderSide,
190 instrument_id: InstrumentId,
191 price_precision: u8,
192 size_precision: u8,
193 sequence: u64,
194 ts_event: UnixNanos,
195 ts_init: UnixNanos,
196) -> anyhow::Result<OrderBookDelta> {
197 let price = Price::new_checked(level.price, price_precision)
198 .with_context(|| format!("Failed to construct Price with precision {price_precision}"))?;
199 let size = Quantity::new_checked(level.qty, size_precision)
200 .with_context(|| format!("Failed to construct Quantity with precision {size_precision}"))?;
201
202 let action = if size.raw == 0 {
204 BookAction::Delete
205 } else {
206 BookAction::Update
207 };
208
209 let order_id = price.raw as u64;
211 let order = BookOrder::new(side, price, size, order_id);
212
213 Ok(OrderBookDelta::new(
214 instrument_id,
215 action,
216 order,
217 0, sequence,
219 ts_event,
220 ts_init,
221 ))
222}
223
224fn datetime_to_nanos(value: DateTime<Utc>, field: &str) -> anyhow::Result<UnixNanos> {
225 let nanos = value
226 .timestamp_nanos_opt()
227 .with_context(|| format!("Failed to convert {field}='{value}' to nanoseconds"))?;
228 Ok(UnixNanos::from(nanos as u64))
229}
230
231pub fn parse_ws_bar(
241 ohlc: &KrakenWsOhlcData,
242 instrument: &InstrumentAny,
243 ts_init: UnixNanos,
244) -> anyhow::Result<Bar> {
245 let instrument_id = instrument.id();
246 let price_precision = instrument.price_precision();
247 let size_precision = instrument.size_precision();
248
249 let open = Price::new_checked(ohlc.open, price_precision)?;
250 let high = Price::new_checked(ohlc.high, price_precision)?;
251 let low = Price::new_checked(ohlc.low, price_precision)?;
252 let close = Price::new_checked(ohlc.close, price_precision)?;
253 let volume = Quantity::new_checked(ohlc.volume, size_precision)?;
254
255 let bar_spec = interval_to_bar_spec(ohlc.interval)?;
256 let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
257
258 let interval_secs = i64::from(ohlc.interval) * 60;
260 let close_time = ohlc.interval_begin + chrono::Duration::seconds(interval_secs);
261 let ts_event = UnixNanos::from(close_time.timestamp_nanos_opt().unwrap_or(0) as u64);
262
263 Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
264}
265
266fn interval_to_bar_spec(interval: u32) -> anyhow::Result<BarSpecification> {
268 let (step, aggregation) = match interval {
269 1 => (1, BarAggregation::Minute),
270 5 => (5, BarAggregation::Minute),
271 15 => (15, BarAggregation::Minute),
272 30 => (30, BarAggregation::Minute),
273 60 => (1, BarAggregation::Hour),
274 240 => (4, BarAggregation::Hour),
275 1440 => (1, BarAggregation::Day),
276 10080 => (1, BarAggregation::Week),
277 21600 => (15, BarAggregation::Day), _ => anyhow::bail!("Unsupported Kraken OHLC interval: {interval}"),
279 };
280
281 Ok(BarSpecification::new(step, aggregation, PriceType::Last))
282}
283
284fn parse_order_status(
286 exec_type: KrakenExecType,
287 order_status: Option<KrakenWsOrderStatus>,
288) -> OrderStatus {
289 match exec_type {
290 KrakenExecType::Canceled => return OrderStatus::Canceled,
291 KrakenExecType::Expired => return OrderStatus::Expired,
292 KrakenExecType::Filled => return OrderStatus::Filled,
293 KrakenExecType::Trade => {
294 return match order_status {
295 Some(KrakenWsOrderStatus::Filled) => OrderStatus::Filled,
296 Some(KrakenWsOrderStatus::PartiallyFilled) | None => OrderStatus::PartiallyFilled,
297 Some(status) => status.into(),
298 };
299 }
300 _ => {}
301 }
302
303 match order_status {
304 Some(status) => status.into(),
305 None => OrderStatus::Accepted,
306 }
307}
308
309fn parse_order_type(order_type: Option<KrakenOrderType>) -> OrderType {
311 match order_type {
312 Some(KrakenOrderType::Market) => OrderType::Market,
313 Some(KrakenOrderType::Limit) => OrderType::Limit,
314 Some(KrakenOrderType::StopLoss) => OrderType::StopMarket,
315 Some(KrakenOrderType::TakeProfit) => OrderType::MarketIfTouched,
316 Some(KrakenOrderType::StopLossLimit) => OrderType::StopLimit,
317 Some(KrakenOrderType::TakeProfitLimit) => OrderType::LimitIfTouched,
318 Some(KrakenOrderType::TrailingStop) => OrderType::StopMarket,
320 Some(KrakenOrderType::TrailingStopLimit) => OrderType::StopLimit,
321 Some(KrakenOrderType::SettlePosition) => OrderType::Market,
322 None => OrderType::Limit,
323 }
324}
325
326fn parse_order_side(side: Option<KrakenOrderSide>) -> OrderSide {
328 match side {
329 Some(KrakenOrderSide::Buy) => OrderSide::Buy,
330 Some(KrakenOrderSide::Sell) => OrderSide::Sell,
331 None => OrderSide::Buy,
332 }
333}
334
335fn parse_time_in_force(
337 time_in_force: Option<KrakenTimeInForce>,
338 post_only: Option<bool>,
339) -> TimeInForce {
340 if post_only == Some(true) {
342 return TimeInForce::Gtc;
343 }
344
345 match time_in_force {
346 Some(KrakenTimeInForce::GoodTilCancelled) => TimeInForce::Gtc,
347 Some(KrakenTimeInForce::ImmediateOrCancel) => TimeInForce::Ioc,
348 Some(KrakenTimeInForce::GoodTilDate) => TimeInForce::Gtd,
349 None => TimeInForce::Gtc,
350 }
351}
352
353fn parse_liquidity_side(liquidity_ind: Option<KrakenLiquidityInd>) -> LiquiditySide {
354 liquidity_ind.map_or(LiquiditySide::NoLiquiditySide, Into::into)
355}
356
357pub fn parse_ws_order_status_report(
363 exec: &KrakenWsExecutionData,
364 instrument: &InstrumentAny,
365 account_id: AccountId,
366 cached_order_qty: Option<f64>,
367 ts_init: UnixNanos,
368) -> anyhow::Result<OrderStatusReport> {
369 let instrument_id = instrument.id();
370 let venue_order_id = VenueOrderId::new(&exec.order_id);
371 let order_side = parse_order_side(exec.side);
372 let order_type = parse_order_type(exec.order_type);
373 let time_in_force = parse_time_in_force(exec.time_in_force, exec.post_only);
374 let order_status = parse_order_status(exec.exec_type, exec.order_status);
375
376 let price_precision = instrument.price_precision();
377 let size_precision = instrument.size_precision();
378
379 let last_qty = exec
381 .last_qty
382 .map(|qty| Quantity::new_checked(qty, size_precision))
383 .transpose()
384 .context("Failed to parse last_qty")?;
385
386 let filled_qty = exec
387 .cum_qty
388 .map(|qty| Quantity::new_checked(qty, size_precision))
389 .transpose()
390 .context("Failed to parse cum_qty")?
391 .or(last_qty)
392 .unwrap_or_else(|| Quantity::new(0.0, size_precision));
393
394 let quantity = exec
395 .order_qty
396 .or(cached_order_qty)
397 .map(|qty| Quantity::new_checked(qty, size_precision))
398 .transpose()
399 .context("Failed to parse order_qty")?
400 .unwrap_or(filled_qty);
401
402 let ts_event = datetime_to_nanos(exec.timestamp, "execution.timestamp")?;
403
404 let mut report = OrderStatusReport::new(
405 account_id,
406 instrument_id,
407 None, venue_order_id,
409 order_side,
410 order_type,
411 time_in_force,
412 order_status,
413 quantity,
414 filled_qty,
415 ts_event,
416 ts_event,
417 ts_init,
418 Some(UUID4::new()),
419 );
420
421 if let Some(ref cl_ord_id) = exec.cl_ord_id
422 && !cl_ord_id.is_empty()
423 {
424 report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
425 }
426
427 let price_value = exec
431 .limit_price
432 .filter(|&p| p > 0.0)
433 .or(exec.avg_price.filter(|&p| p > 0.0))
434 .or(exec.last_price.filter(|&p| p > 0.0));
435
436 if let Some(px) = price_value {
437 let price =
438 Price::new_checked(px, price_precision).context("Failed to parse order price")?;
439 report = report.with_price(price);
440 }
441
442 let avg_px = exec
444 .avg_price
445 .filter(|&p| p > 0.0)
446 .or_else(|| match (exec.cum_cost, exec.cum_qty) {
447 (Some(cost), Some(qty)) if qty > 0.0 => Some(cost / qty),
448 _ => None,
449 })
450 .or_else(|| exec.last_price.filter(|&p| p > 0.0));
451
452 if let Some(avg_price) = avg_px {
453 report = report.with_avg_px(avg_price)?;
454 }
455
456 if exec.post_only == Some(true) {
457 report = report.with_post_only(true);
458 }
459
460 if exec.reduce_only == Some(true) {
461 report = report.with_reduce_only(true);
462 }
463
464 if let Some(ref reason) = exec.reason
465 && !reason.is_empty()
466 {
467 report = report.with_cancel_reason(reason.clone());
468 }
469
470 let is_conditional = matches!(
472 order_type,
473 OrderType::StopMarket
474 | OrderType::StopLimit
475 | OrderType::MarketIfTouched
476 | OrderType::LimitIfTouched
477 );
478
479 if is_conditional {
480 report = report.with_trigger_type(TriggerType::Default);
481 }
482
483 Ok(report)
484}
485
486pub fn parse_ws_fill_report(
494 exec: &KrakenWsExecutionData,
495 instrument: &InstrumentAny,
496 account_id: AccountId,
497 ts_init: UnixNanos,
498) -> anyhow::Result<FillReport> {
499 let instrument_id = instrument.id();
500 let venue_order_id = VenueOrderId::new(&exec.order_id);
501
502 let exec_id = exec
503 .exec_id
504 .as_ref()
505 .context("Missing exec_id for trade execution")?;
506 let trade_id =
507 TradeId::new_checked(exec_id).context("Invalid exec_id in Kraken trade execution")?;
508
509 let order_side = parse_order_side(exec.side);
510
511 let price_precision = instrument.price_precision();
512 let size_precision = instrument.size_precision();
513
514 let last_qty = exec
515 .last_qty
516 .map(|qty| Quantity::new_checked(qty, size_precision))
517 .transpose()
518 .context("Failed to parse last_qty")?
519 .context("Missing last_qty for trade execution")?;
520
521 let last_px = exec
522 .last_price
523 .map(|px| Price::new_checked(px, price_precision))
524 .transpose()
525 .context("Failed to parse last_price")?
526 .context("Missing last_price for trade execution")?;
527
528 let liquidity_side = parse_liquidity_side(exec.liquidity_ind);
529
530 let commission = if let Some(ref fees) = exec.fees {
532 if let Some(fee) = fees.first() {
533 let currency = Currency::get_or_create_crypto(&fee.asset);
534 Money::new(fee.qty.abs(), currency)
535 } else {
536 Money::new(0.0, instrument.quote_currency())
537 }
538 } else {
539 Money::new(0.0, instrument.quote_currency())
540 };
541
542 let ts_event = datetime_to_nanos(exec.timestamp, "execution.timestamp")?;
543
544 let client_order_id = exec
545 .cl_ord_id
546 .as_ref()
547 .filter(|s| !s.is_empty())
548 .map(ClientOrderId::new);
549
550 Ok(FillReport::new(
551 account_id,
552 instrument_id,
553 venue_order_id,
554 trade_id,
555 order_side,
556 last_qty,
557 last_px,
558 commission,
559 liquidity_side,
560 client_order_id,
561 None, ts_event,
563 ts_init,
564 None, ))
566}
567
568#[cfg(test)]
569mod tests {
570 use nautilus_model::{identifiers::Symbol, types::Currency};
571 use rstest::rstest;
572
573 use super::*;
574 use crate::{common::consts::KRAKEN_VENUE, websocket::spot_v2::messages::KrakenWsMessage};
575
576 const TS: UnixNanos = UnixNanos::new(1_700_000_000_000_000_000);
577
578 fn load_test_json(filename: &str) -> String {
579 let path = format!("test_data/{filename}");
580 std::fs::read_to_string(&path)
581 .unwrap_or_else(|e| panic!("Failed to load test data from {path}: {e}"))
582 }
583
584 fn create_mock_instrument() -> InstrumentAny {
585 use nautilus_model::instruments::currency_pair::CurrencyPair;
586
587 let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
588 InstrumentAny::CurrencyPair(CurrencyPair::new(
589 instrument_id,
590 Symbol::new("XBTUSDT"),
591 Currency::BTC(),
592 Currency::USDT(),
593 1, 8, Price::from("0.1"),
596 Quantity::from("0.00000001"),
597 None,
598 None,
599 None,
600 None,
601 None,
602 None,
603 None,
604 None,
605 None,
606 None,
607 None,
608 None,
609 None, TS,
611 TS,
612 ))
613 }
614
615 #[rstest]
616 fn test_parse_quote_tick() {
617 let json = load_test_json("ws_ticker_snapshot.json");
618 let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
619 let ticker: KrakenWsTickerData = serde_json::from_value(message.data[0].clone()).unwrap();
620
621 let instrument = create_mock_instrument();
622 let quote_tick = parse_quote_tick(&ticker, &instrument, TS).unwrap();
623
624 assert_eq!(quote_tick.instrument_id, instrument.id());
625 assert!(quote_tick.bid_price.as_f64() > 0.0);
626 assert!(quote_tick.ask_price.as_f64() > 0.0);
627 assert!(quote_tick.bid_size.as_f64() > 0.0);
628 assert!(quote_tick.ask_size.as_f64() > 0.0);
629 assert_eq!(
630 quote_tick.ts_event,
631 UnixNanos::from(1_671_960_659_123_456_000)
632 );
633 assert_eq!(quote_tick.ts_init, TS);
634 }
635
636 #[rstest]
637 fn test_parse_trade_tick() {
638 let json = load_test_json("ws_trade_update.json");
639 let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
640 let trade: KrakenWsTradeData = serde_json::from_value(message.data[0].clone()).unwrap();
641
642 let instrument = create_mock_instrument();
643 let trade_tick = parse_trade_tick(&trade, &instrument, TS).unwrap();
644
645 assert_eq!(trade_tick.instrument_id, instrument.id());
646 assert!(trade_tick.price.as_f64() > 0.0);
647 assert!(trade_tick.size.as_f64() > 0.0);
648 assert!(matches!(
649 trade_tick.aggressor_side,
650 AggressorSide::Buyer | AggressorSide::Seller
651 ));
652 assert_eq!(
653 trade_tick.ts_event,
654 UnixNanos::from(1_696_613_755_440_295_000)
655 );
656 assert_eq!(trade_tick.ts_init, TS);
657 }
658
659 #[rstest]
660 fn test_parse_book_deltas_snapshot() {
661 let json = load_test_json("ws_book_snapshot.json");
662 let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
663 let book: KrakenWsBookData = serde_json::from_value(message.data[0].clone()).unwrap();
664
665 let instrument = create_mock_instrument();
666 let deltas = parse_book_deltas(&book, &instrument, 1, TS).unwrap();
667
668 assert!(!deltas.is_empty());
669
670 let bid_count = deltas
672 .iter()
673 .filter(|d| d.order.side == OrderSide::Buy)
674 .count();
675 let ask_count = deltas
676 .iter()
677 .filter(|d| d.order.side == OrderSide::Sell)
678 .count();
679
680 assert!(bid_count > 0);
681 assert!(ask_count > 0);
682
683 let first_delta = &deltas[0];
685 assert_eq!(first_delta.instrument_id, instrument.id());
686 assert!(first_delta.order.price.as_f64() > 0.0);
687 assert!(first_delta.order.size.as_f64() > 0.0);
688
689 let expected_ts_event = UnixNanos::from(1_696_613_755_440_295_000);
690 assert!(deltas.iter().all(|d| d.ts_event == expected_ts_event));
691 assert!(deltas.iter().all(|d| d.ts_init == TS));
692 }
693
694 #[rstest]
695 fn test_parse_book_deltas_update() {
696 let json = load_test_json("ws_book_update.json");
697 let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
698 let book: KrakenWsBookData = serde_json::from_value(message.data[0].clone()).unwrap();
699
700 let instrument = create_mock_instrument();
701 let deltas = parse_book_deltas(&book, &instrument, 1, TS).unwrap();
702
703 assert!(!deltas.is_empty());
704
705 let first_delta = &deltas[0];
707 assert_eq!(first_delta.instrument_id, instrument.id());
708 assert!(first_delta.order.price.as_f64() > 0.0);
709
710 let expected_ts_event = UnixNanos::from(1_696_613_755_440_295_000);
711 assert!(deltas.iter().all(|d| d.ts_event == expected_ts_event));
712 assert!(deltas.iter().all(|d| d.ts_init == TS));
713 }
714
715 #[rstest]
716 fn test_datetime_to_nanos() {
717 let dt = "2023-10-06T17:35:55.440295Z"
718 .parse::<DateTime<Utc>>()
719 .unwrap();
720 let result = datetime_to_nanos(dt, "test").unwrap();
721 assert_eq!(result, UnixNanos::from(1_696_613_755_440_295_000));
722 }
723
724 #[rstest]
725 fn test_datetime_to_nanos_out_of_range_errors() {
726 let dt = "1500-01-01T00:00:00Z".parse::<DateTime<Utc>>().unwrap();
727 let result = datetime_to_nanos(dt, "test");
728 assert!(result.is_err());
729 let err = result.unwrap_err().to_string();
730 assert!(err.contains("test"));
731 }
732
733 #[rstest]
734 fn test_parse_ws_bar() {
735 let json = load_test_json("ws_ohlc_update.json");
736 let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
737 let ohlc: KrakenWsOhlcData = serde_json::from_value(message.data[0].clone()).unwrap();
738
739 let instrument = create_mock_instrument();
740 let bar = parse_ws_bar(&ohlc, &instrument, TS).unwrap();
741
742 assert_eq!(bar.bar_type.instrument_id(), instrument.id());
743 assert!(bar.open.as_f64() > 0.0);
744 assert!(bar.high.as_f64() > 0.0);
745 assert!(bar.low.as_f64() > 0.0);
746 assert!(bar.close.as_f64() > 0.0);
747 assert!(bar.volume.as_f64() > 0.0);
748
749 let spec = bar.bar_type.spec();
750 assert_eq!(spec.step.get(), 1);
751 assert_eq!(spec.aggregation, BarAggregation::Minute);
752 assert_eq!(spec.price_type, PriceType::Last);
753
754 let expected_close = ohlc.interval_begin + chrono::Duration::minutes(1);
757 let expected_ts_event =
758 UnixNanos::from(expected_close.timestamp_nanos_opt().unwrap() as u64);
759 assert_eq!(bar.ts_event, expected_ts_event);
760 }
761
762 #[rstest]
763 fn test_interval_to_bar_spec() {
764 let test_cases = [
765 (1, 1, BarAggregation::Minute),
766 (5, 5, BarAggregation::Minute),
767 (15, 15, BarAggregation::Minute),
768 (30, 30, BarAggregation::Minute),
769 (60, 1, BarAggregation::Hour),
770 (240, 4, BarAggregation::Hour),
771 (1440, 1, BarAggregation::Day),
772 (10080, 1, BarAggregation::Week),
773 (21600, 15, BarAggregation::Day), ];
775
776 for (interval, expected_step, expected_aggregation) in test_cases {
777 let spec = interval_to_bar_spec(interval).unwrap();
778 assert_eq!(
779 spec.step.get(),
780 expected_step,
781 "Failed for interval {interval}"
782 );
783 assert_eq!(
784 spec.aggregation, expected_aggregation,
785 "Failed for interval {interval}"
786 );
787 assert_eq!(spec.price_type, PriceType::Last);
788 }
789 }
790
791 #[rstest]
792 fn test_interval_to_bar_spec_invalid() {
793 let result = interval_to_bar_spec(999);
794 assert!(result.is_err());
795 }
796}