Skip to main content

nautilus_okx/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Functions translating raw OKX WebSocket frames into Nautilus data types.
17
18use std::str::FromStr;
19
20use ahash::AHashMap;
21use anyhow::Context;
22use nautilus_core::{UUID4, nanos::UnixNanos};
23use nautilus_model::{
24    data::{
25        Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
26        InstrumentStatus, MarkPriceUpdate, OptionGreekValues, OrderBookDelta, OrderBookDeltas,
27        OrderBookDeltas_API, OrderBookDepth10, QuoteTick, TradeTick, depth::DEPTH10_LEN,
28        option_chain::OptionGreeks,
29    },
30    enums::{
31        AggregationSource, AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus,
32        OrderType, RecordFlag, TimeInForce, TrailingOffsetType, TriggerType,
33    },
34    events::{OrderAccepted, OrderCanceled, OrderExpired, OrderTriggered, OrderUpdated},
35    identifiers::{
36        AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, VenueOrderId,
37    },
38    instruments::{Instrument, InstrumentAny},
39    reports::{FillReport, OrderStatusReport},
40    types::{Money, Price, Quantity},
41};
42use rust_decimal::Decimal;
43use ustr::Ustr;
44
45use super::{
46    enums::OKXWsChannel,
47    messages::{
48        OKXAlgoOrderMsg, OKXBookMsg, OKXCandleMsg, OKXIndexPriceMsg, OKXMarkPriceMsg,
49        OKXOptionSummaryMsg, OKXOrderMsg, OKXTickerMsg, OKXTradeMsg, OrderBookEntry,
50    },
51};
52use crate::{
53    common::{
54        consts::{OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE},
55        enums::{
56            OKXAlgoOrderType, OKXBookAction, OKXCandleConfirm, OKXGreeksType, OKXInstrumentStatus,
57            OKXInstrumentType, OKXOrderCategory, OKXOrderStatus, OKXOrderType, OKXSide,
58            OKXTargetCurrency, OKXTriggerType,
59        },
60        models::OKXInstrument,
61        parse::{
62            determine_order_type_with_alt, is_market_price, okx_channel_to_bar_spec,
63            okx_status_to_market_action, parse_client_order_id, parse_fee, parse_fee_currency,
64            parse_funding_rate_msg, parse_instrument_any, parse_message_vec,
65            parse_millisecond_timestamp, parse_price, parse_quantity,
66        },
67    },
68    websocket::messages::{ExecutionReport, NautilusWsMessage, OKXFundingRateMsg},
69};
70
71/// Extracts fee rates from a cached instrument.
72///
73/// Returns a tuple of (margin_init, margin_maint, maker_fee, taker_fee).
74/// All values are None if the instrument type doesn't support fees.
75pub(crate) fn extract_fees_from_cached_instrument(
76    instrument: &InstrumentAny,
77) -> (
78    Option<Decimal>,
79    Option<Decimal>,
80    Option<Decimal>,
81    Option<Decimal>,
82) {
83    match instrument {
84        InstrumentAny::CurrencyPair(pair) => (
85            Some(pair.margin_init),
86            Some(pair.margin_maint),
87            Some(pair.maker_fee),
88            Some(pair.taker_fee),
89        ),
90        InstrumentAny::CryptoPerpetual(perp) => (
91            Some(perp.margin_init),
92            Some(perp.margin_maint),
93            Some(perp.maker_fee),
94            Some(perp.taker_fee),
95        ),
96        InstrumentAny::CryptoFuture(future) => (
97            Some(future.margin_init),
98            Some(future.margin_maint),
99            Some(future.maker_fee),
100            Some(future.taker_fee),
101        ),
102        InstrumentAny::CryptoOption(option) => (
103            Some(option.margin_init),
104            Some(option.margin_maint),
105            Some(option.maker_fee),
106            Some(option.taker_fee),
107        ),
108        _ => (None, None, None, None),
109    }
110}
111
112/// Represents the result of parsing an OKX order message into a specific event.
113#[derive(Debug, Clone)]
114pub enum ParsedOrderEvent {
115    /// Order has been accepted by the venue.
116    Accepted(OrderAccepted),
117    /// Order has been canceled.
118    Canceled(OrderCanceled),
119    /// Order has expired (e.g., GTD order reached expiration time).
120    Expired(OrderExpired),
121    /// Stop/algo order has been triggered.
122    Triggered(OrderTriggered),
123    /// Order has been modified (price, quantity, or venue order ID changed).
124    Updated(OrderUpdated),
125    /// Order fill event.
126    Fill(FillReport),
127    /// Status update that doesn't map to a specific event (for reconciliation/external orders).
128    StatusOnly(Box<OrderStatusReport>),
129    /// Duplicate message detected (e.g. reconnect replay with unchanged fill).
130    /// The dispatcher should update caches but not emit any event.
131    Skipped,
132}
133
134/// Snapshot of order state for detecting updates.
135#[derive(Debug, Clone)]
136pub struct OrderStateSnapshot {
137    pub venue_order_id: VenueOrderId,
138    pub quantity: Quantity,
139    pub price: Option<Price>,
140}
141
142/// Parses an OKX order message into a specific order event.
143///
144/// This function determines the appropriate event type based on:
145/// - Current order status from OKX
146/// - Whether there's a new fill
147/// - Whether the order was updated (price/quantity change)
148/// - Whether it's an algo order that triggered
149///
150/// # Errors
151///
152/// Returns an error if parsing order identifiers or numeric fields fails.
153#[expect(clippy::too_many_arguments)]
154pub fn parse_order_event(
155    msg: &OKXOrderMsg,
156    client_order_id: ClientOrderId,
157    account_id: AccountId,
158    trader_id: TraderId,
159    strategy_id: StrategyId,
160    instrument: &InstrumentAny,
161    previous_fee: Option<Money>,
162    previous_filled_qty: Option<Quantity>,
163    previous_state: Option<&OrderStateSnapshot>,
164    ts_init: UnixNanos,
165) -> anyhow::Result<ParsedOrderEvent> {
166    let venue_order_id = VenueOrderId::new(msg.ord_id);
167    let instrument_id = instrument.id();
168
169    let has_new_fill = (!msg.fill_sz.is_empty() && msg.fill_sz != "0")
170        || !msg.trade_id.is_empty()
171        || has_acc_fill_sz_increased(
172            &msg.acc_fill_sz,
173            previous_filled_qty,
174            instrument.size_precision(),
175        );
176
177    // Check for order updates, but skip when other events take precedence:
178    // - Fill events: fill data must be processed, update detection secondary
179    // - Terminal states: handled by specific branches below
180    let skip_update_check = has_new_fill
181        || matches!(
182            msg.state,
183            OKXOrderStatus::Filled | OKXOrderStatus::Canceled | OKXOrderStatus::MmpCanceled
184        );
185
186    if !skip_update_check
187        && let Some(prev) = previous_state
188        && is_order_updated_excluding_venue_id_for_live(msg, prev, instrument)?
189    {
190        let ts_event = parse_millisecond_timestamp(msg.u_time);
191        let quantity = parse_quantity(&msg.sz, instrument.size_precision())?;
192        let price = if is_market_price(&msg.px) {
193            None
194        } else {
195            Some(parse_price(&msg.px, instrument.price_precision())?)
196        };
197
198        return Ok(ParsedOrderEvent::Updated(OrderUpdated::new(
199            trader_id,
200            strategy_id,
201            instrument_id,
202            client_order_id,
203            quantity,
204            UUID4::new(),
205            ts_event,
206            ts_init,
207            false, // reconciliation
208            Some(venue_order_id),
209            Some(account_id),
210            price,
211            None,  // trigger_price
212            None,  // protection_price
213            false, // is_quote_quantity
214        )));
215    }
216
217    match msg.state {
218        OKXOrderStatus::Filled | OKXOrderStatus::PartiallyFilled if has_new_fill => {
219            match parse_fill_report(
220                msg,
221                instrument,
222                account_id,
223                previous_fee,
224                previous_filled_qty,
225                ts_init,
226            )? {
227                Some(report) => Ok(ParsedOrderEvent::Fill(report)),
228                None => Ok(ParsedOrderEvent::Skipped),
229            }
230        }
231        OKXOrderStatus::Live => {
232            let ts_event = parse_millisecond_timestamp(msg.c_time);
233            Ok(ParsedOrderEvent::Accepted(OrderAccepted::new(
234                trader_id,
235                strategy_id,
236                instrument_id,
237                client_order_id,
238                venue_order_id,
239                account_id,
240                UUID4::new(),
241                ts_event,
242                ts_init,
243                false, // reconciliation
244            )))
245        }
246        OKXOrderStatus::Canceled | OKXOrderStatus::MmpCanceled => {
247            let ts_event = parse_millisecond_timestamp(msg.u_time);
248
249            if is_order_expired_by_reason(msg) {
250                Ok(ParsedOrderEvent::Expired(OrderExpired::new(
251                    trader_id,
252                    strategy_id,
253                    instrument_id,
254                    client_order_id,
255                    UUID4::new(),
256                    ts_event,
257                    ts_init,
258                    false,
259                    Some(venue_order_id),
260                    Some(account_id),
261                )))
262            } else {
263                Ok(ParsedOrderEvent::Canceled(OrderCanceled::new(
264                    trader_id,
265                    strategy_id,
266                    instrument_id,
267                    client_order_id,
268                    UUID4::new(),
269                    ts_event,
270                    ts_init,
271                    false,
272                    Some(venue_order_id),
273                    Some(account_id),
274                )))
275            }
276        }
277        OKXOrderStatus::Effective | OKXOrderStatus::OrderPlaced => {
278            let ts_event = parse_millisecond_timestamp(msg.u_time);
279            Ok(ParsedOrderEvent::Triggered(OrderTriggered::new(
280                trader_id,
281                strategy_id,
282                instrument_id,
283                client_order_id,
284                UUID4::new(),
285                ts_event,
286                ts_init,
287                false,
288                Some(venue_order_id),
289                Some(account_id),
290            )))
291        }
292        _ => {
293            // PartiallyFilled without new fill or other states - use status report
294            parse_order_status_report(msg, instrument, account_id, ts_init)
295                .map(|r| ParsedOrderEvent::StatusOnly(Box::new(r)))
296        }
297    }
298}
299
300/// Case-insensitive substring check.
301#[inline]
302/// Builds a deterministic synthesized `TradeId` for fills where OKX omits
303/// the venue `trade_id`. Hashes the immutable fill fields with FNV-1a so
304/// the result fits inside the 36-character `TradeId` cap and stays stable
305/// across reconnect replays of the same physical fill.
306fn synthesize_trade_id(msg: &OKXOrderMsg) -> String {
307    const FNV_OFFSET: u64 = 0xcbf29ce484222325;
308    const FNV_PRIME: u64 = 0x100000001b3;
309
310    let mut hasher: u64 = FNV_OFFSET;
311    let mut update = |bytes: &[u8]| {
312        for byte in bytes {
313            hasher ^= u64::from(*byte);
314            hasher = hasher.wrapping_mul(FNV_PRIME);
315        }
316        // Field separator so that "ab" + "c" doesn't hash to the same as "a" + "bc".
317        hasher ^= 0xff;
318        hasher = hasher.wrapping_mul(FNV_PRIME);
319    };
320
321    update(msg.ord_id.as_bytes());
322    update(msg.fill_time.to_string().as_bytes());
323    update(msg.fill_sz.as_bytes());
324    update(msg.fill_px.as_bytes());
325    update(msg.acc_fill_sz.as_deref().unwrap_or("").as_bytes());
326
327    format!("synth-{hasher:016x}")
328}
329
330fn contains_ignore_ascii_case(haystack: &str, needle: &str) -> bool {
331    haystack
332        .as_bytes()
333        .windows(needle.len())
334        .any(|window| window.eq_ignore_ascii_case(needle.as_bytes()))
335}
336
337/// Determines if a Canceled order is actually an Expired order based on cancel reason.
338fn is_order_expired_by_reason(msg: &OKXOrderMsg) -> bool {
339    if let Some(ref reason) = msg.cancel_source_reason
340        && (contains_ignore_ascii_case(reason, "expir")
341            || contains_ignore_ascii_case(reason, "gtd")
342            || contains_ignore_ascii_case(reason, "timeout")
343            || contains_ignore_ascii_case(reason, "time_expired"))
344    {
345        return true;
346    }
347
348    // OKX cancel source codes that indicate expiration
349    if let Some(ref source) = msg.cancel_source
350        && (source == "5" || source == "time_expired" || source == "gtd_expired")
351    {
352        return true;
353    }
354
355    false
356}
357
358/// Checks if order parameters have been updated compared to previous state.
359///
360/// For Live status, venue_id changes are ignored because algo triggers create new IDs
361/// but should emit OrderAccepted, not OrderUpdated. Price/quantity changes still count.
362fn is_order_updated_excluding_venue_id_for_live(
363    msg: &OKXOrderMsg,
364    previous: &OrderStateSnapshot,
365    instrument: &InstrumentAny,
366) -> anyhow::Result<bool> {
367    // For non-Live statuses, venue_id change indicates amendment
368    if msg.state != OKXOrderStatus::Live {
369        let current_venue_id = VenueOrderId::new(msg.ord_id);
370        if previous.venue_order_id != current_venue_id {
371            return Ok(true);
372        }
373    }
374
375    let current_qty = parse_quantity(&msg.sz, instrument.size_precision())?;
376    if previous.quantity != current_qty {
377        return Ok(true);
378    }
379
380    // Price change only applies to limit orders
381    if !is_market_price(&msg.px) {
382        let current_price = parse_price(&msg.px, instrument.price_precision())?;
383
384        if let Some(prev_price) = previous.price
385            && prev_price != current_price
386        {
387            return Ok(true);
388        }
389    }
390
391    Ok(false)
392}
393
394/// Checks if order parameters have been updated (used by tests).
395#[cfg(test)]
396fn is_order_updated(
397    msg: &OKXOrderMsg,
398    previous: &OrderStateSnapshot,
399    instrument: &InstrumentAny,
400) -> anyhow::Result<bool> {
401    let current_venue_id = VenueOrderId::new(msg.ord_id);
402
403    // Venue order ID change indicates amendment
404    if previous.venue_order_id != current_venue_id {
405        return Ok(true);
406    }
407
408    let current_qty = parse_quantity(&msg.sz, instrument.size_precision())?;
409    if previous.quantity != current_qty {
410        return Ok(true);
411    }
412
413    // Price change only applies to limit orders
414    if !is_market_price(&msg.px) {
415        let current_price = parse_price(&msg.px, instrument.price_precision())?;
416
417        if let Some(prev_price) = previous.price
418            && prev_price != current_price
419        {
420            return Ok(true);
421        }
422    }
423
424    Ok(false)
425}
426
427/// Parses vector of OKX book messages into Nautilus order book deltas.
428///
429/// # Errors
430///
431/// Returns an error if any underlying book message cannot be parsed.
432pub fn parse_book_msg_vec(
433    data: Vec<OKXBookMsg>,
434    instrument_id: &InstrumentId,
435    price_precision: u8,
436    size_precision: u8,
437    action: OKXBookAction,
438    ts_init: UnixNanos,
439) -> anyhow::Result<Vec<Data>> {
440    let mut deltas = Vec::with_capacity(data.len());
441
442    for msg in data {
443        let deltas_api = OrderBookDeltas_API::new(parse_book_msg(
444            &msg,
445            *instrument_id,
446            price_precision,
447            size_precision,
448            &action,
449            ts_init,
450        )?);
451        deltas.push(Data::Deltas(deltas_api));
452    }
453
454    Ok(deltas)
455}
456
457/// Parses vector of OKX ticker messages into Nautilus quote ticks.
458///
459/// # Errors
460///
461/// Returns an error if any ticker message fails to parse.
462pub fn parse_ticker_msg_vec(
463    data: serde_json::Value,
464    instrument_id: &InstrumentId,
465    price_precision: u8,
466    size_precision: u8,
467    ts_init: UnixNanos,
468) -> anyhow::Result<Vec<Data>> {
469    parse_message_vec(
470        data,
471        |msg| {
472            parse_ticker_msg(
473                msg,
474                *instrument_id,
475                price_precision,
476                size_precision,
477                ts_init,
478            )
479        },
480        Data::Quote,
481    )
482}
483
484/// Parses vector of OKX book messages into Nautilus quote ticks.
485///
486/// # Errors
487///
488/// Returns an error if any quote message fails to parse.
489pub fn parse_quote_msg_vec(
490    data: serde_json::Value,
491    instrument_id: &InstrumentId,
492    price_precision: u8,
493    size_precision: u8,
494    ts_init: UnixNanos,
495) -> anyhow::Result<Vec<Data>> {
496    parse_message_vec(
497        data,
498        |msg| {
499            parse_quote_msg(
500                msg,
501                *instrument_id,
502                price_precision,
503                size_precision,
504                ts_init,
505            )
506        },
507        Data::Quote,
508    )
509}
510
511/// Parses vector of OKX trade messages into Nautilus trade ticks.
512///
513/// # Errors
514///
515/// Returns an error if any trade message fails to parse.
516pub fn parse_trade_msg_vec(
517    data: serde_json::Value,
518    instrument_id: &InstrumentId,
519    price_precision: u8,
520    size_precision: u8,
521    ts_init: UnixNanos,
522) -> anyhow::Result<Vec<Data>> {
523    parse_message_vec(
524        data,
525        |msg| {
526            parse_trade_msg(
527                msg,
528                *instrument_id,
529                price_precision,
530                size_precision,
531                ts_init,
532            )
533        },
534        Data::Trade,
535    )
536}
537
538/// Parses vector of OKX mark price messages into Nautilus mark price updates.
539///
540/// # Errors
541///
542/// Returns an error if any mark price message fails to parse.
543pub fn parse_mark_price_msg_vec(
544    data: serde_json::Value,
545    instrument_id: &InstrumentId,
546    price_precision: u8,
547    ts_init: UnixNanos,
548) -> anyhow::Result<Vec<Data>> {
549    parse_message_vec(
550        data,
551        |msg| parse_mark_price_msg(msg, *instrument_id, price_precision, ts_init),
552        Data::MarkPriceUpdate,
553    )
554}
555
556/// Parses vector of OKX index price messages into Nautilus index price updates.
557///
558/// # Errors
559///
560/// Returns an error if any index price message fails to parse.
561pub fn parse_index_price_msg_vec(
562    data: serde_json::Value,
563    instrument_id: &InstrumentId,
564    price_precision: u8,
565    ts_init: UnixNanos,
566) -> anyhow::Result<Vec<Data>> {
567    parse_message_vec(
568        data,
569        |msg| parse_index_price_msg(msg, *instrument_id, price_precision, ts_init),
570        Data::IndexPriceUpdate,
571    )
572}
573
574/// Parses vector of OKX funding rate messages into Nautilus funding rate updates.
575/// Includes caching to filter out duplicate funding rates.
576///
577/// # Errors
578///
579/// Returns an error if any funding rate message fails to parse.
580pub fn parse_funding_rate_msg_vec(
581    data: serde_json::Value,
582    instrument_id: &InstrumentId,
583    ts_init: UnixNanos,
584    funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
585) -> anyhow::Result<Vec<FundingRateUpdate>> {
586    let msgs: Vec<OKXFundingRateMsg> = serde_json::from_value(data)?;
587
588    let mut result = Vec::with_capacity(msgs.len());
589
590    for msg in &msgs {
591        let cache_key = (msg.funding_rate, msg.funding_time);
592
593        if let Some(cached) = funding_cache.get(&msg.inst_id)
594            && *cached == cache_key
595        {
596            continue; // Skip duplicate
597        }
598
599        // New or changed funding rate, update cache and parse
600        funding_cache.insert(msg.inst_id, cache_key);
601        let funding_rate = parse_funding_rate_msg(msg, *instrument_id, ts_init)?;
602        result.push(funding_rate);
603    }
604
605    Ok(result)
606}
607
608/// Parses vector of OKX candle messages into Nautilus bars.
609///
610/// # Errors
611///
612/// Returns an error if candle messages cannot be deserialized or parsed.
613pub fn parse_candle_msg_vec(
614    data: serde_json::Value,
615    instrument_id: &InstrumentId,
616    price_precision: u8,
617    size_precision: u8,
618    spec: BarSpecification,
619    ts_init: UnixNanos,
620) -> anyhow::Result<Vec<Data>> {
621    let msgs: Vec<OKXCandleMsg> = serde_json::from_value(data)?;
622    let bar_type = BarType::new(*instrument_id, spec, AggregationSource::External);
623    let mut bars = Vec::with_capacity(msgs.len());
624
625    for msg in msgs {
626        // Only process completed candles to avoid duplicate/partial bars
627        if msg.confirm == OKXCandleConfirm::Closed {
628            let bar = parse_candle_msg(&msg, bar_type, price_precision, size_precision, ts_init)?;
629            bars.push(Data::Bar(bar));
630        }
631    }
632
633    Ok(bars)
634}
635
636/// Parses vector of OKX book messages into Nautilus depth10 updates.
637///
638/// # Errors
639///
640/// Returns an error if any book10 message fails to parse.
641pub fn parse_book10_msg_vec(
642    data: Vec<OKXBookMsg>,
643    instrument_id: &InstrumentId,
644    price_precision: u8,
645    size_precision: u8,
646    ts_init: UnixNanos,
647) -> anyhow::Result<Vec<Data>> {
648    let mut depth10_updates = Vec::with_capacity(data.len());
649
650    for msg in data {
651        let depth10 = parse_book10_msg(
652            &msg,
653            *instrument_id,
654            price_precision,
655            size_precision,
656            ts_init,
657        )?;
658        depth10_updates.push(Data::Depth10(Box::new(depth10)));
659    }
660
661    Ok(depth10_updates)
662}
663
664/// Parses an OKX book message into Nautilus order book deltas.
665///
666/// # Errors
667///
668/// Returns an error if bid or ask levels contain values that cannot be parsed.
669pub fn parse_book_msg(
670    msg: &OKXBookMsg,
671    instrument_id: InstrumentId,
672    price_precision: u8,
673    size_precision: u8,
674    action: &OKXBookAction,
675    ts_init: UnixNanos,
676) -> anyhow::Result<OrderBookDeltas> {
677    let flags = if action == &OKXBookAction::Snapshot {
678        RecordFlag::F_SNAPSHOT as u8
679    } else {
680        0
681    };
682    let ts_event = parse_millisecond_timestamp(msg.ts);
683
684    let mut deltas = Vec::with_capacity(msg.asks.len() + msg.bids.len());
685
686    for bid in &msg.bids {
687        let book_action = match action {
688            OKXBookAction::Snapshot => BookAction::Add,
689            _ => match bid.size.as_str() {
690                "0" => BookAction::Delete,
691                _ => BookAction::Update,
692            },
693        };
694        let price = parse_price(&bid.price, price_precision)?;
695        let size = parse_quantity(&bid.size, size_precision)?;
696        let order_id = 0; // TBD
697        let order = BookOrder::new(OrderSide::Buy, price, size, order_id);
698        let delta = OrderBookDelta::new(
699            instrument_id,
700            book_action,
701            order,
702            flags,
703            msg.seq_id,
704            ts_event,
705            ts_init,
706        );
707        deltas.push(delta);
708    }
709
710    for ask in &msg.asks {
711        let book_action = match action {
712            OKXBookAction::Snapshot => BookAction::Add,
713            _ => match ask.size.as_str() {
714                "0" => BookAction::Delete,
715                _ => BookAction::Update,
716            },
717        };
718        let price = parse_price(&ask.price, price_precision)?;
719        let size = parse_quantity(&ask.size, size_precision)?;
720        let order_id = 0; // TBD
721        let order = BookOrder::new(OrderSide::Sell, price, size, order_id);
722        let delta = OrderBookDelta::new(
723            instrument_id,
724            book_action,
725            order,
726            flags,
727            msg.seq_id,
728            ts_event,
729            ts_init,
730        );
731        deltas.push(delta);
732    }
733
734    OrderBookDeltas::new_checked(instrument_id, deltas)
735}
736
737/// Parses an OKX book message into a Nautilus quote tick.
738///
739/// # Errors
740///
741/// Returns an error if any quote levels contain values that cannot be parsed.
742pub fn parse_quote_msg(
743    msg: &OKXBookMsg,
744    instrument_id: InstrumentId,
745    price_precision: u8,
746    size_precision: u8,
747    ts_init: UnixNanos,
748) -> anyhow::Result<QuoteTick> {
749    let best_bid: &OrderBookEntry = msg
750        .bids
751        .first()
752        .ok_or_else(|| anyhow::anyhow!("Empty bids array for {instrument_id}"))?;
753    let best_ask: &OrderBookEntry = msg
754        .asks
755        .first()
756        .ok_or_else(|| anyhow::anyhow!("Empty asks array for {instrument_id}"))?;
757
758    let bid_price = parse_price(&best_bid.price, price_precision)?;
759    let ask_price = parse_price(&best_ask.price, price_precision)?;
760    let bid_size = parse_quantity(&best_bid.size, size_precision)?;
761    let ask_size = parse_quantity(&best_ask.size, size_precision)?;
762    let ts_event = parse_millisecond_timestamp(msg.ts);
763
764    QuoteTick::new_checked(
765        instrument_id,
766        bid_price,
767        ask_price,
768        bid_size,
769        ask_size,
770        ts_event,
771        ts_init,
772    )
773}
774
775/// Parses an OKX book message into a Nautilus [`OrderBookDepth10`].
776///
777/// Converts order book data into a fixed-depth snapshot with top 10 levels for both sides.
778///
779/// # Errors
780///
781/// Returns an error if price or size fields cannot be parsed for any level.
782pub fn parse_book10_msg(
783    msg: &OKXBookMsg,
784    instrument_id: InstrumentId,
785    price_precision: u8,
786    size_precision: u8,
787    ts_init: UnixNanos,
788) -> anyhow::Result<OrderBookDepth10> {
789    // Initialize arrays - need to fill all 10 levels even if we have fewer
790    let mut bids: [BookOrder; DEPTH10_LEN] = [BookOrder::default(); DEPTH10_LEN];
791    let mut asks: [BookOrder; DEPTH10_LEN] = [BookOrder::default(); DEPTH10_LEN];
792    let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
793    let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
794
795    // Parse available bid levels (up to 10)
796    let bid_len = msg.bids.len().min(DEPTH10_LEN);
797
798    for (i, level) in msg.bids.iter().take(DEPTH10_LEN).enumerate() {
799        let price = parse_price(&level.price, price_precision)?;
800        let size = parse_quantity(&level.size, size_precision)?;
801        let orders_count = level.orders_count.parse::<u32>().unwrap_or(1);
802
803        let bid_order = BookOrder::new(OrderSide::Buy, price, size, 0);
804        bids[i] = bid_order;
805        bid_counts[i] = orders_count;
806    }
807
808    // Fill remaining bid slots with empty Buy orders (not NULL orders)
809    for i in bid_len..DEPTH10_LEN {
810        bids[i] = BookOrder::new(
811            OrderSide::Buy,
812            Price::zero(price_precision),
813            Quantity::zero(size_precision),
814            0,
815        );
816        bid_counts[i] = 0;
817    }
818
819    // Parse available ask levels (up to 10)
820    let ask_len = msg.asks.len().min(DEPTH10_LEN);
821
822    for (i, level) in msg.asks.iter().take(DEPTH10_LEN).enumerate() {
823        let price = parse_price(&level.price, price_precision)?;
824        let size = parse_quantity(&level.size, size_precision)?;
825        let orders_count = level.orders_count.parse::<u32>().unwrap_or(1);
826
827        let ask_order = BookOrder::new(OrderSide::Sell, price, size, 0);
828        asks[i] = ask_order;
829        ask_counts[i] = orders_count;
830    }
831
832    // Fill remaining ask slots with empty Sell orders (not NULL orders)
833    for i in ask_len..DEPTH10_LEN {
834        asks[i] = BookOrder::new(
835            OrderSide::Sell,
836            Price::zero(price_precision),
837            Quantity::zero(size_precision),
838            0,
839        );
840        ask_counts[i] = 0;
841    }
842
843    let ts_event = parse_millisecond_timestamp(msg.ts);
844
845    Ok(OrderBookDepth10::new(
846        instrument_id,
847        bids,
848        asks,
849        bid_counts,
850        ask_counts,
851        RecordFlag::F_SNAPSHOT as u8,
852        msg.seq_id, // Use sequence ID for OKX L2 books
853        ts_event,
854        ts_init,
855    ))
856}
857
858/// Parses an OKX ticker message into a Nautilus quote tick.
859///
860/// # Errors
861///
862/// Returns an error if bid or ask values cannot be parsed from the message.
863pub fn parse_ticker_msg(
864    msg: &OKXTickerMsg,
865    instrument_id: InstrumentId,
866    price_precision: u8,
867    size_precision: u8,
868    ts_init: UnixNanos,
869) -> anyhow::Result<QuoteTick> {
870    let bid_price = parse_price(&msg.bid_px, price_precision)?;
871    let ask_price = parse_price(&msg.ask_px, price_precision)?;
872    let bid_size = parse_quantity(&msg.bid_sz, size_precision)?;
873    let ask_size = parse_quantity(&msg.ask_sz, size_precision)?;
874    let ts_event = parse_millisecond_timestamp(msg.ts);
875
876    QuoteTick::new_checked(
877        instrument_id,
878        bid_price,
879        ask_price,
880        bid_size,
881        ask_size,
882        ts_event,
883        ts_init,
884    )
885}
886
887/// Parses an OKX trade message into a Nautilus trade tick.
888///
889/// # Errors
890///
891/// Returns an error if trade prices or sizes cannot be parsed.
892pub fn parse_trade_msg(
893    msg: &OKXTradeMsg,
894    instrument_id: InstrumentId,
895    price_precision: u8,
896    size_precision: u8,
897    ts_init: UnixNanos,
898) -> anyhow::Result<TradeTick> {
899    let price = parse_price(&msg.px, price_precision)?;
900    let size = parse_quantity(&msg.sz, size_precision)?;
901    let aggressor_side: AggressorSide = msg.side.into();
902    let trade_id = TradeId::new(&msg.trade_id);
903    let ts_event = parse_millisecond_timestamp(msg.ts);
904
905    TradeTick::new_checked(
906        instrument_id,
907        price,
908        size,
909        aggressor_side,
910        trade_id,
911        ts_event,
912        ts_init,
913    )
914}
915
916/// Parses an OKX mark price message into a Nautilus mark price update.
917///
918/// # Errors
919///
920/// Returns an error if the mark price fails to parse.
921pub fn parse_mark_price_msg(
922    msg: &OKXMarkPriceMsg,
923    instrument_id: InstrumentId,
924    price_precision: u8,
925    ts_init: UnixNanos,
926) -> anyhow::Result<MarkPriceUpdate> {
927    let price = parse_price(&msg.mark_px, price_precision)?;
928    let ts_event = parse_millisecond_timestamp(msg.ts);
929
930    Ok(MarkPriceUpdate::new(
931        instrument_id,
932        price,
933        ts_event,
934        ts_init,
935    ))
936}
937
938/// Parses an OKX index price message into a Nautilus index price update.
939///
940/// # Errors
941///
942/// Returns an error if the index price fails to parse.
943pub fn parse_index_price_msg(
944    msg: &OKXIndexPriceMsg,
945    instrument_id: InstrumentId,
946    price_precision: u8,
947    ts_init: UnixNanos,
948) -> anyhow::Result<IndexPriceUpdate> {
949    let price = parse_price(&msg.idx_px, price_precision)?;
950    let ts_event = parse_millisecond_timestamp(msg.ts);
951
952    Ok(IndexPriceUpdate::new(
953        instrument_id,
954        price,
955        ts_event,
956        ts_init,
957    ))
958}
959
960/// Parses an OKX candle message into a Nautilus bar.
961///
962/// # Errors
963///
964/// Returns an error if candle price or volume fields cannot be parsed.
965pub fn parse_candle_msg(
966    msg: &OKXCandleMsg,
967    bar_type: BarType,
968    price_precision: u8,
969    size_precision: u8,
970    ts_init: UnixNanos,
971) -> anyhow::Result<Bar> {
972    let open = parse_price(&msg.o, price_precision)?;
973    let high = parse_price(&msg.h, price_precision)?;
974    let low = parse_price(&msg.l, price_precision)?;
975    let close = parse_price(&msg.c, price_precision)?;
976    let volume = parse_quantity(&msg.vol, size_precision)?;
977    let ts_event = parse_millisecond_timestamp(msg.ts);
978
979    Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
980}
981
982/// Parses vector of OKX order messages into Nautilus execution reports.
983///
984/// # Errors
985///
986/// Returns an error if any contained order messages cannot be parsed.
987pub fn parse_order_msg_vec(
988    data: &[OKXOrderMsg],
989    account_id: AccountId,
990    instruments: &AHashMap<Ustr, InstrumentAny>,
991    fee_cache: &mut AHashMap<Ustr, Money>,
992    filled_qty_cache: &mut AHashMap<Ustr, Quantity>,
993    ts_init: UnixNanos,
994) -> anyhow::Result<Vec<ExecutionReport>> {
995    let mut order_reports = Vec::with_capacity(data.len());
996
997    for msg in data {
998        match parse_order_msg(
999            msg,
1000            account_id,
1001            instruments,
1002            fee_cache,
1003            filled_qty_cache,
1004            ts_init,
1005        ) {
1006            Ok(report) => order_reports.push(report),
1007            Err(e) => log::error!("Failed to parse execution report from message: {e}"),
1008        }
1009
1010        if let Some(instrument) = instruments.get(&msg.inst_id) {
1011            update_fee_fill_caches(msg, instrument, fee_cache, filled_qty_cache);
1012        }
1013    }
1014
1015    Ok(order_reports)
1016}
1017
1018/// Updates fee and fill caches from a raw OKX order message.
1019///
1020/// Call after parsing each message so subsequent messages in the same batch
1021/// see the correct cumulative fee and filled quantity.
1022pub fn update_fee_fill_caches(
1023    msg: &OKXOrderMsg,
1024    instrument: &InstrumentAny,
1025    fee_cache: &mut AHashMap<Ustr, Money>,
1026    filled_qty_cache: &mut AHashMap<Ustr, Quantity>,
1027) {
1028    if let Some(ref fee_str) = msg.fee
1029        && !fee_str.is_empty()
1030    {
1031        let fee_dec = Decimal::from_str(fee_str).unwrap_or_default();
1032        let fee_ccy = parse_fee_currency(msg.fee_ccy.as_str(), fee_dec, || {
1033            format!("update_fee_fill_caches ord_id={}", msg.ord_id)
1034        });
1035
1036        if let Ok(total_fee) = crate::common::parse::parse_fee(Some(fee_str.as_str()), fee_ccy) {
1037            fee_cache.insert(msg.ord_id, total_fee);
1038        }
1039    }
1040
1041    if let Some(ref acc_fill_sz) = msg.acc_fill_sz
1042        && !acc_fill_sz.is_empty()
1043        && acc_fill_sz != "0"
1044        && let Ok(qty) = parse_quantity(acc_fill_sz, instrument.size_precision())
1045    {
1046        filled_qty_cache.insert(msg.ord_id, qty);
1047    }
1048}
1049
1050/// Checks if acc_fill_sz has increased compared to the previous filled quantity.
1051fn has_acc_fill_sz_increased(
1052    acc_fill_sz: &Option<String>,
1053    previous_filled_qty: Option<Quantity>,
1054    size_precision: u8,
1055) -> bool {
1056    if let Some(acc_str) = acc_fill_sz {
1057        if acc_str.is_empty() || acc_str == "0" {
1058            return false;
1059        }
1060
1061        if let Ok(current_filled) = parse_quantity(acc_str, size_precision) {
1062            if let Some(prev_qty) = previous_filled_qty {
1063                return current_filled > prev_qty;
1064            }
1065            return !current_filled.is_zero();
1066        }
1067    }
1068    false
1069}
1070
1071/// Parses a single OKX order message into an [`ExecutionReport`].
1072///
1073/// # Errors
1074///
1075/// Returns an error if the instrument cannot be found or if parsing the
1076/// underlying order payload fails.
1077pub fn parse_order_msg(
1078    msg: &OKXOrderMsg,
1079    account_id: AccountId,
1080    instruments: &AHashMap<Ustr, InstrumentAny>,
1081    fee_cache: &AHashMap<Ustr, Money>,
1082    filled_qty_cache: &AHashMap<Ustr, Quantity>,
1083    ts_init: UnixNanos,
1084) -> anyhow::Result<ExecutionReport> {
1085    let instrument = instruments
1086        .get(&msg.inst_id)
1087        .ok_or_else(|| anyhow::anyhow!("No instrument found for inst_id: {}", msg.inst_id))?;
1088
1089    let previous_fee = fee_cache.get(&msg.ord_id).copied();
1090    let previous_filled_qty = filled_qty_cache.get(&msg.ord_id).copied();
1091
1092    let has_new_fill = (!msg.fill_sz.is_empty() && msg.fill_sz != "0")
1093        || !msg.trade_id.is_empty()
1094        || has_acc_fill_sz_increased(
1095            &msg.acc_fill_sz,
1096            previous_filled_qty,
1097            instrument.size_precision(),
1098        );
1099
1100    match msg.state {
1101        OKXOrderStatus::Filled | OKXOrderStatus::PartiallyFilled if has_new_fill => {
1102            match parse_fill_report(
1103                msg,
1104                instrument,
1105                account_id,
1106                previous_fee,
1107                previous_filled_qty,
1108                ts_init,
1109            )? {
1110                Some(report) => Ok(ExecutionReport::Fill(report)),
1111                None => parse_order_status_report(msg, instrument, account_id, ts_init)
1112                    .map(ExecutionReport::Order),
1113            }
1114        }
1115        _ => parse_order_status_report(msg, instrument, account_id, ts_init)
1116            .map(ExecutionReport::Order),
1117    }
1118}
1119
1120/// Parses an OKX algo order message into a Nautilus execution report.
1121///
1122/// # Errors
1123///
1124/// Returns an error if the instrument cannot be found or if message fields
1125/// fail to parse.
1126pub fn parse_algo_order_msg(
1127    msg: &OKXAlgoOrderMsg,
1128    account_id: AccountId,
1129    instruments: &AHashMap<Ustr, InstrumentAny>,
1130    ts_init: UnixNanos,
1131) -> anyhow::Result<Option<ExecutionReport>> {
1132    // Skip unsupported advance algo types (iceberg, twap)
1133    if matches!(
1134        msg.ord_type,
1135        OKXAlgoOrderType::Iceberg | OKXAlgoOrderType::Twap
1136    ) {
1137        log::debug!("Skipping unsupported algo order type: {:?}", msg.ord_type);
1138        return Ok(None);
1139    }
1140
1141    let inst = instruments
1142        .get(&msg.inst_id)
1143        .ok_or_else(|| anyhow::anyhow!("No instrument found for inst_id: {}", msg.inst_id))?;
1144
1145    parse_algo_order_status_report(msg, inst, account_id, ts_init)
1146        .map(ExecutionReport::Order)
1147        .map(Some)
1148}
1149
1150/// Parses an OKX algo order message into a Nautilus order status report.
1151///
1152/// # Errors
1153///
1154/// Returns an error if any order identifiers or numeric fields cannot be
1155/// parsed.
1156pub fn parse_algo_order_status_report(
1157    msg: &OKXAlgoOrderMsg,
1158    instrument: &InstrumentAny,
1159    account_id: AccountId,
1160    ts_init: UnixNanos,
1161) -> anyhow::Result<OrderStatusReport> {
1162    // For algo orders, use algo_cl_ord_id if cl_ord_id is empty
1163    let client_order_id = if msg.cl_ord_id.is_empty() {
1164        parse_client_order_id(&msg.algo_cl_ord_id)
1165    } else {
1166        parse_client_order_id(&msg.cl_ord_id)
1167    };
1168
1169    // For algo orders that haven't triggered, ord_id will be empty, use algo_id instead
1170    let venue_order_id = if msg.ord_id.is_empty() {
1171        VenueOrderId::new(msg.algo_id.as_str())
1172    } else {
1173        VenueOrderId::new(msg.ord_id.as_str())
1174    };
1175
1176    let order_side: OrderSide = msg.side.into();
1177
1178    let algo_fields = parse_algo_order_fields(msg)?;
1179
1180    let status: OrderStatus = msg.state.into();
1181
1182    let quantity = parse_algo_order_quantity(msg, instrument)?;
1183
1184    // For algo orders, actual_sz represents filled quantity (if any)
1185    let filled_qty = if msg.actual_sz.is_empty() || msg.actual_sz == "0" {
1186        Quantity::zero(instrument.size_precision())
1187    } else {
1188        parse_quantity(msg.actual_sz.as_str(), instrument.size_precision())?
1189    };
1190
1191    // Parse limit price if it exists (not -1)
1192    let price = if is_market_price(algo_fields.ord_px) {
1193        None
1194    } else {
1195        Some(parse_price(
1196            algo_fields.ord_px,
1197            instrument.price_precision(),
1198        )?)
1199    };
1200
1201    let trigger_type = match algo_fields.trigger_px_type {
1202        OKXTriggerType::Last => TriggerType::LastPrice,
1203        OKXTriggerType::Mark => TriggerType::MarkPrice,
1204        OKXTriggerType::Index => TriggerType::IndexPrice,
1205        OKXTriggerType::None => TriggerType::Default,
1206    };
1207
1208    let ts_accepted = parse_millisecond_timestamp(msg.c_time);
1209    let ts_last = parse_millisecond_timestamp(msg.u_time);
1210
1211    let mut report = OrderStatusReport::new(
1212        account_id,
1213        instrument.id(),
1214        client_order_id,
1215        venue_order_id,
1216        order_side,
1217        algo_fields.order_type,
1218        TimeInForce::Gtc,
1219        status,
1220        quantity,
1221        filled_qty,
1222        ts_accepted,
1223        ts_last,
1224        ts_init,
1225        None,
1226    );
1227
1228    if !algo_fields.trigger_px.is_empty() {
1229        report.trigger_price = Some(parse_price(
1230            algo_fields.trigger_px,
1231            instrument.price_precision(),
1232        )?);
1233    }
1234
1235    report.trigger_type = Some(trigger_type);
1236
1237    if let Some(limit_price) = price {
1238        report.price = Some(limit_price);
1239    }
1240
1241    if algo_fields.order_type == OrderType::TrailingStopMarket {
1242        if !msg.callback_ratio.is_empty() {
1243            // OKX ratio is e.g. "0.01" for 1%, convert to basis points
1244            let ratio = Decimal::from_str(&msg.callback_ratio)?;
1245            report.trailing_offset = Some(ratio * Decimal::new(10_000, 0));
1246            report.trailing_offset_type = TrailingOffsetType::BasisPoints;
1247        } else if !msg.callback_spread.is_empty() {
1248            report.trailing_offset = Some(Decimal::from_str(&msg.callback_spread)?);
1249            report.trailing_offset_type = TrailingOffsetType::Price;
1250        }
1251    }
1252
1253    if msg.reduce_only == "true" {
1254        report = report.with_reduce_only(true);
1255    }
1256
1257    Ok(report)
1258}
1259
1260struct AlgoOrderFields<'a> {
1261    order_type: OrderType,
1262    trigger_px: &'a str,
1263    trigger_px_type: OKXTriggerType,
1264    ord_px: &'a str,
1265}
1266
1267fn parse_algo_order_fields(msg: &OKXAlgoOrderMsg) -> anyhow::Result<AlgoOrderFields<'_>> {
1268    match msg.ord_type {
1269        OKXAlgoOrderType::MoveOrderStop => Ok(AlgoOrderFields {
1270            order_type: OrderType::TrailingStopMarket,
1271            trigger_px: msg.trigger_px.as_str(),
1272            trigger_px_type: msg.trigger_px_type,
1273            ord_px: msg.ord_px.as_str(),
1274        }),
1275        OKXAlgoOrderType::Conditional | OKXAlgoOrderType::Oco => {
1276            if msg.tp_trigger_px.is_empty() {
1277                let (trigger_px, trigger_px_type, ord_px) = if msg.sl_trigger_px.is_empty() {
1278                    (
1279                        msg.trigger_px.as_str(),
1280                        msg.trigger_px_type,
1281                        msg.ord_px.as_str(),
1282                    )
1283                } else {
1284                    (
1285                        msg.sl_trigger_px.as_str(),
1286                        msg.sl_trigger_px_type,
1287                        msg.sl_ord_px.as_str(),
1288                    )
1289                };
1290
1291                Ok(AlgoOrderFields {
1292                    order_type: if is_market_price(ord_px) {
1293                        OrderType::StopMarket
1294                    } else {
1295                        OrderType::StopLimit
1296                    },
1297                    trigger_px,
1298                    trigger_px_type,
1299                    ord_px,
1300                })
1301            } else {
1302                let ord_px = msg.tp_ord_px.as_str();
1303                Ok(AlgoOrderFields {
1304                    order_type: if is_market_price(ord_px) {
1305                        OrderType::MarketIfTouched
1306                    } else {
1307                        OrderType::LimitIfTouched
1308                    },
1309                    trigger_px: msg.tp_trigger_px.as_str(),
1310                    trigger_px_type: msg.tp_trigger_px_type,
1311                    ord_px,
1312                })
1313            }
1314        }
1315        OKXAlgoOrderType::Trigger => Ok(AlgoOrderFields {
1316            order_type: if is_market_price(&msg.ord_px) {
1317                OrderType::StopMarket
1318            } else {
1319                OrderType::StopLimit
1320            },
1321            trigger_px: msg.trigger_px.as_str(),
1322            trigger_px_type: msg.trigger_px_type,
1323            ord_px: msg.ord_px.as_str(),
1324        }),
1325        _ => anyhow::bail!("Unsupported algo order type: {:?}", msg.ord_type),
1326    }
1327}
1328
1329fn parse_algo_order_quantity(
1330    msg: &OKXAlgoOrderMsg,
1331    instrument: &InstrumentAny,
1332) -> anyhow::Result<Quantity> {
1333    if !msg.sz.is_empty() {
1334        return parse_quantity(msg.sz.as_str(), instrument.size_precision());
1335    }
1336
1337    if !msg.close_fraction.is_empty()
1338        || !msg.sl_trigger_px.is_empty()
1339        || !msg.tp_trigger_px.is_empty()
1340    {
1341        return Ok(Quantity::zero(instrument.size_precision()));
1342    }
1343
1344    anyhow::bail!("Missing sz for algo order {}", msg.algo_id)
1345}
1346
1347/// Parses an OKX order message into a Nautilus order status report.
1348///
1349/// # Errors
1350///
1351/// Returns an error if order metadata or numeric values cannot be parsed.
1352pub fn parse_order_status_report(
1353    msg: &OKXOrderMsg,
1354    instrument: &InstrumentAny,
1355    account_id: AccountId,
1356    ts_init: UnixNanos,
1357) -> anyhow::Result<OrderStatusReport> {
1358    // For triggered algo child orders, OKX assigns a new cl_ord_id and keeps
1359    // the parent's ID in algo_cl_ord_id. Prefer the parent ID so the report
1360    // matches the tracked order in Nautilus.
1361    let client_order_id = msg
1362        .algo_cl_ord_id
1363        .as_deref()
1364        .and_then(parse_client_order_id)
1365        .or_else(|| parse_client_order_id(&msg.cl_ord_id));
1366    let venue_order_id = VenueOrderId::new(msg.ord_id);
1367    let order_side: OrderSide = msg.side.into();
1368
1369    let okx_order_type = msg.ord_type;
1370
1371    // Determine order type based on presence of limit price for certain OKX order types
1372    let order_type = match okx_order_type {
1373        OKXOrderType::Trigger => {
1374            if is_market_price(&msg.px) {
1375                OrderType::StopMarket
1376            } else {
1377                OrderType::StopLimit
1378            }
1379        }
1380        OKXOrderType::Fok | OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => {
1381            determine_order_type_with_alt(
1382                okx_order_type,
1383                &msg.px,
1384                msg.px_vol.as_deref().unwrap_or(""),
1385                msg.px_usd.as_deref().unwrap_or(""),
1386            )
1387        }
1388        _ => msg.ord_type.into(),
1389    };
1390    let order_status: OrderStatus = msg.state.into();
1391
1392    let time_in_force = match okx_order_type {
1393        OKXOrderType::Fok | OKXOrderType::OpFok => TimeInForce::Fok,
1394        OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => TimeInForce::Ioc,
1395        _ => TimeInForce::Gtc,
1396    };
1397
1398    let size_precision = instrument.size_precision();
1399
1400    // Parse quantities based on target currency
1401    // OKX always returns acc_fill_sz in base currency, but sz depends on tgt_ccy
1402
1403    // Determine if this is a quote-quantity order
1404    // Method 1: Explicit tgt_ccy field set to QuoteCcy
1405    let is_quote_qty_explicit = msg.tgt_ccy == Some(OKXTargetCurrency::QuoteCcy);
1406
1407    // Method 2: Use OKX defaults when tgt_ccy is None (old orders or missing field)
1408    // OKX API defaults for SPOT market orders: BUY orders use quote_ccy, SELL orders use base_ccy
1409    // Note: tgtCcy only applies to SPOT market orders (not limit orders)
1410    // For limit orders, sz is always in base currency regardless of side
1411    let is_quote_qty_heuristic = msg.tgt_ccy.is_none()
1412        && (msg.inst_type == OKXInstrumentType::Spot || msg.inst_type == OKXInstrumentType::Margin)
1413        && msg.side == OKXSide::Buy
1414        && order_type == OrderType::Market;
1415
1416    let (quantity, filled_qty) = if is_quote_qty_explicit || is_quote_qty_heuristic {
1417        // Quote-quantity order: sz is in quote currency, need to convert to base
1418        let sz_quote_dec = Decimal::from_str(&msg.sz).map_err(|e| {
1419            anyhow::anyhow!("Failed to parse sz='{}' as quote quantity: {}", msg.sz, e)
1420        })?;
1421
1422        // Determine the price to use for conversion
1423        // Priority: 1) limit price (px) for limit orders, 2) avg_px for market orders
1424        let conversion_price_dec =
1425            if !is_market_price(&msg.px) {
1426                // Limit order: use the limit price (msg.px)
1427                Some(
1428                    Decimal::from_str(&msg.px)
1429                        .map_err(|e| anyhow::anyhow!("Failed to parse px='{}': {}", msg.px, e))?,
1430                )
1431            } else if !msg.avg_px.is_empty() && msg.avg_px != "0" {
1432                // Market order with fills: use average fill price
1433                Some(Decimal::from_str(&msg.avg_px).map_err(|e| {
1434                    anyhow::anyhow!("Failed to parse avg_px='{}': {}", msg.avg_px, e)
1435                })?)
1436            } else {
1437                None
1438            };
1439
1440        // Convert quote quantity to base: quantity_base = sz_quote / price
1441        let quantity_base = if let Some(price) = conversion_price_dec {
1442            if price.is_zero() {
1443                parse_quantity(&msg.sz, size_precision)?
1444            } else {
1445                Quantity::from_decimal_dp(sz_quote_dec / price, size_precision)?
1446            }
1447        } else {
1448            // No price available, can't convert - use sz as-is temporarily
1449            // This will be corrected once the order gets filled and price is available
1450            parse_quantity(&msg.sz, size_precision)?
1451        };
1452
1453        let filled_qty =
1454            parse_quantity(&msg.acc_fill_sz.clone().unwrap_or_default(), size_precision)?;
1455
1456        (quantity_base, filled_qty)
1457    } else {
1458        // Base-quantity order: both sz and acc_fill_sz are in base currency
1459        let quantity = parse_quantity(&msg.sz, size_precision)?;
1460        let filled_qty =
1461            parse_quantity(&msg.acc_fill_sz.clone().unwrap_or_default(), size_precision)?;
1462
1463        (quantity, filled_qty)
1464    };
1465
1466    // For quote-quantity orders marked as FILLED, adjust quantity to match filled_qty
1467    // to avoid precision mismatches from quote-to-base conversion
1468    let (quantity, filled_qty) = if (is_quote_qty_explicit || is_quote_qty_heuristic)
1469        && msg.state == OKXOrderStatus::Filled
1470        && filled_qty.is_positive()
1471    {
1472        (filled_qty, filled_qty)
1473    } else {
1474        (quantity, filled_qty)
1475    };
1476
1477    let ts_accepted = parse_millisecond_timestamp(msg.c_time);
1478    let ts_last = parse_millisecond_timestamp(msg.u_time);
1479
1480    let is_liquidation = matches!(
1481        msg.category,
1482        OKXOrderCategory::FullLiquidation | OKXOrderCategory::PartialLiquidation
1483    );
1484
1485    let is_adl = msg.category == OKXOrderCategory::Adl;
1486
1487    if is_liquidation {
1488        log::warn!(
1489            "Liquidation order status update: order_id={}, category={:?}, inst_id={}, state={:?}",
1490            msg.ord_id.as_str(),
1491            msg.category,
1492            msg.inst_id.as_str(),
1493            msg.state,
1494        );
1495    }
1496
1497    if is_adl {
1498        log::warn!(
1499            "ADL (Auto-Deleveraging) order status update: order_id={}, inst_id={}, state={:?}",
1500            msg.ord_id.as_str(),
1501            msg.inst_id.as_str(),
1502            msg.state,
1503        );
1504    }
1505
1506    let mut report = OrderStatusReport::new(
1507        account_id,
1508        instrument.id(),
1509        client_order_id,
1510        venue_order_id,
1511        order_side,
1512        order_type,
1513        time_in_force,
1514        order_status,
1515        quantity,
1516        filled_qty,
1517        ts_accepted,
1518        ts_last,
1519        ts_init,
1520        None, // Generate UUID4 automatically
1521    );
1522
1523    let price_precision = instrument.price_precision();
1524
1525    if okx_order_type == OKXOrderType::Trigger {
1526        // For triggered orders coming through regular orders channel,
1527        // set the price if it's a stop-limit order
1528        if !is_market_price(&msg.px)
1529            && let Ok(price) = parse_price(&msg.px, price_precision)
1530        {
1531            report = report.with_price(price);
1532        }
1533    } else {
1534        // For regular orders, use px field
1535        if !is_market_price(&msg.px)
1536            && let Ok(price) = parse_price(&msg.px, price_precision)
1537        {
1538            report = report.with_price(price);
1539        }
1540    }
1541
1542    if !msg.avg_px.is_empty()
1543        && let Ok(decimal) = Decimal::from_str(&msg.avg_px)
1544    {
1545        report.avg_px = Some(decimal);
1546    }
1547
1548    if matches!(
1549        msg.ord_type,
1550        OKXOrderType::PostOnly | OKXOrderType::MmpAndPostOnly
1551    ) || matches!(
1552        msg.cancel_source.as_deref(),
1553        Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
1554    ) || matches!(
1555        msg.cancel_source_reason.as_deref(),
1556        Some(reason) if reason.contains("POST_ONLY")
1557    ) {
1558        report = report.with_post_only(true);
1559    }
1560
1561    if msg.reduce_only == "true" {
1562        report = report.with_reduce_only(true);
1563    }
1564
1565    let mut linked_ids = Vec::new();
1566
1567    if let Some(algo_cl_ord_id) = msg
1568        .algo_cl_ord_id
1569        .as_ref()
1570        .filter(|value| !value.is_empty())
1571    {
1572        let algo_client_id = ClientOrderId::new(algo_cl_ord_id.as_str());
1573        if report.client_order_id != Some(algo_client_id) {
1574            linked_ids.push(algo_client_id);
1575        }
1576    }
1577
1578    if let Some(attach_algo_cl_ord_id) = msg
1579        .attach_algo_cl_ord_id
1580        .as_ref()
1581        .filter(|value| !value.is_empty())
1582    {
1583        let attach_client_id = ClientOrderId::new(attach_algo_cl_ord_id.as_str());
1584        if report.client_order_id != Some(attach_client_id)
1585            && !linked_ids.contains(&attach_client_id)
1586        {
1587            linked_ids.push(attach_client_id);
1588        }
1589    }
1590
1591    for attach_algo in &msg.attach_algo_ords {
1592        if attach_algo.attach_algo_cl_ord_id.is_empty() {
1593            continue;
1594        }
1595
1596        let attach_client_id = ClientOrderId::new(attach_algo.attach_algo_cl_ord_id.as_str());
1597        if report.client_order_id != Some(attach_client_id)
1598            && !linked_ids.contains(&attach_client_id)
1599        {
1600            linked_ids.push(attach_client_id);
1601        }
1602    }
1603
1604    if !linked_ids.is_empty() {
1605        report = report.with_linked_order_ids(linked_ids);
1606    }
1607
1608    if let Some(reason) = msg
1609        .cancel_source_reason
1610        .as_ref()
1611        .filter(|reason| !reason.is_empty())
1612    {
1613        report = report.with_cancel_reason(reason.clone());
1614    } else if let Some(source) = msg
1615        .cancel_source
1616        .as_ref()
1617        .filter(|source| !source.is_empty())
1618    {
1619        let reason = if source == OKX_POST_ONLY_CANCEL_SOURCE {
1620            OKX_POST_ONLY_CANCEL_REASON.to_string()
1621        } else {
1622            format!("cancel_source={source}")
1623        };
1624        report = report.with_cancel_reason(reason);
1625    }
1626
1627    Ok(report)
1628}
1629
1630/// Parses an OKX order message into a Nautilus fill report.
1631///
1632/// # Errors
1633///
1634/// Returns an error if order quantities, prices, or fees cannot be parsed.
1635pub fn parse_fill_report(
1636    msg: &OKXOrderMsg,
1637    instrument: &InstrumentAny,
1638    account_id: AccountId,
1639    previous_fee: Option<Money>,
1640    previous_filled_qty: Option<Quantity>,
1641    ts_init: UnixNanos,
1642) -> anyhow::Result<Option<FillReport>> {
1643    // For triggered algo child orders, prefer the parent algo_cl_ord_id
1644    let client_order_id = msg
1645        .algo_cl_ord_id
1646        .as_deref()
1647        .and_then(parse_client_order_id)
1648        .or_else(|| parse_client_order_id(&msg.cl_ord_id));
1649    let venue_order_id = VenueOrderId::new(msg.ord_id);
1650
1651    // OKX may not provide a `trade_id` (some algo trigger payloads, manual
1652    // settlements). Derive a deterministic id from the immutable fill fields
1653    // via FNV-1a so reconnect replays of the same fill collapse to one event
1654    // in the downstream `WsDispatchState::check_and_insert_trade` dedup. A
1655    // random UUID4 here would defeat dedup, since each replay would mint a
1656    // new id. The hash output keeps the synthesized id within `TradeId`'s
1657    // 36-character cap.
1658    let trade_id = if msg.trade_id.is_empty() {
1659        let synthetic = synthesize_trade_id(msg);
1660        TradeId::new(&synthetic)
1661    } else {
1662        TradeId::new(&msg.trade_id)
1663    };
1664
1665    let order_side: OrderSide = msg.side.into();
1666
1667    let price_precision = instrument.price_precision();
1668    let size_precision = instrument.size_precision();
1669
1670    let price_str = if !msg.fill_px.is_empty() {
1671        &msg.fill_px
1672    } else if !msg.avg_px.is_empty() {
1673        &msg.avg_px
1674    } else {
1675        &msg.px
1676    };
1677    let last_px = parse_price(price_str, price_precision).map_err(|e| {
1678        anyhow::anyhow!(
1679            "Failed to parse price (fill_px='{}', avg_px='{}', px='{}'): {}",
1680            msg.fill_px,
1681            msg.avg_px,
1682            msg.px,
1683            e
1684        )
1685    })?;
1686
1687    // OKX provides fillSz (incremental fill) or accFillSz (cumulative total)
1688    // If fillSz is provided, use it directly as the incremental fill quantity
1689    let last_qty = if !msg.fill_sz.is_empty() && msg.fill_sz != "0" {
1690        parse_quantity(&msg.fill_sz, size_precision)
1691            .map_err(|e| anyhow::anyhow!("Failed to parse fill_sz='{}': {e}", msg.fill_sz,))?
1692    } else if let Some(ref acc_fill_sz) = msg.acc_fill_sz {
1693        // If fillSz is missing but accFillSz is available, calculate incremental fill
1694        if !acc_fill_sz.is_empty() && acc_fill_sz != "0" {
1695            let current_filled = parse_quantity(acc_fill_sz, size_precision).map_err(|e| {
1696                anyhow::anyhow!("Failed to parse acc_fill_sz='{acc_fill_sz}': {e}",)
1697            })?;
1698
1699            // Calculate incremental fill as: current_total - previous_total
1700            if let Some(prev_qty) = previous_filled_qty {
1701                if current_filled < prev_qty {
1702                    anyhow::bail!(
1703                        "Cumulative fill went backwards: acc_fill_sz='{acc_fill_sz}' < previous_filled_qty={prev_qty} \
1704                         (possible stale data after reconnect)"
1705                    );
1706                }
1707                let incremental = current_filled - prev_qty;
1708                if incremental.is_zero() {
1709                    log::debug!(
1710                        "Skipping duplicate fill: acc_fill_sz='{acc_fill_sz}' unchanged from previous={prev_qty}"
1711                    );
1712                    return Ok(None);
1713                }
1714                incremental
1715            } else {
1716                // First fill, use accumulated as incremental
1717                current_filled
1718            }
1719        } else {
1720            anyhow::bail!(
1721                "Cannot determine fill quantity: fill_sz is empty/zero and acc_fill_sz is empty/zero"
1722            );
1723        }
1724    } else {
1725        anyhow::bail!(
1726            "Cannot determine fill quantity: fill_sz='{}' and acc_fill_sz is None",
1727            msg.fill_sz
1728        );
1729    };
1730
1731    let fee_str = msg.fee.as_deref().unwrap_or("0");
1732    let fee_dec = Decimal::from_str(fee_str)
1733        .map_err(|e| anyhow::anyhow!("Failed to parse fee '{fee_str}': {e}"))?;
1734
1735    let fee_currency = parse_fee_currency(msg.fee_ccy.as_str(), fee_dec, || {
1736        format!("fill report for inst_id={}", msg.inst_id)
1737    });
1738
1739    // OKX sends fees as negative numbers (e.g., "-2.5" for a $2.5 charge), parse_fee negates to positive
1740    let total_fee = parse_fee(msg.fee.as_deref(), fee_currency)
1741        .map_err(|e| anyhow::anyhow!("Failed to parse fee={:?}: {}", msg.fee, e))?;
1742
1743    // OKX sends cumulative fees, so we subtract the previous total to get this fill's fee
1744    let commission = if let Some(previous_fee) = previous_fee {
1745        if total_fee.currency == previous_fee.currency {
1746            let incremental = total_fee - previous_fee;
1747
1748            if incremental < Money::zero(fee_currency) {
1749                log::debug!(
1750                    "Negative incremental fee detected - likely a maker rebate or fee refund: order_id={}, total_fee={}, previous_fee={}, incremental={}",
1751                    msg.ord_id.as_str(),
1752                    total_fee,
1753                    previous_fee,
1754                    incremental,
1755                );
1756            }
1757
1758            // Skip corruption check when previous is negative (rebate), as transitions from
1759            // rebate to charge legitimately have incremental > total (e.g., -1 → +2 gives +3)
1760            if previous_fee >= Money::zero(fee_currency)
1761                && total_fee > Money::zero(fee_currency)
1762                && incremental > total_fee
1763            {
1764                log::error!(
1765                    "Incremental fee exceeds total fee - likely fee cache corruption, using total fee as fallback: order_id={}, total_fee={}, previous_fee={}, incremental={}",
1766                    msg.ord_id.as_str(),
1767                    total_fee,
1768                    previous_fee,
1769                    incremental,
1770                );
1771                total_fee
1772            } else {
1773                incremental
1774            }
1775        } else {
1776            log::warn!(
1777                "Fee currency changed from {} to {} for order_id={}, using total fee as commission",
1778                previous_fee.currency.code,
1779                total_fee.currency.code,
1780                msg.ord_id.as_str(),
1781            );
1782            total_fee
1783        }
1784    } else {
1785        total_fee
1786    };
1787
1788    let liquidity_side: LiquiditySide = msg.exec_type.into();
1789    let ts_event = parse_millisecond_timestamp(msg.fill_time);
1790
1791    let is_liquidation = matches!(
1792        msg.category,
1793        OKXOrderCategory::FullLiquidation | OKXOrderCategory::PartialLiquidation
1794    );
1795
1796    let is_adl = msg.category == OKXOrderCategory::Adl;
1797
1798    if is_liquidation {
1799        log::warn!(
1800            "Liquidation order detected: order_id={}, category={:?}, inst_id={}, side={:?}, fill_sz={}, fill_px={}",
1801            msg.ord_id.as_str(),
1802            msg.category,
1803            msg.inst_id.as_str(),
1804            msg.side,
1805            msg.fill_sz,
1806            msg.fill_px,
1807        );
1808    }
1809
1810    if is_adl {
1811        log::warn!(
1812            "ADL (Auto-Deleveraging) order detected: order_id={}, inst_id={}, side={:?}, fill_sz={}, fill_px={}",
1813            msg.ord_id.as_str(),
1814            msg.inst_id.as_str(),
1815            msg.side,
1816            msg.fill_sz,
1817            msg.fill_px,
1818        );
1819    }
1820
1821    let report = FillReport::new(
1822        account_id,
1823        instrument.id(),
1824        venue_order_id,
1825        trade_id,
1826        order_side,
1827        last_qty,
1828        last_px,
1829        commission,
1830        liquidity_side,
1831        client_order_id,
1832        None,
1833        ts_event,
1834        ts_init,
1835        None, // Generate UUID4 automatically
1836    );
1837
1838    Ok(Some(report))
1839}
1840
1841/// Parses an option summary payload into [`OptionGreeks`].
1842///
1843/// Selects Black-Scholes (`delta_bs`, `gamma_bs`, `vega_bs`, `theta_bs`) or
1844/// price-adjusted (`delta`, `gamma`, `vega`, `theta`) greeks based on `greeks_type`.
1845/// BS greeks align with what Deribit and Bybit provide; PA greeks are denominated
1846/// in the underlying/coin units and match OKX's native contract convention.
1847///
1848/// # Errors
1849///
1850/// Returns an error if any of the greeks or volatility fields cannot be parsed as f64.
1851pub fn parse_option_summary_greeks(
1852    msg: &OKXOptionSummaryMsg,
1853    instrument_id: &InstrumentId,
1854    greeks_type: OKXGreeksType,
1855    ts_init: UnixNanos,
1856) -> anyhow::Result<OptionGreeks> {
1857    let ts_event = UnixNanos::from(msg.ts * 1_000_000);
1858
1859    let (delta_s, gamma_s, vega_s, theta_s, delta_ctx, gamma_ctx, vega_ctx, theta_ctx) =
1860        match greeks_type {
1861            OKXGreeksType::Bs => (
1862                &msg.delta_bs,
1863                &msg.gamma_bs,
1864                &msg.vega_bs,
1865                &msg.theta_bs,
1866                "invalid delta_bs",
1867                "invalid gamma_bs",
1868                "invalid vega_bs",
1869                "invalid theta_bs",
1870            ),
1871            OKXGreeksType::Pa => (
1872                &msg.delta,
1873                &msg.gamma,
1874                &msg.vega,
1875                &msg.theta,
1876                "invalid delta (pa)",
1877                "invalid gamma (pa)",
1878                "invalid vega (pa)",
1879                "invalid theta (pa)",
1880            ),
1881        };
1882
1883    let delta: f64 = delta_s.parse().context(delta_ctx)?;
1884    let gamma: f64 = gamma_s.parse().context(gamma_ctx)?;
1885    let vega: f64 = vega_s.parse().context(vega_ctx)?;
1886    let theta: f64 = theta_s.parse().context(theta_ctx)?;
1887
1888    let bid_iv: f64 = msg.bid_vol.parse().context("invalid bid_vol")?;
1889    let ask_iv: f64 = msg.ask_vol.parse().context("invalid ask_vol")?;
1890    let mark_iv: f64 = msg.mark_vol.parse().context("invalid mark_vol")?;
1891
1892    let underlying_price = msg
1893        .fwd_px
1894        .as_deref()
1895        .filter(|s| !s.is_empty())
1896        .map(|s| s.parse::<f64>())
1897        .transpose()
1898        .context("invalid fwd_px")?;
1899
1900    Ok(OptionGreeks {
1901        instrument_id: *instrument_id,
1902        convention: greeks_type.into(),
1903        greeks: OptionGreekValues {
1904            delta,
1905            gamma,
1906            vega,
1907            theta,
1908            rho: 0.0, // OKX does not provide rho
1909        },
1910        mark_iv: Some(mark_iv),
1911        bid_iv: Some(bid_iv),
1912        ask_iv: Some(ask_iv),
1913        underlying_price,
1914        open_interest: None,
1915        ts_event,
1916        ts_init,
1917    })
1918}
1919
1920/// Parses OKX WebSocket message payloads into Nautilus data structures.
1921///
1922/// # Errors
1923///
1924/// Returns an error if the payload cannot be deserialized or if downstream
1925/// parsing routines fail.
1926///
1927/// # Panics
1928///
1929/// Panics only in the case where `okx_channel_to_bar_spec(channel)` returns
1930/// `None` after a prior `is_some` check – an unreachable scenario indicating a
1931/// logic error.
1932#[expect(clippy::too_many_arguments)]
1933pub fn parse_ws_message_data(
1934    channel: &OKXWsChannel,
1935    data: serde_json::Value,
1936    instrument_id: &InstrumentId,
1937    price_precision: u8,
1938    size_precision: u8,
1939    ts_init: UnixNanos,
1940    funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
1941    instruments_cache: &AHashMap<Ustr, InstrumentAny>,
1942) -> anyhow::Result<Option<NautilusWsMessage>> {
1943    match channel {
1944        OKXWsChannel::Instruments => {
1945            if let Ok(msg) = serde_json::from_value::<OKXInstrument>(data) {
1946                // Look up cached instrument to extract existing fees
1947                let (margin_init, margin_maint, maker_fee, taker_fee) =
1948                    instruments_cache.get(&Ustr::from(&msg.inst_id)).map_or(
1949                        (None, None, None, None),
1950                        extract_fees_from_cached_instrument,
1951                    );
1952
1953                let status_action = okx_status_to_market_action(msg.state);
1954
1955                match parse_instrument_any(
1956                    &msg,
1957                    margin_init,
1958                    margin_maint,
1959                    maker_fee,
1960                    taker_fee,
1961                    ts_init,
1962                )? {
1963                    Some(inst_any) => {
1964                        let status = InstrumentStatus::new(
1965                            inst_any.id(),
1966                            status_action,
1967                            ts_init,
1968                            ts_init,
1969                            None,
1970                            None,
1971                            Some(matches!(msg.state, OKXInstrumentStatus::Live)),
1972                            None,
1973                            None,
1974                        );
1975                        Ok(Some(NautilusWsMessage::Instrument(
1976                            Box::new(inst_any),
1977                            Some(status),
1978                        )))
1979                    }
1980                    None => {
1981                        log::warn!("Empty instrument payload: {msg:?}");
1982                        Ok(None)
1983                    }
1984                }
1985            } else {
1986                anyhow::bail!("Failed to deserialize instrument payload")
1987            }
1988        }
1989        OKXWsChannel::BboTbt => {
1990            let data_vec = parse_quote_msg_vec(
1991                data,
1992                instrument_id,
1993                price_precision,
1994                size_precision,
1995                ts_init,
1996            )?;
1997            Ok(Some(NautilusWsMessage::Data(data_vec)))
1998        }
1999        OKXWsChannel::Tickers => {
2000            let data_vec = parse_ticker_msg_vec(
2001                data,
2002                instrument_id,
2003                price_precision,
2004                size_precision,
2005                ts_init,
2006            )?;
2007            Ok(Some(NautilusWsMessage::Data(data_vec)))
2008        }
2009        OKXWsChannel::Trades => {
2010            let data_vec = parse_trade_msg_vec(
2011                data,
2012                instrument_id,
2013                price_precision,
2014                size_precision,
2015                ts_init,
2016            )?;
2017            Ok(Some(NautilusWsMessage::Data(data_vec)))
2018        }
2019        OKXWsChannel::MarkPrice => {
2020            let data_vec = parse_mark_price_msg_vec(data, instrument_id, price_precision, ts_init)?;
2021            Ok(Some(NautilusWsMessage::Data(data_vec)))
2022        }
2023        OKXWsChannel::IndexTickers => {
2024            let data_vec =
2025                parse_index_price_msg_vec(data, instrument_id, price_precision, ts_init)?;
2026            Ok(Some(NautilusWsMessage::Data(data_vec)))
2027        }
2028        OKXWsChannel::FundingRate => {
2029            let data_vec = parse_funding_rate_msg_vec(data, instrument_id, ts_init, funding_cache)?;
2030            Ok(Some(NautilusWsMessage::FundingRates(data_vec)))
2031        }
2032        channel if okx_channel_to_bar_spec(channel).is_some() => {
2033            let bar_spec = okx_channel_to_bar_spec(channel).expect("bar_spec checked above");
2034            let data_vec = parse_candle_msg_vec(
2035                data,
2036                instrument_id,
2037                price_precision,
2038                size_precision,
2039                bar_spec,
2040                ts_init,
2041            )?;
2042            Ok(Some(NautilusWsMessage::Data(data_vec)))
2043        }
2044        OKXWsChannel::Books
2045        | OKXWsChannel::BooksTbt
2046        | OKXWsChannel::Books5
2047        | OKXWsChannel::Books50Tbt => {
2048            if let Ok(book_msgs) = serde_json::from_value::<Vec<OKXBookMsg>>(data) {
2049                let data_vec = parse_book10_msg_vec(
2050                    book_msgs,
2051                    instrument_id,
2052                    price_precision,
2053                    size_precision,
2054                    ts_init,
2055                )?;
2056                Ok(Some(NautilusWsMessage::Data(data_vec)))
2057            } else {
2058                anyhow::bail!("Failed to deserialize Books channel data as Vec<OKXBookMsg>")
2059            }
2060        }
2061        _ => {
2062            log::warn!("Unsupported channel for message parsing: {channel:?}");
2063            Ok(None)
2064        }
2065    }
2066}
2067
2068#[cfg(test)]
2069mod tests {
2070    use ahash::AHashMap;
2071    use nautilus_core::nanos::UnixNanos;
2072    use nautilus_model::{
2073        data::bar::BAR_SPEC_1_DAY_LAST,
2074        enums::GreeksConvention,
2075        identifiers::{ClientOrderId, Symbol},
2076        instruments::CryptoPerpetual,
2077        types::Currency,
2078    };
2079    use rstest::rstest;
2080    use rust_decimal::Decimal;
2081    use rust_decimal_macros::dec;
2082    use ustr::Ustr;
2083
2084    use super::*;
2085    use crate::{
2086        OKXPositionSide,
2087        common::{
2088            enums::{
2089                OKXExecType, OKXInstrumentType, OKXOrderType, OKXPriceType, OKXQuickMarginType,
2090                OKXSelfTradePreventionMode, OKXSide, OKXTradeMode,
2091            },
2092            parse::parse_account_state,
2093            testing::load_test_json,
2094        },
2095        http::models::OKXAccount,
2096        websocket::messages::{OKXAlgoOrderMsg, OKXAttachedAlgoOrd, OKXWebSocketArg, OKXWsFrame},
2097    };
2098
2099    fn create_stub_instrument() -> CryptoPerpetual {
2100        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2101        CryptoPerpetual::new(
2102            instrument_id,
2103            Symbol::from("BTC-USDT-SWAP"),
2104            Currency::BTC(),
2105            Currency::USDT(),
2106            Currency::USDT(),
2107            false,
2108            2,
2109            8,
2110            Price::from("0.01"),
2111            Quantity::from("0.00000001"),
2112            None,
2113            None,
2114            None,
2115            None,
2116            None,
2117            None,
2118            None,
2119            None,
2120            None,
2121            None,
2122            None,
2123            None,
2124            None,
2125            UnixNanos::default(),
2126            UnixNanos::default(),
2127        )
2128    }
2129
2130    fn create_stub_order_msg(
2131        fill_sz: &str,
2132        acc_fill_sz: Option<String>,
2133        order_id: &str,
2134        trade_id: &str,
2135    ) -> OKXOrderMsg {
2136        OKXOrderMsg {
2137            acc_fill_sz,
2138            algo_id: None,
2139            avg_px: "50000.0".to_string(),
2140            c_time: 1746947317401,
2141            cancel_source: None,
2142            cancel_source_reason: None,
2143            category: OKXOrderCategory::Normal,
2144            ccy: Ustr::from("USDT"),
2145            cl_ord_id: "test_order_1".to_string(),
2146            algo_cl_ord_id: None,
2147            attach_algo_cl_ord_id: None,
2148            attach_algo_ords: Vec::new(),
2149            fee: Some("-1.0".to_string()),
2150            fee_ccy: Ustr::from("USDT"),
2151            fill_fee: None,
2152            fill_fee_ccy: None,
2153            fill_mark_px: None,
2154            fill_mark_vol: None,
2155            fill_px_vol: None,
2156            fill_px_usd: None,
2157            fill_fwd_px: None,
2158            fill_notional_usd: None,
2159            fill_pnl: None,
2160            fill_px: "50000.0".to_string(),
2161            fill_sz: fill_sz.to_string(),
2162            fill_time: 1746947317402,
2163            inst_id: Ustr::from("BTC-USDT-SWAP"),
2164            inst_type: OKXInstrumentType::Swap,
2165            is_tp_limit: None,
2166            lever: "2.0".to_string(),
2167            linked_algo_ord: None,
2168            notional_usd: None,
2169            ord_id: Ustr::from(order_id),
2170            ord_type: OKXOrderType::Market,
2171            pnl: "0".to_string(),
2172            pos_side: OKXPositionSide::Long,
2173            px: String::new(),
2174            px_type: OKXPriceType::None,
2175            px_usd: None,
2176            px_vol: None,
2177            quick_mgn_type: OKXQuickMarginType::None,
2178            rebate: None,
2179            rebate_ccy: None,
2180            reduce_only: "false".to_string(),
2181            side: OKXSide::Buy,
2182            sl_ord_px: None,
2183            sl_trigger_px: None,
2184            sl_trigger_px_type: None,
2185            source: None,
2186            state: OKXOrderStatus::PartiallyFilled,
2187            stp_id: None,
2188            stp_mode: OKXSelfTradePreventionMode::None,
2189            exec_type: OKXExecType::Taker,
2190            sz: "0.03".to_string(),
2191            tag: None,
2192            td_mode: OKXTradeMode::Isolated,
2193            tgt_ccy: None,
2194            tp_ord_px: None,
2195            tp_trigger_px: None,
2196            tp_trigger_px_type: None,
2197            trade_id: trade_id.to_string(),
2198            u_time: 1746947317402,
2199            amend_result: None,
2200            req_id: None,
2201            code: None,
2202            msg: None,
2203        }
2204    }
2205
2206    #[rstest]
2207    fn test_parse_books_snapshot() {
2208        let json_data = load_test_json("ws_books_snapshot.json");
2209        let msg: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2210        let (okx_books, action): (Vec<OKXBookMsg>, OKXBookAction) = match msg {
2211            OKXWsFrame::BookData { data, action, .. } => (data, action),
2212            _ => panic!("Expected a `BookData` variant"),
2213        };
2214
2215        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2216        let deltas = parse_book_msg(
2217            &okx_books[0],
2218            instrument_id,
2219            2,
2220            1,
2221            &action,
2222            UnixNanos::default(),
2223        )
2224        .unwrap();
2225
2226        assert_eq!(deltas.instrument_id, instrument_id);
2227        assert_eq!(deltas.deltas.len(), 16);
2228        assert_eq!(deltas.flags, 32);
2229        assert_eq!(deltas.sequence, 123456);
2230        assert_eq!(deltas.ts_event, UnixNanos::from(1597026383085000000));
2231        assert_eq!(deltas.ts_init, UnixNanos::default());
2232
2233        // Verify some individual deltas are parsed correctly
2234        assert!(!deltas.deltas.is_empty());
2235        // Snapshot should have both bid and ask deltas
2236        assert!(
2237            deltas.deltas.iter().any(|d| d.order.side == OrderSide::Buy),
2238            "Should have bid deltas"
2239        );
2240        assert!(
2241            deltas
2242                .deltas
2243                .iter()
2244                .any(|d| d.order.side == OrderSide::Sell),
2245            "Should have ask deltas"
2246        );
2247    }
2248
2249    #[rstest]
2250    fn test_parse_books_update() {
2251        let json_data = load_test_json("ws_books_update.json");
2252        let msg: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2253        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2254        let (okx_books, action): (Vec<OKXBookMsg>, OKXBookAction) = match msg {
2255            OKXWsFrame::BookData { data, action, .. } => (data, action),
2256            _ => panic!("Expected a `BookData` variant"),
2257        };
2258
2259        let deltas = parse_book_msg(
2260            &okx_books[0],
2261            instrument_id,
2262            2,
2263            1,
2264            &action,
2265            UnixNanos::default(),
2266        )
2267        .unwrap();
2268
2269        assert_eq!(deltas.instrument_id, instrument_id);
2270        assert_eq!(deltas.deltas.len(), 16);
2271        assert_eq!(deltas.flags, 0);
2272        assert_eq!(deltas.sequence, 123457);
2273        assert_eq!(deltas.ts_event, UnixNanos::from(1597026383085000000));
2274        assert_eq!(deltas.ts_init, UnixNanos::default());
2275
2276        // Verify some individual deltas are parsed correctly
2277        assert!(!deltas.deltas.is_empty());
2278        // Update should also have both bid and ask deltas
2279        assert!(
2280            deltas.deltas.iter().any(|d| d.order.side == OrderSide::Buy),
2281            "Should have bid deltas"
2282        );
2283        assert!(
2284            deltas
2285                .deltas
2286                .iter()
2287                .any(|d| d.order.side == OrderSide::Sell),
2288            "Should have ask deltas"
2289        );
2290    }
2291
2292    #[rstest]
2293    fn test_parse_tickers() {
2294        let json_data = load_test_json("ws_tickers.json");
2295        let msg: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2296        let okx_tickers: Vec<OKXTickerMsg> = match msg {
2297            OKXWsFrame::Data { data, .. } => serde_json::from_value(data).unwrap(),
2298            _ => panic!("Expected a `Data` variant"),
2299        };
2300
2301        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2302        let trade =
2303            parse_ticker_msg(&okx_tickers[0], instrument_id, 2, 1, UnixNanos::default()).unwrap();
2304
2305        assert_eq!(trade.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
2306        assert_eq!(trade.bid_price, Price::from("8888.88"));
2307        assert_eq!(trade.ask_price, Price::from("9999.99"));
2308        assert_eq!(trade.bid_size, Quantity::from(5));
2309        assert_eq!(trade.ask_size, Quantity::from(11));
2310        assert_eq!(trade.ts_event, UnixNanos::from(1597026383085000000));
2311        assert_eq!(trade.ts_init, UnixNanos::default());
2312    }
2313
2314    #[rstest]
2315    fn test_parse_quotes() {
2316        let json_data = load_test_json("ws_bbo_tbt.json");
2317        let msg: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2318        let okx_quotes: Vec<OKXBookMsg> = match msg {
2319            OKXWsFrame::Data { data, .. } => serde_json::from_value(data).unwrap(),
2320            _ => panic!("Expected a `Data` variant"),
2321        };
2322        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2323
2324        let quote =
2325            parse_quote_msg(&okx_quotes[0], instrument_id, 2, 1, UnixNanos::default()).unwrap();
2326
2327        assert_eq!(quote.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
2328        assert_eq!(quote.bid_price, Price::from("8476.97"));
2329        assert_eq!(quote.ask_price, Price::from("8476.98"));
2330        assert_eq!(quote.bid_size, Quantity::from(256));
2331        assert_eq!(quote.ask_size, Quantity::from(415));
2332        assert_eq!(quote.ts_event, UnixNanos::from(1597026383085000000));
2333        assert_eq!(quote.ts_init, UnixNanos::default());
2334    }
2335
2336    #[rstest]
2337    fn test_parse_trades() {
2338        let json_data = load_test_json("ws_trades.json");
2339        let msg: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2340        let okx_trades: Vec<OKXTradeMsg> = match msg {
2341            OKXWsFrame::Data { data, .. } => serde_json::from_value(data).unwrap(),
2342            _ => panic!("Expected a `Data` variant"),
2343        };
2344
2345        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2346        let trade =
2347            parse_trade_msg(&okx_trades[0], instrument_id, 1, 8, UnixNanos::default()).unwrap();
2348
2349        assert_eq!(trade.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
2350        assert_eq!(trade.price, Price::from("42219.9"));
2351        assert_eq!(trade.size, Quantity::from("0.12060306"));
2352        assert_eq!(trade.aggressor_side, AggressorSide::Buyer);
2353        assert_eq!(trade.trade_id, TradeId::from("130639474"));
2354        assert_eq!(trade.ts_event, UnixNanos::from(1630048897897000000));
2355        assert_eq!(trade.ts_init, UnixNanos::default());
2356    }
2357
2358    #[rstest]
2359    fn test_parse_candle() {
2360        let json_data = load_test_json("ws_candle.json");
2361        let msg: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2362        let okx_candles: Vec<OKXCandleMsg> = match msg {
2363            OKXWsFrame::Data { data, .. } => serde_json::from_value(data).unwrap(),
2364            _ => panic!("Expected a `Data` variant"),
2365        };
2366
2367        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2368        let bar_type = BarType::new(
2369            instrument_id,
2370            BAR_SPEC_1_DAY_LAST,
2371            AggregationSource::External,
2372        );
2373        let bar = parse_candle_msg(&okx_candles[0], bar_type, 2, 0, UnixNanos::default()).unwrap();
2374
2375        assert_eq!(bar.bar_type, bar_type);
2376        assert_eq!(bar.open, Price::from("8533.02"));
2377        assert_eq!(bar.high, Price::from("8553.74"));
2378        assert_eq!(bar.low, Price::from("8527.17"));
2379        assert_eq!(bar.close, Price::from("8548.26"));
2380        assert_eq!(bar.volume, Quantity::from(45247));
2381        assert_eq!(bar.ts_event, UnixNanos::from(1597026383085000000));
2382        assert_eq!(bar.ts_init, UnixNanos::default());
2383    }
2384
2385    #[rstest]
2386    fn test_parse_funding_rate() {
2387        let json_data = load_test_json("ws_funding_rate.json");
2388        let msg: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2389
2390        let okx_funding_rates: Vec<crate::websocket::messages::OKXFundingRateMsg> = match msg {
2391            OKXWsFrame::Data { data, .. } => serde_json::from_value(data).unwrap(),
2392            _ => panic!("Expected a `Data` variant"),
2393        };
2394
2395        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2396        let funding_rate =
2397            parse_funding_rate_msg(&okx_funding_rates[0], instrument_id, UnixNanos::default())
2398                .unwrap();
2399
2400        assert_eq!(funding_rate.instrument_id, instrument_id);
2401        assert_eq!(funding_rate.rate, dec!(0.0001));
2402        assert_eq!(funding_rate.interval, Some(8 * 60));
2403        assert_eq!(
2404            funding_rate.next_funding_ns,
2405            Some(UnixNanos::from(1744590349506000000))
2406        );
2407        assert_eq!(funding_rate.ts_event, UnixNanos::from(1744590349506000000));
2408        assert_eq!(funding_rate.ts_init, UnixNanos::default());
2409    }
2410
2411    #[rstest]
2412    fn test_parse_book_vec() {
2413        let json_data = load_test_json("ws_books_snapshot.json");
2414        let event: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2415        let (msgs, action): (Vec<OKXBookMsg>, OKXBookAction) = match event {
2416            OKXWsFrame::BookData { data, action, .. } => (data, action),
2417            _ => panic!("Expected BookData"),
2418        };
2419
2420        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2421        let deltas_vec =
2422            parse_book_msg_vec(msgs, &instrument_id, 8, 1, action, UnixNanos::default()).unwrap();
2423
2424        assert_eq!(deltas_vec.len(), 1);
2425
2426        if let Data::Deltas(d) = &deltas_vec[0] {
2427            assert_eq!(d.sequence, 123456);
2428        } else {
2429            panic!("Expected Deltas");
2430        }
2431    }
2432
2433    #[rstest]
2434    fn test_parse_ticker_vec() {
2435        let json_data = load_test_json("ws_tickers.json");
2436        let event: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2437        let data_val: serde_json::Value = match event {
2438            OKXWsFrame::Data { data, .. } => data,
2439            _ => panic!("Expected Data"),
2440        };
2441
2442        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2443        let quotes_vec =
2444            parse_ticker_msg_vec(data_val, &instrument_id, 8, 1, UnixNanos::default()).unwrap();
2445
2446        assert_eq!(quotes_vec.len(), 1);
2447
2448        if let Data::Quote(q) = &quotes_vec[0] {
2449            assert_eq!(q.bid_price, Price::from("8888.88000000"));
2450            assert_eq!(q.ask_price, Price::from("9999.99"));
2451        } else {
2452            panic!("Expected Quote");
2453        }
2454    }
2455
2456    #[rstest]
2457    fn test_parse_trade_vec() {
2458        let json_data = load_test_json("ws_trades.json");
2459        let event: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2460        let data_val: serde_json::Value = match event {
2461            OKXWsFrame::Data { data, .. } => data,
2462            _ => panic!("Expected Data"),
2463        };
2464
2465        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2466        let trades_vec =
2467            parse_trade_msg_vec(data_val, &instrument_id, 8, 1, UnixNanos::default()).unwrap();
2468
2469        assert_eq!(trades_vec.len(), 1);
2470
2471        if let Data::Trade(t) = &trades_vec[0] {
2472            assert_eq!(t.trade_id, TradeId::new("130639474"));
2473        } else {
2474            panic!("Expected Trade");
2475        }
2476    }
2477
2478    #[rstest]
2479    fn test_parse_candle_vec() {
2480        let json_data = load_test_json("ws_candle.json");
2481        let event: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2482        let data_val: serde_json::Value = match event {
2483            OKXWsFrame::Data { data, .. } => data,
2484            _ => panic!("Expected Data"),
2485        };
2486
2487        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2488        let bars_vec = parse_candle_msg_vec(
2489            data_val,
2490            &instrument_id,
2491            2,
2492            1,
2493            BAR_SPEC_1_DAY_LAST,
2494            UnixNanos::default(),
2495        )
2496        .unwrap();
2497
2498        assert_eq!(bars_vec.len(), 1);
2499
2500        if let Data::Bar(b) = &bars_vec[0] {
2501            assert_eq!(b.open, Price::from("8533.02"));
2502        } else {
2503            panic!("Expected Bar");
2504        }
2505    }
2506
2507    #[rstest]
2508    fn test_parse_book_message() {
2509        let json_data = load_test_json("ws_bbo_tbt.json");
2510        let msg: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2511        let (okx_books, arg): (Vec<OKXBookMsg>, OKXWebSocketArg) = match msg {
2512            OKXWsFrame::Data { data, arg, .. } => (serde_json::from_value(data).unwrap(), arg),
2513            _ => panic!("Expected a `Data` variant"),
2514        };
2515
2516        assert_eq!(arg.channel, OKXWsChannel::BboTbt);
2517        assert_eq!(arg.inst_id.as_ref().unwrap(), &Ustr::from("BTC-USDT"));
2518        assert_eq!(arg.inst_type, None);
2519        assert_eq!(okx_books.len(), 1);
2520
2521        let book_msg = &okx_books[0];
2522
2523        // Check asks
2524        assert_eq!(book_msg.asks.len(), 1);
2525        let ask = &book_msg.asks[0];
2526        assert_eq!(ask.price, "8476.98");
2527        assert_eq!(ask.size, "415");
2528        assert_eq!(ask.liquidated_orders_count, "0");
2529        assert_eq!(ask.orders_count, "13");
2530
2531        // Check bids
2532        assert_eq!(book_msg.bids.len(), 1);
2533        let bid = &book_msg.bids[0];
2534        assert_eq!(bid.price, "8476.97");
2535        assert_eq!(bid.size, "256");
2536        assert_eq!(bid.liquidated_orders_count, "0");
2537        assert_eq!(bid.orders_count, "12");
2538        assert_eq!(book_msg.ts, 1597026383085);
2539        assert_eq!(book_msg.seq_id, 123456);
2540        assert_eq!(book_msg.checksum, None);
2541        assert_eq!(book_msg.prev_seq_id, None);
2542    }
2543
2544    #[rstest]
2545    fn test_parse_ws_account_message() {
2546        let json_data = load_test_json("ws_account.json");
2547        let msg: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2548
2549        let OKXWsFrame::Data { data, .. } = msg else {
2550            panic!("Expected OKXWsFrame::Data");
2551        };
2552
2553        let accounts: Vec<OKXAccount> = serde_json::from_value(data).unwrap();
2554
2555        assert_eq!(accounts.len(), 1);
2556        let account = &accounts[0];
2557
2558        assert_eq!(account.total_eq, "100.56089404807182");
2559        assert_eq!(account.details.len(), 3);
2560
2561        let usdt_detail = &account.details[0];
2562        assert_eq!(usdt_detail.ccy, "USDT");
2563        assert_eq!(usdt_detail.avail_bal, "100.52768569797846");
2564        assert_eq!(usdt_detail.cash_bal, "100.52768569797846");
2565
2566        let btc_detail = &account.details[1];
2567        assert_eq!(btc_detail.ccy, "BTC");
2568        assert_eq!(btc_detail.avail_bal, "0.0000000051");
2569
2570        let eth_detail = &account.details[2];
2571        assert_eq!(eth_detail.ccy, "ETH");
2572        assert_eq!(eth_detail.avail_bal, "0.000000185");
2573
2574        let account_id = AccountId::new("OKX-001");
2575        let ts_init = UnixNanos::default();
2576        let account_state = parse_account_state(account, account_id, ts_init);
2577
2578        assert!(account_state.is_ok());
2579        let state = account_state.unwrap();
2580        assert_eq!(state.account_id, account_id);
2581        assert_eq!(state.balances.len(), 3);
2582    }
2583
2584    #[rstest]
2585    fn test_parse_ws_account_message_empty_balance() {
2586        // GH-3772: OKX returns empty strings and empty details for zero-balance accounts
2587        let json_data = load_test_json("ws_account_empty.json");
2588        let msg: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2589
2590        let OKXWsFrame::Data { data, .. } = msg else {
2591            panic!("Expected OKXWsFrame::Data");
2592        };
2593
2594        let accounts: Vec<OKXAccount> = serde_json::from_value(data).unwrap();
2595        assert_eq!(accounts.len(), 1);
2596
2597        let account = &accounts[0];
2598        assert!(account.details.is_empty());
2599        assert_eq!(account.total_eq, "0");
2600
2601        let account_id = AccountId::new("OKX-001");
2602        let account_state = parse_account_state(account, account_id, UnixNanos::default()).unwrap();
2603
2604        assert_eq!(account_state.account_id, account_id);
2605        assert_eq!(account_state.margins.len(), 0);
2606        assert_eq!(account_state.balances.len(), 1);
2607
2608        let balance = &account_state.balances[0];
2609        assert_eq!(balance.total, Money::new(0.0, Currency::USD()));
2610        assert_eq!(balance.free, Money::new(0.0, Currency::USD()));
2611        assert_eq!(balance.locked, Money::new(0.0, Currency::USD()));
2612    }
2613
2614    #[rstest]
2615    fn test_parse_order_msg() {
2616        let json_data = load_test_json("ws_orders.json");
2617        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
2618
2619        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
2620
2621        let account_id = AccountId::new("OKX-001");
2622        let mut instruments = AHashMap::new();
2623
2624        // Create a mock instrument for testing
2625        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2626        let instrument = CryptoPerpetual::new(
2627            instrument_id,
2628            Symbol::from("BTC-USDT-SWAP"),
2629            Currency::BTC(),
2630            Currency::USDT(),
2631            Currency::USDT(),
2632            false, // is_inverse
2633            2,     // price_precision
2634            8,     // size_precision
2635            Price::from("0.01"),
2636            Quantity::from("0.00000001"),
2637            None, // multiplier
2638            None, // lot_size
2639            None, // max_quantity
2640            None, // min_quantity
2641            None, // max_notional
2642            None, // min_notional
2643            None, // max_price
2644            None, // min_price
2645            None, // margin_init
2646            None, // margin_maint
2647            None, // maker_fee
2648            None, // taker_fee
2649            None, // info
2650            UnixNanos::default(),
2651            UnixNanos::default(),
2652        );
2653
2654        instruments.insert(
2655            Ustr::from("BTC-USDT-SWAP"),
2656            InstrumentAny::CryptoPerpetual(instrument),
2657        );
2658
2659        let ts_init = UnixNanos::default();
2660        let mut fee_cache = AHashMap::new();
2661        let mut filled_qty_cache = AHashMap::new();
2662
2663        let result = parse_order_msg_vec(
2664            &data,
2665            account_id,
2666            &instruments,
2667            &mut fee_cache,
2668            &mut filled_qty_cache,
2669            ts_init,
2670        );
2671
2672        assert!(result.is_ok());
2673        let order_reports = result.unwrap();
2674        assert_eq!(order_reports.len(), 1);
2675
2676        // Verify the parsed order report
2677        let report = &order_reports[0];
2678
2679        if let ExecutionReport::Fill(fill_report) = report {
2680            assert_eq!(fill_report.account_id, account_id);
2681            assert_eq!(fill_report.instrument_id, instrument_id);
2682            assert_eq!(
2683                fill_report.client_order_id,
2684                Some(ClientOrderId::new("001BTCUSDT20250106001"))
2685            );
2686            assert_eq!(
2687                fill_report.venue_order_id,
2688                VenueOrderId::new("2497956918703120384")
2689            );
2690            assert_eq!(fill_report.trade_id, TradeId::from("1518905529"));
2691            assert_eq!(fill_report.order_side, OrderSide::Buy);
2692            assert_eq!(fill_report.last_px, Price::from("103698.90"));
2693            assert_eq!(fill_report.last_qty, Quantity::from("0.03000000"));
2694            assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
2695        } else {
2696            panic!("Expected Fill report for filled order");
2697        }
2698    }
2699
2700    #[rstest]
2701    fn test_parse_order_status_report() {
2702        let json_data = load_test_json("ws_orders.json");
2703        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
2704        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
2705        let order_msg = &data[0];
2706
2707        let account_id = AccountId::new("OKX-001");
2708        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2709        let instrument = CryptoPerpetual::new(
2710            instrument_id,
2711            Symbol::from("BTC-USDT-SWAP"),
2712            Currency::BTC(),
2713            Currency::USDT(),
2714            Currency::USDT(),
2715            false, // is_inverse
2716            2,     // price_precision
2717            8,     // size_precision
2718            Price::from("0.01"),
2719            Quantity::from("0.00000001"),
2720            None,
2721            None,
2722            None,
2723            None,
2724            None,
2725            None,
2726            None,
2727            None,
2728            None,
2729            None,
2730            None,
2731            None,
2732            None,
2733            UnixNanos::default(),
2734            UnixNanos::default(),
2735        );
2736
2737        let ts_init = UnixNanos::default();
2738
2739        let result = parse_order_status_report(
2740            order_msg,
2741            &InstrumentAny::CryptoPerpetual(instrument),
2742            account_id,
2743            ts_init,
2744        );
2745
2746        assert!(result.is_ok());
2747        let order_status_report = result.unwrap();
2748
2749        assert_eq!(order_status_report.account_id, account_id);
2750        assert_eq!(order_status_report.instrument_id, instrument_id);
2751        assert_eq!(
2752            order_status_report.client_order_id,
2753            Some(ClientOrderId::new("001BTCUSDT20250106001"))
2754        );
2755        assert_eq!(
2756            order_status_report.venue_order_id,
2757            VenueOrderId::new("2497956918703120384")
2758        );
2759        assert_eq!(order_status_report.order_side, OrderSide::Buy);
2760        assert_eq!(order_status_report.order_status, OrderStatus::Filled);
2761        assert_eq!(order_status_report.quantity, Quantity::from("0.03000000"));
2762        assert_eq!(order_status_report.filled_qty, Quantity::from("0.03000000"));
2763    }
2764
2765    #[rstest]
2766    fn test_parse_fill_report() {
2767        let json_data = load_test_json("ws_orders.json");
2768        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
2769        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
2770        let order_msg = &data[0];
2771
2772        let account_id = AccountId::new("OKX-001");
2773        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2774        let instrument = CryptoPerpetual::new(
2775            instrument_id,
2776            Symbol::from("BTC-USDT-SWAP"),
2777            Currency::BTC(),
2778            Currency::USDT(),
2779            Currency::USDT(),
2780            false, // is_inverse
2781            2,     // price_precision
2782            8,     // size_precision
2783            Price::from("0.01"),
2784            Quantity::from("0.00000001"),
2785            None,
2786            None,
2787            None,
2788            None,
2789            None,
2790            None,
2791            None,
2792            None,
2793            None,
2794            None,
2795            None,
2796            None,
2797            None,
2798            UnixNanos::default(),
2799            UnixNanos::default(),
2800        );
2801
2802        let ts_init = UnixNanos::default();
2803
2804        let result = parse_fill_report(
2805            order_msg,
2806            &InstrumentAny::CryptoPerpetual(instrument),
2807            account_id,
2808            None,
2809            None,
2810            ts_init,
2811        );
2812
2813        assert!(result.is_ok());
2814        let fill_report = result.unwrap().unwrap();
2815
2816        assert_eq!(fill_report.account_id, account_id);
2817        assert_eq!(fill_report.instrument_id, instrument_id);
2818        assert_eq!(
2819            fill_report.client_order_id,
2820            Some(ClientOrderId::new("001BTCUSDT20250106001"))
2821        );
2822        assert_eq!(
2823            fill_report.venue_order_id,
2824            VenueOrderId::new("2497956918703120384")
2825        );
2826        assert_eq!(fill_report.trade_id, TradeId::from("1518905529"));
2827        assert_eq!(fill_report.order_side, OrderSide::Buy);
2828        assert_eq!(fill_report.last_px, Price::from("103698.90"));
2829        assert_eq!(fill_report.last_qty, Quantity::from("0.03000000"));
2830        assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
2831    }
2832
2833    #[rstest]
2834    fn test_parse_book10_msg() {
2835        let json_data = load_test_json("ws_books_snapshot.json");
2836        let event: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2837        let msgs: Vec<OKXBookMsg> = match event {
2838            OKXWsFrame::BookData { data, .. } => data,
2839            _ => panic!("Expected BookData"),
2840        };
2841
2842        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2843        let depth10 =
2844            parse_book10_msg(&msgs[0], instrument_id, 2, 0, UnixNanos::default()).unwrap();
2845
2846        assert_eq!(depth10.instrument_id, instrument_id);
2847        assert_eq!(depth10.sequence, 123456);
2848        assert_eq!(depth10.ts_event, UnixNanos::from(1597026383085000000));
2849        assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
2850
2851        // Check bid levels (available in test data: 8 levels)
2852        assert_eq!(depth10.bids[0].price, Price::from("8476.97"));
2853        assert_eq!(depth10.bids[0].size, Quantity::from("256"));
2854        assert_eq!(depth10.bids[0].side, OrderSide::Buy);
2855        assert_eq!(depth10.bid_counts[0], 12);
2856
2857        assert_eq!(depth10.bids[1].price, Price::from("8475.55"));
2858        assert_eq!(depth10.bids[1].size, Quantity::from("101"));
2859        assert_eq!(depth10.bid_counts[1], 1);
2860
2861        // Check that levels beyond available data are padded with empty orders
2862        assert_eq!(depth10.bids[8].price, Price::from("0"));
2863        assert_eq!(depth10.bids[8].size, Quantity::from("0"));
2864        assert_eq!(depth10.bid_counts[8], 0);
2865
2866        // Check ask levels (available in test data: 8 levels)
2867        assert_eq!(depth10.asks[0].price, Price::from("8476.98"));
2868        assert_eq!(depth10.asks[0].size, Quantity::from("415"));
2869        assert_eq!(depth10.asks[0].side, OrderSide::Sell);
2870        assert_eq!(depth10.ask_counts[0], 13);
2871
2872        assert_eq!(depth10.asks[1].price, Price::from("8477.00"));
2873        assert_eq!(depth10.asks[1].size, Quantity::from("7"));
2874        assert_eq!(depth10.ask_counts[1], 2);
2875
2876        // Check that levels beyond available data are padded with empty orders
2877        assert_eq!(depth10.asks[8].price, Price::from("0"));
2878        assert_eq!(depth10.asks[8].size, Quantity::from("0"));
2879        assert_eq!(depth10.ask_counts[8], 0);
2880    }
2881
2882    #[rstest]
2883    fn test_parse_book10_msg_vec() {
2884        let json_data = load_test_json("ws_books_snapshot.json");
2885        let event: OKXWsFrame = serde_json::from_str(&json_data).unwrap();
2886        let msgs: Vec<OKXBookMsg> = match event {
2887            OKXWsFrame::BookData { data, .. } => data,
2888            _ => panic!("Expected BookData"),
2889        };
2890
2891        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2892        let depth10_vec =
2893            parse_book10_msg_vec(msgs, &instrument_id, 2, 0, UnixNanos::default()).unwrap();
2894
2895        assert_eq!(depth10_vec.len(), 1);
2896
2897        if let Data::Depth10(d) = &depth10_vec[0] {
2898            assert_eq!(d.instrument_id, instrument_id);
2899            assert_eq!(d.sequence, 123456);
2900            assert_eq!(d.bids[0].price, Price::from("8476.97"));
2901            assert_eq!(d.asks[0].price, Price::from("8476.98"));
2902        } else {
2903            panic!("Expected Depth10");
2904        }
2905    }
2906
2907    #[rstest]
2908    fn test_parse_fill_report_with_fee_cache() {
2909        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2910        let instrument = CryptoPerpetual::new(
2911            instrument_id,
2912            Symbol::from("BTC-USDT-SWAP"),
2913            Currency::BTC(),
2914            Currency::USDT(),
2915            Currency::USDT(),
2916            false, // is_inverse
2917            2,     // price_precision
2918            8,     // size_precision
2919            Price::from("0.01"),
2920            Quantity::from("0.00000001"),
2921            None, // multiplier
2922            None, // lot_size
2923            None, // max_quantity
2924            None, // min_quantity
2925            None, // max_notional
2926            None, // min_notional
2927            None, // max_price
2928            None, // min_price
2929            None, // margin_init
2930            None, // margin_maint
2931            None, // maker_fee
2932            None, // taker_fee
2933            None, // info
2934            UnixNanos::default(),
2935            UnixNanos::default(),
2936        );
2937
2938        let account_id = AccountId::new("OKX-001");
2939        let ts_init = UnixNanos::default();
2940
2941        // First fill: 0.01 BTC out of 0.03 BTC total (1/3)
2942        let order_msg_1 = OKXOrderMsg {
2943            acc_fill_sz: Some("0.01".to_string()),
2944            algo_id: None,
2945            avg_px: "50000.0".to_string(),
2946            c_time: 1746947317401,
2947            cancel_source: None,
2948            cancel_source_reason: None,
2949            category: OKXOrderCategory::Normal,
2950            ccy: Ustr::from("USDT"),
2951            cl_ord_id: "test_order_1".to_string(),
2952            algo_cl_ord_id: None,
2953            attach_algo_cl_ord_id: None,
2954            attach_algo_ords: Vec::new(),
2955            fee: Some("-1.0".to_string()), // Total fee so far
2956            fee_ccy: Ustr::from("USDT"),
2957            fill_fee: None,
2958            fill_fee_ccy: None,
2959            fill_mark_px: None,
2960            fill_mark_vol: None,
2961            fill_px_vol: None,
2962            fill_px_usd: None,
2963            fill_fwd_px: None,
2964            fill_notional_usd: None,
2965            fill_pnl: None,
2966            fill_px: "50000.0".to_string(),
2967            fill_sz: "0.01".to_string(),
2968            fill_time: 1746947317402,
2969            inst_id: Ustr::from("BTC-USDT-SWAP"),
2970            inst_type: OKXInstrumentType::Swap,
2971            is_tp_limit: None,
2972            lever: "2.0".to_string(),
2973            linked_algo_ord: None,
2974            notional_usd: None,
2975            ord_id: Ustr::from("1234567890"),
2976            ord_type: OKXOrderType::Market,
2977            pnl: "0".to_string(),
2978            pos_side: OKXPositionSide::Long,
2979            px: String::new(),
2980            px_type: OKXPriceType::None,
2981            px_usd: None,
2982            px_vol: None,
2983            quick_mgn_type: OKXQuickMarginType::None,
2984            rebate: None,
2985            rebate_ccy: None,
2986            reduce_only: "false".to_string(),
2987            side: OKXSide::Buy,
2988            sl_ord_px: None,
2989            sl_trigger_px: None,
2990            sl_trigger_px_type: None,
2991            source: None,
2992            state: OKXOrderStatus::PartiallyFilled,
2993            stp_id: None,
2994            stp_mode: OKXSelfTradePreventionMode::None,
2995            exec_type: OKXExecType::Maker,
2996            sz: "0.03".to_string(), // Total order size
2997            tag: None,
2998            td_mode: OKXTradeMode::Isolated,
2999            tgt_ccy: None,
3000            tp_ord_px: None,
3001            tp_trigger_px: None,
3002            tp_trigger_px_type: None,
3003            trade_id: "trade_1".to_string(),
3004            u_time: 1746947317402,
3005            amend_result: None,
3006            req_id: None,
3007            code: None,
3008            msg: None,
3009        };
3010
3011        let fill_report_1 = parse_fill_report(
3012            &order_msg_1,
3013            &InstrumentAny::CryptoPerpetual(instrument.clone()),
3014            account_id,
3015            None,
3016            None,
3017            ts_init,
3018        )
3019        .unwrap()
3020        .unwrap();
3021
3022        // First fill should get the full fee since there's no previous fee
3023        assert_eq!(fill_report_1.commission, Money::new(1.0, Currency::USDT()));
3024
3025        // Second fill: 0.02 BTC more, now 0.03 BTC total (completely filled)
3026        let order_msg_2 = OKXOrderMsg {
3027            acc_fill_sz: Some("0.03".to_string()),
3028            algo_id: None,
3029            avg_px: "50000.0".to_string(),
3030            c_time: 1746947317401,
3031            cancel_source: None,
3032            cancel_source_reason: None,
3033            category: OKXOrderCategory::Normal,
3034            ccy: Ustr::from("USDT"),
3035            cl_ord_id: "test_order_1".to_string(),
3036            algo_cl_ord_id: None,
3037            attach_algo_cl_ord_id: None,
3038            attach_algo_ords: Vec::new(),
3039            fee: Some("-3.0".to_string()), // Same total fee
3040            fee_ccy: Ustr::from("USDT"),
3041            fill_fee: None,
3042            fill_fee_ccy: None,
3043            fill_mark_px: None,
3044            fill_mark_vol: None,
3045            fill_px_vol: None,
3046            fill_px_usd: None,
3047            fill_fwd_px: None,
3048            fill_notional_usd: None,
3049            fill_pnl: None,
3050            fill_px: "50000.0".to_string(),
3051            fill_sz: "0.02".to_string(),
3052            fill_time: 1746947317403,
3053            inst_id: Ustr::from("BTC-USDT-SWAP"),
3054            inst_type: OKXInstrumentType::Swap,
3055            is_tp_limit: None,
3056            lever: "2.0".to_string(),
3057            linked_algo_ord: None,
3058            notional_usd: None,
3059            ord_id: Ustr::from("1234567890"),
3060            ord_type: OKXOrderType::Market,
3061            pnl: "0".to_string(),
3062            pos_side: OKXPositionSide::Long,
3063            px: String::new(),
3064            px_type: OKXPriceType::None,
3065            px_usd: None,
3066            px_vol: None,
3067            quick_mgn_type: OKXQuickMarginType::None,
3068            rebate: None,
3069            rebate_ccy: None,
3070            reduce_only: "false".to_string(),
3071            side: OKXSide::Buy,
3072            sl_ord_px: None,
3073            sl_trigger_px: None,
3074            sl_trigger_px_type: None,
3075            source: None,
3076            state: OKXOrderStatus::Filled,
3077            stp_id: None,
3078            stp_mode: OKXSelfTradePreventionMode::None,
3079            exec_type: OKXExecType::Maker,
3080            sz: "0.03".to_string(), // Same total order size
3081            tag: None,
3082            td_mode: OKXTradeMode::Isolated,
3083            tgt_ccy: None,
3084            tp_ord_px: None,
3085            tp_trigger_px: None,
3086            tp_trigger_px_type: None,
3087            trade_id: "trade_2".to_string(),
3088            u_time: 1746947317403,
3089            amend_result: None,
3090            req_id: None,
3091            code: None,
3092            msg: None,
3093        };
3094
3095        let fill_report_2 = parse_fill_report(
3096            &order_msg_2,
3097            &InstrumentAny::CryptoPerpetual(instrument),
3098            account_id,
3099            Some(fill_report_1.commission),
3100            Some(fill_report_1.last_qty),
3101            ts_init,
3102        )
3103        .unwrap()
3104        .unwrap();
3105
3106        // Second fill should get total_fee - previous_fee = 3.0 - 1.0 = 2.0
3107        assert_eq!(fill_report_2.commission, Money::new(2.0, Currency::USDT()));
3108
3109        // Test passed - fee was correctly split proportionally
3110    }
3111
3112    #[rstest]
3113    fn test_parse_fill_report_with_maker_rebates() {
3114        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
3115        let instrument = CryptoPerpetual::new(
3116            instrument_id,
3117            Symbol::from("BTC-USDT-SWAP"),
3118            Currency::BTC(),
3119            Currency::USDT(),
3120            Currency::USDT(),
3121            false,
3122            2,
3123            8,
3124            Price::from("0.01"),
3125            Quantity::from("0.00000001"),
3126            None,
3127            None,
3128            None,
3129            None,
3130            None,
3131            None,
3132            None,
3133            None,
3134            None,
3135            None,
3136            None,
3137            None,
3138            None,
3139            UnixNanos::default(),
3140            UnixNanos::default(),
3141        );
3142
3143        let account_id = AccountId::new("OKX-001");
3144        let ts_init = UnixNanos::default();
3145
3146        // First fill: maker rebate of $0.5 (OKX sends as "0.5", parse_fee makes it -0.5)
3147        let order_msg_1 = OKXOrderMsg {
3148            acc_fill_sz: Some("0.01".to_string()),
3149            algo_id: None,
3150            avg_px: "50000.0".to_string(),
3151            c_time: 1746947317401,
3152            cancel_source: None,
3153            cancel_source_reason: None,
3154            category: OKXOrderCategory::Normal,
3155            ccy: Ustr::from("USDT"),
3156            cl_ord_id: "test_order_rebate".to_string(),
3157            algo_cl_ord_id: None,
3158            attach_algo_cl_ord_id: None,
3159            attach_algo_ords: Vec::new(),
3160            fee: Some("0.5".to_string()), // Rebate: positive value from OKX
3161            fee_ccy: Ustr::from("USDT"),
3162            fill_fee: None,
3163            fill_fee_ccy: None,
3164            fill_mark_px: None,
3165            fill_mark_vol: None,
3166            fill_px_vol: None,
3167            fill_px_usd: None,
3168            fill_fwd_px: None,
3169            fill_notional_usd: None,
3170            fill_pnl: None,
3171            fill_px: "50000.0".to_string(),
3172            fill_sz: "0.01".to_string(),
3173            fill_time: 1746947317402,
3174            inst_id: Ustr::from("BTC-USDT-SWAP"),
3175            inst_type: OKXInstrumentType::Swap,
3176            is_tp_limit: None,
3177            lever: "2.0".to_string(),
3178            linked_algo_ord: None,
3179            notional_usd: None,
3180            ord_id: Ustr::from("rebate_order_123"),
3181            ord_type: OKXOrderType::Market,
3182            pnl: "0".to_string(),
3183            pos_side: OKXPositionSide::Long,
3184            px: String::new(),
3185            px_type: OKXPriceType::None,
3186            px_usd: None,
3187            px_vol: None,
3188            quick_mgn_type: OKXQuickMarginType::None,
3189            rebate: None,
3190            rebate_ccy: None,
3191            reduce_only: "false".to_string(),
3192            side: OKXSide::Buy,
3193            sl_ord_px: None,
3194            sl_trigger_px: None,
3195            sl_trigger_px_type: None,
3196            source: None,
3197            state: OKXOrderStatus::PartiallyFilled,
3198            stp_id: None,
3199            stp_mode: OKXSelfTradePreventionMode::None,
3200            exec_type: OKXExecType::Maker,
3201            sz: "0.02".to_string(),
3202            tag: None,
3203            td_mode: OKXTradeMode::Isolated,
3204            tgt_ccy: None,
3205            tp_ord_px: None,
3206            tp_trigger_px: None,
3207            tp_trigger_px_type: None,
3208            trade_id: "trade_rebate_1".to_string(),
3209            u_time: 1746947317402,
3210            amend_result: None,
3211            req_id: None,
3212            code: None,
3213            msg: None,
3214        };
3215
3216        let fill_report_1 = parse_fill_report(
3217            &order_msg_1,
3218            &InstrumentAny::CryptoPerpetual(instrument.clone()),
3219            account_id,
3220            None,
3221            None,
3222            ts_init,
3223        )
3224        .unwrap()
3225        .unwrap();
3226
3227        // First fill gets the full rebate (negative commission)
3228        assert_eq!(fill_report_1.commission, Money::new(-0.5, Currency::USDT()));
3229
3230        // Second fill: another maker rebate of $0.3, cumulative now $0.8
3231        let order_msg_2 = OKXOrderMsg {
3232            acc_fill_sz: Some("0.02".to_string()),
3233            algo_id: None,
3234            avg_px: "50000.0".to_string(),
3235            c_time: 1746947317401,
3236            cancel_source: None,
3237            cancel_source_reason: None,
3238            category: OKXOrderCategory::Normal,
3239            ccy: Ustr::from("USDT"),
3240            cl_ord_id: "test_order_rebate".to_string(),
3241            algo_cl_ord_id: None,
3242            attach_algo_cl_ord_id: None,
3243            attach_algo_ords: Vec::new(),
3244            fee: Some("0.8".to_string()), // Cumulative rebate
3245            fee_ccy: Ustr::from("USDT"),
3246            fill_fee: None,
3247            fill_fee_ccy: None,
3248            fill_mark_px: None,
3249            fill_mark_vol: None,
3250            fill_px_vol: None,
3251            fill_px_usd: None,
3252            fill_fwd_px: None,
3253            fill_notional_usd: None,
3254            fill_pnl: None,
3255            fill_px: "50000.0".to_string(),
3256            fill_sz: "0.01".to_string(),
3257            fill_time: 1746947317403,
3258            inst_id: Ustr::from("BTC-USDT-SWAP"),
3259            inst_type: OKXInstrumentType::Swap,
3260            is_tp_limit: None,
3261            lever: "2.0".to_string(),
3262            linked_algo_ord: None,
3263            notional_usd: None,
3264            ord_id: Ustr::from("rebate_order_123"),
3265            ord_type: OKXOrderType::Market,
3266            pnl: "0".to_string(),
3267            pos_side: OKXPositionSide::Long,
3268            px: String::new(),
3269            px_type: OKXPriceType::None,
3270            px_usd: None,
3271            px_vol: None,
3272            quick_mgn_type: OKXQuickMarginType::None,
3273            rebate: None,
3274            rebate_ccy: None,
3275            reduce_only: "false".to_string(),
3276            side: OKXSide::Buy,
3277            sl_ord_px: None,
3278            sl_trigger_px: None,
3279            sl_trigger_px_type: None,
3280            source: None,
3281            state: OKXOrderStatus::Filled,
3282            stp_id: None,
3283            stp_mode: OKXSelfTradePreventionMode::None,
3284            exec_type: OKXExecType::Maker,
3285            sz: "0.02".to_string(),
3286            tag: None,
3287            td_mode: OKXTradeMode::Isolated,
3288            tgt_ccy: None,
3289            tp_ord_px: None,
3290            tp_trigger_px: None,
3291            tp_trigger_px_type: None,
3292            trade_id: "trade_rebate_2".to_string(),
3293            u_time: 1746947317403,
3294            amend_result: None,
3295            req_id: None,
3296            code: None,
3297            msg: None,
3298        };
3299
3300        let fill_report_2 = parse_fill_report(
3301            &order_msg_2,
3302            &InstrumentAny::CryptoPerpetual(instrument),
3303            account_id,
3304            Some(fill_report_1.commission),
3305            Some(fill_report_1.last_qty),
3306            ts_init,
3307        )
3308        .unwrap()
3309        .unwrap();
3310
3311        // Second fill: incremental = -0.8 - (-0.5) = -0.3
3312        assert_eq!(fill_report_2.commission, Money::new(-0.3, Currency::USDT()));
3313    }
3314
3315    #[rstest]
3316    fn test_parse_fill_report_rebate_to_charge_transition() {
3317        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
3318        let instrument = CryptoPerpetual::new(
3319            instrument_id,
3320            Symbol::from("BTC-USDT-SWAP"),
3321            Currency::BTC(),
3322            Currency::USDT(),
3323            Currency::USDT(),
3324            false,
3325            2,
3326            8,
3327            Price::from("0.01"),
3328            Quantity::from("0.00000001"),
3329            None,
3330            None,
3331            None,
3332            None,
3333            None,
3334            None,
3335            None,
3336            None,
3337            None,
3338            None,
3339            None,
3340            None,
3341            None,
3342            UnixNanos::default(),
3343            UnixNanos::default(),
3344        );
3345
3346        let account_id = AccountId::new("OKX-001");
3347        let ts_init = UnixNanos::default();
3348
3349        // First fill: maker rebate of $1.0
3350        let order_msg_1 = OKXOrderMsg {
3351            acc_fill_sz: Some("0.01".to_string()),
3352            algo_id: None,
3353            avg_px: "50000.0".to_string(),
3354            c_time: 1746947317401,
3355            cancel_source: None,
3356            cancel_source_reason: None,
3357            category: OKXOrderCategory::Normal,
3358            ccy: Ustr::from("USDT"),
3359            cl_ord_id: "test_order_transition".to_string(),
3360            algo_cl_ord_id: None,
3361            attach_algo_cl_ord_id: None,
3362            attach_algo_ords: Vec::new(),
3363            fee: Some("1.0".to_string()), // Rebate from OKX
3364            fee_ccy: Ustr::from("USDT"),
3365            fill_fee: None,
3366            fill_fee_ccy: None,
3367            fill_mark_px: None,
3368            fill_mark_vol: None,
3369            fill_px_vol: None,
3370            fill_px_usd: None,
3371            fill_fwd_px: None,
3372            fill_notional_usd: None,
3373            fill_pnl: None,
3374            fill_px: "50000.0".to_string(),
3375            fill_sz: "0.01".to_string(),
3376            fill_time: 1746947317402,
3377            inst_id: Ustr::from("BTC-USDT-SWAP"),
3378            inst_type: OKXInstrumentType::Swap,
3379            is_tp_limit: None,
3380            lever: "2.0".to_string(),
3381            linked_algo_ord: None,
3382            notional_usd: None,
3383            ord_id: Ustr::from("transition_order_456"),
3384            ord_type: OKXOrderType::Market,
3385            pnl: "0".to_string(),
3386            pos_side: OKXPositionSide::Long,
3387            px: String::new(),
3388            px_type: OKXPriceType::None,
3389            px_usd: None,
3390            px_vol: None,
3391            quick_mgn_type: OKXQuickMarginType::None,
3392            rebate: None,
3393            rebate_ccy: None,
3394            reduce_only: "false".to_string(),
3395            side: OKXSide::Buy,
3396            sl_ord_px: None,
3397            sl_trigger_px: None,
3398            sl_trigger_px_type: None,
3399            source: None,
3400            state: OKXOrderStatus::PartiallyFilled,
3401            stp_id: None,
3402            stp_mode: OKXSelfTradePreventionMode::None,
3403            exec_type: OKXExecType::Maker,
3404            sz: "0.02".to_string(),
3405            tag: None,
3406            td_mode: OKXTradeMode::Isolated,
3407            tgt_ccy: None,
3408            tp_ord_px: None,
3409            tp_trigger_px: None,
3410            tp_trigger_px_type: None,
3411            trade_id: "trade_transition_1".to_string(),
3412            u_time: 1746947317402,
3413            amend_result: None,
3414            req_id: None,
3415            code: None,
3416            msg: None,
3417        };
3418
3419        let fill_report_1 = parse_fill_report(
3420            &order_msg_1,
3421            &InstrumentAny::CryptoPerpetual(instrument.clone()),
3422            account_id,
3423            None,
3424            None,
3425            ts_init,
3426        )
3427        .unwrap()
3428        .unwrap();
3429
3430        // First fill gets rebate (negative)
3431        assert_eq!(fill_report_1.commission, Money::new(-1.0, Currency::USDT()));
3432
3433        // Second fill: taker charge of $5.0, net cumulative is now $2.0 charge
3434        // This is the edge case: incremental = 2.0 - (-1.0) = 3.0, which exceeds total (2.0)
3435        // But it's legitimate, not corruption
3436        let order_msg_2 = OKXOrderMsg {
3437            acc_fill_sz: Some("0.02".to_string()),
3438            algo_id: None,
3439            avg_px: "50000.0".to_string(),
3440            c_time: 1746947317401,
3441            cancel_source: None,
3442            cancel_source_reason: None,
3443            category: OKXOrderCategory::Normal,
3444            ccy: Ustr::from("USDT"),
3445            cl_ord_id: "test_order_transition".to_string(),
3446            algo_cl_ord_id: None,
3447            attach_algo_cl_ord_id: None,
3448            attach_algo_ords: Vec::new(),
3449            fee: Some("-2.0".to_string()), // Now a charge (negative from OKX)
3450            fee_ccy: Ustr::from("USDT"),
3451            fill_fee: None,
3452            fill_fee_ccy: None,
3453            fill_mark_px: None,
3454            fill_mark_vol: None,
3455            fill_px_vol: None,
3456            fill_px_usd: None,
3457            fill_fwd_px: None,
3458            fill_notional_usd: None,
3459            fill_pnl: None,
3460            fill_px: "50000.0".to_string(),
3461            fill_sz: "0.01".to_string(),
3462            fill_time: 1746947317403,
3463            inst_id: Ustr::from("BTC-USDT-SWAP"),
3464            inst_type: OKXInstrumentType::Swap,
3465            is_tp_limit: None,
3466            lever: "2.0".to_string(),
3467            linked_algo_ord: None,
3468            notional_usd: None,
3469            ord_id: Ustr::from("transition_order_456"),
3470            ord_type: OKXOrderType::Market,
3471            pnl: "0".to_string(),
3472            pos_side: OKXPositionSide::Long,
3473            px: String::new(),
3474            px_type: OKXPriceType::None,
3475            px_usd: None,
3476            px_vol: None,
3477            quick_mgn_type: OKXQuickMarginType::None,
3478            rebate: None,
3479            rebate_ccy: None,
3480            reduce_only: "false".to_string(),
3481            side: OKXSide::Buy,
3482            sl_ord_px: None,
3483            sl_trigger_px: None,
3484            sl_trigger_px_type: None,
3485            source: None,
3486            state: OKXOrderStatus::Filled,
3487            stp_id: None,
3488            stp_mode: OKXSelfTradePreventionMode::None,
3489            exec_type: OKXExecType::Taker,
3490            sz: "0.02".to_string(),
3491            tag: None,
3492            td_mode: OKXTradeMode::Isolated,
3493            tgt_ccy: None,
3494            tp_ord_px: None,
3495            tp_trigger_px: None,
3496            tp_trigger_px_type: None,
3497            trade_id: "trade_transition_2".to_string(),
3498            u_time: 1746947317403,
3499            amend_result: None,
3500            req_id: None,
3501            code: None,
3502            msg: None,
3503        };
3504
3505        let fill_report_2 = parse_fill_report(
3506            &order_msg_2,
3507            &InstrumentAny::CryptoPerpetual(instrument),
3508            account_id,
3509            Some(fill_report_1.commission),
3510            Some(fill_report_1.last_qty),
3511            ts_init,
3512        )
3513        .unwrap()
3514        .unwrap();
3515
3516        // Second fill: incremental = 2.0 - (-1.0) = 3.0
3517        // This should NOT trigger corruption detection because previous was negative
3518        assert_eq!(fill_report_2.commission, Money::new(3.0, Currency::USDT()));
3519    }
3520
3521    #[rstest]
3522    fn test_parse_fill_report_negative_incremental() {
3523        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
3524        let instrument = CryptoPerpetual::new(
3525            instrument_id,
3526            Symbol::from("BTC-USDT-SWAP"),
3527            Currency::BTC(),
3528            Currency::USDT(),
3529            Currency::USDT(),
3530            false,
3531            2,
3532            8,
3533            Price::from("0.01"),
3534            Quantity::from("0.00000001"),
3535            None,
3536            None,
3537            None,
3538            None,
3539            None,
3540            None,
3541            None,
3542            None,
3543            None,
3544            None,
3545            None,
3546            None,
3547            None,
3548            UnixNanos::default(),
3549            UnixNanos::default(),
3550        );
3551
3552        let account_id = AccountId::new("OKX-001");
3553        let ts_init = UnixNanos::default();
3554
3555        // First fill: charge of $2.0
3556        let order_msg_1 = OKXOrderMsg {
3557            acc_fill_sz: Some("0.01".to_string()),
3558            algo_id: None,
3559            avg_px: "50000.0".to_string(),
3560            c_time: 1746947317401,
3561            cancel_source: None,
3562            cancel_source_reason: None,
3563            category: OKXOrderCategory::Normal,
3564            ccy: Ustr::from("USDT"),
3565            cl_ord_id: "test_order_neg_inc".to_string(),
3566            algo_cl_ord_id: None,
3567            attach_algo_cl_ord_id: None,
3568            attach_algo_ords: Vec::new(),
3569            fee: Some("-2.0".to_string()),
3570            fee_ccy: Ustr::from("USDT"),
3571            fill_fee: None,
3572            fill_fee_ccy: None,
3573            fill_mark_px: None,
3574            fill_mark_vol: None,
3575            fill_px_vol: None,
3576            fill_px_usd: None,
3577            fill_fwd_px: None,
3578            fill_notional_usd: None,
3579            fill_pnl: None,
3580            fill_px: "50000.0".to_string(),
3581            fill_sz: "0.01".to_string(),
3582            fill_time: 1746947317402,
3583            inst_id: Ustr::from("BTC-USDT-SWAP"),
3584            inst_type: OKXInstrumentType::Swap,
3585            is_tp_limit: None,
3586            lever: "2.0".to_string(),
3587            linked_algo_ord: None,
3588            notional_usd: None,
3589            ord_id: Ustr::from("neg_inc_order_789"),
3590            ord_type: OKXOrderType::Market,
3591            pnl: "0".to_string(),
3592            pos_side: OKXPositionSide::Long,
3593            px: String::new(),
3594            px_type: OKXPriceType::None,
3595            px_usd: None,
3596            px_vol: None,
3597            quick_mgn_type: OKXQuickMarginType::None,
3598            rebate: None,
3599            rebate_ccy: None,
3600            reduce_only: "false".to_string(),
3601            side: OKXSide::Buy,
3602            sl_ord_px: None,
3603            sl_trigger_px: None,
3604            sl_trigger_px_type: None,
3605            source: None,
3606            state: OKXOrderStatus::PartiallyFilled,
3607            stp_id: None,
3608            stp_mode: OKXSelfTradePreventionMode::None,
3609            exec_type: OKXExecType::Taker,
3610            sz: "0.02".to_string(),
3611            tag: None,
3612            td_mode: OKXTradeMode::Isolated,
3613            tgt_ccy: None,
3614            tp_ord_px: None,
3615            tp_trigger_px: None,
3616            tp_trigger_px_type: None,
3617            trade_id: "trade_neg_inc_1".to_string(),
3618            u_time: 1746947317402,
3619            amend_result: None,
3620            req_id: None,
3621            code: None,
3622            msg: None,
3623        };
3624
3625        let fill_report_1 = parse_fill_report(
3626            &order_msg_1,
3627            &InstrumentAny::CryptoPerpetual(instrument.clone()),
3628            account_id,
3629            None,
3630            None,
3631            ts_init,
3632        )
3633        .unwrap()
3634        .unwrap();
3635
3636        assert_eq!(fill_report_1.commission, Money::new(2.0, Currency::USDT()));
3637
3638        // Second fill: charge reduced to $1.5 total (refund or maker rebate on this fill)
3639        // Incremental = 1.5 - 2.0 = -0.5 (negative incremental triggers debug log)
3640        let order_msg_2 = OKXOrderMsg {
3641            acc_fill_sz: Some("0.02".to_string()),
3642            algo_id: None,
3643            avg_px: "50000.0".to_string(),
3644            c_time: 1746947317401,
3645            cancel_source: None,
3646            cancel_source_reason: None,
3647            category: OKXOrderCategory::Normal,
3648            ccy: Ustr::from("USDT"),
3649            cl_ord_id: "test_order_neg_inc".to_string(),
3650            algo_cl_ord_id: None,
3651            attach_algo_cl_ord_id: None,
3652            attach_algo_ords: Vec::new(),
3653            fee: Some("-1.5".to_string()), // Total reduced
3654            fee_ccy: Ustr::from("USDT"),
3655            fill_fee: None,
3656            fill_fee_ccy: None,
3657            fill_mark_px: None,
3658            fill_mark_vol: None,
3659            fill_px_vol: None,
3660            fill_px_usd: None,
3661            fill_fwd_px: None,
3662            fill_notional_usd: None,
3663            fill_pnl: None,
3664            fill_px: "50000.0".to_string(),
3665            fill_sz: "0.01".to_string(),
3666            fill_time: 1746947317403,
3667            inst_id: Ustr::from("BTC-USDT-SWAP"),
3668            inst_type: OKXInstrumentType::Swap,
3669            is_tp_limit: None,
3670            lever: "2.0".to_string(),
3671            linked_algo_ord: None,
3672            notional_usd: None,
3673            ord_id: Ustr::from("neg_inc_order_789"),
3674            ord_type: OKXOrderType::Market,
3675            pnl: "0".to_string(),
3676            pos_side: OKXPositionSide::Long,
3677            px: String::new(),
3678            px_type: OKXPriceType::None,
3679            px_usd: None,
3680            px_vol: None,
3681            quick_mgn_type: OKXQuickMarginType::None,
3682            rebate: None,
3683            rebate_ccy: None,
3684            reduce_only: "false".to_string(),
3685            side: OKXSide::Buy,
3686            sl_ord_px: None,
3687            sl_trigger_px: None,
3688            sl_trigger_px_type: None,
3689            source: None,
3690            state: OKXOrderStatus::Filled,
3691            stp_id: None,
3692            stp_mode: OKXSelfTradePreventionMode::None,
3693            exec_type: OKXExecType::Maker,
3694            sz: "0.02".to_string(),
3695            tag: None,
3696            td_mode: OKXTradeMode::Isolated,
3697            tgt_ccy: None,
3698            tp_ord_px: None,
3699            tp_trigger_px: None,
3700            tp_trigger_px_type: None,
3701            trade_id: "trade_neg_inc_2".to_string(),
3702            u_time: 1746947317403,
3703            amend_result: None,
3704            req_id: None,
3705            code: None,
3706            msg: None,
3707        };
3708
3709        let fill_report_2 = parse_fill_report(
3710            &order_msg_2,
3711            &InstrumentAny::CryptoPerpetual(instrument),
3712            account_id,
3713            Some(fill_report_1.commission),
3714            Some(fill_report_1.last_qty),
3715            ts_init,
3716        )
3717        .unwrap()
3718        .unwrap();
3719
3720        // Incremental is negative: 1.5 - 2.0 = -0.5
3721        assert_eq!(fill_report_2.commission, Money::new(-0.5, Currency::USDT()));
3722    }
3723
3724    #[rstest]
3725    fn test_parse_fill_report_fee_currency_change_no_panic() {
3726        let instrument = create_stub_instrument();
3727        let account_id = AccountId::new("OKX-001");
3728        let ts_init = UnixNanos::default();
3729
3730        // First fill charged in USDT
3731        let previous_fee = Money::new(1.0, Currency::USDT());
3732
3733        // Second fill charged in BTC (fee currency changed)
3734        let mut order_msg =
3735            create_stub_order_msg("0.01", Some("0.02".to_string()), "1234567890", "trade_2");
3736        order_msg.fee = Some("-0.00005".to_string());
3737        order_msg.fee_ccy = Ustr::from("BTC");
3738
3739        let result = parse_fill_report(
3740            &order_msg,
3741            &InstrumentAny::CryptoPerpetual(instrument),
3742            account_id,
3743            Some(previous_fee),
3744            Some(Quantity::from("0.01")),
3745            ts_init,
3746        );
3747
3748        let fill_report = result.unwrap().unwrap();
3749        assert_eq!(fill_report.commission.currency, Currency::BTC());
3750    }
3751
3752    #[rstest]
3753    fn test_parse_fill_report_empty_fill_sz_first_fill() {
3754        let instrument = create_stub_instrument();
3755        let account_id = AccountId::new("OKX-001");
3756        let ts_init = UnixNanos::default();
3757
3758        let order_msg =
3759            create_stub_order_msg("", Some("0.01".to_string()), "1234567890", "trade_1");
3760
3761        let fill_report = parse_fill_report(
3762            &order_msg,
3763            &InstrumentAny::CryptoPerpetual(instrument),
3764            account_id,
3765            None,
3766            None,
3767            ts_init,
3768        )
3769        .unwrap()
3770        .unwrap();
3771
3772        assert_eq!(fill_report.last_qty, Quantity::from("0.01"));
3773    }
3774
3775    #[rstest]
3776    fn test_parse_fill_report_empty_fill_sz_subsequent_fills() {
3777        let instrument = create_stub_instrument();
3778        let account_id = AccountId::new("OKX-001");
3779        let ts_init = UnixNanos::default();
3780
3781        let order_msg_1 =
3782            create_stub_order_msg("", Some("0.01".to_string()), "1234567890", "trade_1");
3783
3784        let fill_report_1 = parse_fill_report(
3785            &order_msg_1,
3786            &InstrumentAny::CryptoPerpetual(instrument.clone()),
3787            account_id,
3788            None,
3789            None,
3790            ts_init,
3791        )
3792        .unwrap()
3793        .unwrap();
3794
3795        assert_eq!(fill_report_1.last_qty, Quantity::from("0.01"));
3796
3797        let order_msg_2 =
3798            create_stub_order_msg("", Some("0.03".to_string()), "1234567890", "trade_2");
3799
3800        let fill_report_2 = parse_fill_report(
3801            &order_msg_2,
3802            &InstrumentAny::CryptoPerpetual(instrument),
3803            account_id,
3804            Some(fill_report_1.commission),
3805            Some(fill_report_1.last_qty),
3806            ts_init,
3807        )
3808        .unwrap()
3809        .unwrap();
3810
3811        assert_eq!(fill_report_2.last_qty, Quantity::from("0.02"));
3812    }
3813
3814    #[rstest]
3815    fn test_parse_fill_report_error_both_empty() {
3816        let instrument = create_stub_instrument();
3817        let account_id = AccountId::new("OKX-001");
3818        let ts_init = UnixNanos::default();
3819
3820        let order_msg = create_stub_order_msg("", Some(String::new()), "1234567890", "trade_1");
3821
3822        let result = parse_fill_report(
3823            &order_msg,
3824            &InstrumentAny::CryptoPerpetual(instrument),
3825            account_id,
3826            None,
3827            None,
3828            ts_init,
3829        );
3830
3831        assert!(result.is_err());
3832        let err_msg = result.unwrap_err().to_string();
3833        assert!(err_msg.contains("Cannot determine fill quantity"));
3834        assert!(err_msg.contains("empty/zero"));
3835    }
3836
3837    #[rstest]
3838    fn test_parse_fill_report_error_acc_fill_sz_none() {
3839        let instrument = create_stub_instrument();
3840        let account_id = AccountId::new("OKX-001");
3841        let ts_init = UnixNanos::default();
3842
3843        let order_msg = create_stub_order_msg("", None, "1234567890", "trade_1");
3844
3845        let result = parse_fill_report(
3846            &order_msg,
3847            &InstrumentAny::CryptoPerpetual(instrument),
3848            account_id,
3849            None,
3850            None,
3851            ts_init,
3852        );
3853
3854        assert!(result.is_err());
3855        let err_msg = result.unwrap_err().to_string();
3856        assert!(err_msg.contains("Cannot determine fill quantity"));
3857        assert!(err_msg.contains("acc_fill_sz is None"));
3858    }
3859
3860    #[rstest]
3861    fn test_parse_fill_report_error_acc_fill_sz_less_than_previous() {
3862        let instrument = create_stub_instrument();
3863        let account_id = AccountId::new("OKX-001");
3864        let ts_init = UnixNanos::default();
3865
3866        // acc_fill_sz (0.01) < previous_filled_qty (0.03) — stale data after reconnect
3867        let order_msg =
3868            create_stub_order_msg("", Some("0.01".to_string()), "1234567890", "trade_2");
3869
3870        let result = parse_fill_report(
3871            &order_msg,
3872            &InstrumentAny::CryptoPerpetual(instrument),
3873            account_id,
3874            None,
3875            Some(Quantity::from("0.03")),
3876            ts_init,
3877        );
3878
3879        assert!(result.is_err());
3880        let err_msg = result.unwrap_err().to_string();
3881        assert!(err_msg.contains("Cumulative fill went backwards"));
3882    }
3883
3884    #[rstest]
3885    fn test_parse_order_msg_acc_fill_sz_only_update() {
3886        // Test that we emit fill reports when OKX only updates acc_fill_sz without fill_sz or trade_id
3887        let instrument = create_stub_instrument();
3888        let account_id = AccountId::new("OKX-001");
3889        let ts_init = UnixNanos::default();
3890
3891        let mut instruments = AHashMap::new();
3892        instruments.insert(
3893            Ustr::from("BTC-USDT-SWAP"),
3894            InstrumentAny::CryptoPerpetual(instrument),
3895        );
3896
3897        let fee_cache = AHashMap::new();
3898        let mut filled_qty_cache = AHashMap::new();
3899
3900        // First update: acc_fill_sz = 0.01, no fill_sz, no trade_id
3901        let msg_1 = create_stub_order_msg("", Some("0.01".to_string()), "1234567890", "");
3902
3903        let report_1 = parse_order_msg(
3904            &msg_1,
3905            account_id,
3906            &instruments,
3907            &fee_cache,
3908            &filled_qty_cache,
3909            ts_init,
3910        )
3911        .unwrap();
3912
3913        // Should generate a fill report (not a status report)
3914        assert!(matches!(report_1, ExecutionReport::Fill(_)));
3915        if let ExecutionReport::Fill(fill) = &report_1 {
3916            assert_eq!(fill.last_qty, Quantity::from("0.01"));
3917        }
3918
3919        // Update cache
3920        filled_qty_cache.insert(Ustr::from("1234567890"), Quantity::from("0.01"));
3921
3922        // Second update: acc_fill_sz increased to 0.03, still no fill_sz or trade_id
3923        let msg_2 = create_stub_order_msg("", Some("0.03".to_string()), "1234567890", "");
3924
3925        let report_2 = parse_order_msg(
3926            &msg_2,
3927            account_id,
3928            &instruments,
3929            &fee_cache,
3930            &filled_qty_cache,
3931            ts_init,
3932        )
3933        .unwrap();
3934
3935        // Should still generate a fill report for the incremental 0.02
3936        assert!(matches!(report_2, ExecutionReport::Fill(_)));
3937        if let ExecutionReport::Fill(fill) = &report_2 {
3938            assert_eq!(fill.last_qty, Quantity::from("0.02"));
3939        }
3940    }
3941
3942    #[rstest]
3943    fn test_parse_book10_msg_partial_levels() {
3944        // Test with fewer than 10 levels - should pad with empty orders
3945        let book_msg = OKXBookMsg {
3946            asks: vec![
3947                OrderBookEntry {
3948                    price: "8476.98".to_string(),
3949                    size: "415".to_string(),
3950                    liquidated_orders_count: "0".to_string(),
3951                    orders_count: "13".to_string(),
3952                },
3953                OrderBookEntry {
3954                    price: "8477.00".to_string(),
3955                    size: "7".to_string(),
3956                    liquidated_orders_count: "0".to_string(),
3957                    orders_count: "2".to_string(),
3958                },
3959            ],
3960            bids: vec![OrderBookEntry {
3961                price: "8476.97".to_string(),
3962                size: "256".to_string(),
3963                liquidated_orders_count: "0".to_string(),
3964                orders_count: "12".to_string(),
3965            }],
3966            ts: 1597026383085,
3967            checksum: None,
3968            prev_seq_id: None,
3969            seq_id: 123456,
3970        };
3971
3972        let instrument_id = InstrumentId::from("BTC-USDT.OKX");
3973        let depth10 =
3974            parse_book10_msg(&book_msg, instrument_id, 2, 0, UnixNanos::default()).unwrap();
3975
3976        // Check that first levels have data
3977        assert_eq!(depth10.bids[0].price, Price::from("8476.97"));
3978        assert_eq!(depth10.bids[0].size, Quantity::from("256"));
3979        assert_eq!(depth10.bid_counts[0], 12);
3980
3981        // Check that remaining levels are padded with default (empty) orders
3982        assert_eq!(depth10.bids[1].price, Price::from("0"));
3983        assert_eq!(depth10.bids[1].size, Quantity::from("0"));
3984        assert_eq!(depth10.bid_counts[1], 0);
3985
3986        // Check asks
3987        assert_eq!(depth10.asks[0].price, Price::from("8476.98"));
3988        assert_eq!(depth10.asks[1].price, Price::from("8477.00"));
3989        assert_eq!(depth10.asks[2].price, Price::from("0")); // padded with empty
3990    }
3991
3992    #[rstest]
3993    fn test_parse_algo_order_msg_stop_market() {
3994        let json_data = load_test_json("ws_orders_algo.json");
3995        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3996        let data: Vec<OKXAlgoOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
3997
3998        // Test first algo order (stop market sell)
3999        let msg = &data[0];
4000        assert_eq!(msg.algo_id, "706620792746729472");
4001        assert_eq!(msg.algo_cl_ord_id, "STOP001BTCUSDT20250120");
4002        assert_eq!(msg.state, OKXOrderStatus::Live);
4003        assert_eq!(msg.ord_px, "-1"); // Market order indicator
4004
4005        let account_id = AccountId::new("OKX-001");
4006        let mut instruments = AHashMap::new();
4007
4008        // Create mock instrument
4009        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
4010        let instrument = CryptoPerpetual::new(
4011            instrument_id,
4012            Symbol::from("BTC-USDT-SWAP"),
4013            Currency::BTC(),
4014            Currency::USDT(),
4015            Currency::USDT(),
4016            false, // is_inverse
4017            2,     // price_precision
4018            8,     // size_precision
4019            Price::from("0.01"),
4020            Quantity::from("0.00000001"),
4021            None,
4022            None,
4023            None,
4024            None,
4025            None,
4026            None,
4027            None,
4028            None,
4029            None,
4030            None,
4031            None,
4032            None,
4033            None,
4034            0.into(), // ts_event
4035            0.into(), // ts_init
4036        );
4037        instruments.insert(
4038            Ustr::from("BTC-USDT-SWAP"),
4039            InstrumentAny::CryptoPerpetual(instrument),
4040        );
4041
4042        let result = parse_algo_order_msg(msg, account_id, &instruments, UnixNanos::default());
4043
4044        let report = result.unwrap().unwrap();
4045
4046        if let ExecutionReport::Order(status_report) = report {
4047            assert_eq!(status_report.order_type, OrderType::StopMarket);
4048            assert_eq!(status_report.order_side, OrderSide::Sell);
4049            assert_eq!(status_report.quantity, Quantity::from("0.01000000"));
4050            assert_eq!(status_report.trigger_price, Some(Price::from("95000.00")));
4051            assert_eq!(status_report.trigger_type, Some(TriggerType::LastPrice));
4052            assert_eq!(status_report.price, None); // No limit price for market orders
4053        } else {
4054            panic!("Expected Order report");
4055        }
4056    }
4057
4058    #[rstest]
4059    fn test_parse_algo_order_msg_stop_limit() {
4060        let json_data = load_test_json("ws_orders_algo.json");
4061        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4062        let data: Vec<OKXAlgoOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
4063
4064        // Test second algo order (stop limit buy)
4065        let msg = &data[1];
4066        assert_eq!(msg.algo_id, "706620792746729473");
4067        assert_eq!(msg.state, OKXOrderStatus::Live);
4068        assert_eq!(msg.ord_px, "106000"); // Limit price
4069
4070        let account_id = AccountId::new("OKX-001");
4071        let mut instruments = AHashMap::new();
4072
4073        // Create mock instrument
4074        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
4075        let instrument = CryptoPerpetual::new(
4076            instrument_id,
4077            Symbol::from("BTC-USDT-SWAP"),
4078            Currency::BTC(),
4079            Currency::USDT(),
4080            Currency::USDT(),
4081            false, // is_inverse
4082            2,     // price_precision
4083            8,     // size_precision
4084            Price::from("0.01"),
4085            Quantity::from("0.00000001"),
4086            None,
4087            None,
4088            None,
4089            None,
4090            None,
4091            None,
4092            None,
4093            None,
4094            None,
4095            None,
4096            None,
4097            None,
4098            None,
4099            0.into(), // ts_event
4100            0.into(), // ts_init
4101        );
4102        instruments.insert(
4103            Ustr::from("BTC-USDT-SWAP"),
4104            InstrumentAny::CryptoPerpetual(instrument),
4105        );
4106
4107        let result = parse_algo_order_msg(msg, account_id, &instruments, UnixNanos::default());
4108
4109        let report = result.unwrap().unwrap();
4110
4111        if let ExecutionReport::Order(status_report) = report {
4112            assert_eq!(status_report.order_type, OrderType::StopLimit);
4113            assert_eq!(status_report.order_side, OrderSide::Buy);
4114            assert_eq!(status_report.quantity, Quantity::from("0.02000000"));
4115            assert_eq!(status_report.trigger_price, Some(Price::from("105000.00")));
4116            assert_eq!(status_report.trigger_type, Some(TriggerType::MarkPrice));
4117            assert_eq!(status_report.price, Some(Price::from("106000.00"))); // Has limit price
4118        } else {
4119            panic!("Expected Order report");
4120        }
4121    }
4122
4123    #[rstest]
4124    fn test_parse_trigger_order_from_regular_channel() {
4125        let json_data = load_test_json("ws_orders_trigger.json");
4126        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4127        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
4128
4129        // Test triggered order that came through regular orders channel
4130        let msg = &data[0];
4131        assert_eq!(msg.ord_type, OKXOrderType::Trigger);
4132        assert_eq!(msg.state, OKXOrderStatus::Filled);
4133
4134        let account_id = AccountId::new("OKX-001");
4135        let mut instruments = AHashMap::new();
4136
4137        // Create mock instrument
4138        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
4139        let instrument = CryptoPerpetual::new(
4140            instrument_id,
4141            Symbol::from("BTC-USDT-SWAP"),
4142            Currency::BTC(),
4143            Currency::USDT(),
4144            Currency::USDT(),
4145            false, // is_inverse
4146            2,     // price_precision
4147            8,     // size_precision
4148            Price::from("0.01"),
4149            Quantity::from("0.00000001"),
4150            None,
4151            None,
4152            None,
4153            None,
4154            None,
4155            None,
4156            None,
4157            None,
4158            None,
4159            None,
4160            None,
4161            None,
4162            None,
4163            0.into(), // ts_event
4164            0.into(), // ts_init
4165        );
4166        instruments.insert(
4167            Ustr::from("BTC-USDT-SWAP"),
4168            InstrumentAny::CryptoPerpetual(instrument),
4169        );
4170
4171        let mut fee_cache = AHashMap::new();
4172        let mut filled_qty_cache = AHashMap::new();
4173
4174        let result = parse_order_msg_vec(
4175            std::slice::from_ref(msg),
4176            account_id,
4177            &instruments,
4178            &mut fee_cache,
4179            &mut filled_qty_cache,
4180            UnixNanos::default(),
4181        );
4182
4183        assert!(result.is_ok());
4184        let reports = result.unwrap();
4185        assert_eq!(reports.len(), 1);
4186
4187        if let ExecutionReport::Fill(fill_report) = &reports[0] {
4188            assert_eq!(fill_report.order_side, OrderSide::Sell);
4189            assert_eq!(fill_report.last_qty, Quantity::from("0.01000000"));
4190            assert_eq!(fill_report.last_px, Price::from("101950.00"));
4191        } else {
4192            panic!("Expected Fill report for filled trigger order");
4193        }
4194    }
4195
4196    #[rstest]
4197    fn test_parse_liquidation_order() {
4198        let json_data = load_test_json("ws_orders_liquidation.json");
4199        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4200        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
4201
4202        // Test liquidation order
4203        let msg = &data[0];
4204        assert_eq!(msg.category, OKXOrderCategory::FullLiquidation);
4205        assert_eq!(msg.state, OKXOrderStatus::Filled);
4206        assert_eq!(msg.inst_id.as_str(), "BTC-USDT-SWAP");
4207
4208        let account_id = AccountId::new("OKX-001");
4209        let mut instruments = AHashMap::new();
4210
4211        // Create mock instrument
4212        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
4213        let instrument = CryptoPerpetual::new(
4214            instrument_id,
4215            Symbol::from("BTC-USDT-SWAP"),
4216            Currency::BTC(),
4217            Currency::USDT(),
4218            Currency::USDT(),
4219            false, // is_inverse
4220            2,     // price_precision
4221            8,     // size_precision
4222            Price::from("0.01"),
4223            Quantity::from("0.00000001"),
4224            None,
4225            None,
4226            None,
4227            None,
4228            None,
4229            None,
4230            None,
4231            None,
4232            None,
4233            None,
4234            None,
4235            None,
4236            None,
4237            0.into(), // ts_event
4238            0.into(), // ts_init
4239        );
4240        instruments.insert(
4241            Ustr::from("BTC-USDT-SWAP"),
4242            InstrumentAny::CryptoPerpetual(instrument),
4243        );
4244        let mut fee_cache = AHashMap::new();
4245        let mut filled_qty_cache = AHashMap::new();
4246
4247        let result = parse_order_msg_vec(
4248            std::slice::from_ref(msg),
4249            account_id,
4250            &instruments,
4251            &mut fee_cache,
4252            &mut filled_qty_cache,
4253            UnixNanos::default(),
4254        );
4255
4256        assert!(result.is_ok());
4257        let reports = result.unwrap();
4258        assert_eq!(reports.len(), 1);
4259
4260        // Verify it's a fill report for a liquidation
4261        if let ExecutionReport::Fill(fill_report) = &reports[0] {
4262            assert_eq!(fill_report.order_side, OrderSide::Sell);
4263            assert_eq!(fill_report.last_qty, Quantity::from("0.50000000"));
4264            assert_eq!(fill_report.last_px, Price::from("40000.00"));
4265            assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
4266        } else {
4267            panic!("Expected Fill report for liquidation order");
4268        }
4269    }
4270
4271    #[rstest]
4272    fn test_parse_adl_order() {
4273        let json_data = load_test_json("ws_orders_adl.json");
4274        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4275        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
4276
4277        // Test ADL order
4278        let msg = &data[0];
4279        assert_eq!(msg.category, OKXOrderCategory::Adl);
4280        assert_eq!(msg.state, OKXOrderStatus::Filled);
4281        assert_eq!(msg.inst_id.as_str(), "ETH-USDT-SWAP");
4282
4283        let account_id = AccountId::new("OKX-001");
4284        let mut instruments = AHashMap::new();
4285
4286        // Create mock instrument
4287        let instrument_id = InstrumentId::from("ETH-USDT-SWAP.OKX");
4288        let instrument = CryptoPerpetual::new(
4289            instrument_id,
4290            Symbol::from("ETH-USDT-SWAP"),
4291            Currency::ETH(),
4292            Currency::USDT(),
4293            Currency::USDT(),
4294            false, // is_inverse
4295            2,     // price_precision
4296            8,     // size_precision
4297            Price::from("0.01"),
4298            Quantity::from("0.00000001"),
4299            None,
4300            None,
4301            None,
4302            None,
4303            None,
4304            None,
4305            None,
4306            None,
4307            None,
4308            None,
4309            None,
4310            None,
4311            None,
4312            0.into(), // ts_event
4313            0.into(), // ts_init
4314        );
4315        instruments.insert(
4316            Ustr::from("ETH-USDT-SWAP"),
4317            InstrumentAny::CryptoPerpetual(instrument),
4318        );
4319
4320        let mut fee_cache = AHashMap::new();
4321        let mut filled_qty_cache = AHashMap::new();
4322
4323        let result = parse_order_msg_vec(
4324            std::slice::from_ref(msg),
4325            account_id,
4326            &instruments,
4327            &mut fee_cache,
4328            &mut filled_qty_cache,
4329            UnixNanos::default(),
4330        );
4331
4332        assert!(result.is_ok());
4333        let reports = result.unwrap();
4334        assert_eq!(reports.len(), 1);
4335
4336        // Verify it's a fill report for ADL
4337        if let ExecutionReport::Fill(fill_report) = &reports[0] {
4338            assert_eq!(fill_report.order_side, OrderSide::Buy);
4339            assert_eq!(fill_report.last_qty, Quantity::from("0.30000000"));
4340            assert_eq!(fill_report.last_px, Price::from("41000.00"));
4341            assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
4342        } else {
4343            panic!("Expected Fill report for ADL order");
4344        }
4345    }
4346
4347    #[rstest]
4348    fn test_parse_unknown_category_graceful_fallback() {
4349        // Test that unknown/future category values deserialize as Other instead of failing
4350        let json_with_unknown_category = r#"{
4351            "category": "some_future_category_we_dont_know"
4352        }"#;
4353
4354        let result: Result<serde_json::Value, _> = serde_json::from_str(json_with_unknown_category);
4355        assert!(result.is_ok());
4356
4357        // Test deserialization of the category field directly
4358        let category_result: Result<OKXOrderCategory, _> =
4359            serde_json::from_str(r#""some_future_category""#);
4360        assert!(category_result.is_ok());
4361        assert_eq!(category_result.unwrap(), OKXOrderCategory::Other);
4362
4363        // Verify known categories still work
4364        let normal: OKXOrderCategory = serde_json::from_str(r#""normal""#).unwrap();
4365        assert_eq!(normal, OKXOrderCategory::Normal);
4366
4367        let twap: OKXOrderCategory = serde_json::from_str(r#""twap""#).unwrap();
4368        assert_eq!(twap, OKXOrderCategory::Twap);
4369    }
4370
4371    #[rstest]
4372    fn test_parse_partial_liquidation_order() {
4373        // Create a test message with partial liquidation category
4374        let account_id = AccountId::new("OKX-001");
4375        let mut instruments = AHashMap::new();
4376
4377        let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
4378        let instrument = CryptoPerpetual::new(
4379            instrument_id,
4380            Symbol::from("BTC-USDT-SWAP"),
4381            Currency::BTC(),
4382            Currency::USDT(),
4383            Currency::USDT(),
4384            false,
4385            2,
4386            8,
4387            Price::from("0.01"),
4388            Quantity::from("0.00000001"),
4389            None,
4390            None,
4391            None,
4392            None,
4393            None,
4394            None,
4395            None,
4396            None,
4397            None,
4398            None,
4399            None,
4400            None,
4401            None,
4402            0.into(),
4403            0.into(),
4404        );
4405        instruments.insert(
4406            Ustr::from("BTC-USDT-SWAP"),
4407            InstrumentAny::CryptoPerpetual(instrument),
4408        );
4409
4410        let partial_liq_msg = OKXOrderMsg {
4411            acc_fill_sz: Some("0.25".to_string()),
4412            algo_id: None,
4413            avg_px: "39000.0".to_string(),
4414            c_time: 1746947317401,
4415            cancel_source: None,
4416            cancel_source_reason: None,
4417            category: OKXOrderCategory::PartialLiquidation,
4418            ccy: Ustr::from("USDT"),
4419            cl_ord_id: String::new(),
4420            algo_cl_ord_id: None,
4421            attach_algo_cl_ord_id: None,
4422            attach_algo_ords: Vec::new(),
4423            fee: Some("-9.75".to_string()),
4424            fee_ccy: Ustr::from("USDT"),
4425            fill_fee: None,
4426            fill_fee_ccy: None,
4427            fill_mark_px: None,
4428            fill_mark_vol: None,
4429            fill_px_vol: None,
4430            fill_px_usd: None,
4431            fill_fwd_px: None,
4432            fill_notional_usd: None,
4433            fill_pnl: None,
4434            fill_px: "39000.0".to_string(),
4435            fill_sz: "0.25".to_string(),
4436            fill_time: 1746947317402,
4437            inst_id: Ustr::from("BTC-USDT-SWAP"),
4438            inst_type: OKXInstrumentType::Swap,
4439            is_tp_limit: None,
4440            lever: "10.0".to_string(),
4441            linked_algo_ord: None,
4442            notional_usd: None,
4443            ord_id: Ustr::from("2497956918703120888"),
4444            ord_type: OKXOrderType::Market,
4445            pnl: "-2500".to_string(),
4446            pos_side: OKXPositionSide::Long,
4447            px: String::new(),
4448            px_type: OKXPriceType::None,
4449            px_usd: None,
4450            px_vol: None,
4451            quick_mgn_type: OKXQuickMarginType::None,
4452            rebate: None,
4453            rebate_ccy: None,
4454            reduce_only: "false".to_string(),
4455            side: OKXSide::Sell,
4456            sl_ord_px: None,
4457            sl_trigger_px: None,
4458            sl_trigger_px_type: None,
4459            source: None,
4460            state: OKXOrderStatus::Filled,
4461            stp_id: None,
4462            stp_mode: OKXSelfTradePreventionMode::None,
4463            exec_type: OKXExecType::Taker,
4464            sz: "0.25".to_string(),
4465            tag: None,
4466            td_mode: OKXTradeMode::Isolated,
4467            tgt_ccy: None,
4468            tp_ord_px: None,
4469            tp_trigger_px: None,
4470            tp_trigger_px_type: None,
4471            trade_id: "1518905888".to_string(),
4472            u_time: 1746947317402,
4473            amend_result: None,
4474            req_id: None,
4475            code: None,
4476            msg: None,
4477        };
4478
4479        let fee_cache = AHashMap::new();
4480        let filled_qty_cache = AHashMap::new();
4481        let result = parse_order_msg(
4482            &partial_liq_msg,
4483            account_id,
4484            &instruments,
4485            &fee_cache,
4486            &filled_qty_cache,
4487            UnixNanos::default(),
4488        );
4489
4490        assert!(result.is_ok());
4491        let report = result.unwrap();
4492
4493        // Verify it's a fill report for partial liquidation
4494        if let ExecutionReport::Fill(fill_report) = report {
4495            assert_eq!(fill_report.order_side, OrderSide::Sell);
4496            assert_eq!(fill_report.last_qty, Quantity::from("0.25000000"));
4497            assert_eq!(fill_report.last_px, Price::from("39000.00"));
4498        } else {
4499            panic!("Expected Fill report for partial liquidation order");
4500        }
4501    }
4502
4503    #[rstest]
4504    fn test_websocket_instrument_update_preserves_cached_fees() {
4505        use nautilus_model::{identifiers::InstrumentId, instruments::InstrumentAny};
4506
4507        use crate::common::{models::OKXInstrument, parse::parse_instrument_any};
4508
4509        let ts_init = UnixNanos::default();
4510
4511        // Create initial instrument with fees (simulating HTTP load)
4512        // These values are already in Nautilus format (HTTP client negates OKX values)
4513        let initial_fees = (
4514            Some(Decimal::new(8, 4)),  // Nautilus: 0.0008 (commission)
4515            Some(Decimal::new(10, 4)), // Nautilus: 0.0010 (commission)
4516        );
4517
4518        // Deserialize initial instrument from JSON
4519        let initial_inst_json = serde_json::json!({
4520            "instType": "SPOT",
4521            "instId": "BTC-USD",
4522            "baseCcy": "BTC",
4523            "quoteCcy": "USD",
4524            "settleCcy": "",
4525            "ctVal": "",
4526            "ctMult": "",
4527            "ctValCcy": "",
4528            "optType": "",
4529            "stk": "",
4530            "listTime": "1733454000000",
4531            "expTime": "",
4532            "lever": "",
4533            "tickSz": "0.1",
4534            "lotSz": "0.00000001",
4535            "minSz": "0.00001",
4536            "ctType": "linear",
4537            "alias": "",
4538            "state": "live",
4539            "maxLmtSz": "9999999999",
4540            "maxMktSz": "1000000",
4541            "maxTwapSz": "9999999999.0000000000000000",
4542            "maxIcebergSz": "9999999999.0000000000000000",
4543            "maxTriggerSz": "9999999999.0000000000000000",
4544            "maxStopSz": "1000000",
4545            "uly": "",
4546            "instFamily": "",
4547            "ruleType": "normal",
4548            "maxLmtAmt": "20000000",
4549            "maxMktAmt": "1000000"
4550        });
4551
4552        let initial_inst: OKXInstrument = serde_json::from_value(initial_inst_json)
4553            .expect("Failed to deserialize initial instrument");
4554
4555        // Parse initial instrument with fees
4556        let parsed_initial = parse_instrument_any(
4557            &initial_inst,
4558            None,
4559            None,
4560            initial_fees.0,
4561            initial_fees.1,
4562            ts_init,
4563        )
4564        .expect("Failed to parse initial instrument")
4565        .expect("Initial instrument should not be None");
4566
4567        // Verify fees were applied
4568        if let InstrumentAny::CurrencyPair(ref pair) = parsed_initial {
4569            assert_eq!(pair.maker_fee, dec!(0.0008));
4570            assert_eq!(pair.taker_fee, dec!(0.0010));
4571        } else {
4572            panic!("Expected CurrencyPair instrument");
4573        }
4574
4575        // Build instrument cache with the initial instrument
4576        let mut instruments_cache = AHashMap::new();
4577        instruments_cache.insert(Ustr::from("BTC-USD"), parsed_initial);
4578
4579        // Create WebSocket update message (same structure as initial, simulating a WebSocket update)
4580        let ws_update = serde_json::json!({
4581            "instType": "SPOT",
4582            "instId": "BTC-USD",
4583            "baseCcy": "BTC",
4584            "quoteCcy": "USD",
4585            "settleCcy": "",
4586            "ctVal": "",
4587            "ctMult": "",
4588            "ctValCcy": "",
4589            "optType": "",
4590            "stk": "",
4591            "listTime": "1733454000000",
4592            "expTime": "",
4593            "lever": "",
4594            "tickSz": "0.1",
4595            "lotSz": "0.00000001",
4596            "minSz": "0.00001",
4597            "ctType": "linear",
4598            "alias": "",
4599            "state": "live",
4600            "maxLmtSz": "9999999999",
4601            "maxMktSz": "1000000",
4602            "maxTwapSz": "9999999999.0000000000000000",
4603            "maxIcebergSz": "9999999999.0000000000000000",
4604            "maxTriggerSz": "9999999999.0000000000000000",
4605            "maxStopSz": "1000000",
4606            "uly": "",
4607            "instFamily": "",
4608            "ruleType": "normal",
4609            "maxLmtAmt": "20000000",
4610            "maxMktAmt": "1000000"
4611        });
4612
4613        let instrument_id = InstrumentId::from("BTC-USD.OKX");
4614        let mut funding_cache = AHashMap::new();
4615
4616        // Parse WebSocket update with cache
4617        let result = parse_ws_message_data(
4618            &OKXWsChannel::Instruments,
4619            ws_update,
4620            &instrument_id,
4621            2,
4622            8,
4623            ts_init,
4624            &mut funding_cache,
4625            &instruments_cache,
4626        )
4627        .expect("Failed to parse WebSocket instrument update");
4628
4629        // Verify the update preserves the cached fees
4630        if let Some(NautilusWsMessage::Instrument(boxed_inst, _status)) = result {
4631            if let InstrumentAny::CurrencyPair(pair) = *boxed_inst {
4632                assert_eq!(
4633                    pair.maker_fee,
4634                    Decimal::new(8, 4),
4635                    "Maker fee should be preserved from cache"
4636                );
4637                assert_eq!(
4638                    pair.taker_fee,
4639                    Decimal::new(10, 4),
4640                    "Taker fee should be preserved from cache"
4641                );
4642            } else {
4643                panic!("Expected CurrencyPair instrument from WebSocket update");
4644            }
4645        } else {
4646            panic!("Expected Instrument message from WebSocket update");
4647        }
4648    }
4649
4650    #[rstest]
4651    #[case::fok_order(OKXOrderType::Fok, TimeInForce::Fok)]
4652    #[case::ioc_order(OKXOrderType::Ioc, TimeInForce::Ioc)]
4653    #[case::optimal_limit_ioc_order(OKXOrderType::OptimalLimitIoc, TimeInForce::Ioc)]
4654    #[case::market_order(OKXOrderType::Market, TimeInForce::Gtc)]
4655    #[case::limit_order(OKXOrderType::Limit, TimeInForce::Gtc)]
4656    fn test_parse_time_in_force_from_ord_type(
4657        #[case] okx_ord_type: OKXOrderType,
4658        #[case] expected_tif: TimeInForce,
4659    ) {
4660        let time_in_force = match okx_ord_type {
4661            OKXOrderType::Fok | OKXOrderType::OpFok => TimeInForce::Fok,
4662            OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => TimeInForce::Ioc,
4663            _ => TimeInForce::Gtc,
4664        };
4665
4666        assert_eq!(
4667            time_in_force, expected_tif,
4668            "OKXOrderType::{okx_ord_type:?} should parse to TimeInForce::{expected_tif:?}"
4669        );
4670    }
4671
4672    #[rstest]
4673    fn test_deserialize_fok_order_message() {
4674        let json_data = load_test_json("ws_orders_fok.json");
4675        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4676        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
4677
4678        assert!(!data.is_empty());
4679        assert_eq!(data[0].ord_type, OKXOrderType::Fok);
4680        assert_eq!(data[0].cl_ord_id, "FOK-TEST-001");
4681        assert_eq!(data[0].inst_id, Ustr::from("BTC-USDT"));
4682    }
4683
4684    #[rstest]
4685    fn test_deserialize_ioc_order_message() {
4686        let json_data = load_test_json("ws_orders_ioc.json");
4687        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4688        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
4689
4690        assert!(!data.is_empty());
4691        assert_eq!(data[0].ord_type, OKXOrderType::Ioc);
4692        assert_eq!(data[0].cl_ord_id, "IOC-TEST-001");
4693        assert_eq!(data[0].inst_id, Ustr::from("BTC-USDT"));
4694    }
4695
4696    #[rstest]
4697    fn test_deserialize_optimal_limit_ioc_order_message() {
4698        let json_data = load_test_json("ws_orders_optimal_limit_ioc.json");
4699        let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4700        let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
4701
4702        assert!(!data.is_empty());
4703        assert_eq!(data[0].ord_type, OKXOrderType::OptimalLimitIoc);
4704        assert_eq!(data[0].cl_ord_id, "OPTIMAL-IOC-TEST-001");
4705        assert_eq!(data[0].inst_id, Ustr::from("BTC-USDT-SWAP"));
4706    }
4707
4708    #[rstest]
4709    fn test_deserialize_regular_order_message() {
4710        let json_data = load_test_json("ws_orders.json");
4711        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4712        let data: Vec<OKXOrderMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
4713
4714        assert!(!data.is_empty());
4715        assert_eq!(data[0].inst_id, Ustr::from("BTC-USDT-SWAP"));
4716        assert_eq!(data[0].state, OKXOrderStatus::Filled);
4717        assert_eq!(data[0].category, OKXOrderCategory::Normal);
4718        assert_eq!(data[0].rebate.as_deref(), Some("0"));
4719        assert_eq!(data[0].rebate_ccy.as_deref(), Some("USDT"));
4720        assert_eq!(data[0].stp_mode, OKXSelfTradePreventionMode::CancelMaker);
4721        assert!(data[0].linked_algo_ord.is_some());
4722        assert_eq!(data[0].tag.as_deref(), Some(""));
4723        assert_eq!(data[0].source.as_deref(), Some(""));
4724    }
4725
4726    #[rstest]
4727    fn test_deserialize_algo_order_message() {
4728        let json_data = load_test_json("ws_orders_algo.json");
4729        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4730        let data: Vec<OKXAlgoOrderMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
4731
4732        assert!(!data.is_empty());
4733        assert_eq!(data[0].inst_id, Ustr::from("BTC-USDT-SWAP"));
4734    }
4735
4736    #[rstest]
4737    fn test_deserialize_algo_order_missing_trigger_px_type() {
4738        // algo-advance channel messages omit triggerPxType
4739        let json = r#"{
4740            "algoId": "123",
4741            "algoClOrdId": "cl_1",
4742            "clOrdId": "",
4743            "ordId": "",
4744            "instId": "BTC-USDT-SWAP",
4745            "instType": "SWAP",
4746            "ordType": "move_order_stop",
4747            "state": "live",
4748            "side": "sell",
4749            "posSide": "long",
4750            "sz": "0.01",
4751            "triggerPx": "95000",
4752            "ordPx": "-1",
4753            "tdMode": "cross",
4754            "lever": "",
4755            "reduceOnly": "false",
4756            "actualPx": "",
4757            "actualSz": "",
4758            "notionalUsd": "",
4759            "cTime": "1706000000000",
4760            "uTime": "1706000001000",
4761            "triggerTime": "",
4762            "tag": "",
4763            "callbackRatio": "0.01",
4764            "callbackSpread": "",
4765            "activePx": ""
4766        }"#;
4767
4768        let msg: OKXAlgoOrderMsg = serde_json::from_str(json).unwrap();
4769
4770        assert_eq!(msg.trigger_px_type, OKXTriggerType::None);
4771        assert_eq!(msg.ord_type, OKXAlgoOrderType::MoveOrderStop);
4772    }
4773
4774    #[rstest]
4775    fn test_deserialize_liquidation_order_message() {
4776        let json_data = load_test_json("ws_orders_liquidation.json");
4777        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4778        let data: Vec<OKXOrderMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
4779
4780        assert!(!data.is_empty());
4781        assert_eq!(data[0].category, OKXOrderCategory::FullLiquidation);
4782    }
4783
4784    #[rstest]
4785    fn test_deserialize_adl_order_message() {
4786        let json_data = load_test_json("ws_orders_adl.json");
4787        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4788        let data: Vec<OKXOrderMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
4789
4790        assert!(!data.is_empty());
4791        assert_eq!(data[0].category, OKXOrderCategory::Adl);
4792    }
4793
4794    #[rstest]
4795    fn test_deserialize_trigger_order_message() {
4796        let json_data = load_test_json("ws_orders_trigger.json");
4797        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4798        let data: Vec<OKXOrderMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
4799
4800        assert!(!data.is_empty());
4801        assert_eq!(data[0].ord_type, OKXOrderType::Trigger);
4802        assert_eq!(data[0].category, OKXOrderCategory::Normal);
4803    }
4804
4805    #[rstest]
4806    fn test_deserialize_book_snapshot_message() {
4807        let json_data = load_test_json("ws_books_snapshot.json");
4808        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4809        let action: Option<OKXBookAction> =
4810            serde_json::from_value(payload["action"].clone()).unwrap();
4811        let data: Vec<OKXBookMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
4812
4813        assert!(!data.is_empty());
4814        assert_eq!(action, Some(OKXBookAction::Snapshot));
4815        assert!(!data[0].asks.is_empty());
4816        assert!(!data[0].bids.is_empty());
4817    }
4818
4819    #[rstest]
4820    fn test_deserialize_book_update_message() {
4821        let json_data = load_test_json("ws_books_update.json");
4822        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4823        let action: Option<OKXBookAction> =
4824            serde_json::from_value(payload["action"].clone()).unwrap();
4825        let data: Vec<OKXBookMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
4826
4827        assert!(!data.is_empty());
4828        assert_eq!(action, Some(OKXBookAction::Update));
4829        assert!(!data[0].asks.is_empty());
4830        assert!(!data[0].bids.is_empty());
4831    }
4832
4833    #[rstest]
4834    fn test_deserialize_ticker_message() {
4835        let json_data = load_test_json("ws_tickers.json");
4836        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4837        let data: Vec<OKXTickerMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
4838
4839        assert!(!data.is_empty());
4840        assert_eq!(data[0].inst_id, Ustr::from("BTC-USDT"));
4841        assert_eq!(data[0].last_px, "9999.99");
4842    }
4843
4844    #[rstest]
4845    fn test_deserialize_candle_message() {
4846        let json_data = load_test_json("ws_candle.json");
4847        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4848        let data: Vec<OKXCandleMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
4849
4850        assert!(!data.is_empty());
4851        assert!(!data[0].o.is_empty());
4852        assert!(!data[0].h.is_empty());
4853        assert!(!data[0].l.is_empty());
4854        assert!(!data[0].c.is_empty());
4855    }
4856
4857    #[rstest]
4858    fn test_deserialize_funding_rate_message() {
4859        let json_data = load_test_json("ws_funding_rate.json");
4860        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4861        let data: Vec<OKXFundingRateMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
4862
4863        assert!(!data.is_empty());
4864        assert_eq!(data[0].inst_id, Ustr::from("BTC-USDT-SWAP"));
4865    }
4866
4867    #[rstest]
4868    fn test_deserialize_bbo_tbt_message() {
4869        let json_data = load_test_json("ws_bbo_tbt.json");
4870        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4871        let data: Vec<OKXBookMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
4872
4873        assert!(!data.is_empty());
4874        assert!(!data[0].asks.is_empty());
4875        assert!(!data[0].bids.is_empty());
4876    }
4877
4878    #[rstest]
4879    fn test_deserialize_trade_message() {
4880        let json_data = load_test_json("ws_trades.json");
4881        let payload: serde_json::Value = serde_json::from_str(&json_data).unwrap();
4882        let data: Vec<OKXTradeMsg> = serde_json::from_value(payload["data"].clone()).unwrap();
4883
4884        assert!(!data.is_empty());
4885        assert_eq!(data[0].inst_id, Ustr::from("BTC-USD"));
4886    }
4887
4888    fn create_order_msg_for_event_test(
4889        state: OKXOrderStatus,
4890        cl_ord_id: &str,
4891        ord_id: &str,
4892        px: &str,
4893        sz: &str,
4894    ) -> OKXOrderMsg {
4895        OKXOrderMsg {
4896            acc_fill_sz: Some("0".to_string()),
4897            algo_id: None,
4898            avg_px: "50000.0".to_string(),
4899            c_time: 1746947317401,
4900            cancel_source: None,
4901            cancel_source_reason: None,
4902            category: OKXOrderCategory::Normal,
4903            ccy: Ustr::from("USDT"),
4904            cl_ord_id: cl_ord_id.to_string(),
4905            algo_cl_ord_id: None,
4906            attach_algo_cl_ord_id: None,
4907            attach_algo_ords: Vec::new(),
4908            fee: Some("0".to_string()),
4909            fee_ccy: Ustr::from("USDT"),
4910            fill_fee: None,
4911            fill_fee_ccy: None,
4912            fill_mark_px: None,
4913            fill_mark_vol: None,
4914            fill_px_vol: None,
4915            fill_px_usd: None,
4916            fill_fwd_px: None,
4917            fill_notional_usd: None,
4918            fill_pnl: None,
4919            fill_px: String::new(),
4920            fill_sz: String::new(),
4921            fill_time: 0,
4922            inst_id: Ustr::from("BTC-USDT-SWAP"),
4923            inst_type: OKXInstrumentType::Swap,
4924            is_tp_limit: None,
4925            lever: "2.0".to_string(),
4926            linked_algo_ord: None,
4927            notional_usd: None,
4928            ord_id: Ustr::from(ord_id),
4929            ord_type: OKXOrderType::Limit,
4930            pnl: "0".to_string(),
4931            pos_side: OKXPositionSide::Long,
4932            px: px.to_string(),
4933            px_type: OKXPriceType::None,
4934            px_usd: None,
4935            px_vol: None,
4936            quick_mgn_type: OKXQuickMarginType::None,
4937            rebate: None,
4938            rebate_ccy: None,
4939            reduce_only: "false".to_string(),
4940            side: OKXSide::Buy,
4941            sl_ord_px: None,
4942            sl_trigger_px: None,
4943            sl_trigger_px_type: None,
4944            source: None,
4945            state,
4946            stp_id: None,
4947            stp_mode: OKXSelfTradePreventionMode::None,
4948            exec_type: OKXExecType::Taker,
4949            sz: sz.to_string(),
4950            tag: None,
4951            td_mode: OKXTradeMode::Isolated,
4952            tgt_ccy: None,
4953            tp_ord_px: None,
4954            tp_trigger_px: None,
4955            tp_trigger_px_type: None,
4956            trade_id: String::new(),
4957            u_time: 1746947317402,
4958            amend_result: None,
4959            req_id: None,
4960            code: None,
4961            msg: None,
4962        }
4963    }
4964
4965    #[rstest]
4966    fn test_synthesize_trade_id_is_deterministic_and_under_36_chars() {
4967        let mut msg = create_order_msg_for_event_test(
4968            OKXOrderStatus::Filled,
4969            "client-1",
4970            "venue-1",
4971            "50000.0",
4972            "0.001",
4973        );
4974        msg.fill_px = "50000.0".to_string();
4975        msg.fill_sz = "0.001".to_string();
4976        msg.fill_time = 1_746_947_317_500;
4977        msg.acc_fill_sz = Some("0.001".to_string());
4978
4979        let id1 = synthesize_trade_id(&msg);
4980        let id2 = synthesize_trade_id(&msg);
4981
4982        assert_eq!(id1, id2, "synthesized id must be deterministic");
4983        assert!(
4984            id1.len() <= 36,
4985            "synthesized id must fit in TradeId, was {}",
4986            id1.len()
4987        );
4988        assert!(id1.starts_with("synth-"));
4989    }
4990
4991    #[rstest]
4992    fn test_synthesize_trade_id_changes_with_fill_fields() {
4993        let mut msg = create_order_msg_for_event_test(
4994            OKXOrderStatus::Filled,
4995            "client-1",
4996            "venue-1",
4997            "50000.0",
4998            "0.001",
4999        );
5000        msg.fill_px = "50000.0".to_string();
5001        msg.fill_sz = "0.001".to_string();
5002        msg.fill_time = 1_746_947_317_500;
5003        msg.acc_fill_sz = Some("0.001".to_string());
5004
5005        let baseline = synthesize_trade_id(&msg);
5006
5007        msg.fill_sz = "0.002".to_string();
5008        let different_size = synthesize_trade_id(&msg);
5009        assert_ne!(baseline, different_size);
5010
5011        msg.fill_sz = "0.001".to_string();
5012        msg.fill_time = 1_746_947_317_999;
5013        let different_time = synthesize_trade_id(&msg);
5014        assert_ne!(baseline, different_time);
5015    }
5016
5017    #[rstest]
5018    fn test_empty_trade_id_fill_deduped_across_replays() {
5019        use crate::websocket::dispatch::WsDispatchState;
5020
5021        // Two identical fill messages with no venue trade_id — the dedup in
5022        // `WsDispatchState::check_and_insert_trade` must suppress the replay.
5023        // Regression lock for the empty-trade_id UUID fabrication bug: if
5024        // `synthesize_trade_id` drifts back to a non-deterministic id, the
5025        // second `check_and_insert_trade` would return false (not a dupe)
5026        // and this test fails.
5027        let mut msg = create_order_msg_for_event_test(
5028            OKXOrderStatus::Filled,
5029            "client-1",
5030            "venue-1",
5031            "50000.0",
5032            "0.001",
5033        );
5034        msg.trade_id = String::new();
5035        msg.fill_px = "50000.0".to_string();
5036        msg.fill_sz = "0.001".to_string();
5037        msg.fill_time = 1_746_947_317_500;
5038        msg.acc_fill_sz = Some("0.001".to_string());
5039
5040        let first_id = TradeId::new(synthesize_trade_id(&msg));
5041        let second_id = TradeId::new(synthesize_trade_id(&msg));
5042        assert_eq!(first_id, second_id, "synthesized id must survive replay");
5043
5044        let state = WsDispatchState::default();
5045        assert!(
5046            !state.check_and_insert_trade(first_id),
5047            "first insert is not a duplicate"
5048        );
5049        assert!(
5050            state.check_and_insert_trade(second_id),
5051            "replayed fill with empty trade_id must dedup"
5052        );
5053    }
5054
5055    #[rstest]
5056    fn test_parse_order_event_live_returns_accepted() {
5057        let instrument = create_stub_instrument();
5058        let msg = create_order_msg_for_event_test(
5059            OKXOrderStatus::Live,
5060            "test_client_123",
5061            "venue_456",
5062            "50000.0",
5063            "0.01",
5064        );
5065
5066        let client_order_id = ClientOrderId::new("test_client_123");
5067        let account_id = AccountId::new("OKX-001");
5068        let trader_id = TraderId::new("TRADER-001");
5069        let strategy_id = StrategyId::new("STRATEGY-001");
5070        let ts_init = UnixNanos::from(1000000000);
5071
5072        let result = parse_order_event(
5073            &msg,
5074            client_order_id,
5075            account_id,
5076            trader_id,
5077            strategy_id,
5078            &InstrumentAny::CryptoPerpetual(instrument),
5079            None,
5080            None,
5081            None,
5082            ts_init,
5083        );
5084
5085        assert!(result.is_ok());
5086        match result.unwrap() {
5087            ParsedOrderEvent::Accepted(accepted) => {
5088                assert_eq!(accepted.client_order_id, client_order_id);
5089                assert_eq!(accepted.venue_order_id, VenueOrderId::new("venue_456"));
5090                assert_eq!(accepted.trader_id, trader_id);
5091                assert_eq!(accepted.strategy_id, strategy_id);
5092            }
5093            other => panic!("Expected Accepted, was {other:?}"),
5094        }
5095    }
5096
5097    #[rstest]
5098    fn test_parse_order_event_live_with_price_change_returns_updated() {
5099        let instrument = create_stub_instrument();
5100        let msg = create_order_msg_for_event_test(
5101            OKXOrderStatus::Live,
5102            "test_client_123",
5103            "venue_456",
5104            "51000.0",
5105            "0.01",
5106        );
5107
5108        let client_order_id = ClientOrderId::new("test_client_123");
5109        let account_id = AccountId::new("OKX-001");
5110        let trader_id = TraderId::new("TRADER-001");
5111        let strategy_id = StrategyId::new("STRATEGY-001");
5112        let ts_init = UnixNanos::from(1000000000);
5113
5114        let previous_state = OrderStateSnapshot {
5115            venue_order_id: VenueOrderId::new("venue_456"),
5116            quantity: Quantity::from("0.01000000"),
5117            price: Some(Price::from("50000.00")),
5118        };
5119
5120        let result = parse_order_event(
5121            &msg,
5122            client_order_id,
5123            account_id,
5124            trader_id,
5125            strategy_id,
5126            &InstrumentAny::CryptoPerpetual(instrument),
5127            None,
5128            None,
5129            Some(&previous_state),
5130            ts_init,
5131        );
5132
5133        assert!(result.is_ok());
5134        match result.unwrap() {
5135            ParsedOrderEvent::Updated(updated) => {
5136                assert_eq!(updated.client_order_id, client_order_id);
5137                assert_eq!(updated.price, Some(Price::from("51000.00")));
5138            }
5139            other => panic!("Expected Updated, was {other:?}"),
5140        }
5141    }
5142
5143    #[rstest]
5144    fn test_parse_order_event_live_with_quantity_change_returns_updated() {
5145        let instrument = create_stub_instrument();
5146        let msg = create_order_msg_for_event_test(
5147            OKXOrderStatus::Live,
5148            "test_client_123",
5149            "venue_456",
5150            "50000.0",
5151            "0.02",
5152        );
5153
5154        let client_order_id = ClientOrderId::new("test_client_123");
5155        let account_id = AccountId::new("OKX-001");
5156        let trader_id = TraderId::new("TRADER-001");
5157        let strategy_id = StrategyId::new("STRATEGY-001");
5158        let ts_init = UnixNanos::from(1000000000);
5159        let previous_state = OrderStateSnapshot {
5160            venue_order_id: VenueOrderId::new("venue_456"),
5161            quantity: Quantity::from("0.01000000"),
5162            price: Some(Price::from("50000.00")),
5163        };
5164
5165        let result = parse_order_event(
5166            &msg,
5167            client_order_id,
5168            account_id,
5169            trader_id,
5170            strategy_id,
5171            &InstrumentAny::CryptoPerpetual(instrument),
5172            None,
5173            None,
5174            Some(&previous_state),
5175            ts_init,
5176        );
5177
5178        assert!(result.is_ok());
5179        match result.unwrap() {
5180            ParsedOrderEvent::Updated(updated) => {
5181                assert_eq!(updated.client_order_id, client_order_id);
5182                assert_eq!(updated.quantity, Quantity::from("0.02000000"));
5183            }
5184            other => panic!("Expected Updated, was {other:?}"),
5185        }
5186    }
5187
5188    #[rstest]
5189    fn test_parse_order_event_canceled_returns_canceled() {
5190        let instrument = create_stub_instrument();
5191        let msg = create_order_msg_for_event_test(
5192            OKXOrderStatus::Canceled,
5193            "test_client_123",
5194            "venue_456",
5195            "50000.0",
5196            "0.01",
5197        );
5198
5199        let client_order_id = ClientOrderId::new("test_client_123");
5200        let account_id = AccountId::new("OKX-001");
5201        let trader_id = TraderId::new("TRADER-001");
5202        let strategy_id = StrategyId::new("STRATEGY-001");
5203        let ts_init = UnixNanos::from(1000000000);
5204
5205        let result = parse_order_event(
5206            &msg,
5207            client_order_id,
5208            account_id,
5209            trader_id,
5210            strategy_id,
5211            &InstrumentAny::CryptoPerpetual(instrument),
5212            None,
5213            None,
5214            None,
5215            ts_init,
5216        );
5217
5218        assert!(result.is_ok());
5219        match result.unwrap() {
5220            ParsedOrderEvent::Canceled(canceled) => {
5221                assert_eq!(canceled.client_order_id, client_order_id);
5222                assert_eq!(
5223                    canceled.venue_order_id,
5224                    Some(VenueOrderId::new("venue_456"))
5225                );
5226            }
5227            other => panic!("Expected Canceled, was {other:?}"),
5228        }
5229    }
5230
5231    #[rstest]
5232    fn test_parse_order_event_canceled_with_expiry_reason_returns_expired() {
5233        let instrument = create_stub_instrument();
5234        let mut msg = create_order_msg_for_event_test(
5235            OKXOrderStatus::Canceled,
5236            "test_client_123",
5237            "venue_456",
5238            "50000.0",
5239            "0.01",
5240        );
5241        msg.cancel_source_reason = Some("GTD order expired".to_string());
5242
5243        let client_order_id = ClientOrderId::new("test_client_123");
5244        let account_id = AccountId::new("OKX-001");
5245        let trader_id = TraderId::new("TRADER-001");
5246        let strategy_id = StrategyId::new("STRATEGY-001");
5247        let ts_init = UnixNanos::from(1000000000);
5248
5249        let result = parse_order_event(
5250            &msg,
5251            client_order_id,
5252            account_id,
5253            trader_id,
5254            strategy_id,
5255            &InstrumentAny::CryptoPerpetual(instrument),
5256            None,
5257            None,
5258            None,
5259            ts_init,
5260        );
5261
5262        assert!(result.is_ok());
5263        match result.unwrap() {
5264            ParsedOrderEvent::Expired(expired) => {
5265                assert_eq!(expired.client_order_id, client_order_id);
5266                assert_eq!(expired.venue_order_id, Some(VenueOrderId::new("venue_456")));
5267            }
5268            other => panic!("Expected Expired, was {other:?}"),
5269        }
5270    }
5271
5272    #[rstest]
5273    fn test_parse_order_event_effective_returns_triggered() {
5274        let instrument = create_stub_instrument();
5275        let msg = create_order_msg_for_event_test(
5276            OKXOrderStatus::Effective,
5277            "test_client_123",
5278            "venue_456",
5279            "50000.0",
5280            "0.01",
5281        );
5282
5283        let client_order_id = ClientOrderId::new("test_client_123");
5284        let account_id = AccountId::new("OKX-001");
5285        let trader_id = TraderId::new("TRADER-001");
5286        let strategy_id = StrategyId::new("STRATEGY-001");
5287        let ts_init = UnixNanos::from(1000000000);
5288
5289        let result = parse_order_event(
5290            &msg,
5291            client_order_id,
5292            account_id,
5293            trader_id,
5294            strategy_id,
5295            &InstrumentAny::CryptoPerpetual(instrument),
5296            None,
5297            None,
5298            None,
5299            ts_init,
5300        );
5301
5302        assert!(result.is_ok());
5303        match result.unwrap() {
5304            ParsedOrderEvent::Triggered(triggered) => {
5305                assert_eq!(triggered.client_order_id, client_order_id);
5306                assert_eq!(
5307                    triggered.venue_order_id,
5308                    Some(VenueOrderId::new("venue_456"))
5309                );
5310            }
5311            other => panic!("Expected Triggered, was {other:?}"),
5312        }
5313    }
5314
5315    #[rstest]
5316    fn test_parse_order_event_filled_with_fill_data_returns_fill() {
5317        let instrument = create_stub_instrument();
5318        let mut msg = create_order_msg_for_event_test(
5319            OKXOrderStatus::Filled,
5320            "test_client_123",
5321            "venue_456",
5322            "50000.0",
5323            "0.01",
5324        );
5325        msg.fill_sz = "0.01".to_string();
5326        msg.fill_px = "50000.0".to_string();
5327        msg.trade_id = "trade_789".to_string();
5328        msg.acc_fill_sz = Some("0.01".to_string());
5329
5330        let client_order_id = ClientOrderId::new("test_client_123");
5331        let account_id = AccountId::new("OKX-001");
5332        let trader_id = TraderId::new("TRADER-001");
5333        let strategy_id = StrategyId::new("STRATEGY-001");
5334        let ts_init = UnixNanos::from(1000000000);
5335
5336        let result = parse_order_event(
5337            &msg,
5338            client_order_id,
5339            account_id,
5340            trader_id,
5341            strategy_id,
5342            &InstrumentAny::CryptoPerpetual(instrument),
5343            None,
5344            None,
5345            None,
5346            ts_init,
5347        );
5348
5349        assert!(result.is_ok());
5350        match result.unwrap() {
5351            ParsedOrderEvent::Fill(fill) => {
5352                assert_eq!(fill.client_order_id, Some(client_order_id));
5353                assert_eq!(fill.venue_order_id, VenueOrderId::new("venue_456"));
5354                assert_eq!(fill.trade_id, TradeId::from("trade_789"));
5355            }
5356            other => panic!("Expected Fill, was {other:?}"),
5357        }
5358    }
5359
5360    #[rstest]
5361    fn test_is_order_expired_by_reason_gtd_in_reason() {
5362        let mut msg =
5363            create_order_msg_for_event_test(OKXOrderStatus::Canceled, "test", "123", "100", "1");
5364        msg.cancel_source_reason = Some("GTD order expired".to_string());
5365        assert!(is_order_expired_by_reason(&msg));
5366    }
5367
5368    #[rstest]
5369    fn test_is_order_expired_by_reason_timeout_in_reason() {
5370        let mut msg =
5371            create_order_msg_for_event_test(OKXOrderStatus::Canceled, "test", "123", "100", "1");
5372        msg.cancel_source_reason = Some("Order timeout".to_string());
5373        assert!(is_order_expired_by_reason(&msg));
5374    }
5375
5376    #[rstest]
5377    fn test_is_order_expired_by_reason_expir_in_reason() {
5378        let mut msg =
5379            create_order_msg_for_event_test(OKXOrderStatus::Canceled, "test", "123", "100", "1");
5380        msg.cancel_source_reason = Some("Expiration reached".to_string());
5381        assert!(is_order_expired_by_reason(&msg));
5382    }
5383
5384    #[rstest]
5385    fn test_is_order_expired_by_reason_source_code_5() {
5386        let mut msg =
5387            create_order_msg_for_event_test(OKXOrderStatus::Canceled, "test", "123", "100", "1");
5388        msg.cancel_source = Some("5".to_string());
5389        assert!(is_order_expired_by_reason(&msg));
5390    }
5391
5392    #[rstest]
5393    fn test_is_order_expired_by_reason_source_time_expired() {
5394        let mut msg =
5395            create_order_msg_for_event_test(OKXOrderStatus::Canceled, "test", "123", "100", "1");
5396        msg.cancel_source = Some("time_expired".to_string());
5397        assert!(is_order_expired_by_reason(&msg));
5398    }
5399
5400    #[rstest]
5401    fn test_is_order_expired_by_reason_false_for_user_cancel() {
5402        let mut msg =
5403            create_order_msg_for_event_test(OKXOrderStatus::Canceled, "test", "123", "100", "1");
5404        msg.cancel_source_reason = Some("User canceled".to_string());
5405        msg.cancel_source = Some("1".to_string());
5406        assert!(!is_order_expired_by_reason(&msg));
5407    }
5408
5409    #[rstest]
5410    fn test_is_order_expired_by_reason_false_when_no_reason() {
5411        let msg =
5412            create_order_msg_for_event_test(OKXOrderStatus::Canceled, "test", "123", "100", "1");
5413        assert!(!is_order_expired_by_reason(&msg));
5414    }
5415
5416    // Regression test: PartiallyFilled order with price change should emit Updated, not StatusOnly
5417    #[rstest]
5418    fn test_parse_order_event_partially_filled_with_price_change_returns_updated() {
5419        let instrument = create_stub_instrument();
5420        let msg = create_order_msg_for_event_test(
5421            OKXOrderStatus::PartiallyFilled,
5422            "test_client_123",
5423            "venue_456",
5424            "51000.0",
5425            "0.01",
5426        );
5427
5428        let client_order_id = ClientOrderId::new("test_client_123");
5429        let account_id = AccountId::new("OKX-001");
5430        let trader_id = TraderId::new("TRADER-001");
5431        let strategy_id = StrategyId::new("STRATEGY-001");
5432        let ts_init = UnixNanos::from(1000000000);
5433
5434        let previous_state = OrderStateSnapshot {
5435            venue_order_id: VenueOrderId::new("venue_456"),
5436            quantity: Quantity::from("0.01000000"),
5437            price: Some(Price::from("50000.00")),
5438        };
5439
5440        let result = parse_order_event(
5441            &msg,
5442            client_order_id,
5443            account_id,
5444            trader_id,
5445            strategy_id,
5446            &InstrumentAny::CryptoPerpetual(instrument),
5447            None,
5448            None,
5449            Some(&previous_state),
5450            ts_init,
5451        );
5452
5453        assert!(result.is_ok());
5454        match result.unwrap() {
5455            ParsedOrderEvent::Updated(updated) => {
5456                assert_eq!(updated.client_order_id, client_order_id);
5457                assert_eq!(updated.price, Some(Price::from("51000.00")));
5458            }
5459            other => {
5460                panic!("Expected Updated for PartiallyFilled with price change, was {other:?}")
5461            }
5462        }
5463    }
5464
5465    #[rstest]
5466    fn test_is_order_updated_price_change() {
5467        let instrument = create_stub_instrument();
5468        let msg = create_order_msg_for_event_test(
5469            OKXOrderStatus::Live,
5470            "test",
5471            "venue_123",
5472            "51000.0",
5473            "0.01",
5474        );
5475
5476        let previous = OrderStateSnapshot {
5477            venue_order_id: VenueOrderId::new("venue_123"),
5478            quantity: Quantity::from("0.01000000"),
5479            price: Some(Price::from("50000.00")),
5480        };
5481
5482        let result = is_order_updated(&msg, &previous, &InstrumentAny::CryptoPerpetual(instrument));
5483        assert!(result.is_ok());
5484        assert!(result.unwrap());
5485    }
5486
5487    #[rstest]
5488    fn test_is_order_updated_quantity_change() {
5489        let instrument = create_stub_instrument();
5490        let msg = create_order_msg_for_event_test(
5491            OKXOrderStatus::Live,
5492            "test",
5493            "venue_123",
5494            "50000.0",
5495            "0.02", // New quantity
5496        );
5497
5498        let previous = OrderStateSnapshot {
5499            venue_order_id: VenueOrderId::new("venue_123"),
5500            quantity: Quantity::from("0.01000000"), // Old quantity
5501            price: Some(Price::from("50000.00")),
5502        };
5503
5504        let result = is_order_updated(&msg, &previous, &InstrumentAny::CryptoPerpetual(instrument));
5505        assert!(result.is_ok());
5506        assert!(result.unwrap());
5507    }
5508
5509    #[rstest]
5510    fn test_is_order_updated_venue_id_change() {
5511        let instrument = create_stub_instrument();
5512        let msg = create_order_msg_for_event_test(
5513            OKXOrderStatus::Live,
5514            "test",
5515            "venue_456", // New venue ID
5516            "50000.0",
5517            "0.01",
5518        );
5519
5520        let previous = OrderStateSnapshot {
5521            venue_order_id: VenueOrderId::new("venue_123"), // Old venue ID
5522            quantity: Quantity::from("0.01000000"),
5523            price: Some(Price::from("50000.00")),
5524        };
5525
5526        let result = is_order_updated(&msg, &previous, &InstrumentAny::CryptoPerpetual(instrument));
5527        assert!(result.is_ok());
5528        assert!(result.unwrap());
5529    }
5530
5531    #[rstest]
5532    fn test_is_order_updated_no_change() {
5533        let instrument = create_stub_instrument();
5534        let msg = create_order_msg_for_event_test(
5535            OKXOrderStatus::Live,
5536            "test",
5537            "venue_123",
5538            "50000.0",
5539            "0.01",
5540        );
5541
5542        let previous = OrderStateSnapshot {
5543            venue_order_id: VenueOrderId::new("venue_123"),
5544            quantity: Quantity::from("0.01000000"),
5545            price: Some(Price::from("50000.00")),
5546        };
5547
5548        let result = is_order_updated(&msg, &previous, &InstrumentAny::CryptoPerpetual(instrument));
5549        assert!(result.is_ok());
5550        assert!(!result.unwrap());
5551    }
5552
5553    #[rstest]
5554    fn test_parse_order_status_report_ts_last_and_ts_init_ordering() {
5555        let instrument = create_stub_instrument();
5556        let inst = InstrumentAny::CryptoPerpetual(instrument);
5557        let account_id = AccountId::new("OKX-001");
5558        let ts_init = UnixNanos::from(999_000_000_000u64);
5559
5560        let msg = OKXOrderMsg {
5561            acc_fill_sz: Some("0".to_string()),
5562            algo_id: None,
5563            avg_px: String::new(),
5564            c_time: 1706000000000, // ~2024-01-23 in ms
5565            cancel_source: None,
5566            cancel_source_reason: None,
5567            category: OKXOrderCategory::Normal,
5568            ccy: Ustr::from("USDT"),
5569            cl_ord_id: "test_ts_order".to_string(),
5570            algo_cl_ord_id: None,
5571            attach_algo_cl_ord_id: None,
5572            attach_algo_ords: Vec::new(),
5573            fee: None,
5574            fee_ccy: Ustr::from("USDT"),
5575            fill_fee: None,
5576            fill_fee_ccy: None,
5577            fill_mark_px: None,
5578            fill_mark_vol: None,
5579            fill_px_vol: None,
5580            fill_px_usd: None,
5581            fill_fwd_px: None,
5582            fill_notional_usd: None,
5583            fill_pnl: None,
5584            fill_px: String::new(),
5585            fill_sz: String::new(),
5586            fill_time: 0,
5587            inst_id: Ustr::from("BTC-USDT-SWAP"),
5588            inst_type: OKXInstrumentType::Swap,
5589            is_tp_limit: None,
5590            lever: String::new(),
5591            linked_algo_ord: None,
5592            notional_usd: None,
5593            ord_id: Ustr::from("123456"),
5594            ord_type: OKXOrderType::Limit,
5595            pnl: String::new(),
5596            pos_side: OKXPositionSide::Long,
5597            px: "50000.00".to_string(),
5598            px_type: OKXPriceType::None,
5599            px_usd: None,
5600            px_vol: None,
5601            quick_mgn_type: OKXQuickMarginType::None,
5602            rebate: None,
5603            rebate_ccy: None,
5604            reduce_only: "false".to_string(),
5605            side: OKXSide::Buy,
5606            sl_ord_px: None,
5607            sl_trigger_px: None,
5608            sl_trigger_px_type: None,
5609            source: None,
5610            state: OKXOrderStatus::Live,
5611            stp_id: None,
5612            stp_mode: OKXSelfTradePreventionMode::None,
5613            exec_type: OKXExecType::Taker,
5614            sz: "0.01".to_string(),
5615            tag: None,
5616            td_mode: OKXTradeMode::Cross,
5617            tgt_ccy: None,
5618            tp_ord_px: None,
5619            tp_trigger_px: None,
5620            tp_trigger_px_type: None,
5621            trade_id: String::new(),
5622            u_time: 1706000001000, // 1 second later in ms
5623            amend_result: None,
5624            req_id: None,
5625            code: None,
5626            msg: None,
5627        };
5628
5629        let report = parse_order_status_report(&msg, &inst, account_id, ts_init).unwrap();
5630
5631        assert_eq!(
5632            report.ts_accepted,
5633            UnixNanos::from(1706000000000u64 * 1_000_000)
5634        );
5635        assert_eq!(
5636            report.ts_last,
5637            UnixNanos::from(1706000001000u64 * 1_000_000)
5638        );
5639        assert_eq!(report.ts_init, ts_init);
5640    }
5641
5642    #[rstest]
5643    fn test_parse_order_status_report_preserves_attached_tp_sl_child_ids() {
5644        let instrument = create_stub_instrument();
5645        let inst = InstrumentAny::CryptoPerpetual(instrument);
5646        let account_id = AccountId::new("OKX-001");
5647        let ts_init = UnixNanos::default();
5648
5649        let msg = OKXOrderMsg {
5650            acc_fill_sz: Some("0".to_string()),
5651            algo_id: None,
5652            avg_px: String::new(),
5653            c_time: 1706000000000,
5654            cancel_source: None,
5655            cancel_source_reason: None,
5656            category: OKXOrderCategory::Normal,
5657            ccy: Ustr::from("USDT"),
5658            cl_ord_id: "O-attached-entry".to_string(),
5659            algo_cl_ord_id: None,
5660            attach_algo_cl_ord_id: Some("O-attached-sl".to_string()),
5661            attach_algo_ords: vec![
5662                OKXAttachedAlgoOrd {
5663                    attach_algo_id: "algo-sl".to_string(),
5664                    attach_algo_cl_ord_id: "O-attached-sl".to_string(),
5665                    sl_trigger_px: "1500".to_string(),
5666                    sl_ord_px: "-1".to_string(),
5667                    sl_trigger_px_type: Some(OKXTriggerType::Last),
5668                    tp_trigger_px: String::new(),
5669                    tp_ord_px: String::new(),
5670                    tp_trigger_px_type: None,
5671                },
5672                OKXAttachedAlgoOrd {
5673                    attach_algo_id: "algo-tp".to_string(),
5674                    attach_algo_cl_ord_id: "O-attached-tp".to_string(),
5675                    sl_trigger_px: String::new(),
5676                    sl_ord_px: String::new(),
5677                    sl_trigger_px_type: None,
5678                    tp_trigger_px: "2500".to_string(),
5679                    tp_ord_px: "-1".to_string(),
5680                    tp_trigger_px_type: Some(OKXTriggerType::Last),
5681                },
5682            ],
5683            fee: None,
5684            fee_ccy: Ustr::from("USDT"),
5685            fill_fee: None,
5686            fill_fee_ccy: None,
5687            fill_mark_px: None,
5688            fill_mark_vol: None,
5689            fill_px_vol: None,
5690            fill_px_usd: None,
5691            fill_fwd_px: None,
5692            fill_notional_usd: None,
5693            fill_pnl: None,
5694            fill_px: String::new(),
5695            fill_sz: String::new(),
5696            fill_time: 0,
5697            inst_id: Ustr::from("BTC-USDT-SWAP"),
5698            inst_type: OKXInstrumentType::Swap,
5699            is_tp_limit: None,
5700            lever: String::new(),
5701            linked_algo_ord: None,
5702            notional_usd: None,
5703            ord_id: Ustr::from("123456"),
5704            ord_type: OKXOrderType::Limit,
5705            pnl: String::new(),
5706            pos_side: OKXPositionSide::Long,
5707            px: "2000.00".to_string(),
5708            px_type: OKXPriceType::None,
5709            px_usd: None,
5710            px_vol: None,
5711            quick_mgn_type: OKXQuickMarginType::None,
5712            rebate: None,
5713            rebate_ccy: None,
5714            reduce_only: "false".to_string(),
5715            side: OKXSide::Buy,
5716            sl_ord_px: None,
5717            sl_trigger_px: None,
5718            sl_trigger_px_type: None,
5719            source: None,
5720            state: OKXOrderStatus::Live,
5721            stp_id: None,
5722            stp_mode: OKXSelfTradePreventionMode::None,
5723            exec_type: OKXExecType::Taker,
5724            sz: "0.01".to_string(),
5725            tag: None,
5726            td_mode: OKXTradeMode::Cross,
5727            tgt_ccy: None,
5728            tp_ord_px: None,
5729            tp_trigger_px: None,
5730            tp_trigger_px_type: None,
5731            trade_id: String::new(),
5732            u_time: 1706000001000,
5733            amend_result: None,
5734            req_id: None,
5735            code: None,
5736            msg: None,
5737        };
5738
5739        let report = parse_order_status_report(&msg, &inst, account_id, ts_init).unwrap();
5740        let linked_order_ids = report
5741            .linked_order_ids
5742            .expect("expected linked child order ids");
5743
5744        assert_eq!(linked_order_ids.len(), 2);
5745        assert!(linked_order_ids.contains(&ClientOrderId::from("O-attached-sl")));
5746        assert!(linked_order_ids.contains(&ClientOrderId::from("O-attached-tp")));
5747    }
5748
5749    #[rstest]
5750    fn test_parse_algo_order_timestamps_converted_from_ms_to_ns() {
5751        let instrument = create_stub_instrument();
5752        let inst = InstrumentAny::CryptoPerpetual(instrument);
5753        let account_id = AccountId::new("OKX-001");
5754        let ts_init = UnixNanos::from(999_000_000_000u64);
5755
5756        let msg = OKXAlgoOrderMsg {
5757            algo_id: "algo_1".to_string(),
5758            algo_cl_ord_id: "algo_cl_1".to_string(),
5759            cl_ord_id: String::new(),
5760            ord_id: String::new(),
5761            inst_id: Ustr::from("BTC-USDT-SWAP"),
5762            inst_type: OKXInstrumentType::Swap,
5763            ord_type: OKXAlgoOrderType::Trigger,
5764            state: OKXOrderStatus::Live,
5765            side: OKXSide::Buy,
5766            pos_side: OKXPositionSide::Long,
5767            sz: "0.01".to_string(),
5768            trigger_px: "45000.00".to_string(),
5769            trigger_px_type: OKXTriggerType::Last,
5770            sl_trigger_px: String::new(),
5771            sl_ord_px: String::new(),
5772            sl_trigger_px_type: OKXTriggerType::None,
5773            tp_trigger_px: String::new(),
5774            tp_ord_px: String::new(),
5775            tp_trigger_px_type: OKXTriggerType::None,
5776            ord_px: "-1".to_string(),
5777            td_mode: OKXTradeMode::Cross,
5778            lever: String::new(),
5779            reduce_only: "false".to_string(),
5780            close_fraction: String::new(),
5781            actual_px: String::new(),
5782            actual_sz: String::new(),
5783            notional_usd: String::new(),
5784            c_time: 1706000000000,
5785            u_time: 1706000001000,
5786            trigger_time: String::new(),
5787            tag: String::new(),
5788            callback_ratio: String::new(),
5789            callback_spread: String::new(),
5790            active_px: String::new(),
5791            ccy: None,
5792            tgt_ccy: None,
5793            fee: None,
5794            fee_ccy: None,
5795            advance_ord_type: None,
5796        };
5797
5798        let report = parse_algo_order_status_report(&msg, &inst, account_id, ts_init).unwrap();
5799
5800        let expected_accepted_ns = 1706000000000u64 * 1_000_000;
5801        let expected_last_ns = 1706000001000u64 * 1_000_000;
5802        assert_eq!(report.ts_accepted, UnixNanos::from(expected_accepted_ns));
5803        assert_eq!(report.ts_last, UnixNanos::from(expected_last_ns));
5804        assert_eq!(report.ts_init, ts_init);
5805    }
5806
5807    fn stub_algo_order_msg(ord_type: OKXAlgoOrderType) -> OKXAlgoOrderMsg {
5808        OKXAlgoOrderMsg {
5809            algo_id: "algo_1".to_string(),
5810            algo_cl_ord_id: "algo_cl_1".to_string(),
5811            cl_ord_id: String::new(),
5812            ord_id: String::new(),
5813            inst_id: Ustr::from("BTC-USDT-SWAP"),
5814            inst_type: OKXInstrumentType::Swap,
5815            ord_type,
5816            state: OKXOrderStatus::Live,
5817            side: OKXSide::Sell,
5818            pos_side: OKXPositionSide::Long,
5819            sz: "0.01".to_string(),
5820            trigger_px: "95000.00".to_string(),
5821            trigger_px_type: OKXTriggerType::Last,
5822            sl_trigger_px: String::new(),
5823            sl_ord_px: String::new(),
5824            sl_trigger_px_type: OKXTriggerType::None,
5825            tp_trigger_px: String::new(),
5826            tp_ord_px: String::new(),
5827            tp_trigger_px_type: OKXTriggerType::None,
5828            ord_px: "-1".to_string(),
5829            td_mode: OKXTradeMode::Cross,
5830            lever: String::new(),
5831            reduce_only: "false".to_string(),
5832            close_fraction: String::new(),
5833            actual_px: String::new(),
5834            actual_sz: String::new(),
5835            notional_usd: String::new(),
5836            c_time: 1706000000000,
5837            u_time: 1706000001000,
5838            trigger_time: String::new(),
5839            tag: String::new(),
5840            callback_ratio: String::new(),
5841            callback_spread: String::new(),
5842            active_px: String::new(),
5843            ccy: None,
5844            tgt_ccy: None,
5845            fee: None,
5846            fee_ccy: None,
5847            advance_ord_type: None,
5848        }
5849    }
5850
5851    #[rstest]
5852    fn test_parse_algo_order_trailing_stop_with_callback_ratio() {
5853        let instrument = create_stub_instrument();
5854        let inst = InstrumentAny::CryptoPerpetual(instrument);
5855        let account_id = AccountId::new("OKX-001");
5856
5857        let mut msg = stub_algo_order_msg(OKXAlgoOrderType::MoveOrderStop);
5858        msg.callback_ratio = "0.01".to_string(); // 1% = 100 basis points
5859
5860        let report =
5861            parse_algo_order_status_report(&msg, &inst, account_id, UnixNanos::default()).unwrap();
5862
5863        assert_eq!(report.order_type, OrderType::TrailingStopMarket);
5864        assert_eq!(report.trailing_offset, Some(dec!(100)));
5865        assert_eq!(report.trailing_offset_type, TrailingOffsetType::BasisPoints,);
5866        assert_eq!(report.trigger_price, Some(Price::from("95000.00")));
5867    }
5868
5869    #[rstest]
5870    fn test_parse_algo_order_trailing_stop_with_callback_spread() {
5871        let instrument = create_stub_instrument();
5872        let inst = InstrumentAny::CryptoPerpetual(instrument);
5873        let account_id = AccountId::new("OKX-001");
5874
5875        let mut msg = stub_algo_order_msg(OKXAlgoOrderType::MoveOrderStop);
5876        msg.callback_spread = "50.5".to_string();
5877
5878        let report =
5879            parse_algo_order_status_report(&msg, &inst, account_id, UnixNanos::default()).unwrap();
5880
5881        assert_eq!(report.order_type, OrderType::TrailingStopMarket);
5882        assert_eq!(report.trailing_offset, Some(dec!(50.5)));
5883        assert_eq!(report.trailing_offset_type, TrailingOffsetType::Price);
5884    }
5885
5886    #[rstest]
5887    fn test_parse_algo_order_unsupported_type_skipped() {
5888        let instrument = create_stub_instrument();
5889        let account_id = AccountId::new("OKX-001");
5890        let mut instruments = AHashMap::new();
5891        instruments.insert(
5892            Ustr::from("BTC-USDT-SWAP"),
5893            InstrumentAny::CryptoPerpetual(instrument),
5894        );
5895
5896        let msg = stub_algo_order_msg(OKXAlgoOrderType::Iceberg);
5897
5898        let result = parse_algo_order_msg(&msg, account_id, &instruments, UnixNanos::default());
5899
5900        assert!(result.unwrap().is_none());
5901    }
5902
5903    #[rstest]
5904    fn test_parse_algo_order_missing_trigger_px_type_defaults() {
5905        let instrument = create_stub_instrument();
5906        let inst = InstrumentAny::CryptoPerpetual(instrument);
5907        let account_id = AccountId::new("OKX-001");
5908
5909        let mut msg = stub_algo_order_msg(OKXAlgoOrderType::MoveOrderStop);
5910        msg.trigger_px_type = OKXTriggerType::None;
5911        msg.callback_ratio = "0.005".to_string();
5912
5913        let report =
5914            parse_algo_order_status_report(&msg, &inst, account_id, UnixNanos::default()).unwrap();
5915
5916        assert_eq!(report.trigger_type, Some(TriggerType::Default));
5917        assert_eq!(report.order_type, OrderType::TrailingStopMarket);
5918    }
5919
5920    #[rstest]
5921    fn test_parse_algo_order_close_fraction_stop_market_without_sz() {
5922        let instrument = create_stub_instrument();
5923        let inst = InstrumentAny::CryptoPerpetual(instrument);
5924        let account_id = AccountId::new("OKX-001");
5925
5926        let mut msg = stub_algo_order_msg(OKXAlgoOrderType::Conditional);
5927        msg.sz = String::new();
5928        msg.trigger_px = String::new();
5929        msg.trigger_px_type = OKXTriggerType::None;
5930        msg.ord_px = String::new();
5931        msg.sl_trigger_px = "50000".to_string();
5932        msg.sl_ord_px = "-1".to_string();
5933        msg.sl_trigger_px_type = OKXTriggerType::Last;
5934        msg.close_fraction = "1".to_string();
5935        msg.reduce_only = "true".to_string();
5936
5937        let report =
5938            parse_algo_order_status_report(&msg, &inst, account_id, UnixNanos::default()).unwrap();
5939
5940        assert_eq!(report.order_type, OrderType::StopMarket);
5941        assert_eq!(report.trigger_price, Some(Price::from("50000.00")));
5942        assert_eq!(report.trigger_type, Some(TriggerType::LastPrice));
5943        assert_eq!(report.price, None);
5944        assert_eq!(report.quantity, Quantity::zero(inst.size_precision()));
5945        assert!(report.reduce_only);
5946    }
5947
5948    #[rstest]
5949    fn test_parse_algo_order_close_fraction_market_if_touched_without_sz() {
5950        let instrument = create_stub_instrument();
5951        let inst = InstrumentAny::CryptoPerpetual(instrument);
5952        let account_id = AccountId::new("OKX-001");
5953
5954        let mut msg = stub_algo_order_msg(OKXAlgoOrderType::Conditional);
5955        msg.sz = String::new();
5956        msg.trigger_px = String::new();
5957        msg.trigger_px_type = OKXTriggerType::None;
5958        msg.ord_px = String::new();
5959        msg.sl_trigger_px = String::new();
5960        msg.sl_ord_px = String::new();
5961        msg.tp_trigger_px = "50000".to_string();
5962        msg.tp_ord_px = "-1".to_string();
5963        msg.tp_trigger_px_type = OKXTriggerType::Last;
5964        msg.close_fraction = "1".to_string();
5965        msg.reduce_only = "true".to_string();
5966        msg.side = OKXSide::Buy;
5967
5968        let report =
5969            parse_algo_order_status_report(&msg, &inst, account_id, UnixNanos::default()).unwrap();
5970
5971        assert_eq!(report.order_type, OrderType::MarketIfTouched);
5972        assert_eq!(report.trigger_price, Some(Price::from("50000.00")));
5973        assert_eq!(report.trigger_type, Some(TriggerType::LastPrice));
5974        assert_eq!(report.price, None);
5975        assert_eq!(report.quantity, Quantity::zero(inst.size_precision()));
5976        assert!(report.reduce_only);
5977    }
5978
5979    fn stub_book_entry(price: &str, size: &str) -> OrderBookEntry {
5980        OrderBookEntry {
5981            price: price.to_string(),
5982            size: size.to_string(),
5983            liquidated_orders_count: "0".to_string(),
5984            orders_count: "1".to_string(),
5985        }
5986    }
5987
5988    fn stub_book_msg(bids: Vec<OrderBookEntry>, asks: Vec<OrderBookEntry>) -> OKXBookMsg {
5989        OKXBookMsg {
5990            bids,
5991            asks,
5992            ts: 1706000000000,
5993            seq_id: 1,
5994            prev_seq_id: Some(0),
5995            checksum: None,
5996        }
5997    }
5998
5999    #[rstest]
6000    fn test_parse_quote_msg_empty_bids_returns_error() {
6001        let msg = stub_book_msg(vec![], vec![stub_book_entry("50000.00", "1.0")]);
6002
6003        let result = parse_quote_msg(
6004            &msg,
6005            InstrumentId::from("BTC-USDT.OKX"),
6006            2,
6007            8,
6008            UnixNanos::default(),
6009        );
6010        assert!(result.is_err());
6011        assert!(result.unwrap_err().to_string().contains("Empty bids"));
6012    }
6013
6014    #[rstest]
6015    fn test_parse_quote_msg_empty_asks_returns_error() {
6016        let msg = stub_book_msg(vec![stub_book_entry("50000.00", "1.0")], vec![]);
6017
6018        let result = parse_quote_msg(
6019            &msg,
6020            InstrumentId::from("BTC-USDT.OKX"),
6021            2,
6022            8,
6023            UnixNanos::default(),
6024        );
6025        assert!(result.is_err());
6026        assert!(result.unwrap_err().to_string().contains("Empty asks"));
6027    }
6028
6029    #[rstest]
6030    fn test_quote_cache_complete_bbo_tbt_message() {
6031        use nautilus_common::cache::quote::QuoteCache;
6032
6033        let mut cache = QuoteCache::new();
6034        let instrument_id = InstrumentId::from("BTC-USD-260327-75000-C.OKX");
6035        let msg = stub_book_msg(
6036            vec![stub_book_entry("0.0035", "100")],
6037            vec![stub_book_entry("0.0040", "200")],
6038        );
6039
6040        let bid_price = Some(parse_price(&msg.bids[0].price, 4).unwrap());
6041        let bid_size = Some(parse_quantity(&msg.bids[0].size, 0).unwrap());
6042        let ask_price = Some(parse_price(&msg.asks[0].price, 4).unwrap());
6043        let ask_size = Some(parse_quantity(&msg.asks[0].size, 0).unwrap());
6044        let ts_event = parse_millisecond_timestamp(msg.ts);
6045
6046        let quote = cache
6047            .process(
6048                instrument_id,
6049                bid_price,
6050                ask_price,
6051                bid_size,
6052                ask_size,
6053                ts_event,
6054                UnixNanos::default(),
6055            )
6056            .unwrap();
6057
6058        assert_eq!(quote.bid_price, Price::from("0.0035"));
6059        assert_eq!(quote.ask_price, Price::from("0.0040"));
6060        assert_eq!(quote.bid_size, Quantity::from(100));
6061        assert_eq!(quote.ask_size, Quantity::from(200));
6062    }
6063
6064    #[rstest]
6065    fn test_quote_cache_empty_bids_uses_cached_value() {
6066        use nautilus_common::cache::quote::QuoteCache;
6067
6068        let mut cache = QuoteCache::new();
6069        let instrument_id = InstrumentId::from("BTC-USD-260327-80000-C.OKX");
6070
6071        cache
6072            .process(
6073                instrument_id,
6074                Some(Price::from("0.0010")),
6075                Some(Price::from("0.0015")),
6076                Some(Quantity::from(50)),
6077                Some(Quantity::from(75)),
6078                UnixNanos::default(),
6079                UnixNanos::default(),
6080            )
6081            .unwrap();
6082
6083        let msg = stub_book_msg(vec![], vec![stub_book_entry("0.0020", "100")]);
6084        let ask_price = Some(parse_price(&msg.asks[0].price, 4).unwrap());
6085        let ask_size = Some(parse_quantity(&msg.asks[0].size, 0).unwrap());
6086        let ts_event = parse_millisecond_timestamp(msg.ts);
6087
6088        let quote = cache
6089            .process(
6090                instrument_id,
6091                None,
6092                ask_price,
6093                None,
6094                ask_size,
6095                ts_event,
6096                UnixNanos::default(),
6097            )
6098            .unwrap();
6099
6100        assert_eq!(quote.bid_price, Price::from("0.0010"));
6101        assert_eq!(quote.bid_size, Quantity::from(50));
6102        assert_eq!(quote.ask_price, Price::from("0.0020"));
6103        assert_eq!(quote.ask_size, Quantity::from(100));
6104    }
6105
6106    #[rstest]
6107    fn test_quote_cache_empty_asks_uses_cached_value() {
6108        use nautilus_common::cache::quote::QuoteCache;
6109
6110        let mut cache = QuoteCache::new();
6111        let instrument_id = InstrumentId::from("BTC-USD-260327-79000-P.OKX");
6112
6113        cache
6114            .process(
6115                instrument_id,
6116                Some(Price::from("0.0010")),
6117                Some(Price::from("0.0015")),
6118                Some(Quantity::from(50)),
6119                Some(Quantity::from(75)),
6120                UnixNanos::default(),
6121                UnixNanos::default(),
6122            )
6123            .unwrap();
6124
6125        let msg = stub_book_msg(vec![stub_book_entry("0.0012", "60")], vec![]);
6126        let bid_price = Some(parse_price(&msg.bids[0].price, 4).unwrap());
6127        let bid_size = Some(parse_quantity(&msg.bids[0].size, 0).unwrap());
6128        let ts_event = parse_millisecond_timestamp(msg.ts);
6129
6130        let quote = cache
6131            .process(
6132                instrument_id,
6133                bid_price,
6134                None,
6135                bid_size,
6136                None,
6137                ts_event,
6138                UnixNanos::default(),
6139            )
6140            .unwrap();
6141
6142        assert_eq!(quote.bid_price, Price::from("0.0012"));
6143        assert_eq!(quote.bid_size, Quantity::from(60));
6144        assert_eq!(quote.ask_price, Price::from("0.0015"));
6145        assert_eq!(quote.ask_size, Quantity::from(75));
6146    }
6147
6148    #[rstest]
6149    fn test_quote_cache_both_sides_empty_no_cache_returns_error() {
6150        use nautilus_common::cache::quote::QuoteCache;
6151
6152        let mut cache = QuoteCache::new();
6153        let instrument_id = InstrumentId::from("BTC-USD-260327-80000-C.OKX");
6154
6155        let result = cache.process(
6156            instrument_id,
6157            None,
6158            None,
6159            None,
6160            None,
6161            UnixNanos::default(),
6162            UnixNanos::default(),
6163        );
6164
6165        assert!(result.is_err());
6166    }
6167
6168    #[rstest]
6169    fn test_quote_cache_both_sides_empty_with_cache_returns_cached() {
6170        use nautilus_common::cache::quote::QuoteCache;
6171
6172        let mut cache = QuoteCache::new();
6173        let instrument_id = InstrumentId::from("BTC-USD-260327-80000-C.OKX");
6174
6175        cache
6176            .process(
6177                instrument_id,
6178                Some(Price::from("0.0010")),
6179                Some(Price::from("0.0015")),
6180                Some(Quantity::from(50)),
6181                Some(Quantity::from(75)),
6182                UnixNanos::default(),
6183                UnixNanos::default(),
6184            )
6185            .unwrap();
6186
6187        let quote = cache
6188            .process(
6189                instrument_id,
6190                None,
6191                None,
6192                None,
6193                None,
6194                UnixNanos::from(1706000000000000000u64),
6195                UnixNanos::from(1706000000000000000u64),
6196            )
6197            .unwrap();
6198
6199        assert_eq!(quote.bid_price, Price::from("0.0010"));
6200        assert_eq!(quote.ask_price, Price::from("0.0015"));
6201        assert_eq!(quote.ts_event, UnixNanos::from(1706000000000000000u64));
6202    }
6203
6204    #[rstest]
6205    fn test_parse_instruments_channel_produces_status() {
6206        use nautilus_model::{enums::MarketStatusAction, identifiers::InstrumentId};
6207
6208        use crate::common::{models::OKXInstrument, parse::parse_instrument_any};
6209
6210        let ts_init = UnixNanos::default();
6211
6212        // Build a cached instrument with fees
6213        let inst_json = serde_json::json!({
6214            "instType": "SPOT",
6215            "instId": "BTC-USD",
6216            "baseCcy": "BTC",
6217            "quoteCcy": "USD",
6218            "settleCcy": "",
6219            "ctVal": "",
6220            "ctMult": "",
6221            "ctValCcy": "",
6222            "optType": "",
6223            "stk": "",
6224            "listTime": "1733454000000",
6225            "expTime": "",
6226            "lever": "",
6227            "tickSz": "0.1",
6228            "lotSz": "0.00000001",
6229            "minSz": "0.00001",
6230            "ctType": "",
6231            "state": "live",
6232            "ruleType": "normal",
6233            "maxLmtSz": "9999999999",
6234            "maxMktSz": "1000000",
6235            "maxLmtAmt": "20000000",
6236            "maxMktAmt": "1000000",
6237            "maxTwapSz": "9999999999",
6238            "maxIcebergSz": "9999999999",
6239            "maxTriggerSz": "9999999999",
6240            "maxStopSz": "1000000",
6241            "uly": "",
6242            "instFamily": ""
6243        });
6244        let initial: OKXInstrument = serde_json::from_value(inst_json).unwrap();
6245        let parsed = parse_instrument_any(&initial, None, None, None, None, ts_init)
6246            .unwrap()
6247            .unwrap();
6248
6249        let mut instruments_cache = AHashMap::new();
6250        instruments_cache.insert(Ustr::from("BTC-USD"), parsed);
6251
6252        let ws_data = serde_json::json!({
6253            "instType": "SPOT",
6254            "instId": "BTC-USD",
6255            "baseCcy": "BTC",
6256            "quoteCcy": "USD",
6257            "settleCcy": "",
6258            "ctVal": "",
6259            "ctMult": "",
6260            "ctValCcy": "",
6261            "optType": "",
6262            "stk": "",
6263            "listTime": "1733454000000",
6264            "expTime": "",
6265            "lever": "",
6266            "tickSz": "0.1",
6267            "lotSz": "0.00000001",
6268            "minSz": "0.00001",
6269            "ctType": "",
6270            "state": "live",
6271            "ruleType": "normal",
6272            "maxLmtSz": "9999999999",
6273            "maxMktSz": "1000000",
6274            "maxLmtAmt": "20000000",
6275            "maxMktAmt": "1000000",
6276            "maxTwapSz": "9999999999",
6277            "maxIcebergSz": "9999999999",
6278            "maxTriggerSz": "9999999999",
6279            "maxStopSz": "1000000",
6280            "uly": "",
6281            "instFamily": ""
6282        });
6283
6284        let instrument_id = InstrumentId::from("BTC-USD.OKX");
6285        let mut funding_cache = AHashMap::new();
6286
6287        let result = parse_ws_message_data(
6288            &OKXWsChannel::Instruments,
6289            ws_data,
6290            &instrument_id,
6291            2,
6292            8,
6293            ts_init,
6294            &mut funding_cache,
6295            &instruments_cache,
6296        )
6297        .expect("Failed to parse instruments channel");
6298
6299        match result {
6300            Some(NautilusWsMessage::Instrument(inst, status)) => {
6301                assert_eq!(inst.id(), InstrumentId::from("BTC-USD.OKX"));
6302                let status = status.expect("Expected InstrumentStatus");
6303                assert_eq!(status.action, MarketStatusAction::Trading);
6304                assert_eq!(status.is_trading, Some(true));
6305            }
6306            other => panic!("Expected Instrument with status, was {other:?}"),
6307        }
6308    }
6309
6310    #[rstest]
6311    fn test_parse_instruments_channel_suspend_status() {
6312        use nautilus_model::{enums::MarketStatusAction, identifiers::InstrumentId};
6313
6314        use crate::common::{models::OKXInstrument, parse::parse_instrument_any};
6315
6316        let ts_init = UnixNanos::default();
6317
6318        let inst_json = serde_json::json!({
6319            "instType": "SPOT",
6320            "instId": "BTC-USD",
6321            "baseCcy": "BTC",
6322            "quoteCcy": "USD",
6323            "settleCcy": "",
6324            "ctVal": "",
6325            "ctMult": "",
6326            "ctValCcy": "",
6327            "optType": "",
6328            "stk": "",
6329            "listTime": "1733454000000",
6330            "expTime": "",
6331            "lever": "",
6332            "tickSz": "0.1",
6333            "lotSz": "0.00000001",
6334            "minSz": "0.00001",
6335            "ctType": "",
6336            "state": "live",
6337            "ruleType": "normal",
6338            "maxLmtSz": "9999999999",
6339            "maxMktSz": "1000000",
6340            "maxLmtAmt": "20000000",
6341            "maxMktAmt": "1000000",
6342            "maxTwapSz": "9999999999",
6343            "maxIcebergSz": "9999999999",
6344            "maxTriggerSz": "9999999999",
6345            "maxStopSz": "1000000",
6346            "uly": "",
6347            "instFamily": ""
6348        });
6349        let initial: OKXInstrument = serde_json::from_value(inst_json).unwrap();
6350        let parsed = parse_instrument_any(&initial, None, None, None, None, ts_init)
6351            .unwrap()
6352            .unwrap();
6353
6354        let mut instruments_cache = AHashMap::new();
6355        instruments_cache.insert(Ustr::from("BTC-USD"), parsed);
6356
6357        // WS update with suspend state
6358        let ws_data = serde_json::json!({
6359            "instType": "SPOT",
6360            "instId": "BTC-USD",
6361            "baseCcy": "BTC",
6362            "quoteCcy": "USD",
6363            "settleCcy": "",
6364            "ctVal": "",
6365            "ctMult": "",
6366            "ctValCcy": "",
6367            "optType": "",
6368            "stk": "",
6369            "listTime": "1733454000000",
6370            "expTime": "",
6371            "lever": "",
6372            "tickSz": "0.1",
6373            "lotSz": "0.00000001",
6374            "minSz": "0.00001",
6375            "ctType": "",
6376            "state": "suspend",
6377            "ruleType": "normal",
6378            "maxLmtSz": "9999999999",
6379            "maxMktSz": "1000000",
6380            "maxLmtAmt": "20000000",
6381            "maxMktAmt": "1000000",
6382            "maxTwapSz": "9999999999",
6383            "maxIcebergSz": "9999999999",
6384            "maxTriggerSz": "9999999999",
6385            "maxStopSz": "1000000",
6386            "uly": "",
6387            "instFamily": ""
6388        });
6389
6390        let instrument_id = InstrumentId::from("BTC-USD.OKX");
6391        let mut funding_cache = AHashMap::new();
6392
6393        let result = parse_ws_message_data(
6394            &OKXWsChannel::Instruments,
6395            ws_data,
6396            &instrument_id,
6397            2,
6398            8,
6399            ts_init,
6400            &mut funding_cache,
6401            &instruments_cache,
6402        )
6403        .expect("Failed to parse instruments channel");
6404
6405        match result {
6406            Some(NautilusWsMessage::Instrument(_, status)) => {
6407                let status = status.expect("Expected InstrumentStatus");
6408                assert_eq!(status.action, MarketStatusAction::Suspend);
6409                assert_eq!(status.is_trading, Some(false));
6410            }
6411            other => panic!("Expected Instrument with status, was {other:?}"),
6412        }
6413    }
6414
6415    #[rstest]
6416    fn test_parse_option_summary_greeks() {
6417        let json_str = load_test_json("ws_opt_summary.json");
6418        let msgs: Vec<OKXOptionSummaryMsg> =
6419            serde_json::from_str(&json_str).expect("Failed to deserialize opt-summary fixture");
6420        assert_eq!(msgs.len(), 2);
6421
6422        let instrument_id = InstrumentId::from("BTC-USD-250328-92000-C.OKX");
6423        let ts_init = UnixNanos::from(1_711_612_900_000_000_000u64);
6424        let greeks =
6425            parse_option_summary_greeks(&msgs[0], &instrument_id, OKXGreeksType::Bs, ts_init)
6426                .expect("parse failed");
6427
6428        assert_eq!(greeks.instrument_id, instrument_id);
6429        assert!((greeks.greeks.delta - 0.5312).abs() < 1e-10);
6430        assert!((greeks.greeks.gamma - 0.0000134).abs() < 1e-15);
6431        assert!((greeks.greeks.vega - 0.0038).abs() < 1e-10);
6432        assert!((greeks.greeks.theta - (-0.0015)).abs() < 1e-10);
6433        assert!((greeks.greeks.rho - 0.0).abs() < 1e-10);
6434        assert!((greeks.mark_iv.unwrap() - 0.53).abs() < 1e-10);
6435        assert!((greeks.bid_iv.unwrap() - 0.52).abs() < 1e-10);
6436        assert!((greeks.ask_iv.unwrap() - 0.55).abs() < 1e-10);
6437        assert!((greeks.underlying_price.unwrap() - 92150.50).abs() < 1e-10);
6438        assert!(greeks.open_interest.is_none());
6439        assert_eq!(greeks.convention, GreeksConvention::BlackScholes);
6440        assert_eq!(
6441            greeks.ts_event,
6442            UnixNanos::from(1_711_612_800_000_000_000u64)
6443        );
6444        assert_eq!(greeks.ts_init, ts_init);
6445    }
6446
6447    #[rstest]
6448    fn test_option_summary_msg_deserializes_with_uppercase_bs_alias() {
6449        let json = r#"{
6450            "instId": "BTC-USD-250328-92000-C",
6451            "uly": "BTC-USD",
6452            "delta": "0.52",
6453            "gamma": "0.00001",
6454            "theta": "-0.001",
6455            "vega": "0.003",
6456            "deltaBS": "0.53",
6457            "gammaBS": "0.00002",
6458            "thetaBS": "-0.002",
6459            "vegaBS": "0.004",
6460            "realVol": "0.45",
6461            "bidVol": "0.50",
6462            "askVol": "0.55",
6463            "markVol": "0.52",
6464            "lever": "10.0",
6465            "ts": "1711612800000"
6466        }"#;
6467        let msg: OKXOptionSummaryMsg =
6468            serde_json::from_str(json).expect("deltaBS alias failed to deserialize");
6469        assert_eq!(msg.delta_bs, "0.53");
6470        assert_eq!(msg.gamma_bs, "0.00002");
6471        assert_eq!(msg.theta_bs, "-0.002");
6472        assert_eq!(msg.vega_bs, "0.004");
6473    }
6474
6475    #[rstest]
6476    fn test_parse_option_summary_greeks_put() {
6477        let json_str = load_test_json("ws_opt_summary.json");
6478        let msgs: Vec<OKXOptionSummaryMsg> =
6479            serde_json::from_str(&json_str).expect("Failed to deserialize opt-summary fixture");
6480
6481        let instrument_id = InstrumentId::from("BTC-USD-250328-92000-P.OKX");
6482        let ts_init = UnixNanos::from(1_711_612_900_000_000_000u64);
6483        let greeks =
6484            parse_option_summary_greeks(&msgs[1], &instrument_id, OKXGreeksType::Bs, ts_init)
6485                .expect("parse failed");
6486
6487        assert!((greeks.greeks.delta - (-0.4688)).abs() < 1e-10);
6488    }
6489
6490    #[rstest]
6491    fn test_parse_option_summary_greeks_pa() {
6492        let json_str = load_test_json("ws_opt_summary.json");
6493        let msgs: Vec<OKXOptionSummaryMsg> =
6494            serde_json::from_str(&json_str).expect("Failed to deserialize opt-summary fixture");
6495        assert_eq!(msgs.len(), 2);
6496
6497        let instrument_id = InstrumentId::from("BTC-USD-250328-92000-C.OKX");
6498        let ts_init = UnixNanos::from(1_711_612_900_000_000_000u64);
6499        let greeks =
6500            parse_option_summary_greeks(&msgs[0], &instrument_id, OKXGreeksType::Pa, ts_init)
6501                .expect("parse failed");
6502
6503        assert_eq!(greeks.instrument_id, instrument_id);
6504        assert!((greeks.greeks.delta - 0.5234).abs() < 1e-10);
6505        assert!((greeks.greeks.gamma - 0.0000123).abs() < 1e-15);
6506        assert!((greeks.greeks.vega - 0.0034).abs() < 1e-10);
6507        assert!((greeks.greeks.theta - (-0.0012)).abs() < 1e-10);
6508        assert!((greeks.greeks.rho - 0.0).abs() < 1e-10);
6509        assert!((greeks.mark_iv.unwrap() - 0.53).abs() < 1e-10);
6510        assert!((greeks.bid_iv.unwrap() - 0.52).abs() < 1e-10);
6511        assert!((greeks.ask_iv.unwrap() - 0.55).abs() < 1e-10);
6512        assert!((greeks.underlying_price.unwrap() - 92150.50).abs() < 1e-10);
6513        assert_eq!(greeks.convention, GreeksConvention::PriceAdjusted);
6514    }
6515
6516    #[rstest]
6517    fn test_parse_option_summary_greeks_pa_put() {
6518        let json_str = load_test_json("ws_opt_summary.json");
6519        let msgs: Vec<OKXOptionSummaryMsg> =
6520            serde_json::from_str(&json_str).expect("Failed to deserialize opt-summary fixture");
6521
6522        let instrument_id = InstrumentId::from("BTC-USD-250328-92000-P.OKX");
6523        let ts_init = UnixNanos::from(1_711_612_900_000_000_000u64);
6524        let greeks =
6525            parse_option_summary_greeks(&msgs[1], &instrument_id, OKXGreeksType::Pa, ts_init)
6526                .expect("parse failed");
6527
6528        assert!((greeks.greeks.delta - (-0.4766)).abs() < 1e-10);
6529    }
6530
6531    #[rstest]
6532    fn test_option_greeks_filtering_only_subscribed_instruments() {
6533        use ahash::AHashSet;
6534
6535        let json_str = load_test_json("ws_opt_summary.json");
6536        let msgs: Vec<OKXOptionSummaryMsg> =
6537            serde_json::from_str(&json_str).expect("Failed to deserialize");
6538
6539        let call_id = InstrumentId::from("BTC-USD-250328-92000-C.OKX");
6540        let put_id = InstrumentId::from("BTC-USD-250328-92000-P.OKX");
6541        let ts_init = UnixNanos::from(1_711_612_900_000_000_000u64);
6542
6543        // Subscribe to CALL only
6544        let mut subs = AHashSet::new();
6545        subs.insert(call_id);
6546
6547        let mut results = Vec::new();
6548
6549        for msg in &msgs {
6550            let inst_id_str = format!("{}.OKX", msg.inst_id);
6551            let instrument_id = InstrumentId::from(inst_id_str.as_str());
6552            if !subs.contains(&instrument_id) {
6553                continue;
6554            }
6555
6556            if let Ok(greeks) =
6557                parse_option_summary_greeks(msg, &instrument_id, OKXGreeksType::Bs, ts_init)
6558            {
6559                results.push(greeks);
6560            }
6561        }
6562
6563        assert_eq!(results.len(), 1);
6564        assert_eq!(results[0].instrument_id, call_id);
6565        assert!((results[0].greeks.delta - 0.5312).abs() < 1e-10);
6566
6567        // Now subscribe to both
6568        subs.insert(put_id);
6569
6570        let mut results = Vec::new();
6571
6572        for msg in &msgs {
6573            let inst_id_str = format!("{}.OKX", msg.inst_id);
6574            let instrument_id = InstrumentId::from(inst_id_str.as_str());
6575            if !subs.contains(&instrument_id) {
6576                continue;
6577            }
6578
6579            if let Ok(greeks) =
6580                parse_option_summary_greeks(msg, &instrument_id, OKXGreeksType::Bs, ts_init)
6581            {
6582                results.push(greeks);
6583            }
6584        }
6585
6586        assert_eq!(results.len(), 2);
6587    }
6588
6589    #[rstest]
6590    fn test_option_greeks_unsubscribed_instrument_filtered_out() {
6591        use ahash::AHashSet;
6592
6593        let json_str = load_test_json("ws_opt_summary.json");
6594        let msgs: Vec<OKXOptionSummaryMsg> =
6595            serde_json::from_str(&json_str).expect("Failed to deserialize");
6596
6597        let ts_init = UnixNanos::default();
6598
6599        // Empty subscription set
6600        let subs: AHashSet<InstrumentId> = AHashSet::new();
6601
6602        let mut results = Vec::new();
6603
6604        for msg in &msgs {
6605            let inst_id_str = format!("{}.OKX", msg.inst_id);
6606            let instrument_id = InstrumentId::from(inst_id_str.as_str());
6607            if !subs.contains(&instrument_id) {
6608                continue;
6609            }
6610
6611            if let Ok(greeks) =
6612                parse_option_summary_greeks(msg, &instrument_id, OKXGreeksType::Bs, ts_init)
6613            {
6614                results.push(greeks);
6615            }
6616        }
6617
6618        assert!(results.is_empty());
6619    }
6620
6621    #[rstest]
6622    fn test_option_greeks_family_dedup_subscribe_count() {
6623        use crate::common::parse::extract_inst_family;
6624
6625        let mut family_subs: AHashMap<Ustr, usize> = AHashMap::new();
6626
6627        let call_id = InstrumentId::from("BTC-USD-250328-92000-C.OKX");
6628        let put_id = InstrumentId::from("BTC-USD-250328-92000-P.OKX");
6629        let other_id = InstrumentId::from("BTC-USD-250328-80000-C.OKX");
6630
6631        // Subscribe first instrument: count goes to 1 (triggers WS subscribe)
6632        let family = extract_inst_family(call_id.symbol.inner().as_str()).unwrap();
6633        let count = family_subs.entry(family).or_default();
6634        *count += 1;
6635        assert_eq!(*count, 1);
6636        let should_subscribe_ws = *count == 1;
6637        assert!(should_subscribe_ws);
6638
6639        // Subscribe second instrument in same family: count goes to 2 (no WS subscribe)
6640        let family = extract_inst_family(put_id.symbol.inner().as_str()).unwrap();
6641        let count = family_subs.entry(family).or_default();
6642        *count += 1;
6643        assert_eq!(*count, 2);
6644        let should_subscribe_ws = *count == 1;
6645        assert!(!should_subscribe_ws);
6646
6647        // Subscribe third instrument in same family: count goes to 3
6648        let family = extract_inst_family(other_id.symbol.inner().as_str()).unwrap();
6649        let count = family_subs.entry(family).or_default();
6650        *count += 1;
6651        assert_eq!(*count, 3);
6652
6653        // Unsubscribe one: count goes to 2 (no WS unsubscribe)
6654        let family = extract_inst_family(call_id.symbol.inner().as_str()).unwrap();
6655        if let Some(count) = family_subs.get_mut(&family) {
6656            *count = count.saturating_sub(1);
6657            assert_eq!(*count, 2);
6658            let should_unsubscribe_ws = *count == 0;
6659            assert!(!should_unsubscribe_ws);
6660        }
6661
6662        // Unsubscribe second: count goes to 1
6663        let family = extract_inst_family(put_id.symbol.inner().as_str()).unwrap();
6664        if let Some(count) = family_subs.get_mut(&family) {
6665            *count = count.saturating_sub(1);
6666            assert_eq!(*count, 1);
6667        }
6668
6669        // Unsubscribe last: count goes to 0 (triggers WS unsubscribe)
6670        let family = extract_inst_family(other_id.symbol.inner().as_str()).unwrap();
6671        if let Some(count) = family_subs.get_mut(&family) {
6672            *count = count.saturating_sub(1);
6673            assert_eq!(*count, 0);
6674            let should_unsubscribe_ws = *count == 0;
6675            assert!(should_unsubscribe_ws);
6676        }
6677    }
6678}