Skip to main content

nautilus_kraken/websocket/spot_v2/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message parsers for converting Kraken streaming data to Nautilus domain models.
17
18use anyhow::Context;
19use chrono::{DateTime, Utc};
20use nautilus_core::{UUID4, nanos::UnixNanos};
21use nautilus_model::{
22    data::{Bar, BarSpecification, BarType, BookOrder, OrderBookDelta, QuoteTick, TradeTick},
23    enums::{
24        AggregationSource, AggressorSide, BarAggregation, BookAction, LiquiditySide, OrderSide,
25        OrderStatus, OrderType, PriceType, TimeInForce, TriggerType,
26    },
27    identifiers::{AccountId, ClientOrderId, InstrumentId, TradeId, VenueOrderId},
28    instruments::{Instrument, any::InstrumentAny},
29    reports::{FillReport, OrderStatusReport},
30    types::{Currency, Money, Price, Quantity},
31};
32
33use super::{
34    enums::{KrakenExecType, KrakenLiquidityInd, KrakenWsOrderStatus},
35    messages::{
36        KrakenWsBookData, KrakenWsBookLevel, KrakenWsExecutionData, KrakenWsOhlcData,
37        KrakenWsTickerData, KrakenWsTradeData,
38    },
39};
40use crate::common::enums::{KrakenOrderSide, KrakenOrderType, KrakenTimeInForce};
41
42/// Parses Kraken WebSocket ticker data into a Nautilus quote tick.
43///
44/// # Errors
45///
46/// Returns an error if:
47/// - Bid or ask price/quantity cannot be parsed.
48pub fn parse_quote_tick(
49    ticker: &KrakenWsTickerData,
50    instrument: &InstrumentAny,
51    ts_init: UnixNanos,
52) -> anyhow::Result<QuoteTick> {
53    let instrument_id = instrument.id();
54    let price_precision = instrument.price_precision();
55    let size_precision = instrument.size_precision();
56
57    let bid_price = Price::new_checked(ticker.bid, price_precision).with_context(|| {
58        format!("Failed to construct bid Price with precision {price_precision}")
59    })?;
60    let bid_size = Quantity::new_checked(ticker.bid_qty, size_precision).with_context(|| {
61        format!("Failed to construct bid Quantity with precision {size_precision}")
62    })?;
63
64    let ask_price = Price::new_checked(ticker.ask, price_precision).with_context(|| {
65        format!("Failed to construct ask Price with precision {price_precision}")
66    })?;
67    let ask_size = Quantity::new_checked(ticker.ask_qty, size_precision).with_context(|| {
68        format!("Failed to construct ask Quantity with precision {size_precision}")
69    })?;
70
71    let ts_event = datetime_to_nanos(ticker.timestamp, "ticker.timestamp")?;
72
73    Ok(QuoteTick::new(
74        instrument_id,
75        bid_price,
76        ask_price,
77        bid_size,
78        ask_size,
79        ts_event,
80        ts_init,
81    ))
82}
83
84/// Parses Kraken WebSocket trade data into a Nautilus trade tick.
85///
86/// # Errors
87///
88/// Returns an error if:
89/// - Price or quantity cannot be parsed.
90/// - Timestamp is invalid.
91pub fn parse_trade_tick(
92    trade: &KrakenWsTradeData,
93    instrument: &InstrumentAny,
94    ts_init: UnixNanos,
95) -> anyhow::Result<TradeTick> {
96    let instrument_id = instrument.id();
97    let price_precision = instrument.price_precision();
98    let size_precision = instrument.size_precision();
99
100    let price = Price::new_checked(trade.price, price_precision)
101        .with_context(|| format!("Failed to construct Price with precision {price_precision}"))?;
102    let size = Quantity::new_checked(trade.qty, size_precision)
103        .with_context(|| format!("Failed to construct Quantity with precision {size_precision}"))?;
104
105    let aggressor = match trade.side {
106        KrakenOrderSide::Buy => AggressorSide::Buyer,
107        KrakenOrderSide::Sell => AggressorSide::Seller,
108    };
109
110    let trade_id = TradeId::new_checked(trade.trade_id.to_string())?;
111    let ts_event = datetime_to_nanos(trade.timestamp, "trade.timestamp")?;
112
113    TradeTick::new_checked(
114        instrument_id,
115        price,
116        size,
117        aggressor,
118        trade_id,
119        ts_event,
120        ts_init,
121    )
122    .context("Failed to construct TradeTick from Kraken WebSocket trade")
123}
124
125/// Parses Kraken WebSocket book data into Nautilus order book deltas.
126///
127/// Returns a vector of deltas, one for each bid and ask level.
128///
129/// # Errors
130///
131/// Returns an error if:
132/// - Price or quantity cannot be parsed.
133/// - Timestamp is invalid.
134pub fn parse_book_deltas(
135    book: &KrakenWsBookData,
136    instrument: &InstrumentAny,
137    sequence: u64,
138    ts_init: UnixNanos,
139) -> anyhow::Result<Vec<OrderBookDelta>> {
140    let instrument_id = instrument.id();
141    let price_precision = instrument.price_precision();
142    let size_precision = instrument.size_precision();
143
144    let ts_event = datetime_to_nanos(book.timestamp, "book.timestamp")?;
145
146    let mut deltas = Vec::new();
147    let mut current_sequence = sequence;
148
149    if let Some(ref bids) = book.bids {
150        for level in bids {
151            let delta = parse_book_level(
152                level,
153                OrderSide::Buy,
154                instrument_id,
155                price_precision,
156                size_precision,
157                current_sequence,
158                ts_event,
159                ts_init,
160            )?;
161            deltas.push(delta);
162            current_sequence += 1;
163        }
164    }
165
166    if let Some(ref asks) = book.asks {
167        for level in asks {
168            let delta = parse_book_level(
169                level,
170                OrderSide::Sell,
171                instrument_id,
172                price_precision,
173                size_precision,
174                current_sequence,
175                ts_event,
176                ts_init,
177            )?;
178            deltas.push(delta);
179            current_sequence += 1;
180        }
181    }
182
183    Ok(deltas)
184}
185
186#[expect(clippy::too_many_arguments)]
187fn parse_book_level(
188    level: &KrakenWsBookLevel,
189    side: OrderSide,
190    instrument_id: InstrumentId,
191    price_precision: u8,
192    size_precision: u8,
193    sequence: u64,
194    ts_event: UnixNanos,
195    ts_init: UnixNanos,
196) -> anyhow::Result<OrderBookDelta> {
197    let price = Price::new_checked(level.price, price_precision)
198        .with_context(|| format!("Failed to construct Price with precision {price_precision}"))?;
199    let size = Quantity::new_checked(level.qty, size_precision)
200        .with_context(|| format!("Failed to construct Quantity with precision {size_precision}"))?;
201
202    // Determine action based on quantity
203    let action = if size.raw == 0 {
204        BookAction::Delete
205    } else {
206        BookAction::Update
207    };
208
209    // Create order ID from price (Kraken doesn't provide order IDs)
210    let order_id = price.raw as u64;
211    let order = BookOrder::new(side, price, size, order_id);
212
213    Ok(OrderBookDelta::new(
214        instrument_id,
215        action,
216        order,
217        0, // flags
218        sequence,
219        ts_event,
220        ts_init,
221    ))
222}
223
224fn datetime_to_nanos(value: DateTime<Utc>, field: &str) -> anyhow::Result<UnixNanos> {
225    let nanos = value
226        .timestamp_nanos_opt()
227        .with_context(|| format!("Failed to convert {field}='{value}' to nanoseconds"))?;
228    Ok(UnixNanos::from(nanos as u64))
229}
230
231/// Parses Kraken WebSocket OHLC data into a Nautilus bar.
232///
233/// The bar's `ts_event` is computed as `interval_begin` + `interval` minutes.
234///
235/// # Errors
236///
237/// Returns an error if:
238/// - Price or quantity values cannot be parsed.
239/// - The interval cannot be converted to a valid bar specification.
240pub fn parse_ws_bar(
241    ohlc: &KrakenWsOhlcData,
242    instrument: &InstrumentAny,
243    ts_init: UnixNanos,
244) -> anyhow::Result<Bar> {
245    let instrument_id = instrument.id();
246    let price_precision = instrument.price_precision();
247    let size_precision = instrument.size_precision();
248
249    let open = Price::new_checked(ohlc.open, price_precision)?;
250    let high = Price::new_checked(ohlc.high, price_precision)?;
251    let low = Price::new_checked(ohlc.low, price_precision)?;
252    let close = Price::new_checked(ohlc.close, price_precision)?;
253    let volume = Quantity::new_checked(ohlc.volume, size_precision)?;
254
255    let bar_spec = interval_to_bar_spec(ohlc.interval)?;
256    let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
257
258    // Compute bar close time: interval_begin + interval minutes
259    let interval_secs = i64::from(ohlc.interval) * 60;
260    let close_time = ohlc.interval_begin + chrono::Duration::seconds(interval_secs);
261    let ts_event = UnixNanos::from(close_time.timestamp_nanos_opt().unwrap_or(0) as u64);
262
263    Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
264}
265
266/// Converts a Kraken OHLC interval (minutes) to a Nautilus bar specification.
267fn interval_to_bar_spec(interval: u32) -> anyhow::Result<BarSpecification> {
268    let (step, aggregation) = match interval {
269        1 => (1, BarAggregation::Minute),
270        5 => (5, BarAggregation::Minute),
271        15 => (15, BarAggregation::Minute),
272        30 => (30, BarAggregation::Minute),
273        60 => (1, BarAggregation::Hour),
274        240 => (4, BarAggregation::Hour),
275        1440 => (1, BarAggregation::Day),
276        10080 => (1, BarAggregation::Week),
277        21600 => (15, BarAggregation::Day), // 21600 min = 360 hours = 15 days
278        _ => anyhow::bail!("Unsupported Kraken OHLC interval: {interval}"),
279    };
280
281    Ok(BarSpecification::new(step, aggregation, PriceType::Last))
282}
283
284/// Parses Kraken execution type and order status to Nautilus order status.
285fn parse_order_status(
286    exec_type: KrakenExecType,
287    order_status: Option<KrakenWsOrderStatus>,
288) -> OrderStatus {
289    match exec_type {
290        KrakenExecType::Canceled => return OrderStatus::Canceled,
291        KrakenExecType::Expired => return OrderStatus::Expired,
292        KrakenExecType::Filled => return OrderStatus::Filled,
293        KrakenExecType::Trade => {
294            return match order_status {
295                Some(KrakenWsOrderStatus::Filled) => OrderStatus::Filled,
296                Some(KrakenWsOrderStatus::PartiallyFilled) | None => OrderStatus::PartiallyFilled,
297                Some(status) => status.into(),
298            };
299        }
300        _ => {}
301    }
302
303    match order_status {
304        Some(status) => status.into(),
305        None => OrderStatus::Accepted,
306    }
307}
308
309/// Parses Kraken order type to Nautilus order type.
310fn parse_order_type(order_type: Option<KrakenOrderType>) -> OrderType {
311    match order_type {
312        Some(KrakenOrderType::Market) => OrderType::Market,
313        Some(KrakenOrderType::Limit) => OrderType::Limit,
314        Some(KrakenOrderType::StopLoss) => OrderType::StopMarket,
315        Some(KrakenOrderType::TakeProfit) => OrderType::MarketIfTouched,
316        Some(KrakenOrderType::StopLossLimit) => OrderType::StopLimit,
317        Some(KrakenOrderType::TakeProfitLimit) => OrderType::LimitIfTouched,
318        // Trailing stops lack offset fields in WS reports, map to non-trailing equivalents
319        Some(KrakenOrderType::TrailingStop) => OrderType::StopMarket,
320        Some(KrakenOrderType::TrailingStopLimit) => OrderType::StopLimit,
321        Some(KrakenOrderType::SettlePosition) => OrderType::Market,
322        None => OrderType::Limit,
323    }
324}
325
326/// Parses Kraken order side to Nautilus order side.
327fn parse_order_side(side: Option<KrakenOrderSide>) -> OrderSide {
328    match side {
329        Some(KrakenOrderSide::Buy) => OrderSide::Buy,
330        Some(KrakenOrderSide::Sell) => OrderSide::Sell,
331        None => OrderSide::Buy,
332    }
333}
334
335/// Parses Kraken time-in-force to Nautilus time-in-force.
336fn parse_time_in_force(
337    time_in_force: Option<KrakenTimeInForce>,
338    post_only: Option<bool>,
339) -> TimeInForce {
340    // Handle post_only flag
341    if post_only == Some(true) {
342        return TimeInForce::Gtc;
343    }
344
345    match time_in_force {
346        Some(KrakenTimeInForce::GoodTilCancelled) => TimeInForce::Gtc,
347        Some(KrakenTimeInForce::ImmediateOrCancel) => TimeInForce::Ioc,
348        Some(KrakenTimeInForce::GoodTilDate) => TimeInForce::Gtd,
349        None => TimeInForce::Gtc,
350    }
351}
352
353fn parse_liquidity_side(liquidity_ind: Option<KrakenLiquidityInd>) -> LiquiditySide {
354    liquidity_ind.map_or(LiquiditySide::NoLiquiditySide, Into::into)
355}
356
357/// Parses a Kraken WebSocket execution message into an [`OrderStatusReport`].
358///
359/// # Errors
360///
361/// Returns an error if required fields are missing or cannot be parsed.
362pub fn parse_ws_order_status_report(
363    exec: &KrakenWsExecutionData,
364    instrument: &InstrumentAny,
365    account_id: AccountId,
366    cached_order_qty: Option<f64>,
367    ts_init: UnixNanos,
368) -> anyhow::Result<OrderStatusReport> {
369    let instrument_id = instrument.id();
370    let venue_order_id = VenueOrderId::new(&exec.order_id);
371    let order_side = parse_order_side(exec.side);
372    let order_type = parse_order_type(exec.order_type);
373    let time_in_force = parse_time_in_force(exec.time_in_force, exec.post_only);
374    let order_status = parse_order_status(exec.exec_type, exec.order_status);
375
376    let price_precision = instrument.price_precision();
377    let size_precision = instrument.size_precision();
378
379    // Quantity fallback: order_qty -> cached -> cum_qty -> last_qty (for trade snapshots)
380    let last_qty = exec
381        .last_qty
382        .map(|qty| Quantity::new_checked(qty, size_precision))
383        .transpose()
384        .context("Failed to parse last_qty")?;
385
386    let filled_qty = exec
387        .cum_qty
388        .map(|qty| Quantity::new_checked(qty, size_precision))
389        .transpose()
390        .context("Failed to parse cum_qty")?
391        .or(last_qty)
392        .unwrap_or_else(|| Quantity::new(0.0, size_precision));
393
394    let quantity = exec
395        .order_qty
396        .or(cached_order_qty)
397        .map(|qty| Quantity::new_checked(qty, size_precision))
398        .transpose()
399        .context("Failed to parse order_qty")?
400        .unwrap_or(filled_qty);
401
402    let ts_event = datetime_to_nanos(exec.timestamp, "execution.timestamp")?;
403
404    let mut report = OrderStatusReport::new(
405        account_id,
406        instrument_id,
407        None, // client_order_id set below if present
408        venue_order_id,
409        order_side,
410        order_type,
411        time_in_force,
412        order_status,
413        quantity,
414        filled_qty,
415        ts_event,
416        ts_event,
417        ts_init,
418        Some(UUID4::new()),
419    );
420
421    if let Some(ref cl_ord_id) = exec.cl_ord_id
422        && !cl_ord_id.is_empty()
423    {
424        report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
425    }
426
427    // Price fallback: limit_price -> avg_price -> last_price
428    // Note: pending_new messages may not include any price fields, which is fine for
429    // orders we submitted (engine already has the price from submission)
430    let price_value = exec
431        .limit_price
432        .filter(|&p| p > 0.0)
433        .or(exec.avg_price.filter(|&p| p > 0.0))
434        .or(exec.last_price.filter(|&p| p > 0.0));
435
436    if let Some(px) = price_value {
437        let price =
438            Price::new_checked(px, price_precision).context("Failed to parse order price")?;
439        report = report.with_price(price);
440    }
441
442    // avg_px fallback: avg_price -> cum_cost / cum_qty -> last_price (for single trades/snapshots)
443    let avg_px = exec
444        .avg_price
445        .filter(|&p| p > 0.0)
446        .or_else(|| match (exec.cum_cost, exec.cum_qty) {
447            (Some(cost), Some(qty)) if qty > 0.0 => Some(cost / qty),
448            _ => None,
449        })
450        .or_else(|| exec.last_price.filter(|&p| p > 0.0));
451
452    if let Some(avg_price) = avg_px {
453        report = report.with_avg_px(avg_price)?;
454    }
455
456    if exec.post_only == Some(true) {
457        report = report.with_post_only(true);
458    }
459
460    if exec.reduce_only == Some(true) {
461        report = report.with_reduce_only(true);
462    }
463
464    if let Some(ref reason) = exec.reason
465        && !reason.is_empty()
466    {
467        report = report.with_cancel_reason(reason.clone());
468    }
469
470    // Set trigger type for conditional orders (WebSocket doesn't provide trigger field)
471    let is_conditional = matches!(
472        order_type,
473        OrderType::StopMarket
474            | OrderType::StopLimit
475            | OrderType::MarketIfTouched
476            | OrderType::LimitIfTouched
477    );
478
479    if is_conditional {
480        report = report.with_trigger_type(TriggerType::Default);
481    }
482
483    Ok(report)
484}
485
486/// Parses a Kraken WebSocket trade execution into a [`FillReport`].
487///
488/// This should only be called when exec_type is "trade".
489///
490/// # Errors
491///
492/// Returns an error if required fields are missing or cannot be parsed.
493pub fn parse_ws_fill_report(
494    exec: &KrakenWsExecutionData,
495    instrument: &InstrumentAny,
496    account_id: AccountId,
497    ts_init: UnixNanos,
498) -> anyhow::Result<FillReport> {
499    let instrument_id = instrument.id();
500    let venue_order_id = VenueOrderId::new(&exec.order_id);
501
502    let exec_id = exec
503        .exec_id
504        .as_ref()
505        .context("Missing exec_id for trade execution")?;
506    let trade_id =
507        TradeId::new_checked(exec_id).context("Invalid exec_id in Kraken trade execution")?;
508
509    let order_side = parse_order_side(exec.side);
510
511    let price_precision = instrument.price_precision();
512    let size_precision = instrument.size_precision();
513
514    let last_qty = exec
515        .last_qty
516        .map(|qty| Quantity::new_checked(qty, size_precision))
517        .transpose()
518        .context("Failed to parse last_qty")?
519        .context("Missing last_qty for trade execution")?;
520
521    let last_px = exec
522        .last_price
523        .map(|px| Price::new_checked(px, price_precision))
524        .transpose()
525        .context("Failed to parse last_price")?
526        .context("Missing last_price for trade execution")?;
527
528    let liquidity_side = parse_liquidity_side(exec.liquidity_ind);
529
530    // Calculate commission from fees array
531    let commission = if let Some(ref fees) = exec.fees {
532        if let Some(fee) = fees.first() {
533            let currency = Currency::get_or_create_crypto(&fee.asset);
534            Money::new(fee.qty.abs(), currency)
535        } else {
536            Money::new(0.0, instrument.quote_currency())
537        }
538    } else {
539        Money::new(0.0, instrument.quote_currency())
540    };
541
542    let ts_event = datetime_to_nanos(exec.timestamp, "execution.timestamp")?;
543
544    let client_order_id = exec
545        .cl_ord_id
546        .as_ref()
547        .filter(|s| !s.is_empty())
548        .map(ClientOrderId::new);
549
550    Ok(FillReport::new(
551        account_id,
552        instrument_id,
553        venue_order_id,
554        trade_id,
555        order_side,
556        last_qty,
557        last_px,
558        commission,
559        liquidity_side,
560        client_order_id,
561        None, // venue_position_id
562        ts_event,
563        ts_init,
564        None, // report_id
565    ))
566}
567
568#[cfg(test)]
569mod tests {
570    use nautilus_model::{identifiers::Symbol, types::Currency};
571    use rstest::rstest;
572
573    use super::*;
574    use crate::{common::consts::KRAKEN_VENUE, websocket::spot_v2::messages::KrakenWsMessage};
575
576    const TS: UnixNanos = UnixNanos::new(1_700_000_000_000_000_000);
577
578    fn load_test_json(filename: &str) -> String {
579        let path = format!("test_data/{filename}");
580        std::fs::read_to_string(&path)
581            .unwrap_or_else(|e| panic!("Failed to load test data from {path}: {e}"))
582    }
583
584    fn create_mock_instrument() -> InstrumentAny {
585        use nautilus_model::instruments::currency_pair::CurrencyPair;
586
587        let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
588        InstrumentAny::CurrencyPair(CurrencyPair::new(
589            instrument_id,
590            Symbol::new("XBTUSDT"),
591            Currency::BTC(),
592            Currency::USDT(),
593            1, // price_precision
594            8, // size_precision
595            Price::from("0.1"),
596            Quantity::from("0.00000001"),
597            None,
598            None,
599            None,
600            None,
601            None,
602            None,
603            None,
604            None,
605            None,
606            None,
607            None,
608            None,
609            None, // info
610            TS,
611            TS,
612        ))
613    }
614
615    #[rstest]
616    fn test_parse_quote_tick() {
617        let json = load_test_json("ws_ticker_snapshot.json");
618        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
619        let ticker: KrakenWsTickerData = serde_json::from_value(message.data[0].clone()).unwrap();
620
621        let instrument = create_mock_instrument();
622        let quote_tick = parse_quote_tick(&ticker, &instrument, TS).unwrap();
623
624        assert_eq!(quote_tick.instrument_id, instrument.id());
625        assert!(quote_tick.bid_price.as_f64() > 0.0);
626        assert!(quote_tick.ask_price.as_f64() > 0.0);
627        assert!(quote_tick.bid_size.as_f64() > 0.0);
628        assert!(quote_tick.ask_size.as_f64() > 0.0);
629        assert_eq!(
630            quote_tick.ts_event,
631            UnixNanos::from(1_671_960_659_123_456_000)
632        );
633        assert_eq!(quote_tick.ts_init, TS);
634    }
635
636    #[rstest]
637    fn test_parse_trade_tick() {
638        let json = load_test_json("ws_trade_update.json");
639        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
640        let trade: KrakenWsTradeData = serde_json::from_value(message.data[0].clone()).unwrap();
641
642        let instrument = create_mock_instrument();
643        let trade_tick = parse_trade_tick(&trade, &instrument, TS).unwrap();
644
645        assert_eq!(trade_tick.instrument_id, instrument.id());
646        assert!(trade_tick.price.as_f64() > 0.0);
647        assert!(trade_tick.size.as_f64() > 0.0);
648        assert!(matches!(
649            trade_tick.aggressor_side,
650            AggressorSide::Buyer | AggressorSide::Seller
651        ));
652        assert_eq!(
653            trade_tick.ts_event,
654            UnixNanos::from(1_696_613_755_440_295_000)
655        );
656        assert_eq!(trade_tick.ts_init, TS);
657    }
658
659    #[rstest]
660    fn test_parse_book_deltas_snapshot() {
661        let json = load_test_json("ws_book_snapshot.json");
662        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
663        let book: KrakenWsBookData = serde_json::from_value(message.data[0].clone()).unwrap();
664
665        let instrument = create_mock_instrument();
666        let deltas = parse_book_deltas(&book, &instrument, 1, TS).unwrap();
667
668        assert!(!deltas.is_empty());
669
670        // Check that we have both bids and asks
671        let bid_count = deltas
672            .iter()
673            .filter(|d| d.order.side == OrderSide::Buy)
674            .count();
675        let ask_count = deltas
676            .iter()
677            .filter(|d| d.order.side == OrderSide::Sell)
678            .count();
679
680        assert!(bid_count > 0);
681        assert!(ask_count > 0);
682
683        // Check first delta
684        let first_delta = &deltas[0];
685        assert_eq!(first_delta.instrument_id, instrument.id());
686        assert!(first_delta.order.price.as_f64() > 0.0);
687        assert!(first_delta.order.size.as_f64() > 0.0);
688
689        let expected_ts_event = UnixNanos::from(1_696_613_755_440_295_000);
690        assert!(deltas.iter().all(|d| d.ts_event == expected_ts_event));
691        assert!(deltas.iter().all(|d| d.ts_init == TS));
692    }
693
694    #[rstest]
695    fn test_parse_book_deltas_update() {
696        let json = load_test_json("ws_book_update.json");
697        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
698        let book: KrakenWsBookData = serde_json::from_value(message.data[0].clone()).unwrap();
699
700        let instrument = create_mock_instrument();
701        let deltas = parse_book_deltas(&book, &instrument, 1, TS).unwrap();
702
703        assert!(!deltas.is_empty());
704
705        // Check that we have at least one delta
706        let first_delta = &deltas[0];
707        assert_eq!(first_delta.instrument_id, instrument.id());
708        assert!(first_delta.order.price.as_f64() > 0.0);
709
710        let expected_ts_event = UnixNanos::from(1_696_613_755_440_295_000);
711        assert!(deltas.iter().all(|d| d.ts_event == expected_ts_event));
712        assert!(deltas.iter().all(|d| d.ts_init == TS));
713    }
714
715    #[rstest]
716    fn test_datetime_to_nanos() {
717        let dt = "2023-10-06T17:35:55.440295Z"
718            .parse::<DateTime<Utc>>()
719            .unwrap();
720        let result = datetime_to_nanos(dt, "test").unwrap();
721        assert_eq!(result, UnixNanos::from(1_696_613_755_440_295_000));
722    }
723
724    #[rstest]
725    fn test_datetime_to_nanos_out_of_range_errors() {
726        let dt = "1500-01-01T00:00:00Z".parse::<DateTime<Utc>>().unwrap();
727        let result = datetime_to_nanos(dt, "test");
728        assert!(result.is_err());
729        let err = result.unwrap_err().to_string();
730        assert!(err.contains("test"));
731    }
732
733    #[rstest]
734    fn test_parse_ws_bar() {
735        let json = load_test_json("ws_ohlc_update.json");
736        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
737        let ohlc: KrakenWsOhlcData = serde_json::from_value(message.data[0].clone()).unwrap();
738
739        let instrument = create_mock_instrument();
740        let bar = parse_ws_bar(&ohlc, &instrument, TS).unwrap();
741
742        assert_eq!(bar.bar_type.instrument_id(), instrument.id());
743        assert!(bar.open.as_f64() > 0.0);
744        assert!(bar.high.as_f64() > 0.0);
745        assert!(bar.low.as_f64() > 0.0);
746        assert!(bar.close.as_f64() > 0.0);
747        assert!(bar.volume.as_f64() > 0.0);
748
749        let spec = bar.bar_type.spec();
750        assert_eq!(spec.step.get(), 1);
751        assert_eq!(spec.aggregation, BarAggregation::Minute);
752        assert_eq!(spec.price_type, PriceType::Last);
753
754        // Verify ts_event is computed as interval_begin + interval (close time)
755        // interval_begin is 2023-10-04T16:25:00Z, interval is 1 minute, so close is 16:26:00Z
756        let expected_close = ohlc.interval_begin + chrono::Duration::minutes(1);
757        let expected_ts_event =
758            UnixNanos::from(expected_close.timestamp_nanos_opt().unwrap() as u64);
759        assert_eq!(bar.ts_event, expected_ts_event);
760    }
761
762    #[rstest]
763    fn test_interval_to_bar_spec() {
764        let test_cases = [
765            (1, 1, BarAggregation::Minute),
766            (5, 5, BarAggregation::Minute),
767            (15, 15, BarAggregation::Minute),
768            (30, 30, BarAggregation::Minute),
769            (60, 1, BarAggregation::Hour),
770            (240, 4, BarAggregation::Hour),
771            (1440, 1, BarAggregation::Day),
772            (10080, 1, BarAggregation::Week),
773            (21600, 15, BarAggregation::Day), // 21600 min = 15 days
774        ];
775
776        for (interval, expected_step, expected_aggregation) in test_cases {
777            let spec = interval_to_bar_spec(interval).unwrap();
778            assert_eq!(
779                spec.step.get(),
780                expected_step,
781                "Failed for interval {interval}"
782            );
783            assert_eq!(
784                spec.aggregation, expected_aggregation,
785                "Failed for interval {interval}"
786            );
787            assert_eq!(spec.price_type, PriceType::Last);
788        }
789    }
790
791    #[rstest]
792    fn test_interval_to_bar_spec_invalid() {
793        let result = interval_to_bar_spec(999);
794        assert!(result.is_err());
795    }
796}