Skip to main content

nautilus_deribit/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the Deribit adapter.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use async_trait::async_trait;
26use futures_util::StreamExt;
27use nautilus_common::{
28    clients::DataClient,
29    live::{runner::get_data_event_sender, runtime::get_runtime},
30    log_info,
31    messages::{
32        DataEvent, DataResponse,
33        data::{
34            BarsResponse, BookResponse, ForwardPricesResponse, InstrumentResponse,
35            InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestForwardPrices,
36            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
37            SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates, SubscribeIndexPrices,
38            SubscribeInstrument, SubscribeInstrumentStatus, SubscribeInstruments,
39            SubscribeMarkPrices, SubscribeOptionGreeks, SubscribeQuotes, SubscribeTrades,
40            TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
41            UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrument,
42            UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices,
43            UnsubscribeOptionGreeks, UnsubscribeQuotes, UnsubscribeTrades,
44        },
45    },
46};
47use nautilus_core::{
48    AtomicMap, AtomicSet, Params,
49    datetime::datetime_to_unix_nanos,
50    time::{AtomicTime, get_atomic_clock_realtime},
51};
52use nautilus_model::{
53    data::{Data, ForwardPrice, OrderBookDeltas_API},
54    enums::BookType,
55    identifiers::{ClientId, InstrumentId, Symbol, Venue},
56    instruments::{Instrument, InstrumentAny},
57};
58use tokio::task::JoinHandle;
59use tokio_util::sync::CancellationToken;
60
61use crate::{
62    common::{
63        consts::{
64            DERIBIT_BOOK_DEFAULT_DEPTH, DERIBIT_BOOK_DEFAULT_GROUP, DERIBIT_BOOK_VALID_DEPTHS,
65            DERIBIT_VENUE,
66        },
67        parse::{bar_spec_to_resolution, parse_instrument_kind_currency},
68    },
69    config::DeribitDataClientConfig,
70    http::{
71        client::DeribitHttpClient,
72        models::{DeribitCurrency, DeribitProductType},
73    },
74    websocket::{
75        auth::DERIBIT_DATA_SESSION_NAME, client::DeribitWebSocketClient,
76        enums::DeribitUpdateInterval, messages::NautilusWsMessage,
77    },
78};
79
80/// Deribit live data client.
81#[derive(Debug)]
82pub struct DeribitDataClient {
83    client_id: ClientId,
84    config: DeribitDataClientConfig,
85    http_client: DeribitHttpClient,
86    ws_client: Option<DeribitWebSocketClient>,
87    is_connected: AtomicBool,
88    cancellation_token: CancellationToken,
89    tasks: Vec<JoinHandle<()>>,
90    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
91    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
92    option_greeks_subs: Arc<AtomicSet<InstrumentId>>,
93    mark_price_subs: Arc<AtomicSet<InstrumentId>>,
94    index_price_subs: Arc<AtomicSet<InstrumentId>>,
95    clock: &'static AtomicTime,
96}
97
98impl DeribitDataClient {
99    /// Creates a new [`DeribitDataClient`] instance.
100    ///
101    /// # Errors
102    ///
103    /// Returns an error if the client fails to initialize.
104    pub fn new(client_id: ClientId, config: DeribitDataClientConfig) -> anyhow::Result<Self> {
105        let clock = get_atomic_clock_realtime();
106        let data_sender = get_data_event_sender();
107
108        let http_client = if config.has_api_credentials() {
109            DeribitHttpClient::new_with_env(
110                config.api_key.clone(),
111                config.api_secret.clone(),
112                config.base_url_http.clone(),
113                config.environment,
114                config.http_timeout_secs,
115                config.max_retries,
116                config.retry_delay_initial_ms,
117                config.retry_delay_max_ms,
118                config.proxy_url.clone(),
119            )?
120        } else {
121            DeribitHttpClient::new(
122                config.base_url_http.clone(),
123                config.environment,
124                config.http_timeout_secs,
125                config.max_retries,
126                config.retry_delay_initial_ms,
127                config.retry_delay_max_ms,
128                config.proxy_url.clone(),
129            )?
130        };
131
132        let ws_client = DeribitWebSocketClient::new(
133            Some(config.ws_url()),
134            config.api_key.clone(),
135            config.api_secret.clone(),
136            config.heartbeat_interval_secs,
137            config.environment,
138            config.transport_backend,
139            config.proxy_url.clone(),
140        )?;
141
142        Ok(Self {
143            client_id,
144            config,
145            http_client,
146            ws_client: Some(ws_client),
147            is_connected: AtomicBool::new(false),
148            cancellation_token: CancellationToken::new(),
149            tasks: Vec::new(),
150            data_sender,
151            instruments: Arc::new(AtomicMap::new()),
152            option_greeks_subs: Arc::new(AtomicSet::new()),
153            mark_price_subs: Arc::new(AtomicSet::new()),
154            index_price_subs: Arc::new(AtomicSet::new()),
155            clock,
156        })
157    }
158
159    /// Returns a mutable reference to the WebSocket client.
160    fn ws_client_mut(&mut self) -> anyhow::Result<&mut DeribitWebSocketClient> {
161        self.ws_client
162            .as_mut()
163            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))
164    }
165
166    /// Gets the interval from params, defaulting to Raw if authenticated.
167    ///
168    /// If authenticated, we prefer Raw interval for best data quality.
169    /// Users can still override via params if they want 100ms or agg2.
170    fn get_interval(&self, params: &Option<Params>) -> Option<DeribitUpdateInterval> {
171        if let Some(interval) = params
172            .as_ref()
173            .and_then(|p| p.get_str("interval"))
174            .and_then(|s| s.parse::<DeribitUpdateInterval>().ok())
175        {
176            return Some(interval);
177        }
178
179        // Default to Raw if authenticated, otherwise None (100ms default)
180        if let Some(ws) = self.ws_client.as_ref()
181            && ws.is_authenticated()
182        {
183            return Some(DeribitUpdateInterval::Raw);
184        }
185        None
186    }
187
188    /// Spawns a task to process WebSocket messages.
189    fn spawn_stream_task(
190        &mut self,
191        stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
192    ) {
193        let data_sender = self.data_sender.clone();
194        let instruments = Arc::clone(&self.instruments);
195        let cancellation = self.cancellation_token.clone();
196
197        let handle = get_runtime().spawn(async move {
198            tokio::pin!(stream);
199
200            loop {
201                tokio::select! {
202                    maybe_msg = stream.next() => {
203                        match maybe_msg {
204                            Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
205                            None => {
206                                log::debug!("WebSocket stream ended");
207                                break;
208                            }
209                        }
210                    }
211                    () = cancellation.cancelled() => {
212                        log::debug!("WebSocket stream task cancelled");
213                        break;
214                    }
215                }
216            }
217        });
218
219        self.tasks.push(handle);
220    }
221
222    /// Handles incoming WebSocket messages.
223    fn handle_ws_message(
224        message: NautilusWsMessage,
225        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
226        instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
227    ) {
228        match message {
229            NautilusWsMessage::Data(payloads) => {
230                for data in payloads {
231                    Self::send_data(sender, data);
232                }
233            }
234            NautilusWsMessage::Deltas(deltas) => {
235                Self::send_data(sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
236            }
237            NautilusWsMessage::Instrument(instrument) => {
238                let instrument_any = *instrument;
239                instruments.insert(instrument_any.id(), instrument_any.clone());
240
241                if let Err(e) = sender.send(DataEvent::Instrument(instrument_any)) {
242                    log::warn!("Failed to send instrument update: {e}");
243                }
244            }
245            NautilusWsMessage::OptionGreeks(greeks) => {
246                if let Err(e) = sender.send(DataEvent::OptionGreeks(greeks)) {
247                    log::error!("Failed to send option greeks: {e}");
248                }
249            }
250            NautilusWsMessage::Error(e) => {
251                log::error!("WebSocket error: {e:?}");
252            }
253            NautilusWsMessage::Raw(value) => {
254                log::debug!("Unhandled raw message: {value}");
255            }
256            NautilusWsMessage::Reconnected => {
257                log::info!("WebSocket reconnected");
258            }
259            NautilusWsMessage::Authenticated(auth) => {
260                log::debug!("WebSocket authenticated: expires_in={}s", auth.expires_in);
261            }
262            NautilusWsMessage::FundingRates(funding_rates) => {
263                log::info!(
264                    "Received {} funding rate update(s) from WebSocket",
265                    funding_rates.len()
266                );
267
268                for funding_rate in funding_rates {
269                    log::debug!("Sending funding rate: {funding_rate:?}");
270                    if let Err(e) = sender.send(DataEvent::FundingRate(funding_rate)) {
271                        log::error!("Failed to send funding rate: {e}");
272                    }
273                }
274            }
275            NautilusWsMessage::InstrumentStatus(status) => {
276                if let Err(e) = sender.send(DataEvent::InstrumentStatus(status)) {
277                    log::error!("Failed to send instrument status event: {e}");
278                }
279            }
280            NautilusWsMessage::OrderStatusReports(reports) => {
281                log::warn!(
282                    "Data client received OrderStatusReports message (should be handled by execution client): {} reports",
283                    reports.len()
284                );
285            }
286            NautilusWsMessage::FillReports(reports) => {
287                log::warn!(
288                    "Data client received FillReports message (should be handled by execution client): {} reports",
289                    reports.len()
290                );
291            }
292            NautilusWsMessage::OrderRejected(order) => {
293                log::warn!(
294                    "Data client received OrderRejected message (should be handled by execution client): {order:?}"
295                );
296            }
297            NautilusWsMessage::OrderAccepted(order) => {
298                log::warn!(
299                    "Data client received OrderAccepted message (should be handled by execution client): {order:?}"
300                );
301            }
302            NautilusWsMessage::OrderCanceled(order) => {
303                log::warn!(
304                    "Data client received OrderCanceled message (should be handled by execution client): {order:?}"
305                );
306            }
307            NautilusWsMessage::OrderExpired(order) => {
308                log::warn!(
309                    "Data client received OrderExpired message (should be handled by execution client): {order:?}"
310                );
311            }
312            NautilusWsMessage::OrderUpdated(order) => {
313                log::warn!(
314                    "Data client received OrderUpdated message (should be handled by execution client): {order:?}"
315                );
316            }
317            NautilusWsMessage::OrderCancelRejected(order) => {
318                log::warn!(
319                    "Data client received OrderCancelRejected message (should be handled by execution client): {order:?}"
320                );
321            }
322            NautilusWsMessage::OrderModifyRejected(order) => {
323                log::warn!(
324                    "Data client received OrderModifyRejected message (should be handled by execution client): {order:?}"
325                );
326            }
327            NautilusWsMessage::AccountState(state) => {
328                log::warn!(
329                    "Data client received AccountState message (should be handled by execution client): {state:?}"
330                );
331            }
332            NautilusWsMessage::AuthenticationFailed(reason) => {
333                log::error!("Authentication failed in data client: {reason}");
334            }
335        }
336    }
337
338    /// Sends data to the data channel.
339    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
340        if let Err(e) = sender.send(DataEvent::Data(data)) {
341            log::error!("Failed to send data: {e}");
342        }
343    }
344}
345
346#[async_trait(?Send)]
347impl DataClient for DeribitDataClient {
348    fn client_id(&self) -> ClientId {
349        self.client_id
350    }
351
352    fn venue(&self) -> Option<Venue> {
353        Some(*DERIBIT_VENUE)
354    }
355
356    fn start(&mut self) -> anyhow::Result<()> {
357        log::info!(
358            "Starting data client: client_id={}, environment={}",
359            self.client_id,
360            self.config.environment
361        );
362        Ok(())
363    }
364
365    fn stop(&mut self) -> anyhow::Result<()> {
366        log::info!("Stopping data client: {}", self.client_id);
367        self.cancellation_token.cancel();
368        self.is_connected.store(false, Ordering::Relaxed);
369        Ok(())
370    }
371
372    fn reset(&mut self) -> anyhow::Result<()> {
373        log::info!("Resetting data client: {}", self.client_id);
374        self.is_connected.store(false, Ordering::Relaxed);
375
376        // Cancel running stream tasks before replacing the token
377        self.cancellation_token.cancel();
378
379        for handle in self.tasks.drain(..) {
380            handle.abort();
381        }
382        self.cancellation_token = CancellationToken::new();
383
384        self.instruments.store(AHashMap::new());
385        Ok(())
386    }
387
388    fn dispose(&mut self) -> anyhow::Result<()> {
389        log::info!("Disposing data client: {}", self.client_id);
390        self.stop()
391    }
392
393    fn is_connected(&self) -> bool {
394        self.is_connected.load(Ordering::SeqCst)
395    }
396
397    fn is_disconnected(&self) -> bool {
398        !self.is_connected()
399    }
400
401    async fn connect(&mut self) -> anyhow::Result<()> {
402        if self.is_connected() {
403            return Ok(());
404        }
405
406        // Fetch instruments for each configured product type
407        let product_types = if self.config.product_types.is_empty() {
408            vec![DeribitProductType::Future]
409        } else {
410            self.config.product_types.clone()
411        };
412
413        let mut all_instruments = Vec::new();
414
415        for product_type in &product_types {
416            let fetched = self
417                .http_client
418                .request_instruments(DeribitCurrency::ANY, Some(*product_type))
419                .await
420                .with_context(|| format!("failed to request instruments for {product_type:?}"))?;
421
422            // Cache in http client
423            self.http_client.cache_instruments(&fetched);
424
425            // Cache locally
426            self.instruments.rcu(|m| {
427                for instrument in &fetched {
428                    m.insert(instrument.id(), instrument.clone());
429                }
430            });
431
432            all_instruments.extend(fetched);
433        }
434
435        log::info!(
436            "Cached instruments: client_id={}, total={}",
437            self.client_id,
438            all_instruments.len()
439        );
440
441        for instrument in &all_instruments {
442            if let Err(e) = self
443                .data_sender
444                .send(DataEvent::Instrument(instrument.clone()))
445            {
446                log::warn!("Failed to send instrument: {e}");
447            }
448        }
449
450        // Cache instruments and set subscription filters in WebSocket client before connecting
451        let option_greeks_subs = self.option_greeks_subs.clone();
452        let mark_price_subs = self.mark_price_subs.clone();
453        let index_price_subs = self.index_price_subs.clone();
454        let ws = self.ws_client_mut()?;
455        ws.cache_instruments(&all_instruments);
456        ws.set_option_greeks_subs(option_greeks_subs);
457        ws.set_mark_price_subs(mark_price_subs);
458        ws.set_index_price_subs(index_price_subs);
459
460        // Connect WebSocket and wait until active
461        ws.connect().await.context("failed to connect WebSocket")?;
462        ws.wait_until_active(10.0)
463            .await
464            .context("WebSocket failed to become active")?;
465
466        // Authenticate if credentials are configured (required for raw streams)
467        if ws.has_credentials() {
468            ws.authenticate_session(DERIBIT_DATA_SESSION_NAME)
469                .await
470                .context("failed to authenticate WebSocket")?;
471            log_info!("WebSocket authenticated");
472        }
473
474        // Get the stream and spawn processing task
475        let stream = self.ws_client_mut()?.stream()?;
476        self.spawn_stream_task(stream);
477
478        self.is_connected.store(true, Ordering::Release);
479        log_info!("Connected ({})", self.config.environment);
480        Ok(())
481    }
482
483    async fn disconnect(&mut self) -> anyhow::Result<()> {
484        if self.is_disconnected() {
485            return Ok(());
486        }
487
488        // Cancel all tasks
489        self.cancellation_token.cancel();
490
491        // Close WebSocket connection
492        if let Some(ws) = self.ws_client.as_ref()
493            && let Err(e) = ws.close().await
494        {
495            log::warn!("Error while closing WebSocket: {e:?}");
496        }
497
498        // Wait for all tasks to complete
499        for handle in self.tasks.drain(..) {
500            if let Err(e) = handle.await {
501                log::error!("Error joining WebSocket task: {e:?}");
502            }
503        }
504
505        // Reset cancellation token for potential reconnection
506        self.cancellation_token = CancellationToken::new();
507        self.is_connected.store(false, Ordering::Relaxed);
508
509        log_info!("Disconnected");
510        Ok(())
511    }
512
513    fn subscribe_instruments(&mut self, cmd: SubscribeInstruments) -> anyhow::Result<()> {
514        // Extract kind and currency from params, defaulting to "any.any" (all instruments)
515        let kind = cmd
516            .params
517            .as_ref()
518            .and_then(|p| p.get_str("kind"))
519            .unwrap_or("any")
520            .to_string();
521        let currency = cmd
522            .params
523            .as_ref()
524            .and_then(|p| p.get_str("currency"))
525            .unwrap_or("any")
526            .to_string();
527
528        let ws = self
529            .ws_client
530            .as_ref()
531            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
532            .clone();
533
534        log::debug!("Subscribing to instrument state changes for {kind}.{currency}");
535
536        get_runtime().spawn(async move {
537            if let Err(e) = ws.subscribe_instrument_status(&kind, &currency).await {
538                log::error!("Failed to subscribe to instrument status for {kind}.{currency}: {e}");
539            }
540        });
541
542        Ok(())
543    }
544
545    fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
546        let instrument_id = cmd.instrument_id;
547
548        // Check if instrument is in cache (should be from connect())
549        if !self.instruments.contains_key(&instrument_id) {
550            log::warn!(
551                "Instrument {instrument_id} not in cache - it may have been created after connect()"
552            );
553        }
554
555        // Determine kind and currency from instrument_id
556        let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
557
558        let ws = self
559            .ws_client
560            .as_ref()
561            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
562            .clone();
563
564        log::debug!(
565            "Subscribing to instrument state for {instrument_id} (channel: {kind}.{currency})"
566        );
567
568        // Subscribe to broader kind/currency channel (filter in handler)
569        get_runtime().spawn(async move {
570            if let Err(e) = ws.subscribe_instrument_status(&kind, &currency).await {
571                log::error!("Failed to subscribe to instrument status for {instrument_id}: {e}");
572            }
573        });
574
575        Ok(())
576    }
577
578    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
579        if cmd.book_type != BookType::L2_MBP {
580            anyhow::bail!("Deribit only supports L2_MBP order book deltas");
581        }
582
583        let ws = self
584            .ws_client
585            .as_ref()
586            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
587            .clone();
588        let instrument_id = cmd.instrument_id;
589        let interval = self.get_interval(&cmd.params);
590
591        let depth = cmd
592            .depth
593            .map(|d| d.get() as u32)
594            .or_else(|| {
595                cmd.params
596                    .as_ref()
597                    .and_then(|p| p.get_u64("depth"))
598                    .map(|n| n as u32)
599            })
600            .unwrap_or(DERIBIT_BOOK_DEFAULT_DEPTH);
601
602        if !DERIBIT_BOOK_VALID_DEPTHS.contains(&depth) {
603            anyhow::bail!("invalid depth {depth}; supported depths: {DERIBIT_BOOK_VALID_DEPTHS:?}");
604        }
605
606        let group = cmd
607            .params
608            .as_ref()
609            .and_then(|p| p.get_str("group"))
610            .unwrap_or(DERIBIT_BOOK_DEFAULT_GROUP)
611            .to_string();
612
613        log::debug!(
614            "Subscribing to book deltas for {} (group: {}, depth: {}, interval: {}, book_type: {:?})",
615            instrument_id,
616            group,
617            depth,
618            interval.map_or("100ms (default)".to_string(), |i| i.to_string()),
619            cmd.book_type
620        );
621
622        get_runtime().spawn(async move {
623            let result = if interval == Some(DeribitUpdateInterval::Raw) {
624                ws.subscribe_book(instrument_id, interval).await
625            } else {
626                ws.subscribe_book_grouped(instrument_id, &group, depth, interval)
627                    .await
628            };
629
630            if let Err(e) = result {
631                log::error!("Failed to subscribe to book deltas for {instrument_id}: {e}");
632            }
633        });
634
635        Ok(())
636    }
637
638    fn subscribe_book_depth10(&mut self, cmd: SubscribeBookDepth10) -> anyhow::Result<()> {
639        if cmd.book_type != BookType::L2_MBP {
640            anyhow::bail!("Deribit only supports L2_MBP order book depth");
641        }
642
643        let ws = self
644            .ws_client
645            .as_ref()
646            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
647            .clone();
648        let instrument_id = cmd.instrument_id;
649        let interval = self.get_interval(&cmd.params);
650        let group = cmd
651            .params
652            .as_ref()
653            .and_then(|p| p.get_str("group"))
654            .unwrap_or(DERIBIT_BOOK_DEFAULT_GROUP)
655            .to_string();
656
657        log::debug!(
658            "Subscribing to book depth10 for {} (group: {}, interval: {}, book_type: {:?})",
659            instrument_id,
660            group,
661            interval.map_or("100ms (default)".to_string(), |i| i.to_string()),
662            cmd.book_type
663        );
664
665        get_runtime().spawn(async move {
666            if let Err(e) = ws
667                .subscribe_book_grouped(instrument_id, &group, 10, interval)
668                .await
669            {
670                log::error!("Failed to subscribe to book depth10 for {instrument_id}: {e}");
671            }
672        });
673
674        Ok(())
675    }
676
677    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
678        let ws = self
679            .ws_client
680            .as_ref()
681            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
682            .clone();
683        let instrument_id = cmd.instrument_id;
684
685        log::debug!("Subscribing to quotes for {instrument_id}");
686
687        get_runtime().spawn(async move {
688            if let Err(e) = ws.subscribe_quotes(instrument_id).await {
689                log::error!("Failed to subscribe to quotes for {instrument_id}: {e}");
690            }
691        });
692
693        Ok(())
694    }
695
696    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
697        let ws = self
698            .ws_client
699            .as_ref()
700            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
701            .clone();
702        let instrument_id = cmd.instrument_id;
703        let interval = self.get_interval(&cmd.params);
704
705        log::debug!(
706            "Subscribing to trades for {} (interval: {})",
707            instrument_id,
708            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
709        );
710
711        get_runtime().spawn(async move {
712            if let Err(e) = ws.subscribe_trades(instrument_id, interval).await {
713                log::error!("Failed to subscribe to trades for {instrument_id}: {e}");
714            }
715        });
716
717        Ok(())
718    }
719
720    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
721        let ws = self
722            .ws_client
723            .as_ref()
724            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
725            .clone();
726        let instrument_id = cmd.instrument_id;
727        let interval = self.get_interval(&cmd.params);
728
729        // Track subscription so handler gates MarkPriceUpdate emission
730        self.mark_price_subs.insert(instrument_id);
731
732        log::debug!(
733            "Subscribing to mark prices for {} (via ticker channel, interval: {})",
734            instrument_id,
735            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
736        );
737
738        get_runtime().spawn(async move {
739            if let Err(e) = ws.subscribe_ticker(instrument_id, interval).await {
740                log::error!("Failed to subscribe to mark prices for {instrument_id}: {e}");
741            }
742        });
743
744        Ok(())
745    }
746
747    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
748        let ws = self
749            .ws_client
750            .as_ref()
751            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
752            .clone();
753        let instrument_id = cmd.instrument_id;
754        let interval = self.get_interval(&cmd.params);
755
756        // Track subscription so handler gates IndexPriceUpdate emission
757        self.index_price_subs.insert(instrument_id);
758
759        log::debug!(
760            "Subscribing to index prices for {} (via ticker channel, interval: {})",
761            instrument_id,
762            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
763        );
764
765        get_runtime().spawn(async move {
766            if let Err(e) = ws.subscribe_ticker(instrument_id, interval).await {
767                log::error!("Failed to subscribe to index prices for {instrument_id}: {e}");
768            }
769        });
770
771        Ok(())
772    }
773
774    fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
775        let ws = self
776            .ws_client
777            .as_ref()
778            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
779            .clone();
780        let instrument_id = cmd.bar_type.instrument_id();
781        let resolution = bar_spec_to_resolution(&cmd.bar_type);
782
783        get_runtime().spawn(async move {
784            if let Err(e) = ws.subscribe_chart(instrument_id, &resolution).await {
785                log::error!("Failed to subscribe to bars for {instrument_id}: {e}");
786            }
787        });
788
789        Ok(())
790    }
791
792    fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
793        let instrument_id = cmd.instrument_id;
794
795        // Validate instrument is a perpetual - funding rates only apply to perpetual contracts
796        let is_perpetual = self
797            .instruments
798            .load()
799            .get(&instrument_id)
800            .is_some_and(|inst| matches!(inst, InstrumentAny::CryptoPerpetual(_)));
801
802        if !is_perpetual {
803            log::warn!(
804                "Funding rates subscription rejected for {instrument_id}: only available for perpetual instruments"
805            );
806            return Ok(());
807        }
808
809        let ws = self
810            .ws_client
811            .as_ref()
812            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
813            .clone();
814        let interval = self.get_interval(&cmd.params);
815
816        log::debug!(
817            "Subscribing to funding rates for {} (perpetual channel, interval: {})",
818            instrument_id,
819            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
820        );
821
822        get_runtime().spawn(async move {
823            if let Err(e) = ws
824                .subscribe_perpetual_interests_rates_updates(instrument_id, interval)
825                .await
826            {
827                log::error!("Failed to subscribe to funding rates for {instrument_id}: {e}");
828            }
829        });
830
831        Ok(())
832    }
833
834    fn subscribe_instrument_status(
835        &mut self,
836        cmd: SubscribeInstrumentStatus,
837    ) -> anyhow::Result<()> {
838        let instrument_id = cmd.instrument_id;
839        let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
840
841        let ws = self
842            .ws_client
843            .as_ref()
844            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
845            .clone();
846
847        log::info!("Subscribing to instrument status for {instrument_id} ({kind}.{currency})");
848
849        get_runtime().spawn(async move {
850            if let Err(e) = ws.subscribe_instrument_status(&kind, &currency).await {
851                log::error!("Failed to subscribe to instrument status for {instrument_id}: {e}");
852            }
853        });
854
855        Ok(())
856    }
857
858    fn subscribe_option_greeks(&mut self, cmd: SubscribeOptionGreeks) -> anyhow::Result<()> {
859        let ws = self
860            .ws_client
861            .as_ref()
862            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
863            .clone();
864        let instrument_id = cmd.instrument_id;
865        let interval = self.get_interval(&cmd.params);
866
867        // Track subscription so handler gates OptionGreeks emission
868        self.option_greeks_subs.insert(instrument_id);
869
870        log::debug!(
871            "Subscribing to option greeks for {} (via ticker channel, interval: {})",
872            instrument_id,
873            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
874        );
875
876        get_runtime().spawn(async move {
877            if let Err(e) = ws.subscribe_ticker(instrument_id, interval).await {
878                log::error!("Failed to subscribe to option greeks for {instrument_id}: {e}");
879            }
880        });
881
882        Ok(())
883    }
884
885    fn unsubscribe_instrument_status(
886        &mut self,
887        cmd: &UnsubscribeInstrumentStatus,
888    ) -> anyhow::Result<()> {
889        let instrument_id = cmd.instrument_id;
890        let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
891
892        let ws = self
893            .ws_client
894            .as_ref()
895            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
896            .clone();
897
898        log::info!("Unsubscribing from instrument status for {instrument_id} ({kind}.{currency})");
899
900        get_runtime().spawn(async move {
901            if let Err(e) = ws.unsubscribe_instrument_status(&kind, &currency).await {
902                log::error!(
903                    "Failed to unsubscribe from instrument status for {instrument_id}: {e}"
904                );
905            }
906        });
907
908        Ok(())
909    }
910
911    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
912        let kind = cmd
913            .params
914            .as_ref()
915            .and_then(|p| p.get_str("kind"))
916            .unwrap_or("any")
917            .to_string();
918        let currency = cmd
919            .params
920            .as_ref()
921            .and_then(|p| p.get_str("currency"))
922            .unwrap_or("any")
923            .to_string();
924
925        let ws = self
926            .ws_client
927            .as_ref()
928            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
929            .clone();
930
931        log::debug!("Unsubscribing from instrument state changes for {kind}.{currency}");
932
933        get_runtime().spawn(async move {
934            if let Err(e) = ws.unsubscribe_instrument_status(&kind, &currency).await {
935                log::error!(
936                    "Failed to unsubscribe from instrument status for {kind}.{currency}: {e}"
937                );
938            }
939        });
940
941        Ok(())
942    }
943
944    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
945        let instrument_id = cmd.instrument_id;
946
947        // Determine kind and currency from instrument_id
948        let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
949
950        let ws = self
951            .ws_client
952            .as_ref()
953            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
954            .clone();
955
956        log::debug!(
957            "Unsubscribing from instrument state for {instrument_id} (channel: {kind}.{currency})"
958        );
959
960        get_runtime().spawn(async move {
961            if let Err(e) = ws.unsubscribe_instrument_status(&kind, &currency).await {
962                log::error!(
963                    "Failed to unsubscribe from instrument status for {instrument_id}: {e}"
964                );
965            }
966        });
967
968        Ok(())
969    }
970
971    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
972        let ws = self
973            .ws_client
974            .as_ref()
975            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
976            .clone();
977        let instrument_id = cmd.instrument_id;
978        let interval = self.get_interval(&cmd.params);
979
980        let depth = cmd
981            .params
982            .as_ref()
983            .and_then(|p| p.get_u64("depth"))
984            .map_or(DERIBIT_BOOK_DEFAULT_DEPTH, |n| n as u32);
985
986        if !DERIBIT_BOOK_VALID_DEPTHS.contains(&depth) {
987            anyhow::bail!("invalid depth {depth}; supported depths: {DERIBIT_BOOK_VALID_DEPTHS:?}");
988        }
989
990        let group = cmd
991            .params
992            .as_ref()
993            .and_then(|p| p.get_str("group"))
994            .unwrap_or(DERIBIT_BOOK_DEFAULT_GROUP)
995            .to_string();
996
997        log::debug!(
998            "Unsubscribing from book deltas for {} (group: {}, depth: {}, interval: {})",
999            instrument_id,
1000            group,
1001            depth,
1002            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1003        );
1004
1005        get_runtime().spawn(async move {
1006            let result = if interval == Some(DeribitUpdateInterval::Raw) {
1007                ws.unsubscribe_book(instrument_id, interval).await
1008            } else {
1009                ws.unsubscribe_book_grouped(instrument_id, &group, depth, interval)
1010                    .await
1011            };
1012
1013            if let Err(e) = result {
1014                log::error!("Failed to unsubscribe from book deltas for {instrument_id}: {e}");
1015            }
1016        });
1017
1018        Ok(())
1019    }
1020
1021    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1022        let ws = self
1023            .ws_client
1024            .as_ref()
1025            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1026            .clone();
1027        let instrument_id = cmd.instrument_id;
1028        let interval = self.get_interval(&cmd.params);
1029        let group = cmd
1030            .params
1031            .as_ref()
1032            .and_then(|p| p.get_str("group"))
1033            .unwrap_or(DERIBIT_BOOK_DEFAULT_GROUP)
1034            .to_string();
1035
1036        log::debug!(
1037            "Unsubscribing from book depth10 for {} (group: {}, interval: {})",
1038            instrument_id,
1039            group,
1040            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1041        );
1042
1043        get_runtime().spawn(async move {
1044            if let Err(e) = ws
1045                .unsubscribe_book_grouped(instrument_id, &group, 10, interval)
1046                .await
1047            {
1048                log::error!("Failed to unsubscribe from book depth10 for {instrument_id}: {e}");
1049            }
1050        });
1051
1052        Ok(())
1053    }
1054
1055    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1056        let ws = self
1057            .ws_client
1058            .as_ref()
1059            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1060            .clone();
1061        let instrument_id = cmd.instrument_id;
1062
1063        log::debug!("Unsubscribing from quotes for {instrument_id}");
1064
1065        get_runtime().spawn(async move {
1066            if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
1067                log::error!("Failed to unsubscribe from quotes for {instrument_id}: {e}");
1068            }
1069        });
1070
1071        Ok(())
1072    }
1073
1074    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1075        let ws = self
1076            .ws_client
1077            .as_ref()
1078            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1079            .clone();
1080        let instrument_id = cmd.instrument_id;
1081        let interval = self.get_interval(&cmd.params);
1082
1083        log::debug!(
1084            "Unsubscribing from trades for {} (interval: {})",
1085            instrument_id,
1086            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1087        );
1088
1089        get_runtime().spawn(async move {
1090            if let Err(e) = ws.unsubscribe_trades(instrument_id, interval).await {
1091                log::error!("Failed to unsubscribe from trades for {instrument_id}: {e}");
1092            }
1093        });
1094
1095        Ok(())
1096    }
1097
1098    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1099        let ws = self
1100            .ws_client
1101            .as_ref()
1102            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1103            .clone();
1104        let instrument_id = cmd.instrument_id;
1105        let interval = self.get_interval(&cmd.params);
1106
1107        // Remove subscription tracking so handler stops emitting MarkPriceUpdate
1108        self.mark_price_subs.remove(&instrument_id);
1109
1110        log::debug!(
1111            "Unsubscribing from mark prices for {} (via ticker channel, interval: {})",
1112            instrument_id,
1113            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1114        );
1115
1116        get_runtime().spawn(async move {
1117            if let Err(e) = ws.unsubscribe_ticker(instrument_id, interval).await {
1118                log::error!("Failed to unsubscribe from mark prices for {instrument_id}: {e}");
1119            }
1120        });
1121
1122        Ok(())
1123    }
1124
1125    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1126        let ws = self
1127            .ws_client
1128            .as_ref()
1129            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1130            .clone();
1131        let instrument_id = cmd.instrument_id;
1132        let interval = self.get_interval(&cmd.params);
1133
1134        // Remove subscription tracking so handler stops emitting IndexPriceUpdate
1135        self.index_price_subs.remove(&instrument_id);
1136
1137        log::debug!(
1138            "Unsubscribing from index prices for {} (via ticker channel, interval: {})",
1139            instrument_id,
1140            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1141        );
1142
1143        get_runtime().spawn(async move {
1144            if let Err(e) = ws.unsubscribe_ticker(instrument_id, interval).await {
1145                log::error!("Failed to unsubscribe from index prices for {instrument_id}: {e}");
1146            }
1147        });
1148
1149        Ok(())
1150    }
1151
1152    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1153        let ws = self
1154            .ws_client
1155            .as_ref()
1156            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1157            .clone();
1158        let instrument_id = cmd.bar_type.instrument_id();
1159        let resolution = bar_spec_to_resolution(&cmd.bar_type);
1160
1161        get_runtime().spawn(async move {
1162            if let Err(e) = ws.unsubscribe_chart(instrument_id, &resolution).await {
1163                log::error!("Failed to unsubscribe from bars for {instrument_id}: {e}");
1164            }
1165        });
1166
1167        Ok(())
1168    }
1169
1170    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1171        let instrument_id = cmd.instrument_id;
1172
1173        // Validate instrument is a perpetual - funding rates only apply to perpetual contracts
1174        let is_perpetual = self
1175            .instruments
1176            .load()
1177            .get(&instrument_id)
1178            .is_some_and(|inst| matches!(inst, InstrumentAny::CryptoPerpetual(_)));
1179
1180        if !is_perpetual {
1181            log::warn!(
1182                "Funding rates unsubscription rejected for {instrument_id}: only available for perpetual instruments"
1183            );
1184            return Ok(());
1185        }
1186
1187        let ws = self
1188            .ws_client
1189            .as_ref()
1190            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1191            .clone();
1192        let interval = self.get_interval(&cmd.params);
1193
1194        log::debug!(
1195            "Unsubscribing from funding rates for {} (perpetual channel, interval: {})",
1196            instrument_id,
1197            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1198        );
1199
1200        get_runtime().spawn(async move {
1201            if let Err(e) = ws
1202                .unsubscribe_perpetual_interest_rates_updates(instrument_id, interval)
1203                .await
1204            {
1205                log::error!("Failed to unsubscribe from funding rates for {instrument_id}: {e}");
1206            }
1207        });
1208
1209        Ok(())
1210    }
1211
1212    fn unsubscribe_option_greeks(&mut self, cmd: &UnsubscribeOptionGreeks) -> anyhow::Result<()> {
1213        let ws = self
1214            .ws_client
1215            .as_ref()
1216            .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1217            .clone();
1218        let instrument_id = cmd.instrument_id;
1219        let interval = self.get_interval(&cmd.params);
1220
1221        // Remove subscription tracking so handler stops emitting OptionGreeks
1222        self.option_greeks_subs.remove(&instrument_id);
1223
1224        log::debug!(
1225            "Unsubscribing from option greeks for {} (via ticker channel, interval: {})",
1226            instrument_id,
1227            interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1228        );
1229
1230        get_runtime().spawn(async move {
1231            if let Err(e) = ws.unsubscribe_ticker(instrument_id, interval).await {
1232                log::error!("Failed to unsubscribe from option greeks for {instrument_id}: {e}");
1233            }
1234        });
1235
1236        Ok(())
1237    }
1238
1239    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1240        if request.start.is_some() {
1241            log::warn!(
1242                "Requesting instruments for {:?} with specified `start` which has no effect",
1243                request.venue
1244            );
1245        }
1246
1247        if request.end.is_some() {
1248            log::warn!(
1249                "Requesting instruments for {:?} with specified `end` which has no effect",
1250                request.venue
1251            );
1252        }
1253
1254        let http_client = self.http_client.clone();
1255        let ws_client = self.ws_client.clone();
1256        let instruments_cache = Arc::clone(&self.instruments);
1257        let sender = self.data_sender.clone();
1258        let request_id = request.request_id;
1259        let client_id = request.client_id.unwrap_or(self.client_id);
1260        let start_nanos = datetime_to_unix_nanos(request.start);
1261        let end_nanos = datetime_to_unix_nanos(request.end);
1262        let params = request.params;
1263        let clock = self.clock;
1264        let venue = *DERIBIT_VENUE;
1265
1266        // Get product types from config, default to Future if empty
1267        let product_types = if self.config.product_types.is_empty() {
1268            vec![crate::http::models::DeribitProductType::Future]
1269        } else {
1270            self.config.product_types.clone()
1271        };
1272
1273        get_runtime().spawn(async move {
1274            let mut all_instruments = Vec::new();
1275
1276            for product_type in &product_types {
1277                log::debug!(
1278                    "Requesting instruments for currency=ANY, product_type={product_type:?}"
1279                );
1280
1281                match http_client
1282                    .request_instruments(DeribitCurrency::ANY, Some(*product_type))
1283                    .await
1284                {
1285                    Ok(instruments) => {
1286                        log::info!(
1287                            "Fetched {} instruments for ANY/{:?}",
1288                            instruments.len(),
1289                            product_type
1290                        );
1291
1292                        instruments_cache.rcu(|m| {
1293                            for instrument in &instruments {
1294                                m.insert(instrument.id(), instrument.clone());
1295                            }
1296                        });
1297                        all_instruments.extend(instruments);
1298                    }
1299                    Err(e) => {
1300                        log::error!("Failed to fetch instruments for ANY/{product_type:?}: {e:?}");
1301                    }
1302                }
1303            }
1304
1305            // Propagate to HTTP and WebSocket caches so downstream
1306            // requests use correct precisions.
1307            if !all_instruments.is_empty() {
1308                http_client.cache_instruments(&all_instruments);
1309
1310                if let Some(ws) = &ws_client {
1311                    ws.cache_instruments(&all_instruments);
1312                }
1313            }
1314
1315            // Send response with all collected instruments
1316            let response = DataResponse::Instruments(InstrumentsResponse::new(
1317                request_id,
1318                client_id,
1319                venue,
1320                all_instruments,
1321                start_nanos,
1322                end_nanos,
1323                clock.get_time_ns(),
1324                params,
1325            ));
1326
1327            if let Err(e) = sender.send(DataEvent::Response(response)) {
1328                log::error!("Failed to send instruments response: {e}");
1329            }
1330        });
1331
1332        Ok(())
1333    }
1334
1335    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1336        if request.start.is_some() {
1337            log::warn!(
1338                "Requesting instrument {} with specified `start` which has no effect",
1339                request.instrument_id
1340            );
1341        }
1342
1343        if request.end.is_some() {
1344            log::warn!(
1345                "Requesting instrument {} with specified `end` which has no effect",
1346                request.instrument_id
1347            );
1348        }
1349
1350        // First, check if instrument exists in cache
1351        if let Some(instrument) = self.instruments.get_cloned(&request.instrument_id) {
1352            let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1353                request.request_id,
1354                request.client_id.unwrap_or(self.client_id),
1355                instrument.id(),
1356                instrument,
1357                datetime_to_unix_nanos(request.start),
1358                datetime_to_unix_nanos(request.end),
1359                self.clock.get_time_ns(),
1360                request.params,
1361            )));
1362
1363            if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
1364                log::error!("Failed to send instrument response: {e}");
1365            }
1366            return Ok(());
1367        }
1368
1369        log::debug!(
1370            "Instrument {} not in cache, fetching from API",
1371            request.instrument_id
1372        );
1373
1374        let http_client = self.http_client.clone();
1375        let ws_client = self.ws_client.clone();
1376        let instruments_cache = Arc::clone(&self.instruments);
1377        let sender = self.data_sender.clone();
1378        let instrument_id = request.instrument_id;
1379        let request_id = request.request_id;
1380        let client_id = request.client_id.unwrap_or(self.client_id);
1381        let start_nanos = datetime_to_unix_nanos(request.start);
1382        let end_nanos = datetime_to_unix_nanos(request.end);
1383        let params = request.params;
1384        let clock = self.clock;
1385
1386        get_runtime().spawn(async move {
1387            match http_client
1388                .request_instrument(instrument_id)
1389                .await
1390                .context("failed to request instrument from Deribit")
1391            {
1392                Ok(instrument) => {
1393                    log::info!("Successfully fetched instrument: {instrument_id}");
1394
1395                    instruments_cache.insert(instrument.id(), instrument.clone());
1396                    http_client.cache_instruments(std::slice::from_ref(&instrument));
1397
1398                    if let Some(ws) = &ws_client {
1399                        ws.cache_instruments(std::slice::from_ref(&instrument));
1400                    }
1401
1402                    // Send response
1403                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1404                        request_id,
1405                        client_id,
1406                        instrument.id(),
1407                        instrument,
1408                        start_nanos,
1409                        end_nanos,
1410                        clock.get_time_ns(),
1411                        params,
1412                    )));
1413
1414                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1415                        log::error!("Failed to send instrument response: {e}");
1416                    }
1417                }
1418                Err(e) => {
1419                    log::error!("Instrument request failed for {instrument_id}: {e:?}");
1420                }
1421            }
1422        });
1423
1424        Ok(())
1425    }
1426
1427    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1428        let http_client = self.http_client.clone();
1429        let sender = self.data_sender.clone();
1430        let instrument_id = request.instrument_id;
1431        let start = request.start;
1432        let end = request.end;
1433        let limit = request.limit.map(|n| n.get() as u32);
1434        let request_id = request.request_id;
1435        let client_id = request.client_id.unwrap_or(self.client_id);
1436        let params = request.params;
1437        let clock = self.clock;
1438        let start_nanos = datetime_to_unix_nanos(start);
1439        let end_nanos = datetime_to_unix_nanos(end);
1440
1441        get_runtime().spawn(async move {
1442            match http_client
1443                .request_trades(instrument_id, start, end, limit)
1444                .await
1445                .context("failed to request trades from Deribit")
1446            {
1447                Ok(trades) => {
1448                    let response = DataResponse::Trades(TradesResponse::new(
1449                        request_id,
1450                        client_id,
1451                        instrument_id,
1452                        trades,
1453                        start_nanos,
1454                        end_nanos,
1455                        clock.get_time_ns(),
1456                        params,
1457                    ));
1458
1459                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1460                        log::error!("Failed to send trades response: {e}");
1461                    }
1462                }
1463                Err(e) => log::error!("Trades request failed for {instrument_id}: {e:?}"),
1464            }
1465        });
1466
1467        Ok(())
1468    }
1469
1470    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1471        let http_client = self.http_client.clone();
1472        let sender = self.data_sender.clone();
1473        let bar_type = request.bar_type;
1474        let start = request.start;
1475        let end = request.end;
1476        let limit = request.limit.map(|n| n.get() as u32);
1477        let request_id = request.request_id;
1478        let client_id = request.client_id.unwrap_or(self.client_id);
1479        let params = request.params;
1480        let clock = self.clock;
1481        let start_nanos = datetime_to_unix_nanos(start);
1482        let end_nanos = datetime_to_unix_nanos(end);
1483
1484        get_runtime().spawn(async move {
1485            match http_client
1486                .request_bars(bar_type, start, end, limit)
1487                .await
1488                .context("failed to request bars from Deribit")
1489            {
1490                Ok(bars) => {
1491                    let response = DataResponse::Bars(BarsResponse::new(
1492                        request_id,
1493                        client_id,
1494                        bar_type,
1495                        bars,
1496                        start_nanos,
1497                        end_nanos,
1498                        clock.get_time_ns(),
1499                        params,
1500                    ));
1501
1502                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1503                        log::error!("Failed to send bars response: {e}");
1504                    }
1505                }
1506                Err(e) => log::error!("Bars request failed for {bar_type}: {e:?}"),
1507            }
1508        });
1509
1510        Ok(())
1511    }
1512
1513    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1514        let http_client = self.http_client.clone();
1515        let sender = self.data_sender.clone();
1516        let instrument_id = request.instrument_id;
1517        let depth = request.depth.map(|n| n.get() as u32);
1518        let request_id = request.request_id;
1519        let client_id = request.client_id.unwrap_or(self.client_id);
1520        let params = request.params;
1521        let clock = self.clock;
1522
1523        get_runtime().spawn(async move {
1524            match http_client
1525                .request_book_snapshot(instrument_id, depth)
1526                .await
1527                .context("failed to request book snapshot from Deribit")
1528            {
1529                Ok(book) => {
1530                    let response = DataResponse::Book(BookResponse::new(
1531                        request_id,
1532                        client_id,
1533                        instrument_id,
1534                        book,
1535                        None,
1536                        None,
1537                        clock.get_time_ns(),
1538                        params,
1539                    ));
1540
1541                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1542                        log::error!("Failed to send book snapshot response: {e}");
1543                    }
1544                }
1545                Err(e) => {
1546                    log::error!("Book snapshot request failed for {instrument_id}: {e:?}");
1547                }
1548            }
1549        });
1550
1551        Ok(())
1552    }
1553
1554    fn request_forward_prices(&self, request: RequestForwardPrices) -> anyhow::Result<()> {
1555        let currency = request.underlying.to_string();
1556        let instrument_id = request.instrument_id;
1557        let http_client = self.http_client.clone();
1558        let sender = self.data_sender.clone();
1559        let request_id = request.request_id;
1560        let client_id = request.client_id.unwrap_or(self.client_id());
1561        let params = request.params;
1562        let clock = self.clock;
1563        let venue = *DERIBIT_VENUE;
1564
1565        get_runtime().spawn(async move {
1566            let result = if let Some(inst_id) = instrument_id {
1567                // Single-instrument path: 1 HTTP call to public/ticker
1568                let instrument_name = inst_id.symbol.to_string();
1569                log::info!(
1570                    "Requesting forward price for {currency} (single instrument: {instrument_name})"
1571                );
1572
1573                match http_client.request_ticker(&instrument_name).await {
1574                    Ok(ticker) => {
1575                        let ts = clock.get_time_ns();
1576                        let forward_prices: Vec<ForwardPrice> = ticker
1577                            .underlying_price
1578                            .map(|up| {
1579                                vec![ForwardPrice::new(
1580                                    inst_id,
1581                                    up,
1582                                    ticker.underlying_index.filter(|s| !s.is_empty()),
1583                                    ts,
1584                                    ts,
1585                                )]
1586                            })
1587                            .unwrap_or_default();
1588
1589                        log::info!(
1590                            "Fetched {} forward price for {currency} (single instrument: {instrument_name})",
1591                            forward_prices.len(),
1592                        );
1593                        Ok((forward_prices, ts))
1594                    }
1595                    Err(e) => Err(e),
1596                }
1597            } else {
1598                // Bulk path: fetch all book summaries
1599                log::info!("Requesting option forward prices for currency={currency} (bulk)");
1600
1601                match http_client.request_book_summaries(&currency).await {
1602                    Ok(summaries) => {
1603                        let ts = clock.get_time_ns();
1604
1605                        // Deduplicate: all options at the same expiry share the same
1606                        // forward price, so keep only one entry per underlying_index.
1607                        let mut seen_indices = std::collections::HashSet::new();
1608                        let forward_prices: Vec<ForwardPrice> = summaries
1609                            .into_iter()
1610                            .filter_map(|s| {
1611                                let up = s.underlying_price?;
1612                                let idx = s.underlying_index.clone().unwrap_or_default();
1613                                if !seen_indices.insert(idx.clone()) {
1614                                    return None;
1615                                }
1616                                Some(ForwardPrice::new(
1617                                    InstrumentId::new(
1618                                        Symbol::new(&s.instrument_name),
1619                                        *DERIBIT_VENUE,
1620                                    ),
1621                                    up,
1622                                    Some(idx).filter(|s| !s.is_empty()),
1623                                    ts,
1624                                    ts,
1625                                ))
1626                            })
1627                            .collect();
1628
1629                        log::info!(
1630                            "Fetched {} forward prices (per-expiry) for {currency}",
1631                            forward_prices.len(),
1632                        );
1633                        Ok((forward_prices, ts))
1634                    }
1635                    Err(e) => Err(e),
1636                }
1637            };
1638
1639            match result {
1640                Ok((forward_prices, ts)) => {
1641                    let response = DataResponse::ForwardPrices(ForwardPricesResponse::new(
1642                        request_id,
1643                        client_id,
1644                        venue,
1645                        forward_prices,
1646                        ts,
1647                        params,
1648                    ));
1649
1650                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1651                        log::error!("Failed to send forward prices response: {e}");
1652                    }
1653                }
1654                Err(e) => {
1655                    log::error!("Forward prices request failed for {currency}: {e:?}");
1656                }
1657            }
1658        });
1659
1660        Ok(())
1661    }
1662}