1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use async_trait::async_trait;
26use futures_util::StreamExt;
27use nautilus_common::{
28 clients::DataClient,
29 live::{runner::get_data_event_sender, runtime::get_runtime},
30 log_info,
31 messages::{
32 DataEvent, DataResponse,
33 data::{
34 BarsResponse, BookResponse, ForwardPricesResponse, InstrumentResponse,
35 InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestForwardPrices,
36 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
37 SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates, SubscribeIndexPrices,
38 SubscribeInstrument, SubscribeInstrumentStatus, SubscribeInstruments,
39 SubscribeMarkPrices, SubscribeOptionGreeks, SubscribeQuotes, SubscribeTrades,
40 TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
41 UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrument,
42 UnsubscribeInstrumentStatus, UnsubscribeInstruments, UnsubscribeMarkPrices,
43 UnsubscribeOptionGreeks, UnsubscribeQuotes, UnsubscribeTrades,
44 },
45 },
46};
47use nautilus_core::{
48 AtomicMap, AtomicSet, Params,
49 datetime::datetime_to_unix_nanos,
50 time::{AtomicTime, get_atomic_clock_realtime},
51};
52use nautilus_model::{
53 data::{Data, ForwardPrice, OrderBookDeltas_API},
54 enums::BookType,
55 identifiers::{ClientId, InstrumentId, Symbol, Venue},
56 instruments::{Instrument, InstrumentAny},
57};
58use tokio::task::JoinHandle;
59use tokio_util::sync::CancellationToken;
60
61use crate::{
62 common::{
63 consts::{
64 DERIBIT_BOOK_DEFAULT_DEPTH, DERIBIT_BOOK_DEFAULT_GROUP, DERIBIT_BOOK_VALID_DEPTHS,
65 DERIBIT_VENUE,
66 },
67 parse::{bar_spec_to_resolution, parse_instrument_kind_currency},
68 },
69 config::DeribitDataClientConfig,
70 http::{
71 client::DeribitHttpClient,
72 models::{DeribitCurrency, DeribitProductType},
73 },
74 websocket::{
75 auth::DERIBIT_DATA_SESSION_NAME, client::DeribitWebSocketClient,
76 enums::DeribitUpdateInterval, messages::NautilusWsMessage,
77 },
78};
79
80#[derive(Debug)]
82pub struct DeribitDataClient {
83 client_id: ClientId,
84 config: DeribitDataClientConfig,
85 http_client: DeribitHttpClient,
86 ws_client: Option<DeribitWebSocketClient>,
87 is_connected: AtomicBool,
88 cancellation_token: CancellationToken,
89 tasks: Vec<JoinHandle<()>>,
90 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
91 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
92 option_greeks_subs: Arc<AtomicSet<InstrumentId>>,
93 mark_price_subs: Arc<AtomicSet<InstrumentId>>,
94 index_price_subs: Arc<AtomicSet<InstrumentId>>,
95 clock: &'static AtomicTime,
96}
97
98impl DeribitDataClient {
99 pub fn new(client_id: ClientId, config: DeribitDataClientConfig) -> anyhow::Result<Self> {
105 let clock = get_atomic_clock_realtime();
106 let data_sender = get_data_event_sender();
107
108 let http_client = if config.has_api_credentials() {
109 DeribitHttpClient::new_with_env(
110 config.api_key.clone(),
111 config.api_secret.clone(),
112 config.base_url_http.clone(),
113 config.environment,
114 config.http_timeout_secs,
115 config.max_retries,
116 config.retry_delay_initial_ms,
117 config.retry_delay_max_ms,
118 config.proxy_url.clone(),
119 )?
120 } else {
121 DeribitHttpClient::new(
122 config.base_url_http.clone(),
123 config.environment,
124 config.http_timeout_secs,
125 config.max_retries,
126 config.retry_delay_initial_ms,
127 config.retry_delay_max_ms,
128 config.proxy_url.clone(),
129 )?
130 };
131
132 let ws_client = DeribitWebSocketClient::new(
133 Some(config.ws_url()),
134 config.api_key.clone(),
135 config.api_secret.clone(),
136 config.heartbeat_interval_secs,
137 config.environment,
138 config.transport_backend,
139 config.proxy_url.clone(),
140 )?;
141
142 Ok(Self {
143 client_id,
144 config,
145 http_client,
146 ws_client: Some(ws_client),
147 is_connected: AtomicBool::new(false),
148 cancellation_token: CancellationToken::new(),
149 tasks: Vec::new(),
150 data_sender,
151 instruments: Arc::new(AtomicMap::new()),
152 option_greeks_subs: Arc::new(AtomicSet::new()),
153 mark_price_subs: Arc::new(AtomicSet::new()),
154 index_price_subs: Arc::new(AtomicSet::new()),
155 clock,
156 })
157 }
158
159 fn ws_client_mut(&mut self) -> anyhow::Result<&mut DeribitWebSocketClient> {
161 self.ws_client
162 .as_mut()
163 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))
164 }
165
166 fn get_interval(&self, params: &Option<Params>) -> Option<DeribitUpdateInterval> {
171 if let Some(interval) = params
172 .as_ref()
173 .and_then(|p| p.get_str("interval"))
174 .and_then(|s| s.parse::<DeribitUpdateInterval>().ok())
175 {
176 return Some(interval);
177 }
178
179 if let Some(ws) = self.ws_client.as_ref()
181 && ws.is_authenticated()
182 {
183 return Some(DeribitUpdateInterval::Raw);
184 }
185 None
186 }
187
188 fn spawn_stream_task(
190 &mut self,
191 stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
192 ) {
193 let data_sender = self.data_sender.clone();
194 let instruments = Arc::clone(&self.instruments);
195 let cancellation = self.cancellation_token.clone();
196
197 let handle = get_runtime().spawn(async move {
198 tokio::pin!(stream);
199
200 loop {
201 tokio::select! {
202 maybe_msg = stream.next() => {
203 match maybe_msg {
204 Some(msg) => Self::handle_ws_message(msg, &data_sender, &instruments),
205 None => {
206 log::debug!("WebSocket stream ended");
207 break;
208 }
209 }
210 }
211 () = cancellation.cancelled() => {
212 log::debug!("WebSocket stream task cancelled");
213 break;
214 }
215 }
216 }
217 });
218
219 self.tasks.push(handle);
220 }
221
222 fn handle_ws_message(
224 message: NautilusWsMessage,
225 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
226 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
227 ) {
228 match message {
229 NautilusWsMessage::Data(payloads) => {
230 for data in payloads {
231 Self::send_data(sender, data);
232 }
233 }
234 NautilusWsMessage::Deltas(deltas) => {
235 Self::send_data(sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
236 }
237 NautilusWsMessage::Instrument(instrument) => {
238 let instrument_any = *instrument;
239 instruments.insert(instrument_any.id(), instrument_any.clone());
240
241 if let Err(e) = sender.send(DataEvent::Instrument(instrument_any)) {
242 log::warn!("Failed to send instrument update: {e}");
243 }
244 }
245 NautilusWsMessage::OptionGreeks(greeks) => {
246 if let Err(e) = sender.send(DataEvent::OptionGreeks(greeks)) {
247 log::error!("Failed to send option greeks: {e}");
248 }
249 }
250 NautilusWsMessage::Error(e) => {
251 log::error!("WebSocket error: {e:?}");
252 }
253 NautilusWsMessage::Raw(value) => {
254 log::debug!("Unhandled raw message: {value}");
255 }
256 NautilusWsMessage::Reconnected => {
257 log::info!("WebSocket reconnected");
258 }
259 NautilusWsMessage::Authenticated(auth) => {
260 log::debug!("WebSocket authenticated: expires_in={}s", auth.expires_in);
261 }
262 NautilusWsMessage::FundingRates(funding_rates) => {
263 log::info!(
264 "Received {} funding rate update(s) from WebSocket",
265 funding_rates.len()
266 );
267
268 for funding_rate in funding_rates {
269 log::debug!("Sending funding rate: {funding_rate:?}");
270 if let Err(e) = sender.send(DataEvent::FundingRate(funding_rate)) {
271 log::error!("Failed to send funding rate: {e}");
272 }
273 }
274 }
275 NautilusWsMessage::InstrumentStatus(status) => {
276 if let Err(e) = sender.send(DataEvent::InstrumentStatus(status)) {
277 log::error!("Failed to send instrument status event: {e}");
278 }
279 }
280 NautilusWsMessage::OrderStatusReports(reports) => {
281 log::warn!(
282 "Data client received OrderStatusReports message (should be handled by execution client): {} reports",
283 reports.len()
284 );
285 }
286 NautilusWsMessage::FillReports(reports) => {
287 log::warn!(
288 "Data client received FillReports message (should be handled by execution client): {} reports",
289 reports.len()
290 );
291 }
292 NautilusWsMessage::OrderRejected(order) => {
293 log::warn!(
294 "Data client received OrderRejected message (should be handled by execution client): {order:?}"
295 );
296 }
297 NautilusWsMessage::OrderAccepted(order) => {
298 log::warn!(
299 "Data client received OrderAccepted message (should be handled by execution client): {order:?}"
300 );
301 }
302 NautilusWsMessage::OrderCanceled(order) => {
303 log::warn!(
304 "Data client received OrderCanceled message (should be handled by execution client): {order:?}"
305 );
306 }
307 NautilusWsMessage::OrderExpired(order) => {
308 log::warn!(
309 "Data client received OrderExpired message (should be handled by execution client): {order:?}"
310 );
311 }
312 NautilusWsMessage::OrderUpdated(order) => {
313 log::warn!(
314 "Data client received OrderUpdated message (should be handled by execution client): {order:?}"
315 );
316 }
317 NautilusWsMessage::OrderCancelRejected(order) => {
318 log::warn!(
319 "Data client received OrderCancelRejected message (should be handled by execution client): {order:?}"
320 );
321 }
322 NautilusWsMessage::OrderModifyRejected(order) => {
323 log::warn!(
324 "Data client received OrderModifyRejected message (should be handled by execution client): {order:?}"
325 );
326 }
327 NautilusWsMessage::AccountState(state) => {
328 log::warn!(
329 "Data client received AccountState message (should be handled by execution client): {state:?}"
330 );
331 }
332 NautilusWsMessage::AuthenticationFailed(reason) => {
333 log::error!("Authentication failed in data client: {reason}");
334 }
335 }
336 }
337
338 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
340 if let Err(e) = sender.send(DataEvent::Data(data)) {
341 log::error!("Failed to send data: {e}");
342 }
343 }
344}
345
346#[async_trait(?Send)]
347impl DataClient for DeribitDataClient {
348 fn client_id(&self) -> ClientId {
349 self.client_id
350 }
351
352 fn venue(&self) -> Option<Venue> {
353 Some(*DERIBIT_VENUE)
354 }
355
356 fn start(&mut self) -> anyhow::Result<()> {
357 log::info!(
358 "Starting data client: client_id={}, environment={}",
359 self.client_id,
360 self.config.environment
361 );
362 Ok(())
363 }
364
365 fn stop(&mut self) -> anyhow::Result<()> {
366 log::info!("Stopping data client: {}", self.client_id);
367 self.cancellation_token.cancel();
368 self.is_connected.store(false, Ordering::Relaxed);
369 Ok(())
370 }
371
372 fn reset(&mut self) -> anyhow::Result<()> {
373 log::info!("Resetting data client: {}", self.client_id);
374 self.is_connected.store(false, Ordering::Relaxed);
375
376 self.cancellation_token.cancel();
378
379 for handle in self.tasks.drain(..) {
380 handle.abort();
381 }
382 self.cancellation_token = CancellationToken::new();
383
384 self.instruments.store(AHashMap::new());
385 Ok(())
386 }
387
388 fn dispose(&mut self) -> anyhow::Result<()> {
389 log::info!("Disposing data client: {}", self.client_id);
390 self.stop()
391 }
392
393 fn is_connected(&self) -> bool {
394 self.is_connected.load(Ordering::SeqCst)
395 }
396
397 fn is_disconnected(&self) -> bool {
398 !self.is_connected()
399 }
400
401 async fn connect(&mut self) -> anyhow::Result<()> {
402 if self.is_connected() {
403 return Ok(());
404 }
405
406 let product_types = if self.config.product_types.is_empty() {
408 vec![DeribitProductType::Future]
409 } else {
410 self.config.product_types.clone()
411 };
412
413 let mut all_instruments = Vec::new();
414
415 for product_type in &product_types {
416 let fetched = self
417 .http_client
418 .request_instruments(DeribitCurrency::ANY, Some(*product_type))
419 .await
420 .with_context(|| format!("failed to request instruments for {product_type:?}"))?;
421
422 self.http_client.cache_instruments(&fetched);
424
425 self.instruments.rcu(|m| {
427 for instrument in &fetched {
428 m.insert(instrument.id(), instrument.clone());
429 }
430 });
431
432 all_instruments.extend(fetched);
433 }
434
435 log::info!(
436 "Cached instruments: client_id={}, total={}",
437 self.client_id,
438 all_instruments.len()
439 );
440
441 for instrument in &all_instruments {
442 if let Err(e) = self
443 .data_sender
444 .send(DataEvent::Instrument(instrument.clone()))
445 {
446 log::warn!("Failed to send instrument: {e}");
447 }
448 }
449
450 let option_greeks_subs = self.option_greeks_subs.clone();
452 let mark_price_subs = self.mark_price_subs.clone();
453 let index_price_subs = self.index_price_subs.clone();
454 let ws = self.ws_client_mut()?;
455 ws.cache_instruments(&all_instruments);
456 ws.set_option_greeks_subs(option_greeks_subs);
457 ws.set_mark_price_subs(mark_price_subs);
458 ws.set_index_price_subs(index_price_subs);
459
460 ws.connect().await.context("failed to connect WebSocket")?;
462 ws.wait_until_active(10.0)
463 .await
464 .context("WebSocket failed to become active")?;
465
466 if ws.has_credentials() {
468 ws.authenticate_session(DERIBIT_DATA_SESSION_NAME)
469 .await
470 .context("failed to authenticate WebSocket")?;
471 log_info!("WebSocket authenticated");
472 }
473
474 let stream = self.ws_client_mut()?.stream()?;
476 self.spawn_stream_task(stream);
477
478 self.is_connected.store(true, Ordering::Release);
479 log_info!("Connected ({})", self.config.environment);
480 Ok(())
481 }
482
483 async fn disconnect(&mut self) -> anyhow::Result<()> {
484 if self.is_disconnected() {
485 return Ok(());
486 }
487
488 self.cancellation_token.cancel();
490
491 if let Some(ws) = self.ws_client.as_ref()
493 && let Err(e) = ws.close().await
494 {
495 log::warn!("Error while closing WebSocket: {e:?}");
496 }
497
498 for handle in self.tasks.drain(..) {
500 if let Err(e) = handle.await {
501 log::error!("Error joining WebSocket task: {e:?}");
502 }
503 }
504
505 self.cancellation_token = CancellationToken::new();
507 self.is_connected.store(false, Ordering::Relaxed);
508
509 log_info!("Disconnected");
510 Ok(())
511 }
512
513 fn subscribe_instruments(&mut self, cmd: SubscribeInstruments) -> anyhow::Result<()> {
514 let kind = cmd
516 .params
517 .as_ref()
518 .and_then(|p| p.get_str("kind"))
519 .unwrap_or("any")
520 .to_string();
521 let currency = cmd
522 .params
523 .as_ref()
524 .and_then(|p| p.get_str("currency"))
525 .unwrap_or("any")
526 .to_string();
527
528 let ws = self
529 .ws_client
530 .as_ref()
531 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
532 .clone();
533
534 log::debug!("Subscribing to instrument state changes for {kind}.{currency}");
535
536 get_runtime().spawn(async move {
537 if let Err(e) = ws.subscribe_instrument_status(&kind, ¤cy).await {
538 log::error!("Failed to subscribe to instrument status for {kind}.{currency}: {e}");
539 }
540 });
541
542 Ok(())
543 }
544
545 fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
546 let instrument_id = cmd.instrument_id;
547
548 if !self.instruments.contains_key(&instrument_id) {
550 log::warn!(
551 "Instrument {instrument_id} not in cache - it may have been created after connect()"
552 );
553 }
554
555 let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
557
558 let ws = self
559 .ws_client
560 .as_ref()
561 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
562 .clone();
563
564 log::debug!(
565 "Subscribing to instrument state for {instrument_id} (channel: {kind}.{currency})"
566 );
567
568 get_runtime().spawn(async move {
570 if let Err(e) = ws.subscribe_instrument_status(&kind, ¤cy).await {
571 log::error!("Failed to subscribe to instrument status for {instrument_id}: {e}");
572 }
573 });
574
575 Ok(())
576 }
577
578 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
579 if cmd.book_type != BookType::L2_MBP {
580 anyhow::bail!("Deribit only supports L2_MBP order book deltas");
581 }
582
583 let ws = self
584 .ws_client
585 .as_ref()
586 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
587 .clone();
588 let instrument_id = cmd.instrument_id;
589 let interval = self.get_interval(&cmd.params);
590
591 let depth = cmd
592 .depth
593 .map(|d| d.get() as u32)
594 .or_else(|| {
595 cmd.params
596 .as_ref()
597 .and_then(|p| p.get_u64("depth"))
598 .map(|n| n as u32)
599 })
600 .unwrap_or(DERIBIT_BOOK_DEFAULT_DEPTH);
601
602 if !DERIBIT_BOOK_VALID_DEPTHS.contains(&depth) {
603 anyhow::bail!("invalid depth {depth}; supported depths: {DERIBIT_BOOK_VALID_DEPTHS:?}");
604 }
605
606 let group = cmd
607 .params
608 .as_ref()
609 .and_then(|p| p.get_str("group"))
610 .unwrap_or(DERIBIT_BOOK_DEFAULT_GROUP)
611 .to_string();
612
613 log::debug!(
614 "Subscribing to book deltas for {} (group: {}, depth: {}, interval: {}, book_type: {:?})",
615 instrument_id,
616 group,
617 depth,
618 interval.map_or("100ms (default)".to_string(), |i| i.to_string()),
619 cmd.book_type
620 );
621
622 get_runtime().spawn(async move {
623 let result = if interval == Some(DeribitUpdateInterval::Raw) {
624 ws.subscribe_book(instrument_id, interval).await
625 } else {
626 ws.subscribe_book_grouped(instrument_id, &group, depth, interval)
627 .await
628 };
629
630 if let Err(e) = result {
631 log::error!("Failed to subscribe to book deltas for {instrument_id}: {e}");
632 }
633 });
634
635 Ok(())
636 }
637
638 fn subscribe_book_depth10(&mut self, cmd: SubscribeBookDepth10) -> anyhow::Result<()> {
639 if cmd.book_type != BookType::L2_MBP {
640 anyhow::bail!("Deribit only supports L2_MBP order book depth");
641 }
642
643 let ws = self
644 .ws_client
645 .as_ref()
646 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
647 .clone();
648 let instrument_id = cmd.instrument_id;
649 let interval = self.get_interval(&cmd.params);
650 let group = cmd
651 .params
652 .as_ref()
653 .and_then(|p| p.get_str("group"))
654 .unwrap_or(DERIBIT_BOOK_DEFAULT_GROUP)
655 .to_string();
656
657 log::debug!(
658 "Subscribing to book depth10 for {} (group: {}, interval: {}, book_type: {:?})",
659 instrument_id,
660 group,
661 interval.map_or("100ms (default)".to_string(), |i| i.to_string()),
662 cmd.book_type
663 );
664
665 get_runtime().spawn(async move {
666 if let Err(e) = ws
667 .subscribe_book_grouped(instrument_id, &group, 10, interval)
668 .await
669 {
670 log::error!("Failed to subscribe to book depth10 for {instrument_id}: {e}");
671 }
672 });
673
674 Ok(())
675 }
676
677 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
678 let ws = self
679 .ws_client
680 .as_ref()
681 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
682 .clone();
683 let instrument_id = cmd.instrument_id;
684
685 log::debug!("Subscribing to quotes for {instrument_id}");
686
687 get_runtime().spawn(async move {
688 if let Err(e) = ws.subscribe_quotes(instrument_id).await {
689 log::error!("Failed to subscribe to quotes for {instrument_id}: {e}");
690 }
691 });
692
693 Ok(())
694 }
695
696 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
697 let ws = self
698 .ws_client
699 .as_ref()
700 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
701 .clone();
702 let instrument_id = cmd.instrument_id;
703 let interval = self.get_interval(&cmd.params);
704
705 log::debug!(
706 "Subscribing to trades for {} (interval: {})",
707 instrument_id,
708 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
709 );
710
711 get_runtime().spawn(async move {
712 if let Err(e) = ws.subscribe_trades(instrument_id, interval).await {
713 log::error!("Failed to subscribe to trades for {instrument_id}: {e}");
714 }
715 });
716
717 Ok(())
718 }
719
720 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
721 let ws = self
722 .ws_client
723 .as_ref()
724 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
725 .clone();
726 let instrument_id = cmd.instrument_id;
727 let interval = self.get_interval(&cmd.params);
728
729 self.mark_price_subs.insert(instrument_id);
731
732 log::debug!(
733 "Subscribing to mark prices for {} (via ticker channel, interval: {})",
734 instrument_id,
735 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
736 );
737
738 get_runtime().spawn(async move {
739 if let Err(e) = ws.subscribe_ticker(instrument_id, interval).await {
740 log::error!("Failed to subscribe to mark prices for {instrument_id}: {e}");
741 }
742 });
743
744 Ok(())
745 }
746
747 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
748 let ws = self
749 .ws_client
750 .as_ref()
751 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
752 .clone();
753 let instrument_id = cmd.instrument_id;
754 let interval = self.get_interval(&cmd.params);
755
756 self.index_price_subs.insert(instrument_id);
758
759 log::debug!(
760 "Subscribing to index prices for {} (via ticker channel, interval: {})",
761 instrument_id,
762 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
763 );
764
765 get_runtime().spawn(async move {
766 if let Err(e) = ws.subscribe_ticker(instrument_id, interval).await {
767 log::error!("Failed to subscribe to index prices for {instrument_id}: {e}");
768 }
769 });
770
771 Ok(())
772 }
773
774 fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
775 let ws = self
776 .ws_client
777 .as_ref()
778 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
779 .clone();
780 let instrument_id = cmd.bar_type.instrument_id();
781 let resolution = bar_spec_to_resolution(&cmd.bar_type);
782
783 get_runtime().spawn(async move {
784 if let Err(e) = ws.subscribe_chart(instrument_id, &resolution).await {
785 log::error!("Failed to subscribe to bars for {instrument_id}: {e}");
786 }
787 });
788
789 Ok(())
790 }
791
792 fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
793 let instrument_id = cmd.instrument_id;
794
795 let is_perpetual = self
797 .instruments
798 .load()
799 .get(&instrument_id)
800 .is_some_and(|inst| matches!(inst, InstrumentAny::CryptoPerpetual(_)));
801
802 if !is_perpetual {
803 log::warn!(
804 "Funding rates subscription rejected for {instrument_id}: only available for perpetual instruments"
805 );
806 return Ok(());
807 }
808
809 let ws = self
810 .ws_client
811 .as_ref()
812 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
813 .clone();
814 let interval = self.get_interval(&cmd.params);
815
816 log::debug!(
817 "Subscribing to funding rates for {} (perpetual channel, interval: {})",
818 instrument_id,
819 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
820 );
821
822 get_runtime().spawn(async move {
823 if let Err(e) = ws
824 .subscribe_perpetual_interests_rates_updates(instrument_id, interval)
825 .await
826 {
827 log::error!("Failed to subscribe to funding rates for {instrument_id}: {e}");
828 }
829 });
830
831 Ok(())
832 }
833
834 fn subscribe_instrument_status(
835 &mut self,
836 cmd: SubscribeInstrumentStatus,
837 ) -> anyhow::Result<()> {
838 let instrument_id = cmd.instrument_id;
839 let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
840
841 let ws = self
842 .ws_client
843 .as_ref()
844 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
845 .clone();
846
847 log::info!("Subscribing to instrument status for {instrument_id} ({kind}.{currency})");
848
849 get_runtime().spawn(async move {
850 if let Err(e) = ws.subscribe_instrument_status(&kind, ¤cy).await {
851 log::error!("Failed to subscribe to instrument status for {instrument_id}: {e}");
852 }
853 });
854
855 Ok(())
856 }
857
858 fn subscribe_option_greeks(&mut self, cmd: SubscribeOptionGreeks) -> anyhow::Result<()> {
859 let ws = self
860 .ws_client
861 .as_ref()
862 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
863 .clone();
864 let instrument_id = cmd.instrument_id;
865 let interval = self.get_interval(&cmd.params);
866
867 self.option_greeks_subs.insert(instrument_id);
869
870 log::debug!(
871 "Subscribing to option greeks for {} (via ticker channel, interval: {})",
872 instrument_id,
873 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
874 );
875
876 get_runtime().spawn(async move {
877 if let Err(e) = ws.subscribe_ticker(instrument_id, interval).await {
878 log::error!("Failed to subscribe to option greeks for {instrument_id}: {e}");
879 }
880 });
881
882 Ok(())
883 }
884
885 fn unsubscribe_instrument_status(
886 &mut self,
887 cmd: &UnsubscribeInstrumentStatus,
888 ) -> anyhow::Result<()> {
889 let instrument_id = cmd.instrument_id;
890 let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
891
892 let ws = self
893 .ws_client
894 .as_ref()
895 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
896 .clone();
897
898 log::info!("Unsubscribing from instrument status for {instrument_id} ({kind}.{currency})");
899
900 get_runtime().spawn(async move {
901 if let Err(e) = ws.unsubscribe_instrument_status(&kind, ¤cy).await {
902 log::error!(
903 "Failed to unsubscribe from instrument status for {instrument_id}: {e}"
904 );
905 }
906 });
907
908 Ok(())
909 }
910
911 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
912 let kind = cmd
913 .params
914 .as_ref()
915 .and_then(|p| p.get_str("kind"))
916 .unwrap_or("any")
917 .to_string();
918 let currency = cmd
919 .params
920 .as_ref()
921 .and_then(|p| p.get_str("currency"))
922 .unwrap_or("any")
923 .to_string();
924
925 let ws = self
926 .ws_client
927 .as_ref()
928 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
929 .clone();
930
931 log::debug!("Unsubscribing from instrument state changes for {kind}.{currency}");
932
933 get_runtime().spawn(async move {
934 if let Err(e) = ws.unsubscribe_instrument_status(&kind, ¤cy).await {
935 log::error!(
936 "Failed to unsubscribe from instrument status for {kind}.{currency}: {e}"
937 );
938 }
939 });
940
941 Ok(())
942 }
943
944 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
945 let instrument_id = cmd.instrument_id;
946
947 let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
949
950 let ws = self
951 .ws_client
952 .as_ref()
953 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
954 .clone();
955
956 log::debug!(
957 "Unsubscribing from instrument state for {instrument_id} (channel: {kind}.{currency})"
958 );
959
960 get_runtime().spawn(async move {
961 if let Err(e) = ws.unsubscribe_instrument_status(&kind, ¤cy).await {
962 log::error!(
963 "Failed to unsubscribe from instrument status for {instrument_id}: {e}"
964 );
965 }
966 });
967
968 Ok(())
969 }
970
971 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
972 let ws = self
973 .ws_client
974 .as_ref()
975 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
976 .clone();
977 let instrument_id = cmd.instrument_id;
978 let interval = self.get_interval(&cmd.params);
979
980 let depth = cmd
981 .params
982 .as_ref()
983 .and_then(|p| p.get_u64("depth"))
984 .map_or(DERIBIT_BOOK_DEFAULT_DEPTH, |n| n as u32);
985
986 if !DERIBIT_BOOK_VALID_DEPTHS.contains(&depth) {
987 anyhow::bail!("invalid depth {depth}; supported depths: {DERIBIT_BOOK_VALID_DEPTHS:?}");
988 }
989
990 let group = cmd
991 .params
992 .as_ref()
993 .and_then(|p| p.get_str("group"))
994 .unwrap_or(DERIBIT_BOOK_DEFAULT_GROUP)
995 .to_string();
996
997 log::debug!(
998 "Unsubscribing from book deltas for {} (group: {}, depth: {}, interval: {})",
999 instrument_id,
1000 group,
1001 depth,
1002 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1003 );
1004
1005 get_runtime().spawn(async move {
1006 let result = if interval == Some(DeribitUpdateInterval::Raw) {
1007 ws.unsubscribe_book(instrument_id, interval).await
1008 } else {
1009 ws.unsubscribe_book_grouped(instrument_id, &group, depth, interval)
1010 .await
1011 };
1012
1013 if let Err(e) = result {
1014 log::error!("Failed to unsubscribe from book deltas for {instrument_id}: {e}");
1015 }
1016 });
1017
1018 Ok(())
1019 }
1020
1021 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1022 let ws = self
1023 .ws_client
1024 .as_ref()
1025 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1026 .clone();
1027 let instrument_id = cmd.instrument_id;
1028 let interval = self.get_interval(&cmd.params);
1029 let group = cmd
1030 .params
1031 .as_ref()
1032 .and_then(|p| p.get_str("group"))
1033 .unwrap_or(DERIBIT_BOOK_DEFAULT_GROUP)
1034 .to_string();
1035
1036 log::debug!(
1037 "Unsubscribing from book depth10 for {} (group: {}, interval: {})",
1038 instrument_id,
1039 group,
1040 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1041 );
1042
1043 get_runtime().spawn(async move {
1044 if let Err(e) = ws
1045 .unsubscribe_book_grouped(instrument_id, &group, 10, interval)
1046 .await
1047 {
1048 log::error!("Failed to unsubscribe from book depth10 for {instrument_id}: {e}");
1049 }
1050 });
1051
1052 Ok(())
1053 }
1054
1055 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1056 let ws = self
1057 .ws_client
1058 .as_ref()
1059 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1060 .clone();
1061 let instrument_id = cmd.instrument_id;
1062
1063 log::debug!("Unsubscribing from quotes for {instrument_id}");
1064
1065 get_runtime().spawn(async move {
1066 if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
1067 log::error!("Failed to unsubscribe from quotes for {instrument_id}: {e}");
1068 }
1069 });
1070
1071 Ok(())
1072 }
1073
1074 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1075 let ws = self
1076 .ws_client
1077 .as_ref()
1078 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1079 .clone();
1080 let instrument_id = cmd.instrument_id;
1081 let interval = self.get_interval(&cmd.params);
1082
1083 log::debug!(
1084 "Unsubscribing from trades for {} (interval: {})",
1085 instrument_id,
1086 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1087 );
1088
1089 get_runtime().spawn(async move {
1090 if let Err(e) = ws.unsubscribe_trades(instrument_id, interval).await {
1091 log::error!("Failed to unsubscribe from trades for {instrument_id}: {e}");
1092 }
1093 });
1094
1095 Ok(())
1096 }
1097
1098 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1099 let ws = self
1100 .ws_client
1101 .as_ref()
1102 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1103 .clone();
1104 let instrument_id = cmd.instrument_id;
1105 let interval = self.get_interval(&cmd.params);
1106
1107 self.mark_price_subs.remove(&instrument_id);
1109
1110 log::debug!(
1111 "Unsubscribing from mark prices for {} (via ticker channel, interval: {})",
1112 instrument_id,
1113 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1114 );
1115
1116 get_runtime().spawn(async move {
1117 if let Err(e) = ws.unsubscribe_ticker(instrument_id, interval).await {
1118 log::error!("Failed to unsubscribe from mark prices for {instrument_id}: {e}");
1119 }
1120 });
1121
1122 Ok(())
1123 }
1124
1125 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1126 let ws = self
1127 .ws_client
1128 .as_ref()
1129 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1130 .clone();
1131 let instrument_id = cmd.instrument_id;
1132 let interval = self.get_interval(&cmd.params);
1133
1134 self.index_price_subs.remove(&instrument_id);
1136
1137 log::debug!(
1138 "Unsubscribing from index prices for {} (via ticker channel, interval: {})",
1139 instrument_id,
1140 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1141 );
1142
1143 get_runtime().spawn(async move {
1144 if let Err(e) = ws.unsubscribe_ticker(instrument_id, interval).await {
1145 log::error!("Failed to unsubscribe from index prices for {instrument_id}: {e}");
1146 }
1147 });
1148
1149 Ok(())
1150 }
1151
1152 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1153 let ws = self
1154 .ws_client
1155 .as_ref()
1156 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1157 .clone();
1158 let instrument_id = cmd.bar_type.instrument_id();
1159 let resolution = bar_spec_to_resolution(&cmd.bar_type);
1160
1161 get_runtime().spawn(async move {
1162 if let Err(e) = ws.unsubscribe_chart(instrument_id, &resolution).await {
1163 log::error!("Failed to unsubscribe from bars for {instrument_id}: {e}");
1164 }
1165 });
1166
1167 Ok(())
1168 }
1169
1170 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1171 let instrument_id = cmd.instrument_id;
1172
1173 let is_perpetual = self
1175 .instruments
1176 .load()
1177 .get(&instrument_id)
1178 .is_some_and(|inst| matches!(inst, InstrumentAny::CryptoPerpetual(_)));
1179
1180 if !is_perpetual {
1181 log::warn!(
1182 "Funding rates unsubscription rejected for {instrument_id}: only available for perpetual instruments"
1183 );
1184 return Ok(());
1185 }
1186
1187 let ws = self
1188 .ws_client
1189 .as_ref()
1190 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1191 .clone();
1192 let interval = self.get_interval(&cmd.params);
1193
1194 log::debug!(
1195 "Unsubscribing from funding rates for {} (perpetual channel, interval: {})",
1196 instrument_id,
1197 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1198 );
1199
1200 get_runtime().spawn(async move {
1201 if let Err(e) = ws
1202 .unsubscribe_perpetual_interest_rates_updates(instrument_id, interval)
1203 .await
1204 {
1205 log::error!("Failed to unsubscribe from funding rates for {instrument_id}: {e}");
1206 }
1207 });
1208
1209 Ok(())
1210 }
1211
1212 fn unsubscribe_option_greeks(&mut self, cmd: &UnsubscribeOptionGreeks) -> anyhow::Result<()> {
1213 let ws = self
1214 .ws_client
1215 .as_ref()
1216 .ok_or_else(|| anyhow::anyhow!("WebSocket client not initialized"))?
1217 .clone();
1218 let instrument_id = cmd.instrument_id;
1219 let interval = self.get_interval(&cmd.params);
1220
1221 self.option_greeks_subs.remove(&instrument_id);
1223
1224 log::debug!(
1225 "Unsubscribing from option greeks for {} (via ticker channel, interval: {})",
1226 instrument_id,
1227 interval.map_or("100ms (default)".to_string(), |i| i.to_string())
1228 );
1229
1230 get_runtime().spawn(async move {
1231 if let Err(e) = ws.unsubscribe_ticker(instrument_id, interval).await {
1232 log::error!("Failed to unsubscribe from option greeks for {instrument_id}: {e}");
1233 }
1234 });
1235
1236 Ok(())
1237 }
1238
1239 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1240 if request.start.is_some() {
1241 log::warn!(
1242 "Requesting instruments for {:?} with specified `start` which has no effect",
1243 request.venue
1244 );
1245 }
1246
1247 if request.end.is_some() {
1248 log::warn!(
1249 "Requesting instruments for {:?} with specified `end` which has no effect",
1250 request.venue
1251 );
1252 }
1253
1254 let http_client = self.http_client.clone();
1255 let ws_client = self.ws_client.clone();
1256 let instruments_cache = Arc::clone(&self.instruments);
1257 let sender = self.data_sender.clone();
1258 let request_id = request.request_id;
1259 let client_id = request.client_id.unwrap_or(self.client_id);
1260 let start_nanos = datetime_to_unix_nanos(request.start);
1261 let end_nanos = datetime_to_unix_nanos(request.end);
1262 let params = request.params;
1263 let clock = self.clock;
1264 let venue = *DERIBIT_VENUE;
1265
1266 let product_types = if self.config.product_types.is_empty() {
1268 vec![crate::http::models::DeribitProductType::Future]
1269 } else {
1270 self.config.product_types.clone()
1271 };
1272
1273 get_runtime().spawn(async move {
1274 let mut all_instruments = Vec::new();
1275
1276 for product_type in &product_types {
1277 log::debug!(
1278 "Requesting instruments for currency=ANY, product_type={product_type:?}"
1279 );
1280
1281 match http_client
1282 .request_instruments(DeribitCurrency::ANY, Some(*product_type))
1283 .await
1284 {
1285 Ok(instruments) => {
1286 log::info!(
1287 "Fetched {} instruments for ANY/{:?}",
1288 instruments.len(),
1289 product_type
1290 );
1291
1292 instruments_cache.rcu(|m| {
1293 for instrument in &instruments {
1294 m.insert(instrument.id(), instrument.clone());
1295 }
1296 });
1297 all_instruments.extend(instruments);
1298 }
1299 Err(e) => {
1300 log::error!("Failed to fetch instruments for ANY/{product_type:?}: {e:?}");
1301 }
1302 }
1303 }
1304
1305 if !all_instruments.is_empty() {
1308 http_client.cache_instruments(&all_instruments);
1309
1310 if let Some(ws) = &ws_client {
1311 ws.cache_instruments(&all_instruments);
1312 }
1313 }
1314
1315 let response = DataResponse::Instruments(InstrumentsResponse::new(
1317 request_id,
1318 client_id,
1319 venue,
1320 all_instruments,
1321 start_nanos,
1322 end_nanos,
1323 clock.get_time_ns(),
1324 params,
1325 ));
1326
1327 if let Err(e) = sender.send(DataEvent::Response(response)) {
1328 log::error!("Failed to send instruments response: {e}");
1329 }
1330 });
1331
1332 Ok(())
1333 }
1334
1335 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1336 if request.start.is_some() {
1337 log::warn!(
1338 "Requesting instrument {} with specified `start` which has no effect",
1339 request.instrument_id
1340 );
1341 }
1342
1343 if request.end.is_some() {
1344 log::warn!(
1345 "Requesting instrument {} with specified `end` which has no effect",
1346 request.instrument_id
1347 );
1348 }
1349
1350 if let Some(instrument) = self.instruments.get_cloned(&request.instrument_id) {
1352 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1353 request.request_id,
1354 request.client_id.unwrap_or(self.client_id),
1355 instrument.id(),
1356 instrument,
1357 datetime_to_unix_nanos(request.start),
1358 datetime_to_unix_nanos(request.end),
1359 self.clock.get_time_ns(),
1360 request.params,
1361 )));
1362
1363 if let Err(e) = self.data_sender.send(DataEvent::Response(response)) {
1364 log::error!("Failed to send instrument response: {e}");
1365 }
1366 return Ok(());
1367 }
1368
1369 log::debug!(
1370 "Instrument {} not in cache, fetching from API",
1371 request.instrument_id
1372 );
1373
1374 let http_client = self.http_client.clone();
1375 let ws_client = self.ws_client.clone();
1376 let instruments_cache = Arc::clone(&self.instruments);
1377 let sender = self.data_sender.clone();
1378 let instrument_id = request.instrument_id;
1379 let request_id = request.request_id;
1380 let client_id = request.client_id.unwrap_or(self.client_id);
1381 let start_nanos = datetime_to_unix_nanos(request.start);
1382 let end_nanos = datetime_to_unix_nanos(request.end);
1383 let params = request.params;
1384 let clock = self.clock;
1385
1386 get_runtime().spawn(async move {
1387 match http_client
1388 .request_instrument(instrument_id)
1389 .await
1390 .context("failed to request instrument from Deribit")
1391 {
1392 Ok(instrument) => {
1393 log::info!("Successfully fetched instrument: {instrument_id}");
1394
1395 instruments_cache.insert(instrument.id(), instrument.clone());
1396 http_client.cache_instruments(std::slice::from_ref(&instrument));
1397
1398 if let Some(ws) = &ws_client {
1399 ws.cache_instruments(std::slice::from_ref(&instrument));
1400 }
1401
1402 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1404 request_id,
1405 client_id,
1406 instrument.id(),
1407 instrument,
1408 start_nanos,
1409 end_nanos,
1410 clock.get_time_ns(),
1411 params,
1412 )));
1413
1414 if let Err(e) = sender.send(DataEvent::Response(response)) {
1415 log::error!("Failed to send instrument response: {e}");
1416 }
1417 }
1418 Err(e) => {
1419 log::error!("Instrument request failed for {instrument_id}: {e:?}");
1420 }
1421 }
1422 });
1423
1424 Ok(())
1425 }
1426
1427 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1428 let http_client = self.http_client.clone();
1429 let sender = self.data_sender.clone();
1430 let instrument_id = request.instrument_id;
1431 let start = request.start;
1432 let end = request.end;
1433 let limit = request.limit.map(|n| n.get() as u32);
1434 let request_id = request.request_id;
1435 let client_id = request.client_id.unwrap_or(self.client_id);
1436 let params = request.params;
1437 let clock = self.clock;
1438 let start_nanos = datetime_to_unix_nanos(start);
1439 let end_nanos = datetime_to_unix_nanos(end);
1440
1441 get_runtime().spawn(async move {
1442 match http_client
1443 .request_trades(instrument_id, start, end, limit)
1444 .await
1445 .context("failed to request trades from Deribit")
1446 {
1447 Ok(trades) => {
1448 let response = DataResponse::Trades(TradesResponse::new(
1449 request_id,
1450 client_id,
1451 instrument_id,
1452 trades,
1453 start_nanos,
1454 end_nanos,
1455 clock.get_time_ns(),
1456 params,
1457 ));
1458
1459 if let Err(e) = sender.send(DataEvent::Response(response)) {
1460 log::error!("Failed to send trades response: {e}");
1461 }
1462 }
1463 Err(e) => log::error!("Trades request failed for {instrument_id}: {e:?}"),
1464 }
1465 });
1466
1467 Ok(())
1468 }
1469
1470 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1471 let http_client = self.http_client.clone();
1472 let sender = self.data_sender.clone();
1473 let bar_type = request.bar_type;
1474 let start = request.start;
1475 let end = request.end;
1476 let limit = request.limit.map(|n| n.get() as u32);
1477 let request_id = request.request_id;
1478 let client_id = request.client_id.unwrap_or(self.client_id);
1479 let params = request.params;
1480 let clock = self.clock;
1481 let start_nanos = datetime_to_unix_nanos(start);
1482 let end_nanos = datetime_to_unix_nanos(end);
1483
1484 get_runtime().spawn(async move {
1485 match http_client
1486 .request_bars(bar_type, start, end, limit)
1487 .await
1488 .context("failed to request bars from Deribit")
1489 {
1490 Ok(bars) => {
1491 let response = DataResponse::Bars(BarsResponse::new(
1492 request_id,
1493 client_id,
1494 bar_type,
1495 bars,
1496 start_nanos,
1497 end_nanos,
1498 clock.get_time_ns(),
1499 params,
1500 ));
1501
1502 if let Err(e) = sender.send(DataEvent::Response(response)) {
1503 log::error!("Failed to send bars response: {e}");
1504 }
1505 }
1506 Err(e) => log::error!("Bars request failed for {bar_type}: {e:?}"),
1507 }
1508 });
1509
1510 Ok(())
1511 }
1512
1513 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1514 let http_client = self.http_client.clone();
1515 let sender = self.data_sender.clone();
1516 let instrument_id = request.instrument_id;
1517 let depth = request.depth.map(|n| n.get() as u32);
1518 let request_id = request.request_id;
1519 let client_id = request.client_id.unwrap_or(self.client_id);
1520 let params = request.params;
1521 let clock = self.clock;
1522
1523 get_runtime().spawn(async move {
1524 match http_client
1525 .request_book_snapshot(instrument_id, depth)
1526 .await
1527 .context("failed to request book snapshot from Deribit")
1528 {
1529 Ok(book) => {
1530 let response = DataResponse::Book(BookResponse::new(
1531 request_id,
1532 client_id,
1533 instrument_id,
1534 book,
1535 None,
1536 None,
1537 clock.get_time_ns(),
1538 params,
1539 ));
1540
1541 if let Err(e) = sender.send(DataEvent::Response(response)) {
1542 log::error!("Failed to send book snapshot response: {e}");
1543 }
1544 }
1545 Err(e) => {
1546 log::error!("Book snapshot request failed for {instrument_id}: {e:?}");
1547 }
1548 }
1549 });
1550
1551 Ok(())
1552 }
1553
1554 fn request_forward_prices(&self, request: RequestForwardPrices) -> anyhow::Result<()> {
1555 let currency = request.underlying.to_string();
1556 let instrument_id = request.instrument_id;
1557 let http_client = self.http_client.clone();
1558 let sender = self.data_sender.clone();
1559 let request_id = request.request_id;
1560 let client_id = request.client_id.unwrap_or(self.client_id());
1561 let params = request.params;
1562 let clock = self.clock;
1563 let venue = *DERIBIT_VENUE;
1564
1565 get_runtime().spawn(async move {
1566 let result = if let Some(inst_id) = instrument_id {
1567 let instrument_name = inst_id.symbol.to_string();
1569 log::info!(
1570 "Requesting forward price for {currency} (single instrument: {instrument_name})"
1571 );
1572
1573 match http_client.request_ticker(&instrument_name).await {
1574 Ok(ticker) => {
1575 let ts = clock.get_time_ns();
1576 let forward_prices: Vec<ForwardPrice> = ticker
1577 .underlying_price
1578 .map(|up| {
1579 vec![ForwardPrice::new(
1580 inst_id,
1581 up,
1582 ticker.underlying_index.filter(|s| !s.is_empty()),
1583 ts,
1584 ts,
1585 )]
1586 })
1587 .unwrap_or_default();
1588
1589 log::info!(
1590 "Fetched {} forward price for {currency} (single instrument: {instrument_name})",
1591 forward_prices.len(),
1592 );
1593 Ok((forward_prices, ts))
1594 }
1595 Err(e) => Err(e),
1596 }
1597 } else {
1598 log::info!("Requesting option forward prices for currency={currency} (bulk)");
1600
1601 match http_client.request_book_summaries(¤cy).await {
1602 Ok(summaries) => {
1603 let ts = clock.get_time_ns();
1604
1605 let mut seen_indices = std::collections::HashSet::new();
1608 let forward_prices: Vec<ForwardPrice> = summaries
1609 .into_iter()
1610 .filter_map(|s| {
1611 let up = s.underlying_price?;
1612 let idx = s.underlying_index.clone().unwrap_or_default();
1613 if !seen_indices.insert(idx.clone()) {
1614 return None;
1615 }
1616 Some(ForwardPrice::new(
1617 InstrumentId::new(
1618 Symbol::new(&s.instrument_name),
1619 *DERIBIT_VENUE,
1620 ),
1621 up,
1622 Some(idx).filter(|s| !s.is_empty()),
1623 ts,
1624 ts,
1625 ))
1626 })
1627 .collect();
1628
1629 log::info!(
1630 "Fetched {} forward prices (per-expiry) for {currency}",
1631 forward_prices.len(),
1632 );
1633 Ok((forward_prices, ts))
1634 }
1635 Err(e) => Err(e),
1636 }
1637 };
1638
1639 match result {
1640 Ok((forward_prices, ts)) => {
1641 let response = DataResponse::ForwardPrices(ForwardPricesResponse::new(
1642 request_id,
1643 client_id,
1644 venue,
1645 forward_prices,
1646 ts,
1647 params,
1648 ));
1649
1650 if let Err(e) = sender.send(DataEvent::Response(response)) {
1651 log::error!("Failed to send forward prices response: {e}");
1652 }
1653 }
1654 Err(e) => {
1655 log::error!("Forward prices request failed for {currency}: {e:?}");
1656 }
1657 }
1658 });
1659
1660 Ok(())
1661 }
1662}