Skip to main content

nautilus_okx/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the OKX adapter.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::{AHashMap, AHashSet};
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27    cache::quote::QuoteCache,
28    clients::DataClient,
29    live::{runner::get_data_event_sender, runtime::get_runtime},
30    messages::{
31        DataEvent,
32        data::{
33            BarsResponse, BookResponse, DataResponse, ForwardPricesResponse, FundingRatesResponse,
34            InstrumentResponse, InstrumentsResponse, RequestBars, RequestBookSnapshot,
35            RequestForwardPrices, RequestFundingRates, RequestInstrument, RequestInstruments,
36            RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeFundingRates,
37            SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentStatus,
38            SubscribeInstruments, SubscribeMarkPrices, SubscribeOptionGreeks, SubscribeQuotes,
39            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
40            UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrumentStatus,
41            UnsubscribeMarkPrices, UnsubscribeOptionGreeks, UnsubscribeQuotes, UnsubscribeTrades,
42        },
43    },
44};
45use nautilus_core::{
46    AtomicMap, Params, UnixNanos,
47    datetime::datetime_to_unix_nanos,
48    time::{AtomicTime, get_atomic_clock_realtime},
49};
50use nautilus_model::{
51    data::{Data, FundingRateUpdate, InstrumentStatus, OrderBookDeltas_API},
52    enums::{BookType, GreeksConvention, MarketStatusAction},
53    identifiers::{ClientId, InstrumentId, Venue},
54    instruments::{Instrument, InstrumentAny},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58use ustr::Ustr;
59
60use crate::{
61    common::{
62        consts::{
63            OKX_VENUE, OKX_WS_HEARTBEAT_SECS, resolve_book_depth, resolve_instrument_families,
64        },
65        enums::{
66            OKXBookChannel, OKXContractType, OKXGreeksType, OKXInstrumentStatus, OKXInstrumentType,
67            OKXVipLevel,
68        },
69        parse::{
70            extract_inst_family, okx_instrument_type_from_symbol, okx_status_to_market_action,
71            parse_base_quote_from_symbol, parse_instrument_any, parse_instrument_id,
72            parse_millisecond_timestamp, parse_price, parse_quantity,
73        },
74    },
75    config::OKXDataClientConfig,
76    http::client::OKXHttpClient,
77    websocket::{
78        client::OKXWebSocketClient,
79        enums::OKXWsChannel,
80        messages::{NautilusWsMessage, OKXBookMsg, OKXOptionSummaryMsg, OKXWsMessage},
81        parse::{
82            extract_fees_from_cached_instrument, parse_book_msg_vec, parse_index_price_msg_vec,
83            parse_option_summary_greeks, parse_ws_message_data,
84        },
85    },
86};
87
88/// Resolves the set of [`OKXGreeksType`] conventions for an option greeks subscription.
89///
90/// Reads the `greeks_convention` key from `params`, accepting either a single
91/// [`GreeksConvention`] string (e.g. `"BLACK_SCHOLES"` or `"PRICE_ADJUSTED"`) or a
92/// JSON array of such strings. Unrecognized entries log a warning and are skipped.
93/// Returns the default set `{Bs, Pa}` when the key is absent, unparsable, or
94/// yields no valid entries so every subscription defaults to both conventions.
95pub(crate) fn parse_greeks_conventions_from_params(
96    params: &Option<Params>,
97) -> AHashSet<OKXGreeksType> {
98    let default_set: AHashSet<OKXGreeksType> =
99        [OKXGreeksType::Bs, OKXGreeksType::Pa].into_iter().collect();
100
101    let Some(value) = params.as_ref().and_then(|p| p.get("greeks_convention")) else {
102        return default_set;
103    };
104
105    let mut out = AHashSet::new();
106    match value {
107        serde_json::Value::String(s) => push_convention_str(&mut out, s),
108        serde_json::Value::Array(items) => {
109            for item in items {
110                if let Some(s) = item.as_str() {
111                    push_convention_str(&mut out, s);
112                } else {
113                    log::warn!("Ignoring non-string greeks_convention entry {item:?}");
114                }
115            }
116        }
117        other => {
118            log::warn!(
119                "Unsupported greeks_convention value {other:?}, defaulting to both conventions"
120            );
121        }
122    }
123
124    if out.is_empty() { default_set } else { out }
125}
126
127fn push_convention_str(out: &mut AHashSet<OKXGreeksType>, raw: &str) {
128    match raw.parse::<GreeksConvention>() {
129        Ok(convention) => {
130            out.insert(convention.into());
131        }
132        Err(_) => log::warn!("Unrecognized greeks_convention {raw:?}, skipping"),
133    }
134}
135
136#[derive(Debug)]
137pub struct OKXDataClient {
138    client_id: ClientId,
139    config: OKXDataClientConfig,
140    http_client: OKXHttpClient,
141    ws_public: Option<OKXWebSocketClient>,
142    ws_business: Option<OKXWebSocketClient>,
143    is_connected: AtomicBool,
144    cancellation_token: CancellationToken,
145    tasks: Vec<JoinHandle<()>>,
146    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
147    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
148    book_channels: Arc<AtomicMap<InstrumentId, OKXBookChannel>>,
149    index_ticker_map: Arc<AtomicMap<Ustr, AHashSet<Ustr>>>,
150    option_greeks_subs: Arc<AtomicMap<InstrumentId, AHashSet<OKXGreeksType>>>,
151    // `Mutex<AHashMap>` so the spawned subscribe task can roll back the
152    // refcount on failure. A bare `AHashMap` would leave the count
153    // permanently incremented and wedge future Greeks subscribes.
154    option_summary_family_subs: Arc<std::sync::Mutex<AHashMap<Ustr, usize>>>,
155    clock: &'static AtomicTime,
156}
157
158impl OKXDataClient {
159    /// Creates a new [`OKXDataClient`] instance.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if the client fails to initialize.
164    pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
165        let clock = get_atomic_clock_realtime();
166        let data_sender = get_data_event_sender();
167
168        let http_client = if config.has_api_credentials() {
169            OKXHttpClient::with_credentials(
170                config.api_key.clone(),
171                config.api_secret.clone(),
172                config.api_passphrase.clone(),
173                config.base_url_http.clone(),
174                config.http_timeout_secs,
175                config.max_retries,
176                config.retry_delay_initial_ms,
177                config.retry_delay_max_ms,
178                config.environment,
179                config.proxy_url.clone(),
180            )?
181        } else {
182            OKXHttpClient::new(
183                config.base_url_http.clone(),
184                config.http_timeout_secs,
185                config.max_retries,
186                config.retry_delay_initial_ms,
187                config.retry_delay_max_ms,
188                config.environment,
189                config.proxy_url.clone(),
190            )?
191        };
192
193        let ws_public = OKXWebSocketClient::new(
194            Some(config.ws_public_url()),
195            None,
196            None,
197            None,
198            None,
199            Some(OKX_WS_HEARTBEAT_SECS),
200            None,
201            config.transport_backend,
202            config.proxy_url.clone(),
203        )
204        .context("failed to construct OKX public websocket client")?;
205
206        let ws_business = if config.requires_business_ws() {
207            let ws = OKXWebSocketClient::new(
208                Some(config.ws_business_url()),
209                None, // No auth needed for public business channels
210                None,
211                None,
212                None,
213                Some(OKX_WS_HEARTBEAT_SECS),
214                None,
215                config.transport_backend,
216                config.proxy_url.clone(),
217            )
218            .context("failed to construct OKX business websocket client")?;
219            Some(ws)
220        } else {
221            None
222        };
223
224        if let Some(vip_level) = config.vip_level {
225            ws_public.set_vip_level(vip_level);
226
227            if let Some(ref ws) = ws_business {
228                ws.set_vip_level(vip_level);
229            }
230        }
231
232        Ok(Self {
233            client_id,
234            config,
235            http_client,
236            ws_public: Some(ws_public),
237            ws_business,
238            is_connected: AtomicBool::new(false),
239            cancellation_token: CancellationToken::new(),
240            tasks: Vec::new(),
241            data_sender,
242            instruments: Arc::new(AtomicMap::new()),
243            book_channels: Arc::new(AtomicMap::new()),
244            index_ticker_map: Arc::new(AtomicMap::new()),
245            option_greeks_subs: Arc::new(AtomicMap::new()),
246            option_summary_family_subs: Arc::new(std::sync::Mutex::new(AHashMap::new())),
247            clock,
248        })
249    }
250
251    fn venue(&self) -> Venue {
252        *OKX_VENUE
253    }
254
255    fn vip_level(&self) -> Option<OKXVipLevel> {
256        self.ws_public.as_ref().map(|ws| ws.vip_level())
257    }
258
259    fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
260        self.ws_public
261            .as_ref()
262            .context("public websocket client not initialized")
263    }
264
265    fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
266        self.ws_business
267            .as_ref()
268            .context("business websocket client not available (credentials required)")
269    }
270
271    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
272        if let Err(e) = sender.send(DataEvent::Data(data)) {
273            log::error!("Failed to emit data event: {e}");
274        }
275    }
276
277    fn spawn_ws<F>(&self, fut: F, context: &'static str)
278    where
279        F: Future<Output = anyhow::Result<()>> + Send + 'static,
280    {
281        get_runtime().spawn(async move {
282            if let Err(e) = fut.await {
283                log::error!("{context}: {e:?}");
284            }
285        });
286    }
287
288    #[expect(clippy::too_many_arguments)]
289    fn handle_ws_message(
290        message: OKXWsMessage,
291        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
292        instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
293        instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
294        quote_cache: &mut QuoteCache,
295        funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
296        index_ticker_map: &Arc<AtomicMap<Ustr, AHashSet<Ustr>>>,
297        option_greeks_subs: &Arc<AtomicMap<InstrumentId, AHashSet<OKXGreeksType>>>,
298        clock: &AtomicTime,
299    ) {
300        match message {
301            OKXWsMessage::BookData { arg, action, data } => {
302                let Some(inst_id) = arg.inst_id else {
303                    log::warn!("Book data without inst_id");
304                    return;
305                };
306                let Some(instrument) = instruments_by_symbol.get(&inst_id) else {
307                    log::warn!("No cached instrument for book data: {inst_id}");
308                    return;
309                };
310                let ts_init = clock.get_time_ns();
311
312                match parse_book_msg_vec(
313                    data,
314                    &instrument.id(),
315                    instrument.price_precision(),
316                    instrument.size_precision(),
317                    action,
318                    ts_init,
319                ) {
320                    Ok(data_vec) => {
321                        for data in data_vec {
322                            Self::send_data(data_sender, data);
323                        }
324                    }
325                    Err(e) => log::error!("Failed to parse book data: {e}"),
326                }
327            }
328            OKXWsMessage::ChannelData {
329                channel,
330                inst_id,
331                data,
332            } => {
333                // Option summary subscriptions use instFamily (not instId), so
334                // the arg has inst_id: None. Each element in the data array carries
335                // its own inst_id that we resolve per-message.
336                if matches!(channel, OKXWsChannel::OptionSummary) {
337                    let ts_init = clock.get_time_ns();
338
339                    match serde_json::from_value::<Vec<OKXOptionSummaryMsg>>(data) {
340                        Ok(msgs) => {
341                            let subs = option_greeks_subs.load();
342
343                            for msg in &msgs {
344                                let Some(instrument) = instruments_by_symbol.get(&msg.inst_id)
345                                else {
346                                    continue;
347                                };
348                                let instrument_id = instrument.id();
349                                let Some(conventions) = subs.get(&instrument_id) else {
350                                    continue;
351                                };
352
353                                for greeks_type in conventions {
354                                    match parse_option_summary_greeks(
355                                        msg,
356                                        &instrument_id,
357                                        *greeks_type,
358                                        ts_init,
359                                    ) {
360                                        Ok(greeks) => {
361                                            if let Err(e) =
362                                                data_sender.send(DataEvent::OptionGreeks(greeks))
363                                            {
364                                                log::error!(
365                                                    "Failed to emit option greeks event: {e}"
366                                                );
367                                            }
368                                        }
369                                        Err(e) => {
370                                            log::error!(
371                                                "Failed to parse option summary for {} ({greeks_type:?}): {e}",
372                                                msg.inst_id
373                                            );
374                                        }
375                                    }
376                                }
377                            }
378                        }
379                        Err(e) => {
380                            log::error!("Failed to deserialize option summary data: {e}");
381                        }
382                    }
383                    return;
384                }
385
386                let Some(inst_id) = inst_id else {
387                    log::debug!("Channel data without inst_id: {channel:?}");
388                    return;
389                };
390
391                // Index tickers use base pair format (e.g., "BTC-USDT") but instruments
392                // are keyed by full symbol (e.g., "BTC-USDT-SWAP"). Dispatch index price
393                // updates only to instruments that subscribed via subscribe_index_prices.
394                if matches!(channel, OKXWsChannel::IndexTickers) {
395                    let ts_init = clock.get_time_ns();
396                    let map_guard = index_ticker_map.load();
397                    let Some(subscribed_symbols) = map_guard.get(&inst_id) else {
398                        log::debug!("No subscribed instruments for index ticker: {inst_id}");
399                        return;
400                    };
401                    let symbols: Vec<Ustr> = subscribed_symbols.iter().copied().collect();
402                    drop(map_guard);
403
404                    for sym in &symbols {
405                        let Some(instrument) = instruments_by_symbol.get(sym) else {
406                            log::warn!("No cached instrument for index ticker symbol: {sym}");
407                            continue;
408                        };
409
410                        match parse_index_price_msg_vec(
411                            data.clone(),
412                            &instrument.id(),
413                            instrument.price_precision(),
414                            ts_init,
415                        ) {
416                            Ok(data_vec) => {
417                                for d in data_vec {
418                                    Self::send_data(data_sender, d);
419                                }
420                            }
421                            Err(e) => log::error!("Failed to parse index price data: {e}"),
422                        }
423                    }
424                    return;
425                }
426
427                let Some(instrument) = instruments_by_symbol.get(&inst_id) else {
428                    log::warn!("No cached instrument for {channel:?}: {inst_id}");
429                    return;
430                };
431                let instrument_id = instrument.id();
432                let price_precision = instrument.price_precision();
433                let size_precision = instrument.size_precision();
434                let ts_init = clock.get_time_ns();
435
436                if matches!(channel, OKXWsChannel::BboTbt) {
437                    let msgs: Vec<OKXBookMsg> = match serde_json::from_value(data) {
438                        Ok(m) => m,
439                        Err(e) => {
440                            log::error!("Failed to deserialize BboTbt data: {e}");
441                            return;
442                        }
443                    };
444
445                    for msg in &msgs {
446                        let bid = msg.bids.first();
447                        let ask = msg.asks.first();
448                        let bid_price =
449                            bid.and_then(|e| parse_price(&e.price, price_precision).ok());
450                        let bid_size =
451                            bid.and_then(|e| parse_quantity(&e.size, size_precision).ok());
452                        let ask_price =
453                            ask.and_then(|e| parse_price(&e.price, price_precision).ok());
454                        let ask_size =
455                            ask.and_then(|e| parse_quantity(&e.size, size_precision).ok());
456                        let ts_event = parse_millisecond_timestamp(msg.ts);
457
458                        match quote_cache.process(
459                            instrument_id,
460                            bid_price,
461                            ask_price,
462                            bid_size,
463                            ask_size,
464                            ts_event,
465                            ts_init,
466                        ) {
467                            Ok(quote) => Self::send_data(data_sender, Data::Quote(quote)),
468                            Err(e) => {
469                                log::debug!("Skipping partial BboTbt for {instrument_id}: {e}");
470                            }
471                        }
472                    }
473
474                    return;
475                }
476
477                match parse_ws_message_data(
478                    &channel,
479                    data,
480                    &instrument_id,
481                    price_precision,
482                    size_precision,
483                    ts_init,
484                    funding_cache,
485                    instruments_by_symbol,
486                ) {
487                    Ok(Some(ws_msg)) => {
488                        dispatch_parsed_data(
489                            ws_msg,
490                            data_sender,
491                            instruments,
492                            instruments_by_symbol,
493                        );
494                    }
495                    Ok(None) => {}
496                    Err(e) => log::error!("Failed to parse {channel:?} data: {e}"),
497                }
498            }
499            OKXWsMessage::Instruments(okx_instruments) => {
500                let ts_init = clock.get_time_ns();
501
502                for okx_inst in okx_instruments {
503                    let inst_key = Ustr::from(&okx_inst.inst_id);
504                    let (margin_init, margin_maint, maker_fee, taker_fee) =
505                        instruments_by_symbol.get(&inst_key).map_or(
506                            (None, None, None, None),
507                            extract_fees_from_cached_instrument,
508                        );
509                    let status_action = okx_status_to_market_action(okx_inst.state);
510                    let is_live = matches!(okx_inst.state, OKXInstrumentStatus::Live);
511                    match parse_instrument_any(
512                        &okx_inst,
513                        margin_init,
514                        margin_maint,
515                        maker_fee,
516                        taker_fee,
517                        ts_init,
518                    ) {
519                        Ok(Some(inst_any)) => {
520                            let instrument_id = inst_any.id();
521                            instruments_by_symbol
522                                .insert(inst_any.symbol().inner(), inst_any.clone());
523                            upsert_instrument(instruments, inst_any);
524                            emit_instrument_status(
525                                data_sender,
526                                instrument_id,
527                                status_action,
528                                is_live,
529                                ts_init,
530                            );
531                        }
532                        Ok(None) => {
533                            let instrument_id = instruments_by_symbol
534                                .get(&inst_key)
535                                .map_or_else(|| parse_instrument_id(inst_key), |i| i.id());
536                            emit_instrument_status(
537                                data_sender,
538                                instrument_id,
539                                status_action,
540                                is_live,
541                                ts_init,
542                            );
543                        }
544                        Err(e) => {
545                            log::error!("Failed to parse instrument: {e}");
546                            let instrument_id = instruments_by_symbol
547                                .get(&inst_key)
548                                .map_or_else(|| parse_instrument_id(inst_key), |i| i.id());
549                            emit_instrument_status(
550                                data_sender,
551                                instrument_id,
552                                status_action,
553                                is_live,
554                                ts_init,
555                            );
556                        }
557                    }
558                }
559            }
560            OKXWsMessage::Orders(_)
561            | OKXWsMessage::AlgoOrders(_)
562            | OKXWsMessage::OrderResponse { .. }
563            | OKXWsMessage::Account(_)
564            | OKXWsMessage::Positions(_)
565            | OKXWsMessage::SendFailed { .. } => {
566                log::debug!("Ignoring execution message on data client");
567            }
568            OKXWsMessage::Error(e) => {
569                log::error!("OKX websocket error: {e:?}");
570            }
571            OKXWsMessage::Reconnected => {
572                log::info!("Websocket reconnected");
573            }
574            OKXWsMessage::Authenticated => {
575                log::debug!("Websocket authenticated");
576            }
577        }
578    }
579}
580
581fn dispatch_parsed_data(
582    msg: NautilusWsMessage,
583    data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
584    instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
585    instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
586) {
587    match msg {
588        NautilusWsMessage::Data(payloads) => {
589            for data in payloads {
590                if let Err(e) = data_sender.send(DataEvent::Data(data)) {
591                    log::error!("Failed to emit data event: {e}");
592                }
593            }
594        }
595        NautilusWsMessage::Deltas(deltas) => {
596            let data = Data::Deltas(OrderBookDeltas_API::new(deltas));
597            if let Err(e) = data_sender.send(DataEvent::Data(data)) {
598                log::error!("Failed to emit data event: {e}");
599            }
600        }
601        NautilusWsMessage::FundingRates(updates) => {
602            emit_funding_rates(data_sender, updates);
603        }
604        NautilusWsMessage::Instrument(instrument, status) => {
605            instruments_by_symbol.insert(instrument.symbol().inner(), (*instrument).clone());
606            upsert_instrument(instruments, *instrument);
607
608            if let Some(status) = status
609                && let Err(e) = data_sender.send(DataEvent::InstrumentStatus(status))
610            {
611                log::error!("Failed to emit instrument status event: {e}");
612            }
613        }
614        _ => {}
615    }
616}
617
618fn emit_funding_rates(
619    sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
620    updates: Vec<FundingRateUpdate>,
621) {
622    for update in updates {
623        if let Err(e) = sender.send(DataEvent::FundingRate(update)) {
624            log::error!("Failed to emit funding rate event: {e}");
625        }
626    }
627}
628
629fn emit_instrument_status(
630    sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
631    instrument_id: InstrumentId,
632    status_action: MarketStatusAction,
633    is_live: bool,
634    ts_init: UnixNanos,
635) {
636    let status = InstrumentStatus::new(
637        instrument_id,
638        status_action,
639        ts_init,
640        ts_init,
641        None,
642        None,
643        Some(is_live),
644        None,
645        None,
646    );
647
648    if let Err(e) = sender.send(DataEvent::InstrumentStatus(status)) {
649        log::error!("Failed to emit instrument status event: {e}");
650    }
651}
652
653fn upsert_instrument(
654    cache: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
655    instrument: InstrumentAny,
656) {
657    cache.insert(instrument.id(), instrument);
658}
659
660fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
661    contract_filter_with_config_types(config.contract_types.as_ref(), instrument)
662}
663
664fn contract_filter_with_config_types(
665    contract_types: Option<&Vec<OKXContractType>>,
666    instrument: &InstrumentAny,
667) -> bool {
668    match contract_types {
669        None => true,
670        Some(filter) if filter.is_empty() => true,
671        Some(filter) => {
672            let is_inverse = instrument.is_inverse();
673            (is_inverse && filter.contains(&OKXContractType::Inverse))
674                || (!is_inverse && filter.contains(&OKXContractType::Linear))
675        }
676    }
677}
678
679#[async_trait::async_trait(?Send)]
680impl DataClient for OKXDataClient {
681    fn client_id(&self) -> ClientId {
682        self.client_id
683    }
684
685    fn venue(&self) -> Option<Venue> {
686        Some(self.venue())
687    }
688
689    fn start(&mut self) -> anyhow::Result<()> {
690        log::info!(
691            "Started: client_id={}, vip_level={:?}, instrument_types={:?}, environment={}, proxy_url={:?}",
692            self.client_id,
693            self.vip_level(),
694            self.config.instrument_types,
695            self.config.environment,
696            self.config.proxy_url,
697        );
698        Ok(())
699    }
700
701    fn stop(&mut self) -> anyhow::Result<()> {
702        log::info!("Stopping {id}", id = self.client_id);
703        self.cancellation_token.cancel();
704        self.is_connected.store(false, Ordering::Relaxed);
705        Ok(())
706    }
707
708    fn reset(&mut self) -> anyhow::Result<()> {
709        log::debug!("Resetting {id}", id = self.client_id);
710        self.is_connected.store(false, Ordering::Relaxed);
711        self.cancellation_token = CancellationToken::new();
712        self.tasks.clear();
713        self.book_channels.store(AHashMap::new());
714        self.option_greeks_subs
715            .store(AHashMap::<InstrumentId, AHashSet<OKXGreeksType>>::new());
716        self.option_summary_family_subs
717            .lock()
718            .expect("option_summary_family_subs mutex poisoned")
719            .clear();
720        Ok(())
721    }
722
723    fn dispose(&mut self) -> anyhow::Result<()> {
724        log::debug!("Disposing {id}", id = self.client_id);
725        self.stop()
726    }
727
728    async fn connect(&mut self) -> anyhow::Result<()> {
729        if self.is_connected() {
730            return Ok(());
731        }
732
733        // Create fresh token so tasks from a previous connection cycle are not
734        // immediately cancelled (the old token may already be in cancelled state)
735        self.cancellation_token = CancellationToken::new();
736
737        let instrument_types = if self.config.instrument_types.is_empty() {
738            vec![OKXInstrumentType::Spot]
739        } else {
740            self.config.instrument_types.clone()
741        };
742
743        let mut all_instruments = Vec::new();
744
745        for inst_type in &instrument_types {
746            let Some(families) =
747                resolve_instrument_families(&self.config.instrument_families, *inst_type)
748            else {
749                continue;
750            };
751
752            if families.is_empty() {
753                let (mut fetched, _inst_id_codes) = self
754                    .http_client
755                    .request_instruments(*inst_type, None)
756                    .await
757                    .with_context(|| {
758                        format!("failed to request OKX instruments for {inst_type:?}")
759                    })?;
760
761                fetched.retain(|instrument| contract_filter_with_config(&self.config, instrument));
762                self.http_client.cache_instruments(&fetched);
763
764                self.instruments.rcu(|m| {
765                    for instrument in &fetched {
766                        m.insert(instrument.id(), instrument.clone());
767                    }
768                });
769
770                all_instruments.extend(fetched);
771            } else {
772                for family in &families {
773                    let (mut fetched, _inst_id_codes) = self
774                        .http_client
775                        .request_instruments(*inst_type, Some(family.clone()))
776                        .await
777                        .with_context(|| {
778                            format!(
779                                "failed to request OKX instruments for {inst_type:?} family {family}"
780                            )
781                        })?;
782
783                    fetched
784                        .retain(|instrument| contract_filter_with_config(&self.config, instrument));
785                    self.http_client.cache_instruments(&fetched);
786
787                    self.instruments.rcu(|m| {
788                        for instrument in &fetched {
789                            m.insert(instrument.id(), instrument.clone());
790                        }
791                    });
792
793                    all_instruments.extend(fetched);
794                }
795            }
796        }
797
798        for instrument in all_instruments {
799            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
800                log::warn!("Failed to send instrument: {e}");
801            }
802        }
803
804        if let Some(ref mut ws) = self.ws_public {
805            // Cache instruments to websocket before connecting so handler has them
806            let instruments: Vec<_> = self.instruments.load().values().cloned().collect();
807            ws.cache_instruments(&instruments);
808
809            ws.connect()
810                .await
811                .context("failed to connect OKX public websocket")?;
812            ws.wait_until_active(10.0)
813                .await
814                .context("public websocket did not become active")?;
815
816            let stream = ws.stream();
817            let sender = self.data_sender.clone();
818            let insts = self.instruments.clone();
819            let idx_map = self.index_ticker_map.clone();
820            let greeks_subs = self.option_greeks_subs.clone();
821            let cancel = self.cancellation_token.clone();
822            let clock = self.clock;
823
824            let handle = get_runtime().spawn(async move {
825                let mut instruments_by_symbol: AHashMap<Ustr, InstrumentAny> = insts
826                    .load()
827                    .values()
828                    .map(|i| (i.symbol().inner(), i.clone()))
829                    .collect();
830                let mut quote_cache = QuoteCache::new();
831                let mut funding_cache: AHashMap<Ustr, (Ustr, u64)> = AHashMap::new();
832                pin_mut!(stream);
833
834                loop {
835                    tokio::select! {
836                        Some(message) = stream.next() => {
837                            Self::handle_ws_message(
838                                message,
839                                &sender,
840                                &insts,
841                                &mut instruments_by_symbol,
842                                &mut quote_cache,
843                                &mut funding_cache,
844                                &idx_map,
845                                &greeks_subs,
846                                clock,
847                            );
848                        }
849                        () = cancel.cancelled() => {
850                            log::debug!("Public websocket stream task cancelled");
851                            break;
852                        }
853                    }
854                }
855            });
856            self.tasks.push(handle);
857
858            for inst_type in &instrument_types {
859                ws.subscribe_instruments(*inst_type)
860                    .await
861                    .with_context(|| {
862                        format!("failed to subscribe to instrument type {inst_type:?}")
863                    })?;
864            }
865        }
866
867        if let Some(ref mut ws) = self.ws_business {
868            // Cache instruments to websocket before connecting so handler has them
869            let instruments: Vec<_> = self.instruments.load().values().cloned().collect();
870            ws.cache_instruments(&instruments);
871
872            ws.connect()
873                .await
874                .context("failed to connect OKX business websocket")?;
875            ws.wait_until_active(10.0)
876                .await
877                .context("business websocket did not become active")?;
878
879            let stream = ws.stream();
880            let sender = self.data_sender.clone();
881            let insts = self.instruments.clone();
882            let idx_map = self.index_ticker_map.clone();
883            let greeks_subs = self.option_greeks_subs.clone();
884            let cancel = self.cancellation_token.clone();
885            let clock = self.clock;
886
887            let handle = get_runtime().spawn(async move {
888                let mut instruments_by_symbol: AHashMap<Ustr, InstrumentAny> = insts
889                    .load()
890                    .values()
891                    .map(|i| (i.symbol().inner(), i.clone()))
892                    .collect();
893                let mut quote_cache = QuoteCache::new();
894                let mut funding_cache: AHashMap<Ustr, (Ustr, u64)> = AHashMap::new();
895                pin_mut!(stream);
896
897                loop {
898                    tokio::select! {
899                        Some(message) = stream.next() => {
900                            Self::handle_ws_message(
901                                message,
902                                &sender,
903                                &insts,
904                                &mut instruments_by_symbol,
905                                &mut quote_cache,
906                                &mut funding_cache,
907                                &idx_map,
908                                &greeks_subs,
909                                clock,
910                            );
911                        }
912                        () = cancel.cancelled() => {
913                            log::debug!("Business websocket stream task cancelled");
914                            break;
915                        }
916                    }
917                }
918            });
919            self.tasks.push(handle);
920        }
921
922        self.is_connected.store(true, Ordering::Release);
923        log::info!("Connected: client_id={}", self.client_id);
924        Ok(())
925    }
926
927    async fn disconnect(&mut self) -> anyhow::Result<()> {
928        if self.is_disconnected() {
929            return Ok(());
930        }
931
932        self.cancellation_token.cancel();
933
934        if let Some(ref ws) = self.ws_public
935            && let Err(e) = ws.unsubscribe_all().await
936        {
937            log::warn!("Failed to unsubscribe all from public websocket: {e:?}");
938        }
939
940        if let Some(ref ws) = self.ws_business
941            && let Err(e) = ws.unsubscribe_all().await
942        {
943            log::warn!("Failed to unsubscribe all from business websocket: {e:?}");
944        }
945
946        // Allow time for unsubscribe confirmations
947        tokio::time::sleep(Duration::from_millis(500)).await;
948
949        if let Some(ref mut ws) = self.ws_public {
950            let _ = ws.close().await;
951        }
952
953        if let Some(ref mut ws) = self.ws_business {
954            let _ = ws.close().await;
955        }
956
957        let handles: Vec<_> = self.tasks.drain(..).collect();
958
959        for handle in handles {
960            if let Err(e) = handle.await {
961                log::error!("Error joining websocket task: {e}");
962            }
963        }
964
965        self.book_channels.store(AHashMap::new());
966        self.option_greeks_subs
967            .store(AHashMap::<InstrumentId, AHashSet<OKXGreeksType>>::new());
968        self.option_summary_family_subs
969            .lock()
970            .expect("option_summary_family_subs mutex poisoned")
971            .clear();
972        self.is_connected.store(false, Ordering::Release);
973        log::info!("Disconnected: client_id={}", self.client_id);
974        Ok(())
975    }
976
977    fn is_connected(&self) -> bool {
978        self.is_connected.load(Ordering::Relaxed)
979    }
980
981    fn is_disconnected(&self) -> bool {
982        !self.is_connected()
983    }
984
985    fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
986        for inst_type in &self.config.instrument_types {
987            let ws = self.public_ws()?.clone();
988            let inst_type = *inst_type;
989
990            self.spawn_ws(
991                async move {
992                    ws.subscribe_instruments(inst_type)
993                        .await
994                        .context("instruments subscription")?;
995                    Ok(())
996                },
997                "subscribe_instruments",
998            );
999        }
1000        Ok(())
1001    }
1002
1003    fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
1004        // OKX instruments channel doesn't support subscribing to individual instruments via instId
1005        // Instead, subscribe to the instrument type if not already subscribed
1006        let instrument_id = cmd.instrument_id;
1007        let ws = self.public_ws()?.clone();
1008
1009        self.spawn_ws(
1010            async move {
1011                ws.subscribe_instrument(instrument_id)
1012                    .await
1013                    .context("instrument type subscription")?;
1014                Ok(())
1015            },
1016            "subscribe_instrument",
1017        );
1018        Ok(())
1019    }
1020
1021    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
1022        if cmd.book_type != BookType::L2_MBP {
1023            anyhow::bail!("OKX only supports L2_MBP order book deltas");
1024        }
1025
1026        let raw_depth = cmd.depth.map_or(0, |d| d.get());
1027        let depth = resolve_book_depth(raw_depth);
1028        if depth != raw_depth {
1029            log::info!("Clamped book depth {raw_depth} to {depth} (OKX supports 50 or 400)");
1030        }
1031
1032        let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
1033        let channel = match depth {
1034            50 => {
1035                if vip < OKXVipLevel::Vip4 {
1036                    log::info!(
1037                        "VIP level {vip} insufficient for 50-depth channel, falling back to default"
1038                    );
1039                    OKXBookChannel::Book
1040                } else {
1041                    OKXBookChannel::Books50L2Tbt
1042                }
1043            }
1044            0 | 400 => {
1045                if vip >= OKXVipLevel::Vip5 {
1046                    OKXBookChannel::BookL2Tbt
1047                } else {
1048                    OKXBookChannel::Book
1049                }
1050            }
1051            _ => unreachable!(),
1052        };
1053
1054        let instrument_id = cmd.instrument_id;
1055        let ws = self.public_ws()?.clone();
1056        let book_channels = Arc::clone(&self.book_channels);
1057
1058        self.spawn_ws(
1059            async move {
1060                match channel {
1061                    OKXBookChannel::Books50L2Tbt => ws
1062                        .subscribe_book50_l2_tbt(instrument_id)
1063                        .await
1064                        .context("books50-l2-tbt subscription")?,
1065                    OKXBookChannel::BookL2Tbt => ws
1066                        .subscribe_book_l2_tbt(instrument_id)
1067                        .await
1068                        .context("books-l2-tbt subscription")?,
1069                    OKXBookChannel::Book => ws
1070                        .subscribe_books_channel(instrument_id)
1071                        .await
1072                        .context("books subscription")?,
1073                }
1074                book_channels.insert(instrument_id, channel);
1075                Ok(())
1076            },
1077            "order book delta subscription",
1078        );
1079
1080        Ok(())
1081    }
1082
1083    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
1084        let ws = self.public_ws()?.clone();
1085        let instrument_id = cmd.instrument_id;
1086
1087        self.spawn_ws(
1088            async move {
1089                ws.subscribe_quotes(instrument_id)
1090                    .await
1091                    .context("quotes subscription")
1092            },
1093            "quote subscription",
1094        );
1095        Ok(())
1096    }
1097
1098    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
1099        let ws = self.public_ws()?.clone();
1100        let instrument_id = cmd.instrument_id;
1101
1102        self.spawn_ws(
1103            async move {
1104                ws.subscribe_trades(instrument_id, false)
1105                    .await
1106                    .context("trades subscription")
1107            },
1108            "trade subscription",
1109        );
1110        Ok(())
1111    }
1112
1113    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
1114        let ws = self.public_ws()?.clone();
1115        let instrument_id = cmd.instrument_id;
1116
1117        self.spawn_ws(
1118            async move {
1119                ws.subscribe_mark_prices(instrument_id)
1120                    .await
1121                    .context("mark price subscription")
1122            },
1123            "mark price subscription",
1124        );
1125        Ok(())
1126    }
1127
1128    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
1129        let ws = self.public_ws()?.clone();
1130        let instrument_id = cmd.instrument_id;
1131        let symbol = instrument_id.symbol.inner();
1132
1133        let (base, quote) = parse_base_quote_from_symbol(symbol.as_str())?;
1134        let base_pair = Ustr::from(&format!("{base}-{quote}"));
1135        self.index_ticker_map.rcu(|m| {
1136            m.entry(base_pair).or_default().insert(symbol);
1137        });
1138
1139        self.spawn_ws(
1140            async move {
1141                ws.subscribe_index_prices(instrument_id)
1142                    .await
1143                    .context("index price subscription")
1144            },
1145            "index price subscription",
1146        );
1147        Ok(())
1148    }
1149
1150    fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
1151        let ws = self.business_ws()?.clone();
1152        let bar_type = cmd.bar_type;
1153
1154        self.spawn_ws(
1155            async move {
1156                ws.subscribe_bars(bar_type)
1157                    .await
1158                    .context("bars subscription")
1159            },
1160            "bar subscription",
1161        );
1162        Ok(())
1163    }
1164
1165    fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
1166        let ws = self.public_ws()?.clone();
1167        let instrument_id = cmd.instrument_id;
1168
1169        self.spawn_ws(
1170            async move {
1171                ws.subscribe_funding_rates(instrument_id)
1172                    .await
1173                    .context("funding rate subscription")
1174            },
1175            "funding rate subscription",
1176        );
1177        Ok(())
1178    }
1179
1180    fn subscribe_option_greeks(&mut self, cmd: SubscribeOptionGreeks) -> anyhow::Result<()> {
1181        let instrument_id = cmd.instrument_id;
1182        let conventions = parse_greeks_conventions_from_params(&cmd.params);
1183        self.option_greeks_subs.insert(instrument_id, conventions);
1184
1185        let family = extract_inst_family(instrument_id.symbol.inner().as_str())?;
1186        let is_first = {
1187            let mut family_subs = self
1188                .option_summary_family_subs
1189                .lock()
1190                .expect("option_summary_family_subs mutex poisoned");
1191            let count = family_subs.entry(family).or_default();
1192            *count += 1;
1193            *count == 1
1194        };
1195
1196        if is_first {
1197            let ws = self.public_ws()?.clone();
1198            let family_subs = self.option_summary_family_subs.clone();
1199            self.spawn_ws(
1200                async move {
1201                    let result = ws
1202                        .subscribe_option_summary(family)
1203                        .await
1204                        .context("opt-summary subscription");
1205
1206                    if result.is_err() {
1207                        // Roll back the refcount so a retry can re-arm the subscribe;
1208                        // otherwise the family wedges and Greeks stay dark.
1209                        let mut subs = family_subs
1210                            .lock()
1211                            .expect("option_summary_family_subs mutex poisoned");
1212
1213                        if let Some(count) = subs.get_mut(&family) {
1214                            *count = count.saturating_sub(1);
1215                            if *count == 0 {
1216                                subs.remove(&family);
1217                            }
1218                        }
1219                    }
1220                    result
1221                },
1222                "option greeks subscription",
1223            );
1224        }
1225        Ok(())
1226    }
1227
1228    fn subscribe_instrument_status(
1229        &mut self,
1230        cmd: SubscribeInstrumentStatus,
1231    ) -> anyhow::Result<()> {
1232        let ws = self.public_ws()?.clone();
1233        let instrument_id = cmd.instrument_id;
1234
1235        self.spawn_ws(
1236            async move {
1237                ws.subscribe_instrument(instrument_id)
1238                    .await
1239                    .context("instrument status subscription")
1240            },
1241            "instrument status subscription",
1242        );
1243        Ok(())
1244    }
1245
1246    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1247        let ws = self.public_ws()?.clone();
1248        let instrument_id = cmd.instrument_id;
1249        let channel = self.book_channels.get_cloned(&instrument_id);
1250        self.book_channels.remove(&instrument_id);
1251
1252        self.spawn_ws(
1253            async move {
1254                match channel {
1255                    Some(OKXBookChannel::Books50L2Tbt) => ws
1256                        .unsubscribe_book50_l2_tbt(instrument_id)
1257                        .await
1258                        .context("books50-l2-tbt unsubscribe")?,
1259                    Some(OKXBookChannel::BookL2Tbt) => ws
1260                        .unsubscribe_book_l2_tbt(instrument_id)
1261                        .await
1262                        .context("books-l2-tbt unsubscribe")?,
1263                    Some(OKXBookChannel::Book) => ws
1264                        .unsubscribe_book(instrument_id)
1265                        .await
1266                        .context("book unsubscribe")?,
1267                    None => {
1268                        log::warn!(
1269                            "Book channel not found for {instrument_id}; unsubscribing fallback channel"
1270                        );
1271                        ws.unsubscribe_book(instrument_id)
1272                            .await
1273                            .context("book fallback unsubscribe")?;
1274                    }
1275                }
1276                Ok(())
1277            },
1278            "order book unsubscribe",
1279        );
1280        Ok(())
1281    }
1282
1283    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1284        let ws = self.public_ws()?.clone();
1285        let instrument_id = cmd.instrument_id;
1286
1287        self.spawn_ws(
1288            async move {
1289                ws.unsubscribe_quotes(instrument_id)
1290                    .await
1291                    .context("quotes unsubscribe")
1292            },
1293            "quote unsubscribe",
1294        );
1295        Ok(())
1296    }
1297
1298    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1299        let ws = self.public_ws()?.clone();
1300        let instrument_id = cmd.instrument_id;
1301
1302        self.spawn_ws(
1303            async move {
1304                ws.unsubscribe_trades(instrument_id, false) // TODO: Aggregated trades?
1305                    .await
1306                    .context("trades unsubscribe")
1307            },
1308            "trade unsubscribe",
1309        );
1310        Ok(())
1311    }
1312
1313    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1314        let ws = self.public_ws()?.clone();
1315        let instrument_id = cmd.instrument_id;
1316
1317        self.spawn_ws(
1318            async move {
1319                ws.unsubscribe_mark_prices(instrument_id)
1320                    .await
1321                    .context("mark price unsubscribe")
1322            },
1323            "mark price unsubscribe",
1324        );
1325        Ok(())
1326    }
1327
1328    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1329        let ws = self.public_ws()?.clone();
1330        let instrument_id = cmd.instrument_id;
1331        let symbol = instrument_id.symbol.inner();
1332
1333        // The OKX index-tickers channel is keyed by base pair, so multiple
1334        // instruments on the same pair share one subscription. Per-base-pair
1335        // refcounting lives on the WS client, so we always forward the
1336        // unsubscribe and let the WS layer fire the venue request only when
1337        // it knows the last subscriber dropped. Local routing in
1338        // `index_ticker_map` is still maintained for downstream emit fan-out.
1339        if let Ok((base, quote)) = parse_base_quote_from_symbol(symbol.as_str()) {
1340            let base_pair = Ustr::from(&format!("{base}-{quote}"));
1341            self.index_ticker_map.rcu(|m| {
1342                if let Some(set) = m.get_mut(&base_pair) {
1343                    set.remove(&symbol);
1344                    if set.is_empty() {
1345                        m.remove(&base_pair);
1346                    }
1347                }
1348            });
1349        }
1350
1351        self.spawn_ws(
1352            async move {
1353                ws.unsubscribe_index_prices(instrument_id)
1354                    .await
1355                    .context("index price unsubscribe")
1356            },
1357            "index price unsubscribe",
1358        );
1359        Ok(())
1360    }
1361
1362    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1363        let ws = self.business_ws()?.clone();
1364        let bar_type = cmd.bar_type;
1365
1366        self.spawn_ws(
1367            async move {
1368                ws.unsubscribe_bars(bar_type)
1369                    .await
1370                    .context("bars unsubscribe")
1371            },
1372            "bar unsubscribe",
1373        );
1374        Ok(())
1375    }
1376
1377    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1378        let ws = self.public_ws()?.clone();
1379        let instrument_id = cmd.instrument_id;
1380
1381        self.spawn_ws(
1382            async move {
1383                ws.unsubscribe_funding_rates(instrument_id)
1384                    .await
1385                    .context("funding rate unsubscribe")
1386            },
1387            "funding rate unsubscribe",
1388        );
1389        Ok(())
1390    }
1391
1392    fn unsubscribe_option_greeks(&mut self, cmd: &UnsubscribeOptionGreeks) -> anyhow::Result<()> {
1393        let instrument_id = cmd.instrument_id;
1394        self.option_greeks_subs.remove(&instrument_id);
1395
1396        let family = extract_inst_family(instrument_id.symbol.inner().as_str())?;
1397        let should_unsubscribe = {
1398            let mut family_subs = self
1399                .option_summary_family_subs
1400                .lock()
1401                .expect("option_summary_family_subs mutex poisoned");
1402
1403            if let Some(count) = family_subs.get_mut(&family) {
1404                *count = count.saturating_sub(1);
1405                if *count == 0 {
1406                    family_subs.remove(&family);
1407                    true
1408                } else {
1409                    false
1410                }
1411            } else {
1412                false
1413            }
1414        };
1415
1416        if should_unsubscribe {
1417            let ws = self.public_ws()?.clone();
1418            self.spawn_ws(
1419                async move {
1420                    ws.unsubscribe_option_summary(family)
1421                        .await
1422                        .context("opt-summary unsubscription")
1423                },
1424                "option greeks unsubscription",
1425            );
1426        }
1427        Ok(())
1428    }
1429
1430    fn unsubscribe_instrument_status(
1431        &mut self,
1432        cmd: &UnsubscribeInstrumentStatus,
1433    ) -> anyhow::Result<()> {
1434        let ws = self.public_ws()?.clone();
1435        let instrument_id = cmd.instrument_id;
1436
1437        self.spawn_ws(
1438            async move {
1439                ws.unsubscribe_instrument(instrument_id)
1440                    .await
1441                    .context("instrument status unsubscription")
1442            },
1443            "instrument status unsubscription",
1444        );
1445        Ok(())
1446    }
1447
1448    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1449        let http = self.http_client.clone();
1450        let sender = self.data_sender.clone();
1451        let instruments_cache = self.instruments.clone();
1452        let request_id = request.request_id;
1453        let client_id = request.client_id.unwrap_or(self.client_id);
1454        let venue = self.venue();
1455        let start = request.start;
1456        let end = request.end;
1457        let params = request.params;
1458        let clock = self.clock;
1459        let start_nanos = datetime_to_unix_nanos(start);
1460        let end_nanos = datetime_to_unix_nanos(end);
1461        let instrument_types = if self.config.instrument_types.is_empty() {
1462            vec![OKXInstrumentType::Spot]
1463        } else {
1464            self.config.instrument_types.clone()
1465        };
1466        let contract_types = self.config.contract_types.clone();
1467        let instrument_families = self.config.instrument_families.clone();
1468
1469        get_runtime().spawn(async move {
1470            let mut all_instruments = Vec::new();
1471
1472            for inst_type in instrument_types {
1473                let Some(families) =
1474                    resolve_instrument_families(&instrument_families, inst_type)
1475                else {
1476                    continue;
1477                };
1478
1479                if families.is_empty() {
1480                    match http.request_instruments(inst_type, None).await {
1481                        Ok((instruments, _inst_id_codes)) => {
1482                            for instrument in instruments {
1483                                if !contract_filter_with_config_types(
1484                                    contract_types.as_ref(),
1485                                    &instrument,
1486                                ) {
1487                                    continue;
1488                                }
1489
1490                                upsert_instrument(&instruments_cache, instrument.clone());
1491                                all_instruments.push(instrument);
1492                            }
1493                        }
1494                        Err(e) => {
1495                            log::error!("Failed to fetch instruments for {inst_type:?}: {e:?}");
1496                        }
1497                    }
1498                } else {
1499                    for family in families {
1500                        match http
1501                            .request_instruments(inst_type, Some(family.clone()))
1502                            .await
1503                        {
1504                            Ok((instruments, _inst_id_codes)) => {
1505                                for instrument in instruments {
1506                                    if !contract_filter_with_config_types(
1507                                        contract_types.as_ref(),
1508                                        &instrument,
1509                                    ) {
1510                                        continue;
1511                                    }
1512
1513                                    upsert_instrument(&instruments_cache, instrument.clone());
1514                                    all_instruments.push(instrument);
1515                                }
1516                            }
1517                            Err(e) => {
1518                                log::error!(
1519                                    "Failed to fetch instruments for {inst_type:?} family {family}: {e:?}"
1520                                );
1521                            }
1522                        }
1523                    }
1524                }
1525            }
1526
1527            let response = DataResponse::Instruments(InstrumentsResponse::new(
1528                request_id,
1529                client_id,
1530                venue,
1531                all_instruments,
1532                start_nanos,
1533                end_nanos,
1534                clock.get_time_ns(),
1535                params,
1536            ));
1537
1538            if let Err(e) = sender.send(DataEvent::Response(response)) {
1539                log::error!("Failed to send instruments response: {e}");
1540            }
1541        });
1542
1543        Ok(())
1544    }
1545
1546    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1547        let http = self.http_client.clone();
1548        let sender = self.data_sender.clone();
1549        let instruments = self.instruments.clone();
1550        let instrument_id = request.instrument_id;
1551        let request_id = request.request_id;
1552        let client_id = request.client_id.unwrap_or(self.client_id);
1553        let start = request.start;
1554        let end = request.end;
1555        let params = request.params;
1556        let clock = self.clock;
1557        let start_nanos = datetime_to_unix_nanos(start);
1558        let end_nanos = datetime_to_unix_nanos(end);
1559        let instrument_types = if self.config.instrument_types.is_empty() {
1560            vec![OKXInstrumentType::Spot]
1561        } else {
1562            self.config.instrument_types.clone()
1563        };
1564        let contract_types = self.config.contract_types.clone();
1565
1566        get_runtime().spawn(async move {
1567            match http
1568                .request_instrument(instrument_id)
1569                .await
1570                .context("fetch instrument from API")
1571            {
1572                Ok(instrument) => {
1573                    let inst_id = instrument.id();
1574                    let symbol = inst_id.symbol.as_str();
1575                    let inst_type = okx_instrument_type_from_symbol(symbol);
1576                    if !instrument_types.contains(&inst_type) {
1577                        log::error!(
1578                            "Instrument {instrument_id} type {inst_type:?} not in configured types {instrument_types:?}"
1579                        );
1580                        return;
1581                    }
1582
1583                    if !contract_filter_with_config_types(contract_types.as_ref(), &instrument) {
1584                        log::error!(
1585                            "Instrument {instrument_id} filtered out by contract_types config"
1586                        );
1587                        return;
1588                    }
1589
1590                    upsert_instrument(&instruments, instrument.clone());
1591
1592                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1593                        request_id,
1594                        client_id,
1595                        instrument.id(),
1596                        instrument,
1597                        start_nanos,
1598                        end_nanos,
1599                        clock.get_time_ns(),
1600                        params,
1601                    )));
1602
1603                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1604                        log::error!("Failed to send instrument response: {e}");
1605                    }
1606                }
1607                Err(e) => log::error!("Instrument request failed: {e:?}"),
1608            }
1609        });
1610
1611        Ok(())
1612    }
1613
1614    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1615        let http = self.http_client.clone();
1616        let sender = self.data_sender.clone();
1617        let instrument_id = request.instrument_id;
1618        let depth = request.depth.map(|n| n.get() as u32);
1619        let request_id = request.request_id;
1620        let client_id = request.client_id.unwrap_or(self.client_id);
1621        let params = request.params;
1622        let clock = self.clock;
1623
1624        get_runtime().spawn(async move {
1625            match http
1626                .request_book_snapshot(instrument_id, depth)
1627                .await
1628                .context("failed to request book snapshot from OKX")
1629            {
1630                Ok(book) => {
1631                    let response = DataResponse::Book(BookResponse::new(
1632                        request_id,
1633                        client_id,
1634                        instrument_id,
1635                        book,
1636                        None,
1637                        None,
1638                        clock.get_time_ns(),
1639                        params,
1640                    ));
1641
1642                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1643                        log::error!("Failed to send book snapshot response: {e}");
1644                    }
1645                }
1646                Err(e) => log::error!("Book snapshot request failed: {e:?}"),
1647            }
1648        });
1649
1650        Ok(())
1651    }
1652
1653    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1654        let http = self.http_client.clone();
1655        let sender = self.data_sender.clone();
1656        let instrument_id = request.instrument_id;
1657        let start = request.start;
1658        let end = request.end;
1659        let limit = request.limit.map(|n| n.get() as u32);
1660        let request_id = request.request_id;
1661        let client_id = request.client_id.unwrap_or(self.client_id);
1662        let params = request.params;
1663        let clock = self.clock;
1664        let start_nanos = datetime_to_unix_nanos(start);
1665        let end_nanos = datetime_to_unix_nanos(end);
1666
1667        get_runtime().spawn(async move {
1668            match http
1669                .request_trades(instrument_id, start, end, limit)
1670                .await
1671                .context("failed to request trades from OKX")
1672            {
1673                Ok(trades) => {
1674                    let response = DataResponse::Trades(TradesResponse::new(
1675                        request_id,
1676                        client_id,
1677                        instrument_id,
1678                        trades,
1679                        start_nanos,
1680                        end_nanos,
1681                        clock.get_time_ns(),
1682                        params,
1683                    ));
1684
1685                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1686                        log::error!("Failed to send trades response: {e}");
1687                    }
1688                }
1689                Err(e) => log::error!("Trade request failed: {e:?}"),
1690            }
1691        });
1692
1693        Ok(())
1694    }
1695
1696    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1697        let http = self.http_client.clone();
1698        let sender = self.data_sender.clone();
1699        let bar_type = request.bar_type;
1700        let start = request.start;
1701        let end = request.end;
1702        let limit = request.limit.map(|n| n.get() as u32);
1703        let request_id = request.request_id;
1704        let client_id = request.client_id.unwrap_or(self.client_id);
1705        let params = request.params;
1706        let clock = self.clock;
1707        let start_nanos = datetime_to_unix_nanos(start);
1708        let end_nanos = datetime_to_unix_nanos(end);
1709
1710        get_runtime().spawn(async move {
1711            match http
1712                .request_bars(bar_type, start, end, limit)
1713                .await
1714                .context("failed to request bars from OKX")
1715            {
1716                Ok(bars) => {
1717                    let response = DataResponse::Bars(BarsResponse::new(
1718                        request_id,
1719                        client_id,
1720                        bar_type,
1721                        bars,
1722                        start_nanos,
1723                        end_nanos,
1724                        clock.get_time_ns(),
1725                        params,
1726                    ));
1727
1728                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1729                        log::error!("Failed to send bars response: {e}");
1730                    }
1731                }
1732                Err(e) => log::error!("Bar request failed: {e:?}"),
1733            }
1734        });
1735
1736        Ok(())
1737    }
1738
1739    fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
1740        let http = self.http_client.clone();
1741        let sender = self.data_sender.clone();
1742        let instrument_id = request.instrument_id;
1743        let start = request.start;
1744        let end = request.end;
1745        let limit = request.limit.map(|n| n.get() as u32);
1746        let request_id = request.request_id;
1747        let client_id = request.client_id.unwrap_or(self.client_id);
1748        let params = request.params;
1749        let clock = self.clock;
1750        let start_nanos = datetime_to_unix_nanos(start);
1751        let end_nanos = datetime_to_unix_nanos(end);
1752
1753        get_runtime().spawn(async move {
1754            match http
1755                .request_funding_rates(instrument_id, start, end, limit)
1756                .await
1757                .context("failed to request funding rates from OKX")
1758            {
1759                Ok(funding_rates) => {
1760                    let response = DataResponse::FundingRates(FundingRatesResponse::new(
1761                        request_id,
1762                        client_id,
1763                        instrument_id,
1764                        funding_rates,
1765                        start_nanos,
1766                        end_nanos,
1767                        clock.get_time_ns(),
1768                        params,
1769                    ));
1770
1771                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1772                        log::error!("Failed to send funding rates response: {e}");
1773                    }
1774                }
1775                Err(e) => log::error!("Funding rates request failed: {e:?}"),
1776            }
1777        });
1778
1779        Ok(())
1780    }
1781
1782    fn request_forward_prices(&self, request: RequestForwardPrices) -> anyhow::Result<()> {
1783        let http = self.http_client.clone();
1784        let sender = self.data_sender.clone();
1785        let underlying = request.underlying.to_string();
1786        let instrument_id = request.instrument_id;
1787        let request_id = request.request_id;
1788        let client_id = request.client_id.unwrap_or(self.client_id);
1789        let params = request.params;
1790        let clock = self.clock;
1791        let venue = *OKX_VENUE;
1792
1793        get_runtime().spawn(async move {
1794            match http
1795                .request_forward_prices(&underlying, instrument_id)
1796                .await
1797                .context("failed to request forward prices from OKX")
1798            {
1799                Ok(forward_prices) => {
1800                    let response = DataResponse::ForwardPrices(ForwardPricesResponse::new(
1801                        request_id,
1802                        client_id,
1803                        venue,
1804                        forward_prices,
1805                        clock.get_time_ns(),
1806                        params,
1807                    ));
1808
1809                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1810                        log::error!("Failed to send forward prices response: {e}");
1811                    }
1812                }
1813                Err(e) => {
1814                    log::error!("Forward prices request failed for {underlying}: {e:?}");
1815                    let response = DataResponse::ForwardPrices(ForwardPricesResponse::new(
1816                        request_id,
1817                        client_id,
1818                        venue,
1819                        Vec::new(),
1820                        clock.get_time_ns(),
1821                        params,
1822                    ));
1823
1824                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1825                        log::error!("Failed to send forward prices response: {e}");
1826                    }
1827                }
1828            }
1829        });
1830
1831        Ok(())
1832    }
1833}
1834
1835#[cfg(test)]
1836mod tests {
1837    use rstest::rstest;
1838    use serde_json::json;
1839
1840    use super::*;
1841
1842    fn both() -> AHashSet<OKXGreeksType> {
1843        [OKXGreeksType::Bs, OKXGreeksType::Pa].into_iter().collect()
1844    }
1845
1846    fn only(greeks_type: OKXGreeksType) -> AHashSet<OKXGreeksType> {
1847        [greeks_type].into_iter().collect()
1848    }
1849
1850    #[rstest]
1851    fn parse_conventions_returns_both_when_params_missing() {
1852        let result = parse_greeks_conventions_from_params(&None);
1853        assert_eq!(result, both());
1854    }
1855
1856    #[rstest]
1857    fn parse_conventions_returns_both_when_key_absent() {
1858        let mut params = Params::new();
1859        params.insert("other_key".to_string(), json!("value"));
1860        let result = parse_greeks_conventions_from_params(&Some(params));
1861        assert_eq!(result, both());
1862    }
1863
1864    #[rstest]
1865    #[case("BLACK_SCHOLES", OKXGreeksType::Bs)]
1866    #[case("PRICE_ADJUSTED", OKXGreeksType::Pa)]
1867    #[case("black_scholes", OKXGreeksType::Bs)]
1868    #[case("price_adjusted", OKXGreeksType::Pa)]
1869    fn parse_conventions_accepts_single_string(#[case] raw: &str, #[case] expected: OKXGreeksType) {
1870        let mut params = Params::new();
1871        params.insert("greeks_convention".to_string(), json!(raw));
1872        let result = parse_greeks_conventions_from_params(&Some(params));
1873        assert_eq!(result, only(expected));
1874    }
1875
1876    #[rstest]
1877    fn parse_conventions_accepts_list_of_strings() {
1878        let mut params = Params::new();
1879        params.insert(
1880            "greeks_convention".to_string(),
1881            json!(["BLACK_SCHOLES", "PRICE_ADJUSTED"]),
1882        );
1883        let result = parse_greeks_conventions_from_params(&Some(params));
1884        assert_eq!(result, both());
1885    }
1886
1887    #[rstest]
1888    fn parse_conventions_accepts_single_entry_list() {
1889        let mut params = Params::new();
1890        params.insert("greeks_convention".to_string(), json!(["PRICE_ADJUSTED"]));
1891        let result = parse_greeks_conventions_from_params(&Some(params));
1892        assert_eq!(result, only(OKXGreeksType::Pa));
1893    }
1894
1895    #[rstest]
1896    fn parse_conventions_deduplicates_list_entries() {
1897        let mut params = Params::new();
1898        params.insert(
1899            "greeks_convention".to_string(),
1900            json!(["BLACK_SCHOLES", "black_scholes"]),
1901        );
1902        let result = parse_greeks_conventions_from_params(&Some(params));
1903        assert_eq!(result, only(OKXGreeksType::Bs));
1904    }
1905
1906    #[rstest]
1907    fn parse_conventions_skips_unknown_list_entries() {
1908        let mut params = Params::new();
1909        params.insert(
1910            "greeks_convention".to_string(),
1911            json!(["BOGUS", "PRICE_ADJUSTED"]),
1912        );
1913        let result = parse_greeks_conventions_from_params(&Some(params));
1914        assert_eq!(result, only(OKXGreeksType::Pa));
1915    }
1916
1917    #[rstest]
1918    fn parse_conventions_falls_back_to_both_on_all_unknown() {
1919        let mut params = Params::new();
1920        params.insert("greeks_convention".to_string(), json!(["BOGUS"]));
1921        let result = parse_greeks_conventions_from_params(&Some(params));
1922        assert_eq!(result, both());
1923    }
1924
1925    #[rstest]
1926    #[case(json!(1))]
1927    #[case(json!(null))]
1928    #[case(json!(true))]
1929    #[case(json!({"nested": "object"}))]
1930    fn parse_conventions_falls_back_on_non_string_value(#[case] value: serde_json::Value) {
1931        let mut params = Params::new();
1932        params.insert("greeks_convention".to_string(), value);
1933        let result = parse_greeks_conventions_from_params(&Some(params));
1934        assert_eq!(result, both());
1935    }
1936
1937    #[rstest]
1938    fn parse_conventions_falls_back_on_unknown_single_string() {
1939        let mut params = Params::new();
1940        params.insert("greeks_convention".to_string(), json!("BOGUS"));
1941        let result = parse_greeks_conventions_from_params(&Some(params));
1942        assert_eq!(result, both());
1943    }
1944}