1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::{AHashMap, AHashSet};
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27 cache::quote::QuoteCache,
28 clients::DataClient,
29 live::{runner::get_data_event_sender, runtime::get_runtime},
30 messages::{
31 DataEvent,
32 data::{
33 BarsResponse, BookResponse, DataResponse, ForwardPricesResponse, FundingRatesResponse,
34 InstrumentResponse, InstrumentsResponse, RequestBars, RequestBookSnapshot,
35 RequestForwardPrices, RequestFundingRates, RequestInstrument, RequestInstruments,
36 RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeFundingRates,
37 SubscribeIndexPrices, SubscribeInstrument, SubscribeInstrumentStatus,
38 SubscribeInstruments, SubscribeMarkPrices, SubscribeOptionGreeks, SubscribeQuotes,
39 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
40 UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrumentStatus,
41 UnsubscribeMarkPrices, UnsubscribeOptionGreeks, UnsubscribeQuotes, UnsubscribeTrades,
42 },
43 },
44};
45use nautilus_core::{
46 AtomicMap, Params, UnixNanos,
47 datetime::datetime_to_unix_nanos,
48 time::{AtomicTime, get_atomic_clock_realtime},
49};
50use nautilus_model::{
51 data::{Data, FundingRateUpdate, InstrumentStatus, OrderBookDeltas_API},
52 enums::{BookType, GreeksConvention, MarketStatusAction},
53 identifiers::{ClientId, InstrumentId, Venue},
54 instruments::{Instrument, InstrumentAny},
55};
56use tokio::{task::JoinHandle, time::Duration};
57use tokio_util::sync::CancellationToken;
58use ustr::Ustr;
59
60use crate::{
61 common::{
62 consts::{
63 OKX_VENUE, OKX_WS_HEARTBEAT_SECS, resolve_book_depth, resolve_instrument_families,
64 },
65 enums::{
66 OKXBookChannel, OKXContractType, OKXGreeksType, OKXInstrumentStatus, OKXInstrumentType,
67 OKXVipLevel,
68 },
69 parse::{
70 extract_inst_family, okx_instrument_type_from_symbol, okx_status_to_market_action,
71 parse_base_quote_from_symbol, parse_instrument_any, parse_instrument_id,
72 parse_millisecond_timestamp, parse_price, parse_quantity,
73 },
74 },
75 config::OKXDataClientConfig,
76 http::client::OKXHttpClient,
77 websocket::{
78 client::OKXWebSocketClient,
79 enums::OKXWsChannel,
80 messages::{NautilusWsMessage, OKXBookMsg, OKXOptionSummaryMsg, OKXWsMessage},
81 parse::{
82 extract_fees_from_cached_instrument, parse_book_msg_vec, parse_index_price_msg_vec,
83 parse_option_summary_greeks, parse_ws_message_data,
84 },
85 },
86};
87
88pub(crate) fn parse_greeks_conventions_from_params(
96 params: &Option<Params>,
97) -> AHashSet<OKXGreeksType> {
98 let default_set: AHashSet<OKXGreeksType> =
99 [OKXGreeksType::Bs, OKXGreeksType::Pa].into_iter().collect();
100
101 let Some(value) = params.as_ref().and_then(|p| p.get("greeks_convention")) else {
102 return default_set;
103 };
104
105 let mut out = AHashSet::new();
106 match value {
107 serde_json::Value::String(s) => push_convention_str(&mut out, s),
108 serde_json::Value::Array(items) => {
109 for item in items {
110 if let Some(s) = item.as_str() {
111 push_convention_str(&mut out, s);
112 } else {
113 log::warn!("Ignoring non-string greeks_convention entry {item:?}");
114 }
115 }
116 }
117 other => {
118 log::warn!(
119 "Unsupported greeks_convention value {other:?}, defaulting to both conventions"
120 );
121 }
122 }
123
124 if out.is_empty() { default_set } else { out }
125}
126
127fn push_convention_str(out: &mut AHashSet<OKXGreeksType>, raw: &str) {
128 match raw.parse::<GreeksConvention>() {
129 Ok(convention) => {
130 out.insert(convention.into());
131 }
132 Err(_) => log::warn!("Unrecognized greeks_convention {raw:?}, skipping"),
133 }
134}
135
136#[derive(Debug)]
137pub struct OKXDataClient {
138 client_id: ClientId,
139 config: OKXDataClientConfig,
140 http_client: OKXHttpClient,
141 ws_public: Option<OKXWebSocketClient>,
142 ws_business: Option<OKXWebSocketClient>,
143 is_connected: AtomicBool,
144 cancellation_token: CancellationToken,
145 tasks: Vec<JoinHandle<()>>,
146 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
147 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
148 book_channels: Arc<AtomicMap<InstrumentId, OKXBookChannel>>,
149 index_ticker_map: Arc<AtomicMap<Ustr, AHashSet<Ustr>>>,
150 option_greeks_subs: Arc<AtomicMap<InstrumentId, AHashSet<OKXGreeksType>>>,
151 option_summary_family_subs: Arc<std::sync::Mutex<AHashMap<Ustr, usize>>>,
155 clock: &'static AtomicTime,
156}
157
158impl OKXDataClient {
159 pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
165 let clock = get_atomic_clock_realtime();
166 let data_sender = get_data_event_sender();
167
168 let http_client = if config.has_api_credentials() {
169 OKXHttpClient::with_credentials(
170 config.api_key.clone(),
171 config.api_secret.clone(),
172 config.api_passphrase.clone(),
173 config.base_url_http.clone(),
174 config.http_timeout_secs,
175 config.max_retries,
176 config.retry_delay_initial_ms,
177 config.retry_delay_max_ms,
178 config.environment,
179 config.proxy_url.clone(),
180 )?
181 } else {
182 OKXHttpClient::new(
183 config.base_url_http.clone(),
184 config.http_timeout_secs,
185 config.max_retries,
186 config.retry_delay_initial_ms,
187 config.retry_delay_max_ms,
188 config.environment,
189 config.proxy_url.clone(),
190 )?
191 };
192
193 let ws_public = OKXWebSocketClient::new(
194 Some(config.ws_public_url()),
195 None,
196 None,
197 None,
198 None,
199 Some(OKX_WS_HEARTBEAT_SECS),
200 None,
201 config.transport_backend,
202 config.proxy_url.clone(),
203 )
204 .context("failed to construct OKX public websocket client")?;
205
206 let ws_business = if config.requires_business_ws() {
207 let ws = OKXWebSocketClient::new(
208 Some(config.ws_business_url()),
209 None, None,
211 None,
212 None,
213 Some(OKX_WS_HEARTBEAT_SECS),
214 None,
215 config.transport_backend,
216 config.proxy_url.clone(),
217 )
218 .context("failed to construct OKX business websocket client")?;
219 Some(ws)
220 } else {
221 None
222 };
223
224 if let Some(vip_level) = config.vip_level {
225 ws_public.set_vip_level(vip_level);
226
227 if let Some(ref ws) = ws_business {
228 ws.set_vip_level(vip_level);
229 }
230 }
231
232 Ok(Self {
233 client_id,
234 config,
235 http_client,
236 ws_public: Some(ws_public),
237 ws_business,
238 is_connected: AtomicBool::new(false),
239 cancellation_token: CancellationToken::new(),
240 tasks: Vec::new(),
241 data_sender,
242 instruments: Arc::new(AtomicMap::new()),
243 book_channels: Arc::new(AtomicMap::new()),
244 index_ticker_map: Arc::new(AtomicMap::new()),
245 option_greeks_subs: Arc::new(AtomicMap::new()),
246 option_summary_family_subs: Arc::new(std::sync::Mutex::new(AHashMap::new())),
247 clock,
248 })
249 }
250
251 fn venue(&self) -> Venue {
252 *OKX_VENUE
253 }
254
255 fn vip_level(&self) -> Option<OKXVipLevel> {
256 self.ws_public.as_ref().map(|ws| ws.vip_level())
257 }
258
259 fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
260 self.ws_public
261 .as_ref()
262 .context("public websocket client not initialized")
263 }
264
265 fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
266 self.ws_business
267 .as_ref()
268 .context("business websocket client not available (credentials required)")
269 }
270
271 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
272 if let Err(e) = sender.send(DataEvent::Data(data)) {
273 log::error!("Failed to emit data event: {e}");
274 }
275 }
276
277 fn spawn_ws<F>(&self, fut: F, context: &'static str)
278 where
279 F: Future<Output = anyhow::Result<()>> + Send + 'static,
280 {
281 get_runtime().spawn(async move {
282 if let Err(e) = fut.await {
283 log::error!("{context}: {e:?}");
284 }
285 });
286 }
287
288 #[expect(clippy::too_many_arguments)]
289 fn handle_ws_message(
290 message: OKXWsMessage,
291 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
292 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
293 instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
294 quote_cache: &mut QuoteCache,
295 funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
296 index_ticker_map: &Arc<AtomicMap<Ustr, AHashSet<Ustr>>>,
297 option_greeks_subs: &Arc<AtomicMap<InstrumentId, AHashSet<OKXGreeksType>>>,
298 clock: &AtomicTime,
299 ) {
300 match message {
301 OKXWsMessage::BookData { arg, action, data } => {
302 let Some(inst_id) = arg.inst_id else {
303 log::warn!("Book data without inst_id");
304 return;
305 };
306 let Some(instrument) = instruments_by_symbol.get(&inst_id) else {
307 log::warn!("No cached instrument for book data: {inst_id}");
308 return;
309 };
310 let ts_init = clock.get_time_ns();
311
312 match parse_book_msg_vec(
313 data,
314 &instrument.id(),
315 instrument.price_precision(),
316 instrument.size_precision(),
317 action,
318 ts_init,
319 ) {
320 Ok(data_vec) => {
321 for data in data_vec {
322 Self::send_data(data_sender, data);
323 }
324 }
325 Err(e) => log::error!("Failed to parse book data: {e}"),
326 }
327 }
328 OKXWsMessage::ChannelData {
329 channel,
330 inst_id,
331 data,
332 } => {
333 if matches!(channel, OKXWsChannel::OptionSummary) {
337 let ts_init = clock.get_time_ns();
338
339 match serde_json::from_value::<Vec<OKXOptionSummaryMsg>>(data) {
340 Ok(msgs) => {
341 let subs = option_greeks_subs.load();
342
343 for msg in &msgs {
344 let Some(instrument) = instruments_by_symbol.get(&msg.inst_id)
345 else {
346 continue;
347 };
348 let instrument_id = instrument.id();
349 let Some(conventions) = subs.get(&instrument_id) else {
350 continue;
351 };
352
353 for greeks_type in conventions {
354 match parse_option_summary_greeks(
355 msg,
356 &instrument_id,
357 *greeks_type,
358 ts_init,
359 ) {
360 Ok(greeks) => {
361 if let Err(e) =
362 data_sender.send(DataEvent::OptionGreeks(greeks))
363 {
364 log::error!(
365 "Failed to emit option greeks event: {e}"
366 );
367 }
368 }
369 Err(e) => {
370 log::error!(
371 "Failed to parse option summary for {} ({greeks_type:?}): {e}",
372 msg.inst_id
373 );
374 }
375 }
376 }
377 }
378 }
379 Err(e) => {
380 log::error!("Failed to deserialize option summary data: {e}");
381 }
382 }
383 return;
384 }
385
386 let Some(inst_id) = inst_id else {
387 log::debug!("Channel data without inst_id: {channel:?}");
388 return;
389 };
390
391 if matches!(channel, OKXWsChannel::IndexTickers) {
395 let ts_init = clock.get_time_ns();
396 let map_guard = index_ticker_map.load();
397 let Some(subscribed_symbols) = map_guard.get(&inst_id) else {
398 log::debug!("No subscribed instruments for index ticker: {inst_id}");
399 return;
400 };
401 let symbols: Vec<Ustr> = subscribed_symbols.iter().copied().collect();
402 drop(map_guard);
403
404 for sym in &symbols {
405 let Some(instrument) = instruments_by_symbol.get(sym) else {
406 log::warn!("No cached instrument for index ticker symbol: {sym}");
407 continue;
408 };
409
410 match parse_index_price_msg_vec(
411 data.clone(),
412 &instrument.id(),
413 instrument.price_precision(),
414 ts_init,
415 ) {
416 Ok(data_vec) => {
417 for d in data_vec {
418 Self::send_data(data_sender, d);
419 }
420 }
421 Err(e) => log::error!("Failed to parse index price data: {e}"),
422 }
423 }
424 return;
425 }
426
427 let Some(instrument) = instruments_by_symbol.get(&inst_id) else {
428 log::warn!("No cached instrument for {channel:?}: {inst_id}");
429 return;
430 };
431 let instrument_id = instrument.id();
432 let price_precision = instrument.price_precision();
433 let size_precision = instrument.size_precision();
434 let ts_init = clock.get_time_ns();
435
436 if matches!(channel, OKXWsChannel::BboTbt) {
437 let msgs: Vec<OKXBookMsg> = match serde_json::from_value(data) {
438 Ok(m) => m,
439 Err(e) => {
440 log::error!("Failed to deserialize BboTbt data: {e}");
441 return;
442 }
443 };
444
445 for msg in &msgs {
446 let bid = msg.bids.first();
447 let ask = msg.asks.first();
448 let bid_price =
449 bid.and_then(|e| parse_price(&e.price, price_precision).ok());
450 let bid_size =
451 bid.and_then(|e| parse_quantity(&e.size, size_precision).ok());
452 let ask_price =
453 ask.and_then(|e| parse_price(&e.price, price_precision).ok());
454 let ask_size =
455 ask.and_then(|e| parse_quantity(&e.size, size_precision).ok());
456 let ts_event = parse_millisecond_timestamp(msg.ts);
457
458 match quote_cache.process(
459 instrument_id,
460 bid_price,
461 ask_price,
462 bid_size,
463 ask_size,
464 ts_event,
465 ts_init,
466 ) {
467 Ok(quote) => Self::send_data(data_sender, Data::Quote(quote)),
468 Err(e) => {
469 log::debug!("Skipping partial BboTbt for {instrument_id}: {e}");
470 }
471 }
472 }
473
474 return;
475 }
476
477 match parse_ws_message_data(
478 &channel,
479 data,
480 &instrument_id,
481 price_precision,
482 size_precision,
483 ts_init,
484 funding_cache,
485 instruments_by_symbol,
486 ) {
487 Ok(Some(ws_msg)) => {
488 dispatch_parsed_data(
489 ws_msg,
490 data_sender,
491 instruments,
492 instruments_by_symbol,
493 );
494 }
495 Ok(None) => {}
496 Err(e) => log::error!("Failed to parse {channel:?} data: {e}"),
497 }
498 }
499 OKXWsMessage::Instruments(okx_instruments) => {
500 let ts_init = clock.get_time_ns();
501
502 for okx_inst in okx_instruments {
503 let inst_key = Ustr::from(&okx_inst.inst_id);
504 let (margin_init, margin_maint, maker_fee, taker_fee) =
505 instruments_by_symbol.get(&inst_key).map_or(
506 (None, None, None, None),
507 extract_fees_from_cached_instrument,
508 );
509 let status_action = okx_status_to_market_action(okx_inst.state);
510 let is_live = matches!(okx_inst.state, OKXInstrumentStatus::Live);
511 match parse_instrument_any(
512 &okx_inst,
513 margin_init,
514 margin_maint,
515 maker_fee,
516 taker_fee,
517 ts_init,
518 ) {
519 Ok(Some(inst_any)) => {
520 let instrument_id = inst_any.id();
521 instruments_by_symbol
522 .insert(inst_any.symbol().inner(), inst_any.clone());
523 upsert_instrument(instruments, inst_any);
524 emit_instrument_status(
525 data_sender,
526 instrument_id,
527 status_action,
528 is_live,
529 ts_init,
530 );
531 }
532 Ok(None) => {
533 let instrument_id = instruments_by_symbol
534 .get(&inst_key)
535 .map_or_else(|| parse_instrument_id(inst_key), |i| i.id());
536 emit_instrument_status(
537 data_sender,
538 instrument_id,
539 status_action,
540 is_live,
541 ts_init,
542 );
543 }
544 Err(e) => {
545 log::error!("Failed to parse instrument: {e}");
546 let instrument_id = instruments_by_symbol
547 .get(&inst_key)
548 .map_or_else(|| parse_instrument_id(inst_key), |i| i.id());
549 emit_instrument_status(
550 data_sender,
551 instrument_id,
552 status_action,
553 is_live,
554 ts_init,
555 );
556 }
557 }
558 }
559 }
560 OKXWsMessage::Orders(_)
561 | OKXWsMessage::AlgoOrders(_)
562 | OKXWsMessage::OrderResponse { .. }
563 | OKXWsMessage::Account(_)
564 | OKXWsMessage::Positions(_)
565 | OKXWsMessage::SendFailed { .. } => {
566 log::debug!("Ignoring execution message on data client");
567 }
568 OKXWsMessage::Error(e) => {
569 log::error!("OKX websocket error: {e:?}");
570 }
571 OKXWsMessage::Reconnected => {
572 log::info!("Websocket reconnected");
573 }
574 OKXWsMessage::Authenticated => {
575 log::debug!("Websocket authenticated");
576 }
577 }
578 }
579}
580
581fn dispatch_parsed_data(
582 msg: NautilusWsMessage,
583 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
584 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
585 instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
586) {
587 match msg {
588 NautilusWsMessage::Data(payloads) => {
589 for data in payloads {
590 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
591 log::error!("Failed to emit data event: {e}");
592 }
593 }
594 }
595 NautilusWsMessage::Deltas(deltas) => {
596 let data = Data::Deltas(OrderBookDeltas_API::new(deltas));
597 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
598 log::error!("Failed to emit data event: {e}");
599 }
600 }
601 NautilusWsMessage::FundingRates(updates) => {
602 emit_funding_rates(data_sender, updates);
603 }
604 NautilusWsMessage::Instrument(instrument, status) => {
605 instruments_by_symbol.insert(instrument.symbol().inner(), (*instrument).clone());
606 upsert_instrument(instruments, *instrument);
607
608 if let Some(status) = status
609 && let Err(e) = data_sender.send(DataEvent::InstrumentStatus(status))
610 {
611 log::error!("Failed to emit instrument status event: {e}");
612 }
613 }
614 _ => {}
615 }
616}
617
618fn emit_funding_rates(
619 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
620 updates: Vec<FundingRateUpdate>,
621) {
622 for update in updates {
623 if let Err(e) = sender.send(DataEvent::FundingRate(update)) {
624 log::error!("Failed to emit funding rate event: {e}");
625 }
626 }
627}
628
629fn emit_instrument_status(
630 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
631 instrument_id: InstrumentId,
632 status_action: MarketStatusAction,
633 is_live: bool,
634 ts_init: UnixNanos,
635) {
636 let status = InstrumentStatus::new(
637 instrument_id,
638 status_action,
639 ts_init,
640 ts_init,
641 None,
642 None,
643 Some(is_live),
644 None,
645 None,
646 );
647
648 if let Err(e) = sender.send(DataEvent::InstrumentStatus(status)) {
649 log::error!("Failed to emit instrument status event: {e}");
650 }
651}
652
653fn upsert_instrument(
654 cache: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
655 instrument: InstrumentAny,
656) {
657 cache.insert(instrument.id(), instrument);
658}
659
660fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
661 contract_filter_with_config_types(config.contract_types.as_ref(), instrument)
662}
663
664fn contract_filter_with_config_types(
665 contract_types: Option<&Vec<OKXContractType>>,
666 instrument: &InstrumentAny,
667) -> bool {
668 match contract_types {
669 None => true,
670 Some(filter) if filter.is_empty() => true,
671 Some(filter) => {
672 let is_inverse = instrument.is_inverse();
673 (is_inverse && filter.contains(&OKXContractType::Inverse))
674 || (!is_inverse && filter.contains(&OKXContractType::Linear))
675 }
676 }
677}
678
679#[async_trait::async_trait(?Send)]
680impl DataClient for OKXDataClient {
681 fn client_id(&self) -> ClientId {
682 self.client_id
683 }
684
685 fn venue(&self) -> Option<Venue> {
686 Some(self.venue())
687 }
688
689 fn start(&mut self) -> anyhow::Result<()> {
690 log::info!(
691 "Started: client_id={}, vip_level={:?}, instrument_types={:?}, environment={}, proxy_url={:?}",
692 self.client_id,
693 self.vip_level(),
694 self.config.instrument_types,
695 self.config.environment,
696 self.config.proxy_url,
697 );
698 Ok(())
699 }
700
701 fn stop(&mut self) -> anyhow::Result<()> {
702 log::info!("Stopping {id}", id = self.client_id);
703 self.cancellation_token.cancel();
704 self.is_connected.store(false, Ordering::Relaxed);
705 Ok(())
706 }
707
708 fn reset(&mut self) -> anyhow::Result<()> {
709 log::debug!("Resetting {id}", id = self.client_id);
710 self.is_connected.store(false, Ordering::Relaxed);
711 self.cancellation_token = CancellationToken::new();
712 self.tasks.clear();
713 self.book_channels.store(AHashMap::new());
714 self.option_greeks_subs
715 .store(AHashMap::<InstrumentId, AHashSet<OKXGreeksType>>::new());
716 self.option_summary_family_subs
717 .lock()
718 .expect("option_summary_family_subs mutex poisoned")
719 .clear();
720 Ok(())
721 }
722
723 fn dispose(&mut self) -> anyhow::Result<()> {
724 log::debug!("Disposing {id}", id = self.client_id);
725 self.stop()
726 }
727
728 async fn connect(&mut self) -> anyhow::Result<()> {
729 if self.is_connected() {
730 return Ok(());
731 }
732
733 self.cancellation_token = CancellationToken::new();
736
737 let instrument_types = if self.config.instrument_types.is_empty() {
738 vec![OKXInstrumentType::Spot]
739 } else {
740 self.config.instrument_types.clone()
741 };
742
743 let mut all_instruments = Vec::new();
744
745 for inst_type in &instrument_types {
746 let Some(families) =
747 resolve_instrument_families(&self.config.instrument_families, *inst_type)
748 else {
749 continue;
750 };
751
752 if families.is_empty() {
753 let (mut fetched, _inst_id_codes) = self
754 .http_client
755 .request_instruments(*inst_type, None)
756 .await
757 .with_context(|| {
758 format!("failed to request OKX instruments for {inst_type:?}")
759 })?;
760
761 fetched.retain(|instrument| contract_filter_with_config(&self.config, instrument));
762 self.http_client.cache_instruments(&fetched);
763
764 self.instruments.rcu(|m| {
765 for instrument in &fetched {
766 m.insert(instrument.id(), instrument.clone());
767 }
768 });
769
770 all_instruments.extend(fetched);
771 } else {
772 for family in &families {
773 let (mut fetched, _inst_id_codes) = self
774 .http_client
775 .request_instruments(*inst_type, Some(family.clone()))
776 .await
777 .with_context(|| {
778 format!(
779 "failed to request OKX instruments for {inst_type:?} family {family}"
780 )
781 })?;
782
783 fetched
784 .retain(|instrument| contract_filter_with_config(&self.config, instrument));
785 self.http_client.cache_instruments(&fetched);
786
787 self.instruments.rcu(|m| {
788 for instrument in &fetched {
789 m.insert(instrument.id(), instrument.clone());
790 }
791 });
792
793 all_instruments.extend(fetched);
794 }
795 }
796 }
797
798 for instrument in all_instruments {
799 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
800 log::warn!("Failed to send instrument: {e}");
801 }
802 }
803
804 if let Some(ref mut ws) = self.ws_public {
805 let instruments: Vec<_> = self.instruments.load().values().cloned().collect();
807 ws.cache_instruments(&instruments);
808
809 ws.connect()
810 .await
811 .context("failed to connect OKX public websocket")?;
812 ws.wait_until_active(10.0)
813 .await
814 .context("public websocket did not become active")?;
815
816 let stream = ws.stream();
817 let sender = self.data_sender.clone();
818 let insts = self.instruments.clone();
819 let idx_map = self.index_ticker_map.clone();
820 let greeks_subs = self.option_greeks_subs.clone();
821 let cancel = self.cancellation_token.clone();
822 let clock = self.clock;
823
824 let handle = get_runtime().spawn(async move {
825 let mut instruments_by_symbol: AHashMap<Ustr, InstrumentAny> = insts
826 .load()
827 .values()
828 .map(|i| (i.symbol().inner(), i.clone()))
829 .collect();
830 let mut quote_cache = QuoteCache::new();
831 let mut funding_cache: AHashMap<Ustr, (Ustr, u64)> = AHashMap::new();
832 pin_mut!(stream);
833
834 loop {
835 tokio::select! {
836 Some(message) = stream.next() => {
837 Self::handle_ws_message(
838 message,
839 &sender,
840 &insts,
841 &mut instruments_by_symbol,
842 &mut quote_cache,
843 &mut funding_cache,
844 &idx_map,
845 &greeks_subs,
846 clock,
847 );
848 }
849 () = cancel.cancelled() => {
850 log::debug!("Public websocket stream task cancelled");
851 break;
852 }
853 }
854 }
855 });
856 self.tasks.push(handle);
857
858 for inst_type in &instrument_types {
859 ws.subscribe_instruments(*inst_type)
860 .await
861 .with_context(|| {
862 format!("failed to subscribe to instrument type {inst_type:?}")
863 })?;
864 }
865 }
866
867 if let Some(ref mut ws) = self.ws_business {
868 let instruments: Vec<_> = self.instruments.load().values().cloned().collect();
870 ws.cache_instruments(&instruments);
871
872 ws.connect()
873 .await
874 .context("failed to connect OKX business websocket")?;
875 ws.wait_until_active(10.0)
876 .await
877 .context("business websocket did not become active")?;
878
879 let stream = ws.stream();
880 let sender = self.data_sender.clone();
881 let insts = self.instruments.clone();
882 let idx_map = self.index_ticker_map.clone();
883 let greeks_subs = self.option_greeks_subs.clone();
884 let cancel = self.cancellation_token.clone();
885 let clock = self.clock;
886
887 let handle = get_runtime().spawn(async move {
888 let mut instruments_by_symbol: AHashMap<Ustr, InstrumentAny> = insts
889 .load()
890 .values()
891 .map(|i| (i.symbol().inner(), i.clone()))
892 .collect();
893 let mut quote_cache = QuoteCache::new();
894 let mut funding_cache: AHashMap<Ustr, (Ustr, u64)> = AHashMap::new();
895 pin_mut!(stream);
896
897 loop {
898 tokio::select! {
899 Some(message) = stream.next() => {
900 Self::handle_ws_message(
901 message,
902 &sender,
903 &insts,
904 &mut instruments_by_symbol,
905 &mut quote_cache,
906 &mut funding_cache,
907 &idx_map,
908 &greeks_subs,
909 clock,
910 );
911 }
912 () = cancel.cancelled() => {
913 log::debug!("Business websocket stream task cancelled");
914 break;
915 }
916 }
917 }
918 });
919 self.tasks.push(handle);
920 }
921
922 self.is_connected.store(true, Ordering::Release);
923 log::info!("Connected: client_id={}", self.client_id);
924 Ok(())
925 }
926
927 async fn disconnect(&mut self) -> anyhow::Result<()> {
928 if self.is_disconnected() {
929 return Ok(());
930 }
931
932 self.cancellation_token.cancel();
933
934 if let Some(ref ws) = self.ws_public
935 && let Err(e) = ws.unsubscribe_all().await
936 {
937 log::warn!("Failed to unsubscribe all from public websocket: {e:?}");
938 }
939
940 if let Some(ref ws) = self.ws_business
941 && let Err(e) = ws.unsubscribe_all().await
942 {
943 log::warn!("Failed to unsubscribe all from business websocket: {e:?}");
944 }
945
946 tokio::time::sleep(Duration::from_millis(500)).await;
948
949 if let Some(ref mut ws) = self.ws_public {
950 let _ = ws.close().await;
951 }
952
953 if let Some(ref mut ws) = self.ws_business {
954 let _ = ws.close().await;
955 }
956
957 let handles: Vec<_> = self.tasks.drain(..).collect();
958
959 for handle in handles {
960 if let Err(e) = handle.await {
961 log::error!("Error joining websocket task: {e}");
962 }
963 }
964
965 self.book_channels.store(AHashMap::new());
966 self.option_greeks_subs
967 .store(AHashMap::<InstrumentId, AHashSet<OKXGreeksType>>::new());
968 self.option_summary_family_subs
969 .lock()
970 .expect("option_summary_family_subs mutex poisoned")
971 .clear();
972 self.is_connected.store(false, Ordering::Release);
973 log::info!("Disconnected: client_id={}", self.client_id);
974 Ok(())
975 }
976
977 fn is_connected(&self) -> bool {
978 self.is_connected.load(Ordering::Relaxed)
979 }
980
981 fn is_disconnected(&self) -> bool {
982 !self.is_connected()
983 }
984
985 fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
986 for inst_type in &self.config.instrument_types {
987 let ws = self.public_ws()?.clone();
988 let inst_type = *inst_type;
989
990 self.spawn_ws(
991 async move {
992 ws.subscribe_instruments(inst_type)
993 .await
994 .context("instruments subscription")?;
995 Ok(())
996 },
997 "subscribe_instruments",
998 );
999 }
1000 Ok(())
1001 }
1002
1003 fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
1004 let instrument_id = cmd.instrument_id;
1007 let ws = self.public_ws()?.clone();
1008
1009 self.spawn_ws(
1010 async move {
1011 ws.subscribe_instrument(instrument_id)
1012 .await
1013 .context("instrument type subscription")?;
1014 Ok(())
1015 },
1016 "subscribe_instrument",
1017 );
1018 Ok(())
1019 }
1020
1021 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
1022 if cmd.book_type != BookType::L2_MBP {
1023 anyhow::bail!("OKX only supports L2_MBP order book deltas");
1024 }
1025
1026 let raw_depth = cmd.depth.map_or(0, |d| d.get());
1027 let depth = resolve_book_depth(raw_depth);
1028 if depth != raw_depth {
1029 log::info!("Clamped book depth {raw_depth} to {depth} (OKX supports 50 or 400)");
1030 }
1031
1032 let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
1033 let channel = match depth {
1034 50 => {
1035 if vip < OKXVipLevel::Vip4 {
1036 log::info!(
1037 "VIP level {vip} insufficient for 50-depth channel, falling back to default"
1038 );
1039 OKXBookChannel::Book
1040 } else {
1041 OKXBookChannel::Books50L2Tbt
1042 }
1043 }
1044 0 | 400 => {
1045 if vip >= OKXVipLevel::Vip5 {
1046 OKXBookChannel::BookL2Tbt
1047 } else {
1048 OKXBookChannel::Book
1049 }
1050 }
1051 _ => unreachable!(),
1052 };
1053
1054 let instrument_id = cmd.instrument_id;
1055 let ws = self.public_ws()?.clone();
1056 let book_channels = Arc::clone(&self.book_channels);
1057
1058 self.spawn_ws(
1059 async move {
1060 match channel {
1061 OKXBookChannel::Books50L2Tbt => ws
1062 .subscribe_book50_l2_tbt(instrument_id)
1063 .await
1064 .context("books50-l2-tbt subscription")?,
1065 OKXBookChannel::BookL2Tbt => ws
1066 .subscribe_book_l2_tbt(instrument_id)
1067 .await
1068 .context("books-l2-tbt subscription")?,
1069 OKXBookChannel::Book => ws
1070 .subscribe_books_channel(instrument_id)
1071 .await
1072 .context("books subscription")?,
1073 }
1074 book_channels.insert(instrument_id, channel);
1075 Ok(())
1076 },
1077 "order book delta subscription",
1078 );
1079
1080 Ok(())
1081 }
1082
1083 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
1084 let ws = self.public_ws()?.clone();
1085 let instrument_id = cmd.instrument_id;
1086
1087 self.spawn_ws(
1088 async move {
1089 ws.subscribe_quotes(instrument_id)
1090 .await
1091 .context("quotes subscription")
1092 },
1093 "quote subscription",
1094 );
1095 Ok(())
1096 }
1097
1098 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
1099 let ws = self.public_ws()?.clone();
1100 let instrument_id = cmd.instrument_id;
1101
1102 self.spawn_ws(
1103 async move {
1104 ws.subscribe_trades(instrument_id, false)
1105 .await
1106 .context("trades subscription")
1107 },
1108 "trade subscription",
1109 );
1110 Ok(())
1111 }
1112
1113 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
1114 let ws = self.public_ws()?.clone();
1115 let instrument_id = cmd.instrument_id;
1116
1117 self.spawn_ws(
1118 async move {
1119 ws.subscribe_mark_prices(instrument_id)
1120 .await
1121 .context("mark price subscription")
1122 },
1123 "mark price subscription",
1124 );
1125 Ok(())
1126 }
1127
1128 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
1129 let ws = self.public_ws()?.clone();
1130 let instrument_id = cmd.instrument_id;
1131 let symbol = instrument_id.symbol.inner();
1132
1133 let (base, quote) = parse_base_quote_from_symbol(symbol.as_str())?;
1134 let base_pair = Ustr::from(&format!("{base}-{quote}"));
1135 self.index_ticker_map.rcu(|m| {
1136 m.entry(base_pair).or_default().insert(symbol);
1137 });
1138
1139 self.spawn_ws(
1140 async move {
1141 ws.subscribe_index_prices(instrument_id)
1142 .await
1143 .context("index price subscription")
1144 },
1145 "index price subscription",
1146 );
1147 Ok(())
1148 }
1149
1150 fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
1151 let ws = self.business_ws()?.clone();
1152 let bar_type = cmd.bar_type;
1153
1154 self.spawn_ws(
1155 async move {
1156 ws.subscribe_bars(bar_type)
1157 .await
1158 .context("bars subscription")
1159 },
1160 "bar subscription",
1161 );
1162 Ok(())
1163 }
1164
1165 fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
1166 let ws = self.public_ws()?.clone();
1167 let instrument_id = cmd.instrument_id;
1168
1169 self.spawn_ws(
1170 async move {
1171 ws.subscribe_funding_rates(instrument_id)
1172 .await
1173 .context("funding rate subscription")
1174 },
1175 "funding rate subscription",
1176 );
1177 Ok(())
1178 }
1179
1180 fn subscribe_option_greeks(&mut self, cmd: SubscribeOptionGreeks) -> anyhow::Result<()> {
1181 let instrument_id = cmd.instrument_id;
1182 let conventions = parse_greeks_conventions_from_params(&cmd.params);
1183 self.option_greeks_subs.insert(instrument_id, conventions);
1184
1185 let family = extract_inst_family(instrument_id.symbol.inner().as_str())?;
1186 let is_first = {
1187 let mut family_subs = self
1188 .option_summary_family_subs
1189 .lock()
1190 .expect("option_summary_family_subs mutex poisoned");
1191 let count = family_subs.entry(family).or_default();
1192 *count += 1;
1193 *count == 1
1194 };
1195
1196 if is_first {
1197 let ws = self.public_ws()?.clone();
1198 let family_subs = self.option_summary_family_subs.clone();
1199 self.spawn_ws(
1200 async move {
1201 let result = ws
1202 .subscribe_option_summary(family)
1203 .await
1204 .context("opt-summary subscription");
1205
1206 if result.is_err() {
1207 let mut subs = family_subs
1210 .lock()
1211 .expect("option_summary_family_subs mutex poisoned");
1212
1213 if let Some(count) = subs.get_mut(&family) {
1214 *count = count.saturating_sub(1);
1215 if *count == 0 {
1216 subs.remove(&family);
1217 }
1218 }
1219 }
1220 result
1221 },
1222 "option greeks subscription",
1223 );
1224 }
1225 Ok(())
1226 }
1227
1228 fn subscribe_instrument_status(
1229 &mut self,
1230 cmd: SubscribeInstrumentStatus,
1231 ) -> anyhow::Result<()> {
1232 let ws = self.public_ws()?.clone();
1233 let instrument_id = cmd.instrument_id;
1234
1235 self.spawn_ws(
1236 async move {
1237 ws.subscribe_instrument(instrument_id)
1238 .await
1239 .context("instrument status subscription")
1240 },
1241 "instrument status subscription",
1242 );
1243 Ok(())
1244 }
1245
1246 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1247 let ws = self.public_ws()?.clone();
1248 let instrument_id = cmd.instrument_id;
1249 let channel = self.book_channels.get_cloned(&instrument_id);
1250 self.book_channels.remove(&instrument_id);
1251
1252 self.spawn_ws(
1253 async move {
1254 match channel {
1255 Some(OKXBookChannel::Books50L2Tbt) => ws
1256 .unsubscribe_book50_l2_tbt(instrument_id)
1257 .await
1258 .context("books50-l2-tbt unsubscribe")?,
1259 Some(OKXBookChannel::BookL2Tbt) => ws
1260 .unsubscribe_book_l2_tbt(instrument_id)
1261 .await
1262 .context("books-l2-tbt unsubscribe")?,
1263 Some(OKXBookChannel::Book) => ws
1264 .unsubscribe_book(instrument_id)
1265 .await
1266 .context("book unsubscribe")?,
1267 None => {
1268 log::warn!(
1269 "Book channel not found for {instrument_id}; unsubscribing fallback channel"
1270 );
1271 ws.unsubscribe_book(instrument_id)
1272 .await
1273 .context("book fallback unsubscribe")?;
1274 }
1275 }
1276 Ok(())
1277 },
1278 "order book unsubscribe",
1279 );
1280 Ok(())
1281 }
1282
1283 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1284 let ws = self.public_ws()?.clone();
1285 let instrument_id = cmd.instrument_id;
1286
1287 self.spawn_ws(
1288 async move {
1289 ws.unsubscribe_quotes(instrument_id)
1290 .await
1291 .context("quotes unsubscribe")
1292 },
1293 "quote unsubscribe",
1294 );
1295 Ok(())
1296 }
1297
1298 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1299 let ws = self.public_ws()?.clone();
1300 let instrument_id = cmd.instrument_id;
1301
1302 self.spawn_ws(
1303 async move {
1304 ws.unsubscribe_trades(instrument_id, false) .await
1306 .context("trades unsubscribe")
1307 },
1308 "trade unsubscribe",
1309 );
1310 Ok(())
1311 }
1312
1313 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1314 let ws = self.public_ws()?.clone();
1315 let instrument_id = cmd.instrument_id;
1316
1317 self.spawn_ws(
1318 async move {
1319 ws.unsubscribe_mark_prices(instrument_id)
1320 .await
1321 .context("mark price unsubscribe")
1322 },
1323 "mark price unsubscribe",
1324 );
1325 Ok(())
1326 }
1327
1328 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1329 let ws = self.public_ws()?.clone();
1330 let instrument_id = cmd.instrument_id;
1331 let symbol = instrument_id.symbol.inner();
1332
1333 if let Ok((base, quote)) = parse_base_quote_from_symbol(symbol.as_str()) {
1340 let base_pair = Ustr::from(&format!("{base}-{quote}"));
1341 self.index_ticker_map.rcu(|m| {
1342 if let Some(set) = m.get_mut(&base_pair) {
1343 set.remove(&symbol);
1344 if set.is_empty() {
1345 m.remove(&base_pair);
1346 }
1347 }
1348 });
1349 }
1350
1351 self.spawn_ws(
1352 async move {
1353 ws.unsubscribe_index_prices(instrument_id)
1354 .await
1355 .context("index price unsubscribe")
1356 },
1357 "index price unsubscribe",
1358 );
1359 Ok(())
1360 }
1361
1362 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1363 let ws = self.business_ws()?.clone();
1364 let bar_type = cmd.bar_type;
1365
1366 self.spawn_ws(
1367 async move {
1368 ws.unsubscribe_bars(bar_type)
1369 .await
1370 .context("bars unsubscribe")
1371 },
1372 "bar unsubscribe",
1373 );
1374 Ok(())
1375 }
1376
1377 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1378 let ws = self.public_ws()?.clone();
1379 let instrument_id = cmd.instrument_id;
1380
1381 self.spawn_ws(
1382 async move {
1383 ws.unsubscribe_funding_rates(instrument_id)
1384 .await
1385 .context("funding rate unsubscribe")
1386 },
1387 "funding rate unsubscribe",
1388 );
1389 Ok(())
1390 }
1391
1392 fn unsubscribe_option_greeks(&mut self, cmd: &UnsubscribeOptionGreeks) -> anyhow::Result<()> {
1393 let instrument_id = cmd.instrument_id;
1394 self.option_greeks_subs.remove(&instrument_id);
1395
1396 let family = extract_inst_family(instrument_id.symbol.inner().as_str())?;
1397 let should_unsubscribe = {
1398 let mut family_subs = self
1399 .option_summary_family_subs
1400 .lock()
1401 .expect("option_summary_family_subs mutex poisoned");
1402
1403 if let Some(count) = family_subs.get_mut(&family) {
1404 *count = count.saturating_sub(1);
1405 if *count == 0 {
1406 family_subs.remove(&family);
1407 true
1408 } else {
1409 false
1410 }
1411 } else {
1412 false
1413 }
1414 };
1415
1416 if should_unsubscribe {
1417 let ws = self.public_ws()?.clone();
1418 self.spawn_ws(
1419 async move {
1420 ws.unsubscribe_option_summary(family)
1421 .await
1422 .context("opt-summary unsubscription")
1423 },
1424 "option greeks unsubscription",
1425 );
1426 }
1427 Ok(())
1428 }
1429
1430 fn unsubscribe_instrument_status(
1431 &mut self,
1432 cmd: &UnsubscribeInstrumentStatus,
1433 ) -> anyhow::Result<()> {
1434 let ws = self.public_ws()?.clone();
1435 let instrument_id = cmd.instrument_id;
1436
1437 self.spawn_ws(
1438 async move {
1439 ws.unsubscribe_instrument(instrument_id)
1440 .await
1441 .context("instrument status unsubscription")
1442 },
1443 "instrument status unsubscription",
1444 );
1445 Ok(())
1446 }
1447
1448 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1449 let http = self.http_client.clone();
1450 let sender = self.data_sender.clone();
1451 let instruments_cache = self.instruments.clone();
1452 let request_id = request.request_id;
1453 let client_id = request.client_id.unwrap_or(self.client_id);
1454 let venue = self.venue();
1455 let start = request.start;
1456 let end = request.end;
1457 let params = request.params;
1458 let clock = self.clock;
1459 let start_nanos = datetime_to_unix_nanos(start);
1460 let end_nanos = datetime_to_unix_nanos(end);
1461 let instrument_types = if self.config.instrument_types.is_empty() {
1462 vec![OKXInstrumentType::Spot]
1463 } else {
1464 self.config.instrument_types.clone()
1465 };
1466 let contract_types = self.config.contract_types.clone();
1467 let instrument_families = self.config.instrument_families.clone();
1468
1469 get_runtime().spawn(async move {
1470 let mut all_instruments = Vec::new();
1471
1472 for inst_type in instrument_types {
1473 let Some(families) =
1474 resolve_instrument_families(&instrument_families, inst_type)
1475 else {
1476 continue;
1477 };
1478
1479 if families.is_empty() {
1480 match http.request_instruments(inst_type, None).await {
1481 Ok((instruments, _inst_id_codes)) => {
1482 for instrument in instruments {
1483 if !contract_filter_with_config_types(
1484 contract_types.as_ref(),
1485 &instrument,
1486 ) {
1487 continue;
1488 }
1489
1490 upsert_instrument(&instruments_cache, instrument.clone());
1491 all_instruments.push(instrument);
1492 }
1493 }
1494 Err(e) => {
1495 log::error!("Failed to fetch instruments for {inst_type:?}: {e:?}");
1496 }
1497 }
1498 } else {
1499 for family in families {
1500 match http
1501 .request_instruments(inst_type, Some(family.clone()))
1502 .await
1503 {
1504 Ok((instruments, _inst_id_codes)) => {
1505 for instrument in instruments {
1506 if !contract_filter_with_config_types(
1507 contract_types.as_ref(),
1508 &instrument,
1509 ) {
1510 continue;
1511 }
1512
1513 upsert_instrument(&instruments_cache, instrument.clone());
1514 all_instruments.push(instrument);
1515 }
1516 }
1517 Err(e) => {
1518 log::error!(
1519 "Failed to fetch instruments for {inst_type:?} family {family}: {e:?}"
1520 );
1521 }
1522 }
1523 }
1524 }
1525 }
1526
1527 let response = DataResponse::Instruments(InstrumentsResponse::new(
1528 request_id,
1529 client_id,
1530 venue,
1531 all_instruments,
1532 start_nanos,
1533 end_nanos,
1534 clock.get_time_ns(),
1535 params,
1536 ));
1537
1538 if let Err(e) = sender.send(DataEvent::Response(response)) {
1539 log::error!("Failed to send instruments response: {e}");
1540 }
1541 });
1542
1543 Ok(())
1544 }
1545
1546 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1547 let http = self.http_client.clone();
1548 let sender = self.data_sender.clone();
1549 let instruments = self.instruments.clone();
1550 let instrument_id = request.instrument_id;
1551 let request_id = request.request_id;
1552 let client_id = request.client_id.unwrap_or(self.client_id);
1553 let start = request.start;
1554 let end = request.end;
1555 let params = request.params;
1556 let clock = self.clock;
1557 let start_nanos = datetime_to_unix_nanos(start);
1558 let end_nanos = datetime_to_unix_nanos(end);
1559 let instrument_types = if self.config.instrument_types.is_empty() {
1560 vec![OKXInstrumentType::Spot]
1561 } else {
1562 self.config.instrument_types.clone()
1563 };
1564 let contract_types = self.config.contract_types.clone();
1565
1566 get_runtime().spawn(async move {
1567 match http
1568 .request_instrument(instrument_id)
1569 .await
1570 .context("fetch instrument from API")
1571 {
1572 Ok(instrument) => {
1573 let inst_id = instrument.id();
1574 let symbol = inst_id.symbol.as_str();
1575 let inst_type = okx_instrument_type_from_symbol(symbol);
1576 if !instrument_types.contains(&inst_type) {
1577 log::error!(
1578 "Instrument {instrument_id} type {inst_type:?} not in configured types {instrument_types:?}"
1579 );
1580 return;
1581 }
1582
1583 if !contract_filter_with_config_types(contract_types.as_ref(), &instrument) {
1584 log::error!(
1585 "Instrument {instrument_id} filtered out by contract_types config"
1586 );
1587 return;
1588 }
1589
1590 upsert_instrument(&instruments, instrument.clone());
1591
1592 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1593 request_id,
1594 client_id,
1595 instrument.id(),
1596 instrument,
1597 start_nanos,
1598 end_nanos,
1599 clock.get_time_ns(),
1600 params,
1601 )));
1602
1603 if let Err(e) = sender.send(DataEvent::Response(response)) {
1604 log::error!("Failed to send instrument response: {e}");
1605 }
1606 }
1607 Err(e) => log::error!("Instrument request failed: {e:?}"),
1608 }
1609 });
1610
1611 Ok(())
1612 }
1613
1614 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1615 let http = self.http_client.clone();
1616 let sender = self.data_sender.clone();
1617 let instrument_id = request.instrument_id;
1618 let depth = request.depth.map(|n| n.get() as u32);
1619 let request_id = request.request_id;
1620 let client_id = request.client_id.unwrap_or(self.client_id);
1621 let params = request.params;
1622 let clock = self.clock;
1623
1624 get_runtime().spawn(async move {
1625 match http
1626 .request_book_snapshot(instrument_id, depth)
1627 .await
1628 .context("failed to request book snapshot from OKX")
1629 {
1630 Ok(book) => {
1631 let response = DataResponse::Book(BookResponse::new(
1632 request_id,
1633 client_id,
1634 instrument_id,
1635 book,
1636 None,
1637 None,
1638 clock.get_time_ns(),
1639 params,
1640 ));
1641
1642 if let Err(e) = sender.send(DataEvent::Response(response)) {
1643 log::error!("Failed to send book snapshot response: {e}");
1644 }
1645 }
1646 Err(e) => log::error!("Book snapshot request failed: {e:?}"),
1647 }
1648 });
1649
1650 Ok(())
1651 }
1652
1653 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1654 let http = self.http_client.clone();
1655 let sender = self.data_sender.clone();
1656 let instrument_id = request.instrument_id;
1657 let start = request.start;
1658 let end = request.end;
1659 let limit = request.limit.map(|n| n.get() as u32);
1660 let request_id = request.request_id;
1661 let client_id = request.client_id.unwrap_or(self.client_id);
1662 let params = request.params;
1663 let clock = self.clock;
1664 let start_nanos = datetime_to_unix_nanos(start);
1665 let end_nanos = datetime_to_unix_nanos(end);
1666
1667 get_runtime().spawn(async move {
1668 match http
1669 .request_trades(instrument_id, start, end, limit)
1670 .await
1671 .context("failed to request trades from OKX")
1672 {
1673 Ok(trades) => {
1674 let response = DataResponse::Trades(TradesResponse::new(
1675 request_id,
1676 client_id,
1677 instrument_id,
1678 trades,
1679 start_nanos,
1680 end_nanos,
1681 clock.get_time_ns(),
1682 params,
1683 ));
1684
1685 if let Err(e) = sender.send(DataEvent::Response(response)) {
1686 log::error!("Failed to send trades response: {e}");
1687 }
1688 }
1689 Err(e) => log::error!("Trade request failed: {e:?}"),
1690 }
1691 });
1692
1693 Ok(())
1694 }
1695
1696 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1697 let http = self.http_client.clone();
1698 let sender = self.data_sender.clone();
1699 let bar_type = request.bar_type;
1700 let start = request.start;
1701 let end = request.end;
1702 let limit = request.limit.map(|n| n.get() as u32);
1703 let request_id = request.request_id;
1704 let client_id = request.client_id.unwrap_or(self.client_id);
1705 let params = request.params;
1706 let clock = self.clock;
1707 let start_nanos = datetime_to_unix_nanos(start);
1708 let end_nanos = datetime_to_unix_nanos(end);
1709
1710 get_runtime().spawn(async move {
1711 match http
1712 .request_bars(bar_type, start, end, limit)
1713 .await
1714 .context("failed to request bars from OKX")
1715 {
1716 Ok(bars) => {
1717 let response = DataResponse::Bars(BarsResponse::new(
1718 request_id,
1719 client_id,
1720 bar_type,
1721 bars,
1722 start_nanos,
1723 end_nanos,
1724 clock.get_time_ns(),
1725 params,
1726 ));
1727
1728 if let Err(e) = sender.send(DataEvent::Response(response)) {
1729 log::error!("Failed to send bars response: {e}");
1730 }
1731 }
1732 Err(e) => log::error!("Bar request failed: {e:?}"),
1733 }
1734 });
1735
1736 Ok(())
1737 }
1738
1739 fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
1740 let http = self.http_client.clone();
1741 let sender = self.data_sender.clone();
1742 let instrument_id = request.instrument_id;
1743 let start = request.start;
1744 let end = request.end;
1745 let limit = request.limit.map(|n| n.get() as u32);
1746 let request_id = request.request_id;
1747 let client_id = request.client_id.unwrap_or(self.client_id);
1748 let params = request.params;
1749 let clock = self.clock;
1750 let start_nanos = datetime_to_unix_nanos(start);
1751 let end_nanos = datetime_to_unix_nanos(end);
1752
1753 get_runtime().spawn(async move {
1754 match http
1755 .request_funding_rates(instrument_id, start, end, limit)
1756 .await
1757 .context("failed to request funding rates from OKX")
1758 {
1759 Ok(funding_rates) => {
1760 let response = DataResponse::FundingRates(FundingRatesResponse::new(
1761 request_id,
1762 client_id,
1763 instrument_id,
1764 funding_rates,
1765 start_nanos,
1766 end_nanos,
1767 clock.get_time_ns(),
1768 params,
1769 ));
1770
1771 if let Err(e) = sender.send(DataEvent::Response(response)) {
1772 log::error!("Failed to send funding rates response: {e}");
1773 }
1774 }
1775 Err(e) => log::error!("Funding rates request failed: {e:?}"),
1776 }
1777 });
1778
1779 Ok(())
1780 }
1781
1782 fn request_forward_prices(&self, request: RequestForwardPrices) -> anyhow::Result<()> {
1783 let http = self.http_client.clone();
1784 let sender = self.data_sender.clone();
1785 let underlying = request.underlying.to_string();
1786 let instrument_id = request.instrument_id;
1787 let request_id = request.request_id;
1788 let client_id = request.client_id.unwrap_or(self.client_id);
1789 let params = request.params;
1790 let clock = self.clock;
1791 let venue = *OKX_VENUE;
1792
1793 get_runtime().spawn(async move {
1794 match http
1795 .request_forward_prices(&underlying, instrument_id)
1796 .await
1797 .context("failed to request forward prices from OKX")
1798 {
1799 Ok(forward_prices) => {
1800 let response = DataResponse::ForwardPrices(ForwardPricesResponse::new(
1801 request_id,
1802 client_id,
1803 venue,
1804 forward_prices,
1805 clock.get_time_ns(),
1806 params,
1807 ));
1808
1809 if let Err(e) = sender.send(DataEvent::Response(response)) {
1810 log::error!("Failed to send forward prices response: {e}");
1811 }
1812 }
1813 Err(e) => {
1814 log::error!("Forward prices request failed for {underlying}: {e:?}");
1815 let response = DataResponse::ForwardPrices(ForwardPricesResponse::new(
1816 request_id,
1817 client_id,
1818 venue,
1819 Vec::new(),
1820 clock.get_time_ns(),
1821 params,
1822 ));
1823
1824 if let Err(e) = sender.send(DataEvent::Response(response)) {
1825 log::error!("Failed to send forward prices response: {e}");
1826 }
1827 }
1828 }
1829 });
1830
1831 Ok(())
1832 }
1833}
1834
1835#[cfg(test)]
1836mod tests {
1837 use rstest::rstest;
1838 use serde_json::json;
1839
1840 use super::*;
1841
1842 fn both() -> AHashSet<OKXGreeksType> {
1843 [OKXGreeksType::Bs, OKXGreeksType::Pa].into_iter().collect()
1844 }
1845
1846 fn only(greeks_type: OKXGreeksType) -> AHashSet<OKXGreeksType> {
1847 [greeks_type].into_iter().collect()
1848 }
1849
1850 #[rstest]
1851 fn parse_conventions_returns_both_when_params_missing() {
1852 let result = parse_greeks_conventions_from_params(&None);
1853 assert_eq!(result, both());
1854 }
1855
1856 #[rstest]
1857 fn parse_conventions_returns_both_when_key_absent() {
1858 let mut params = Params::new();
1859 params.insert("other_key".to_string(), json!("value"));
1860 let result = parse_greeks_conventions_from_params(&Some(params));
1861 assert_eq!(result, both());
1862 }
1863
1864 #[rstest]
1865 #[case("BLACK_SCHOLES", OKXGreeksType::Bs)]
1866 #[case("PRICE_ADJUSTED", OKXGreeksType::Pa)]
1867 #[case("black_scholes", OKXGreeksType::Bs)]
1868 #[case("price_adjusted", OKXGreeksType::Pa)]
1869 fn parse_conventions_accepts_single_string(#[case] raw: &str, #[case] expected: OKXGreeksType) {
1870 let mut params = Params::new();
1871 params.insert("greeks_convention".to_string(), json!(raw));
1872 let result = parse_greeks_conventions_from_params(&Some(params));
1873 assert_eq!(result, only(expected));
1874 }
1875
1876 #[rstest]
1877 fn parse_conventions_accepts_list_of_strings() {
1878 let mut params = Params::new();
1879 params.insert(
1880 "greeks_convention".to_string(),
1881 json!(["BLACK_SCHOLES", "PRICE_ADJUSTED"]),
1882 );
1883 let result = parse_greeks_conventions_from_params(&Some(params));
1884 assert_eq!(result, both());
1885 }
1886
1887 #[rstest]
1888 fn parse_conventions_accepts_single_entry_list() {
1889 let mut params = Params::new();
1890 params.insert("greeks_convention".to_string(), json!(["PRICE_ADJUSTED"]));
1891 let result = parse_greeks_conventions_from_params(&Some(params));
1892 assert_eq!(result, only(OKXGreeksType::Pa));
1893 }
1894
1895 #[rstest]
1896 fn parse_conventions_deduplicates_list_entries() {
1897 let mut params = Params::new();
1898 params.insert(
1899 "greeks_convention".to_string(),
1900 json!(["BLACK_SCHOLES", "black_scholes"]),
1901 );
1902 let result = parse_greeks_conventions_from_params(&Some(params));
1903 assert_eq!(result, only(OKXGreeksType::Bs));
1904 }
1905
1906 #[rstest]
1907 fn parse_conventions_skips_unknown_list_entries() {
1908 let mut params = Params::new();
1909 params.insert(
1910 "greeks_convention".to_string(),
1911 json!(["BOGUS", "PRICE_ADJUSTED"]),
1912 );
1913 let result = parse_greeks_conventions_from_params(&Some(params));
1914 assert_eq!(result, only(OKXGreeksType::Pa));
1915 }
1916
1917 #[rstest]
1918 fn parse_conventions_falls_back_to_both_on_all_unknown() {
1919 let mut params = Params::new();
1920 params.insert("greeks_convention".to_string(), json!(["BOGUS"]));
1921 let result = parse_greeks_conventions_from_params(&Some(params));
1922 assert_eq!(result, both());
1923 }
1924
1925 #[rstest]
1926 #[case(json!(1))]
1927 #[case(json!(null))]
1928 #[case(json!(true))]
1929 #[case(json!({"nested": "object"}))]
1930 fn parse_conventions_falls_back_on_non_string_value(#[case] value: serde_json::Value) {
1931 let mut params = Params::new();
1932 params.insert("greeks_convention".to_string(), value);
1933 let result = parse_greeks_conventions_from_params(&Some(params));
1934 assert_eq!(result, both());
1935 }
1936
1937 #[rstest]
1938 fn parse_conventions_falls_back_on_unknown_single_string() {
1939 let mut params = Params::new();
1940 params.insert("greeks_convention".to_string(), json!("BOGUS"));
1941 let result = parse_greeks_conventions_from_params(&Some(params));
1942 assert_eq!(result, both());
1943 }
1944}