Skip to main content

nautilus_data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Base data client functionality.
17//!
18//! Provides the `DataClientAdapter` for managing subscriptions and requests,
19//! and utilities for constructing data responses.
20
21use std::{
22    fmt::Debug,
23    ops::{Deref, DerefMut},
24};
25
26use ahash::AHashSet;
27use nautilus_common::{
28    clients::{DataClient, log_command_error},
29    messages::data::{
30        RequestBars, RequestBookDepth, RequestBookSnapshot, RequestCustomData,
31        RequestForwardPrices, RequestFundingRates, RequestInstrument, RequestInstruments,
32        RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10,
33        SubscribeCommand, SubscribeCustomData, SubscribeFundingRates, SubscribeIndexPrices,
34        SubscribeInstrument, SubscribeInstrumentClose, SubscribeInstrumentStatus,
35        SubscribeInstruments, SubscribeMarkPrices, SubscribeOptionGreeks, SubscribeQuotes,
36        SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
37        UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
38        UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
39        UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeOptionGreeks, UnsubscribeQuotes,
40        UnsubscribeTrades,
41    },
42};
43#[cfg(feature = "defi")]
44use nautilus_model::defi::Blockchain;
45use nautilus_model::{
46    data::{BarType, DataType},
47    identifiers::{ClientId, InstrumentId, Venue},
48};
49
50#[cfg(feature = "defi")]
51#[allow(unused_imports)] // Brings DeFi impl blocks into scope
52use crate::defi::client as _;
53
54/// Wraps a [`DataClient`], managing subscription state and forwarding commands.
55pub struct DataClientAdapter {
56    pub(crate) client: Box<dyn DataClient>,
57    pub client_id: ClientId,
58    pub venue: Option<Venue>,
59    pub handles_book_deltas: bool,
60    pub handles_book_snapshots: bool,
61    pub subscriptions_custom: AHashSet<DataType>,
62    pub subscriptions_book_deltas: AHashSet<InstrumentId>,
63    pub subscriptions_book_depth10: AHashSet<InstrumentId>,
64    pub subscriptions_quotes: AHashSet<InstrumentId>,
65    pub subscriptions_trades: AHashSet<InstrumentId>,
66    pub subscriptions_bars: AHashSet<BarType>,
67    pub subscriptions_instrument_status: AHashSet<InstrumentId>,
68    pub subscriptions_instrument_close: AHashSet<InstrumentId>,
69    pub subscriptions_instrument: AHashSet<InstrumentId>,
70    pub subscriptions_instrument_venue: AHashSet<Venue>,
71    pub subscriptions_mark_prices: AHashSet<InstrumentId>,
72    pub subscriptions_index_prices: AHashSet<InstrumentId>,
73    pub subscriptions_funding_rates: AHashSet<InstrumentId>,
74    pub subscriptions_option_greeks: AHashSet<InstrumentId>,
75    #[cfg(feature = "defi")]
76    pub subscriptions_blocks: AHashSet<Blockchain>,
77    #[cfg(feature = "defi")]
78    pub subscriptions_pools: AHashSet<InstrumentId>,
79    #[cfg(feature = "defi")]
80    pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
81    #[cfg(feature = "defi")]
82    pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
83    #[cfg(feature = "defi")]
84    pub subscriptions_pool_fee_collects: AHashSet<InstrumentId>,
85    #[cfg(feature = "defi")]
86    pub subscriptions_pool_flash: AHashSet<InstrumentId>,
87}
88
89impl Deref for DataClientAdapter {
90    type Target = Box<dyn DataClient>;
91
92    fn deref(&self) -> &Self::Target {
93        &self.client
94    }
95}
96
97impl DerefMut for DataClientAdapter {
98    fn deref_mut(&mut self) -> &mut Self::Target {
99        &mut self.client
100    }
101}
102
103impl Debug for DataClientAdapter {
104    #[rustfmt::skip]
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct(stringify!(DataClientAdapter))
107            .field("client_id", &self.client_id)
108            .field("venue", &self.venue)
109            .field("handles_book_deltas", &self.handles_book_deltas)
110            .field("handles_book_snapshots", &self.handles_book_snapshots)
111            .field("subscriptions_custom", &self.subscriptions_custom)
112            .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
113            .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
114            .field("subscriptions_quotes", &self.subscriptions_quotes)
115            .field("subscriptions_trades", &self.subscriptions_trades)
116            .field("subscriptions_bars", &self.subscriptions_bars)
117            .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
118            .field("subscriptions_index_prices", &self.subscriptions_index_prices)
119            .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
120            .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
121            .field("subscriptions_instrument", &self.subscriptions_instrument)
122            .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
123            .finish()
124    }
125}
126
127impl DataClientAdapter {
128    /// Creates a new [`DataClientAdapter`] with the given client and clock.
129    #[must_use]
130    pub fn new(
131        client_id: ClientId,
132        venue: Option<Venue>,
133        handles_order_book_deltas: bool,
134        handles_order_book_snapshots: bool,
135        client: Box<dyn DataClient>,
136    ) -> Self {
137        Self {
138            client,
139            client_id,
140            venue,
141            handles_book_deltas: handles_order_book_deltas,
142            handles_book_snapshots: handles_order_book_snapshots,
143            subscriptions_custom: AHashSet::new(),
144            subscriptions_book_deltas: AHashSet::new(),
145            subscriptions_book_depth10: AHashSet::new(),
146            subscriptions_quotes: AHashSet::new(),
147            subscriptions_trades: AHashSet::new(),
148            subscriptions_mark_prices: AHashSet::new(),
149            subscriptions_index_prices: AHashSet::new(),
150            subscriptions_funding_rates: AHashSet::new(),
151            subscriptions_option_greeks: AHashSet::new(),
152            subscriptions_bars: AHashSet::new(),
153            subscriptions_instrument_status: AHashSet::new(),
154            subscriptions_instrument_close: AHashSet::new(),
155            subscriptions_instrument: AHashSet::new(),
156            subscriptions_instrument_venue: AHashSet::new(),
157            #[cfg(feature = "defi")]
158            subscriptions_blocks: AHashSet::new(),
159            #[cfg(feature = "defi")]
160            subscriptions_pools: AHashSet::new(),
161            #[cfg(feature = "defi")]
162            subscriptions_pool_swaps: AHashSet::new(),
163            #[cfg(feature = "defi")]
164            subscriptions_pool_liquidity_updates: AHashSet::new(),
165            #[cfg(feature = "defi")]
166            subscriptions_pool_fee_collects: AHashSet::new(),
167            #[cfg(feature = "defi")]
168            subscriptions_pool_flash: AHashSet::new(),
169        }
170    }
171
172    #[expect(clippy::borrowed_box)]
173    #[must_use]
174    pub fn get_client(&self) -> &Box<dyn DataClient> {
175        &self.client
176    }
177
178    /// Connects the underlying client to the data provider.
179    ///
180    /// # Errors
181    ///
182    /// Returns an error if the connection fails.
183    pub async fn connect(&mut self) -> anyhow::Result<()> {
184        self.client.connect().await
185    }
186
187    /// Disconnects the underlying client from the data provider.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if the disconnection fails.
192    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
193        self.client.disconnect().await
194    }
195
196    #[inline]
197    pub fn execute_subscribe(&mut self, cmd: SubscribeCommand) {
198        let cmd_debug = format!("{cmd:?}");
199        if let Err(e) = match cmd {
200            SubscribeCommand::Data(cmd) => self.subscribe(cmd),
201            SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
202            SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
203            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
204            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
205            SubscribeCommand::BookSnapshots(_) => Ok(()), // Handled internally by engine
206            SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
207            SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
208            SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
209            SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
210            SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
211            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
212            SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
213            SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
214            SubscribeCommand::OptionGreeks(cmd) => self.subscribe_option_greeks(cmd),
215            SubscribeCommand::OptionChain(_) => Ok(()), // Handled internally by engine
216        } {
217            log_command_error(&cmd_debug, &e);
218        }
219    }
220
221    #[inline]
222    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
223        if let Err(e) = match cmd {
224            UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
225            UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
226            UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
227            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
228            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
229            UnsubscribeCommand::BookSnapshots(_) => Ok(()), // Handled internally by engine
230            UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
231            UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
232            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
233            UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
234            UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
235            UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
236            UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
237            UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
238            UnsubscribeCommand::OptionGreeks(cmd) => self.unsubscribe_option_greeks(cmd),
239            UnsubscribeCommand::OptionChain(_) => Ok(()), // Handled internally by engine
240        } {
241            log_command_error(&cmd, &e);
242        }
243    }
244
245    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
246
247    /// Subscribes to a custom data type, updating internal state and forwarding to the client.
248    ///
249    /// # Errors
250    ///
251    /// Returns an error if the underlying client subscribe operation fails.
252    pub fn subscribe(&mut self, cmd: SubscribeCustomData) -> anyhow::Result<()> {
253        if !self.subscriptions_custom.contains(&cmd.data_type) {
254            self.subscriptions_custom.insert(cmd.data_type.clone());
255            self.client.subscribe(cmd)?;
256        }
257        Ok(())
258    }
259
260    /// Unsubscribes from a custom data type, updating internal state and forwarding to the client.
261    ///
262    /// # Errors
263    ///
264    /// Returns an error if the underlying client unsubscribe operation fails.
265    pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
266        if self.subscriptions_custom.contains(&cmd.data_type) {
267            self.subscriptions_custom.remove(&cmd.data_type);
268            self.client.unsubscribe(cmd)?;
269        }
270        Ok(())
271    }
272
273    /// Subscribes to instrument definitions for a venue, updating internal state and forwarding to the client.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if the underlying client subscribe operation fails.
278    fn subscribe_instruments(&mut self, cmd: SubscribeInstruments) -> anyhow::Result<()> {
279        if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
280            self.subscriptions_instrument_venue.insert(cmd.venue);
281            self.client.subscribe_instruments(cmd)?;
282        }
283
284        Ok(())
285    }
286
287    /// Unsubscribes from instrument definition updates for a venue, updating internal state and forwarding to the client.
288    ///
289    /// # Errors
290    ///
291    /// Returns an error if the underlying client unsubscribe operation fails.
292    fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
293        if self.subscriptions_instrument_venue.contains(&cmd.venue) {
294            self.subscriptions_instrument_venue.remove(&cmd.venue);
295            self.client.unsubscribe_instruments(cmd)?;
296        }
297
298        Ok(())
299    }
300
301    /// Subscribes to instrument definitions for a single instrument, updating internal state and forwarding to the client.
302    ///
303    /// # Errors
304    ///
305    /// Returns an error if the underlying client subscribe operation fails.
306    fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
307        if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
308            self.subscriptions_instrument.insert(cmd.instrument_id);
309            self.client.subscribe_instrument(cmd)?;
310        }
311
312        Ok(())
313    }
314
315    /// Unsubscribes from instrument definition updates for a single instrument, updating internal state and forwarding to the client.
316    ///
317    /// # Errors
318    ///
319    /// Returns an error if the underlying client unsubscribe operation fails.
320    fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
321        if self.subscriptions_instrument.contains(&cmd.instrument_id) {
322            self.subscriptions_instrument.remove(&cmd.instrument_id);
323            self.client.unsubscribe_instrument(cmd)?;
324        }
325
326        Ok(())
327    }
328
329    /// Subscribes to book deltas updates for an instrument, updating internal state and forwarding to the client.
330    ///
331    /// # Errors
332    ///
333    /// Returns an error if the underlying client subscribe operation fails.
334    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
335        if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
336            self.subscriptions_book_deltas.insert(cmd.instrument_id);
337            self.client.subscribe_book_deltas(cmd)?;
338        }
339
340        Ok(())
341    }
342
343    /// Unsubscribes from book deltas for an instrument, updating internal state and forwarding to the client.
344    ///
345    /// # Errors
346    ///
347    /// Returns an error if the underlying client unsubscribe operation fails.
348    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
349        if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
350            self.subscriptions_book_deltas.remove(&cmd.instrument_id);
351            self.client.unsubscribe_book_deltas(cmd)?;
352        }
353
354        Ok(())
355    }
356
357    /// Subscribes to book depth updates for an instrument, updating internal state and forwarding to the client.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if the underlying client subscribe operation fails.
362    fn subscribe_book_depth10(&mut self, cmd: SubscribeBookDepth10) -> anyhow::Result<()> {
363        if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
364            self.subscriptions_book_depth10.insert(cmd.instrument_id);
365            self.client.subscribe_book_depth10(cmd)?;
366        }
367
368        Ok(())
369    }
370
371    /// Unsubscribes from book depth updates for an instrument, updating internal state and forwarding to the client.
372    ///
373    /// # Errors
374    ///
375    /// Returns an error if the underlying client unsubscribe operation fails.
376    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
377        if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
378            self.subscriptions_book_depth10.remove(&cmd.instrument_id);
379            self.client.unsubscribe_book_depth10(cmd)?;
380        }
381
382        Ok(())
383    }
384
385    /// Subscribes to quotes for an instrument, updating internal state and forwarding to the client.
386    ///
387    /// # Errors
388    ///
389    /// Returns an error if the underlying client subscribe operation fails.
390    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
391        if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
392            self.subscriptions_quotes.insert(cmd.instrument_id);
393            self.client.subscribe_quotes(cmd)?;
394        }
395        Ok(())
396    }
397
398    /// Unsubscribes from quotes for an instrument, updating internal state and forwarding to the client.
399    ///
400    /// # Errors
401    ///
402    /// Returns an error if the underlying client unsubscribe operation fails.
403    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
404        if self.subscriptions_quotes.contains(&cmd.instrument_id) {
405            self.subscriptions_quotes.remove(&cmd.instrument_id);
406            self.client.unsubscribe_quotes(cmd)?;
407        }
408        Ok(())
409    }
410
411    /// Subscribes to trades for an instrument, updating internal state and forwarding to the client.
412    ///
413    /// # Errors
414    ///
415    /// Returns an error if the underlying client subscribe operation fails.
416    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
417        if !self.subscriptions_trades.contains(&cmd.instrument_id) {
418            self.subscriptions_trades.insert(cmd.instrument_id);
419            self.client.subscribe_trades(cmd)?;
420        }
421        Ok(())
422    }
423
424    /// Unsubscribes from trades for an instrument, updating internal state and forwarding to the client.
425    ///
426    /// # Errors
427    ///
428    /// Returns an error if the underlying client unsubscribe operation fails.
429    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
430        if self.subscriptions_trades.contains(&cmd.instrument_id) {
431            self.subscriptions_trades.remove(&cmd.instrument_id);
432            self.client.unsubscribe_trades(cmd)?;
433        }
434        Ok(())
435    }
436
437    /// Subscribes to bars for a bar type, updating internal state and forwarding to the client.
438    ///
439    /// # Errors
440    ///
441    /// Returns an error if the underlying client subscribe operation fails.
442    fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
443        if !self.subscriptions_bars.contains(&cmd.bar_type) {
444            self.subscriptions_bars.insert(cmd.bar_type);
445            self.client.subscribe_bars(cmd)?;
446        }
447        Ok(())
448    }
449
450    /// Unsubscribes from bars for a bar type, updating internal state and forwarding to the client.
451    ///
452    /// # Errors
453    ///
454    /// Returns an error if the underlying client unsubscribe operation fails.
455    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
456        if self.subscriptions_bars.contains(&cmd.bar_type) {
457            self.subscriptions_bars.remove(&cmd.bar_type);
458            self.client.unsubscribe_bars(cmd)?;
459        }
460        Ok(())
461    }
462
463    /// Subscribes to mark price updates for an instrument, updating internal state and forwarding to the client.
464    ///
465    /// # Errors
466    ///
467    /// Returns an error if the underlying client subscribe operation fails.
468    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
469        if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
470            self.subscriptions_mark_prices.insert(cmd.instrument_id);
471            self.client.subscribe_mark_prices(cmd)?;
472        }
473        Ok(())
474    }
475
476    /// Unsubscribes from mark price updates for an instrument, updating internal state and forwarding to the client.
477    ///
478    /// # Errors
479    ///
480    /// Returns an error if the underlying client unsubscribe operation fails.
481    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
482        if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
483            self.subscriptions_mark_prices.remove(&cmd.instrument_id);
484            self.client.unsubscribe_mark_prices(cmd)?;
485        }
486        Ok(())
487    }
488
489    /// Subscribes to index price updates for an instrument, updating internal state and forwarding to the client.
490    ///
491    /// # Errors
492    ///
493    /// Returns an error if the underlying client subscribe operation fails.
494    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
495        if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
496            self.subscriptions_index_prices.insert(cmd.instrument_id);
497            self.client.subscribe_index_prices(cmd)?;
498        }
499        Ok(())
500    }
501
502    /// Unsubscribes from index price updates for an instrument, updating internal state and forwarding to the client.
503    ///
504    /// # Errors
505    ///
506    /// Returns an error if the underlying client unsubscribe operation fails.
507    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
508        if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
509            self.subscriptions_index_prices.remove(&cmd.instrument_id);
510            self.client.unsubscribe_index_prices(cmd)?;
511        }
512        Ok(())
513    }
514
515    /// Subscribes to funding rate updates for an instrument, updating internal state and forwarding to the client.
516    ///
517    /// # Errors
518    ///
519    /// Returns an error if the underlying client subscribe operation fails.
520    fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
521        if !self
522            .subscriptions_funding_rates
523            .contains(&cmd.instrument_id)
524        {
525            self.subscriptions_funding_rates.insert(cmd.instrument_id);
526            self.client.subscribe_funding_rates(cmd)?;
527        }
528        Ok(())
529    }
530
531    /// Unsubscribes from funding rate updates for an instrument, updating internal state and forwarding to the client.
532    ///
533    /// # Errors
534    ///
535    /// Returns an error if the underlying client unsubscribe operation fails.
536    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
537        if self
538            .subscriptions_funding_rates
539            .contains(&cmd.instrument_id)
540        {
541            self.subscriptions_funding_rates.remove(&cmd.instrument_id);
542            self.client.unsubscribe_funding_rates(cmd)?;
543        }
544        Ok(())
545    }
546
547    /// Subscribes to instrument status updates for the specified instrument.
548    ///
549    /// # Errors
550    ///
551    /// Returns an error if the underlying client subscribe operation fails.
552    fn subscribe_instrument_status(
553        &mut self,
554        cmd: SubscribeInstrumentStatus,
555    ) -> anyhow::Result<()> {
556        if !self
557            .subscriptions_instrument_status
558            .contains(&cmd.instrument_id)
559        {
560            self.subscriptions_instrument_status
561                .insert(cmd.instrument_id);
562            self.client.subscribe_instrument_status(cmd)?;
563        }
564        Ok(())
565    }
566
567    /// Unsubscribes from instrument status updates for the specified instrument.
568    ///
569    /// # Errors
570    ///
571    /// Returns an error if the underlying client unsubscribe operation fails.
572    fn unsubscribe_instrument_status(
573        &mut self,
574        cmd: &UnsubscribeInstrumentStatus,
575    ) -> anyhow::Result<()> {
576        if self
577            .subscriptions_instrument_status
578            .contains(&cmd.instrument_id)
579        {
580            self.subscriptions_instrument_status
581                .remove(&cmd.instrument_id);
582            self.client.unsubscribe_instrument_status(cmd)?;
583        }
584        Ok(())
585    }
586
587    /// Subscribes to instrument close events for the specified instrument.
588    ///
589    /// # Errors
590    ///
591    /// Returns an error if the underlying client subscribe operation fails.
592    fn subscribe_instrument_close(&mut self, cmd: SubscribeInstrumentClose) -> anyhow::Result<()> {
593        if !self
594            .subscriptions_instrument_close
595            .contains(&cmd.instrument_id)
596        {
597            self.subscriptions_instrument_close
598                .insert(cmd.instrument_id);
599            self.client.subscribe_instrument_close(cmd)?;
600        }
601        Ok(())
602    }
603
604    /// Unsubscribes from instrument close events for the specified instrument.
605    ///
606    /// # Errors
607    ///
608    /// Returns an error if the underlying client unsubscribe operation fails.
609    fn unsubscribe_instrument_close(
610        &mut self,
611        cmd: &UnsubscribeInstrumentClose,
612    ) -> anyhow::Result<()> {
613        if self
614            .subscriptions_instrument_close
615            .contains(&cmd.instrument_id)
616        {
617            self.subscriptions_instrument_close
618                .remove(&cmd.instrument_id);
619            self.client.unsubscribe_instrument_close(cmd)?;
620        }
621        Ok(())
622    }
623
624    /// Subscribes to option greeks for an instrument, updating internal state and forwarding to the client.
625    ///
626    /// # Errors
627    ///
628    /// Returns an error if the underlying client subscribe operation fails.
629    fn subscribe_option_greeks(&mut self, cmd: SubscribeOptionGreeks) -> anyhow::Result<()> {
630        if !self
631            .subscriptions_option_greeks
632            .contains(&cmd.instrument_id)
633        {
634            self.subscriptions_option_greeks.insert(cmd.instrument_id);
635            self.client.subscribe_option_greeks(cmd)?;
636        }
637        Ok(())
638    }
639
640    /// Unsubscribes from option greeks for an instrument, updating internal state and forwarding to the client.
641    ///
642    /// # Errors
643    ///
644    /// Returns an error if the underlying client unsubscribe operation fails.
645    fn unsubscribe_option_greeks(&mut self, cmd: &UnsubscribeOptionGreeks) -> anyhow::Result<()> {
646        if self
647            .subscriptions_option_greeks
648            .contains(&cmd.instrument_id)
649        {
650            self.subscriptions_option_greeks.remove(&cmd.instrument_id);
651            self.client.unsubscribe_option_greeks(cmd)?;
652        }
653        Ok(())
654    }
655
656    // -- REQUEST HANDLERS ------------------------------------------------------------------------
657
658    /// Sends a data request to the underlying client.
659    ///
660    /// # Errors
661    ///
662    /// Returns an error if the client request fails.
663    pub fn request_data(&self, req: RequestCustomData) -> anyhow::Result<()> {
664        self.client.request_data(req)
665    }
666
667    /// Sends a single instrument request to the client.
668    ///
669    /// # Errors
670    ///
671    /// Returns an error if the client fails to process the request.
672    pub fn request_instrument(&self, req: RequestInstrument) -> anyhow::Result<()> {
673        self.client.request_instrument(req)
674    }
675
676    /// Sends a batch instruments request to the client.
677    ///
678    /// # Errors
679    ///
680    /// Returns an error if the client fails to process the request.
681    pub fn request_instruments(&self, req: RequestInstruments) -> anyhow::Result<()> {
682        self.client.request_instruments(req)
683    }
684
685    /// Sends a book snapshot request for a given instrument.
686    ///
687    /// # Errors
688    ///
689    /// Returns an error if the client fails to process the book snapshot request.
690    pub fn request_book_snapshot(&self, req: RequestBookSnapshot) -> anyhow::Result<()> {
691        self.client.request_book_snapshot(req)
692    }
693
694    /// Sends a quotes request for a given instrument.
695    ///
696    /// # Errors
697    ///
698    /// Returns an error if the client fails to process the quotes request.
699    pub fn request_quotes(&self, req: RequestQuotes) -> anyhow::Result<()> {
700        self.client.request_quotes(req)
701    }
702
703    /// Sends a trades request for a given instrument.
704    ///
705    /// # Errors
706    ///
707    /// Returns an error if the client fails to process the trades request.
708    pub fn request_trades(&self, req: RequestTrades) -> anyhow::Result<()> {
709        self.client.request_trades(req)
710    }
711
712    /// Sends a funding rates request for a given instrument.
713    ///
714    /// # Errors
715    ///
716    /// Returns an error if the client fails to process the trades request.
717    pub fn request_funding_rates(&self, req: RequestFundingRates) -> anyhow::Result<()> {
718        self.client.request_funding_rates(req)
719    }
720
721    /// Sends a forward prices request for derivatives instruments.
722    ///
723    /// # Errors
724    ///
725    /// Returns an error if the client fails to process the forward prices request.
726    pub fn request_forward_prices(&self, req: RequestForwardPrices) -> anyhow::Result<()> {
727        self.client.request_forward_prices(req)
728    }
729
730    /// Sends a bars request for a given instrument and bar type.
731    ///
732    /// # Errors
733    ///
734    /// Returns an error if the client fails to process the bars request.
735    pub fn request_bars(&self, req: RequestBars) -> anyhow::Result<()> {
736        self.client.request_bars(req)
737    }
738
739    /// Sends an order book depths request for a given instrument.
740    ///
741    /// # Errors
742    ///
743    /// Returns an error if the client fails to process the order book depths request.
744    pub fn request_book_depth(&self, req: RequestBookDepth) -> anyhow::Result<()> {
745        self.client.request_book_depth(req)
746    }
747}