1use std::{
22 fmt::Debug,
23 ops::{Deref, DerefMut},
24};
25
26use ahash::AHashSet;
27use nautilus_common::{
28 clients::{DataClient, log_command_error},
29 messages::data::{
30 RequestBars, RequestBookDepth, RequestBookSnapshot, RequestCustomData,
31 RequestForwardPrices, RequestFundingRates, RequestInstrument, RequestInstruments,
32 RequestQuotes, RequestTrades, SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10,
33 SubscribeCommand, SubscribeCustomData, SubscribeFundingRates, SubscribeIndexPrices,
34 SubscribeInstrument, SubscribeInstrumentClose, SubscribeInstrumentStatus,
35 SubscribeInstruments, SubscribeMarkPrices, SubscribeOptionGreeks, SubscribeQuotes,
36 SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
37 UnsubscribeCommand, UnsubscribeCustomData, UnsubscribeFundingRates, UnsubscribeIndexPrices,
38 UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
39 UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeOptionGreeks, UnsubscribeQuotes,
40 UnsubscribeTrades,
41 },
42};
43#[cfg(feature = "defi")]
44use nautilus_model::defi::Blockchain;
45use nautilus_model::{
46 data::{BarType, DataType},
47 identifiers::{ClientId, InstrumentId, Venue},
48};
49
50#[cfg(feature = "defi")]
51#[allow(unused_imports)] use crate::defi::client as _;
53
54pub struct DataClientAdapter {
56 pub(crate) client: Box<dyn DataClient>,
57 pub client_id: ClientId,
58 pub venue: Option<Venue>,
59 pub handles_book_deltas: bool,
60 pub handles_book_snapshots: bool,
61 pub subscriptions_custom: AHashSet<DataType>,
62 pub subscriptions_book_deltas: AHashSet<InstrumentId>,
63 pub subscriptions_book_depth10: AHashSet<InstrumentId>,
64 pub subscriptions_quotes: AHashSet<InstrumentId>,
65 pub subscriptions_trades: AHashSet<InstrumentId>,
66 pub subscriptions_bars: AHashSet<BarType>,
67 pub subscriptions_instrument_status: AHashSet<InstrumentId>,
68 pub subscriptions_instrument_close: AHashSet<InstrumentId>,
69 pub subscriptions_instrument: AHashSet<InstrumentId>,
70 pub subscriptions_instrument_venue: AHashSet<Venue>,
71 pub subscriptions_mark_prices: AHashSet<InstrumentId>,
72 pub subscriptions_index_prices: AHashSet<InstrumentId>,
73 pub subscriptions_funding_rates: AHashSet<InstrumentId>,
74 pub subscriptions_option_greeks: AHashSet<InstrumentId>,
75 #[cfg(feature = "defi")]
76 pub subscriptions_blocks: AHashSet<Blockchain>,
77 #[cfg(feature = "defi")]
78 pub subscriptions_pools: AHashSet<InstrumentId>,
79 #[cfg(feature = "defi")]
80 pub subscriptions_pool_swaps: AHashSet<InstrumentId>,
81 #[cfg(feature = "defi")]
82 pub subscriptions_pool_liquidity_updates: AHashSet<InstrumentId>,
83 #[cfg(feature = "defi")]
84 pub subscriptions_pool_fee_collects: AHashSet<InstrumentId>,
85 #[cfg(feature = "defi")]
86 pub subscriptions_pool_flash: AHashSet<InstrumentId>,
87}
88
89impl Deref for DataClientAdapter {
90 type Target = Box<dyn DataClient>;
91
92 fn deref(&self) -> &Self::Target {
93 &self.client
94 }
95}
96
97impl DerefMut for DataClientAdapter {
98 fn deref_mut(&mut self) -> &mut Self::Target {
99 &mut self.client
100 }
101}
102
103impl Debug for DataClientAdapter {
104 #[rustfmt::skip]
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct(stringify!(DataClientAdapter))
107 .field("client_id", &self.client_id)
108 .field("venue", &self.venue)
109 .field("handles_book_deltas", &self.handles_book_deltas)
110 .field("handles_book_snapshots", &self.handles_book_snapshots)
111 .field("subscriptions_custom", &self.subscriptions_custom)
112 .field("subscriptions_book_deltas", &self.subscriptions_book_deltas)
113 .field("subscriptions_book_depth10", &self.subscriptions_book_depth10)
114 .field("subscriptions_quotes", &self.subscriptions_quotes)
115 .field("subscriptions_trades", &self.subscriptions_trades)
116 .field("subscriptions_bars", &self.subscriptions_bars)
117 .field("subscriptions_mark_prices", &self.subscriptions_mark_prices)
118 .field("subscriptions_index_prices", &self.subscriptions_index_prices)
119 .field("subscriptions_instrument_status", &self.subscriptions_instrument_status)
120 .field("subscriptions_instrument_close", &self.subscriptions_instrument_close)
121 .field("subscriptions_instrument", &self.subscriptions_instrument)
122 .field("subscriptions_instrument_venue", &self.subscriptions_instrument_venue)
123 .finish()
124 }
125}
126
127impl DataClientAdapter {
128 #[must_use]
130 pub fn new(
131 client_id: ClientId,
132 venue: Option<Venue>,
133 handles_order_book_deltas: bool,
134 handles_order_book_snapshots: bool,
135 client: Box<dyn DataClient>,
136 ) -> Self {
137 Self {
138 client,
139 client_id,
140 venue,
141 handles_book_deltas: handles_order_book_deltas,
142 handles_book_snapshots: handles_order_book_snapshots,
143 subscriptions_custom: AHashSet::new(),
144 subscriptions_book_deltas: AHashSet::new(),
145 subscriptions_book_depth10: AHashSet::new(),
146 subscriptions_quotes: AHashSet::new(),
147 subscriptions_trades: AHashSet::new(),
148 subscriptions_mark_prices: AHashSet::new(),
149 subscriptions_index_prices: AHashSet::new(),
150 subscriptions_funding_rates: AHashSet::new(),
151 subscriptions_option_greeks: AHashSet::new(),
152 subscriptions_bars: AHashSet::new(),
153 subscriptions_instrument_status: AHashSet::new(),
154 subscriptions_instrument_close: AHashSet::new(),
155 subscriptions_instrument: AHashSet::new(),
156 subscriptions_instrument_venue: AHashSet::new(),
157 #[cfg(feature = "defi")]
158 subscriptions_blocks: AHashSet::new(),
159 #[cfg(feature = "defi")]
160 subscriptions_pools: AHashSet::new(),
161 #[cfg(feature = "defi")]
162 subscriptions_pool_swaps: AHashSet::new(),
163 #[cfg(feature = "defi")]
164 subscriptions_pool_liquidity_updates: AHashSet::new(),
165 #[cfg(feature = "defi")]
166 subscriptions_pool_fee_collects: AHashSet::new(),
167 #[cfg(feature = "defi")]
168 subscriptions_pool_flash: AHashSet::new(),
169 }
170 }
171
172 #[expect(clippy::borrowed_box)]
173 #[must_use]
174 pub fn get_client(&self) -> &Box<dyn DataClient> {
175 &self.client
176 }
177
178 pub async fn connect(&mut self) -> anyhow::Result<()> {
184 self.client.connect().await
185 }
186
187 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
193 self.client.disconnect().await
194 }
195
196 #[inline]
197 pub fn execute_subscribe(&mut self, cmd: SubscribeCommand) {
198 let cmd_debug = format!("{cmd:?}");
199 if let Err(e) = match cmd {
200 SubscribeCommand::Data(cmd) => self.subscribe(cmd),
201 SubscribeCommand::Instrument(cmd) => self.subscribe_instrument(cmd),
202 SubscribeCommand::Instruments(cmd) => self.subscribe_instruments(cmd),
203 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd),
204 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd),
205 SubscribeCommand::BookSnapshots(_) => Ok(()), SubscribeCommand::Quotes(cmd) => self.subscribe_quotes(cmd),
207 SubscribeCommand::Trades(cmd) => self.subscribe_trades(cmd),
208 SubscribeCommand::MarkPrices(cmd) => self.subscribe_mark_prices(cmd),
209 SubscribeCommand::IndexPrices(cmd) => self.subscribe_index_prices(cmd),
210 SubscribeCommand::FundingRates(cmd) => self.subscribe_funding_rates(cmd),
211 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd),
212 SubscribeCommand::InstrumentStatus(cmd) => self.subscribe_instrument_status(cmd),
213 SubscribeCommand::InstrumentClose(cmd) => self.subscribe_instrument_close(cmd),
214 SubscribeCommand::OptionGreeks(cmd) => self.subscribe_option_greeks(cmd),
215 SubscribeCommand::OptionChain(_) => Ok(()), } {
217 log_command_error(&cmd_debug, &e);
218 }
219 }
220
221 #[inline]
222 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) {
223 if let Err(e) = match cmd {
224 UnsubscribeCommand::Data(cmd) => self.unsubscribe(cmd),
225 UnsubscribeCommand::Instrument(cmd) => self.unsubscribe_instrument(cmd),
226 UnsubscribeCommand::Instruments(cmd) => self.unsubscribe_instruments(cmd),
227 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
228 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
229 UnsubscribeCommand::BookSnapshots(_) => Ok(()), UnsubscribeCommand::Quotes(cmd) => self.unsubscribe_quotes(cmd),
231 UnsubscribeCommand::Trades(cmd) => self.unsubscribe_trades(cmd),
232 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
233 UnsubscribeCommand::MarkPrices(cmd) => self.unsubscribe_mark_prices(cmd),
234 UnsubscribeCommand::IndexPrices(cmd) => self.unsubscribe_index_prices(cmd),
235 UnsubscribeCommand::FundingRates(cmd) => self.unsubscribe_funding_rates(cmd),
236 UnsubscribeCommand::InstrumentStatus(cmd) => self.unsubscribe_instrument_status(cmd),
237 UnsubscribeCommand::InstrumentClose(cmd) => self.unsubscribe_instrument_close(cmd),
238 UnsubscribeCommand::OptionGreeks(cmd) => self.unsubscribe_option_greeks(cmd),
239 UnsubscribeCommand::OptionChain(_) => Ok(()), } {
241 log_command_error(&cmd, &e);
242 }
243 }
244
245 pub fn subscribe(&mut self, cmd: SubscribeCustomData) -> anyhow::Result<()> {
253 if !self.subscriptions_custom.contains(&cmd.data_type) {
254 self.subscriptions_custom.insert(cmd.data_type.clone());
255 self.client.subscribe(cmd)?;
256 }
257 Ok(())
258 }
259
260 pub fn unsubscribe(&mut self, cmd: &UnsubscribeCustomData) -> anyhow::Result<()> {
266 if self.subscriptions_custom.contains(&cmd.data_type) {
267 self.subscriptions_custom.remove(&cmd.data_type);
268 self.client.unsubscribe(cmd)?;
269 }
270 Ok(())
271 }
272
273 fn subscribe_instruments(&mut self, cmd: SubscribeInstruments) -> anyhow::Result<()> {
279 if !self.subscriptions_instrument_venue.contains(&cmd.venue) {
280 self.subscriptions_instrument_venue.insert(cmd.venue);
281 self.client.subscribe_instruments(cmd)?;
282 }
283
284 Ok(())
285 }
286
287 fn unsubscribe_instruments(&mut self, cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
293 if self.subscriptions_instrument_venue.contains(&cmd.venue) {
294 self.subscriptions_instrument_venue.remove(&cmd.venue);
295 self.client.unsubscribe_instruments(cmd)?;
296 }
297
298 Ok(())
299 }
300
301 fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
307 if !self.subscriptions_instrument.contains(&cmd.instrument_id) {
308 self.subscriptions_instrument.insert(cmd.instrument_id);
309 self.client.subscribe_instrument(cmd)?;
310 }
311
312 Ok(())
313 }
314
315 fn unsubscribe_instrument(&mut self, cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
321 if self.subscriptions_instrument.contains(&cmd.instrument_id) {
322 self.subscriptions_instrument.remove(&cmd.instrument_id);
323 self.client.unsubscribe_instrument(cmd)?;
324 }
325
326 Ok(())
327 }
328
329 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
335 if !self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
336 self.subscriptions_book_deltas.insert(cmd.instrument_id);
337 self.client.subscribe_book_deltas(cmd)?;
338 }
339
340 Ok(())
341 }
342
343 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
349 if self.subscriptions_book_deltas.contains(&cmd.instrument_id) {
350 self.subscriptions_book_deltas.remove(&cmd.instrument_id);
351 self.client.unsubscribe_book_deltas(cmd)?;
352 }
353
354 Ok(())
355 }
356
357 fn subscribe_book_depth10(&mut self, cmd: SubscribeBookDepth10) -> anyhow::Result<()> {
363 if !self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
364 self.subscriptions_book_depth10.insert(cmd.instrument_id);
365 self.client.subscribe_book_depth10(cmd)?;
366 }
367
368 Ok(())
369 }
370
371 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
377 if self.subscriptions_book_depth10.contains(&cmd.instrument_id) {
378 self.subscriptions_book_depth10.remove(&cmd.instrument_id);
379 self.client.unsubscribe_book_depth10(cmd)?;
380 }
381
382 Ok(())
383 }
384
385 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
391 if !self.subscriptions_quotes.contains(&cmd.instrument_id) {
392 self.subscriptions_quotes.insert(cmd.instrument_id);
393 self.client.subscribe_quotes(cmd)?;
394 }
395 Ok(())
396 }
397
398 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
404 if self.subscriptions_quotes.contains(&cmd.instrument_id) {
405 self.subscriptions_quotes.remove(&cmd.instrument_id);
406 self.client.unsubscribe_quotes(cmd)?;
407 }
408 Ok(())
409 }
410
411 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
417 if !self.subscriptions_trades.contains(&cmd.instrument_id) {
418 self.subscriptions_trades.insert(cmd.instrument_id);
419 self.client.subscribe_trades(cmd)?;
420 }
421 Ok(())
422 }
423
424 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
430 if self.subscriptions_trades.contains(&cmd.instrument_id) {
431 self.subscriptions_trades.remove(&cmd.instrument_id);
432 self.client.unsubscribe_trades(cmd)?;
433 }
434 Ok(())
435 }
436
437 fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
443 if !self.subscriptions_bars.contains(&cmd.bar_type) {
444 self.subscriptions_bars.insert(cmd.bar_type);
445 self.client.subscribe_bars(cmd)?;
446 }
447 Ok(())
448 }
449
450 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
456 if self.subscriptions_bars.contains(&cmd.bar_type) {
457 self.subscriptions_bars.remove(&cmd.bar_type);
458 self.client.unsubscribe_bars(cmd)?;
459 }
460 Ok(())
461 }
462
463 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
469 if !self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
470 self.subscriptions_mark_prices.insert(cmd.instrument_id);
471 self.client.subscribe_mark_prices(cmd)?;
472 }
473 Ok(())
474 }
475
476 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
482 if self.subscriptions_mark_prices.contains(&cmd.instrument_id) {
483 self.subscriptions_mark_prices.remove(&cmd.instrument_id);
484 self.client.unsubscribe_mark_prices(cmd)?;
485 }
486 Ok(())
487 }
488
489 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
495 if !self.subscriptions_index_prices.contains(&cmd.instrument_id) {
496 self.subscriptions_index_prices.insert(cmd.instrument_id);
497 self.client.subscribe_index_prices(cmd)?;
498 }
499 Ok(())
500 }
501
502 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
508 if self.subscriptions_index_prices.contains(&cmd.instrument_id) {
509 self.subscriptions_index_prices.remove(&cmd.instrument_id);
510 self.client.unsubscribe_index_prices(cmd)?;
511 }
512 Ok(())
513 }
514
515 fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
521 if !self
522 .subscriptions_funding_rates
523 .contains(&cmd.instrument_id)
524 {
525 self.subscriptions_funding_rates.insert(cmd.instrument_id);
526 self.client.subscribe_funding_rates(cmd)?;
527 }
528 Ok(())
529 }
530
531 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
537 if self
538 .subscriptions_funding_rates
539 .contains(&cmd.instrument_id)
540 {
541 self.subscriptions_funding_rates.remove(&cmd.instrument_id);
542 self.client.unsubscribe_funding_rates(cmd)?;
543 }
544 Ok(())
545 }
546
547 fn subscribe_instrument_status(
553 &mut self,
554 cmd: SubscribeInstrumentStatus,
555 ) -> anyhow::Result<()> {
556 if !self
557 .subscriptions_instrument_status
558 .contains(&cmd.instrument_id)
559 {
560 self.subscriptions_instrument_status
561 .insert(cmd.instrument_id);
562 self.client.subscribe_instrument_status(cmd)?;
563 }
564 Ok(())
565 }
566
567 fn unsubscribe_instrument_status(
573 &mut self,
574 cmd: &UnsubscribeInstrumentStatus,
575 ) -> anyhow::Result<()> {
576 if self
577 .subscriptions_instrument_status
578 .contains(&cmd.instrument_id)
579 {
580 self.subscriptions_instrument_status
581 .remove(&cmd.instrument_id);
582 self.client.unsubscribe_instrument_status(cmd)?;
583 }
584 Ok(())
585 }
586
587 fn subscribe_instrument_close(&mut self, cmd: SubscribeInstrumentClose) -> anyhow::Result<()> {
593 if !self
594 .subscriptions_instrument_close
595 .contains(&cmd.instrument_id)
596 {
597 self.subscriptions_instrument_close
598 .insert(cmd.instrument_id);
599 self.client.subscribe_instrument_close(cmd)?;
600 }
601 Ok(())
602 }
603
604 fn unsubscribe_instrument_close(
610 &mut self,
611 cmd: &UnsubscribeInstrumentClose,
612 ) -> anyhow::Result<()> {
613 if self
614 .subscriptions_instrument_close
615 .contains(&cmd.instrument_id)
616 {
617 self.subscriptions_instrument_close
618 .remove(&cmd.instrument_id);
619 self.client.unsubscribe_instrument_close(cmd)?;
620 }
621 Ok(())
622 }
623
624 fn subscribe_option_greeks(&mut self, cmd: SubscribeOptionGreeks) -> anyhow::Result<()> {
630 if !self
631 .subscriptions_option_greeks
632 .contains(&cmd.instrument_id)
633 {
634 self.subscriptions_option_greeks.insert(cmd.instrument_id);
635 self.client.subscribe_option_greeks(cmd)?;
636 }
637 Ok(())
638 }
639
640 fn unsubscribe_option_greeks(&mut self, cmd: &UnsubscribeOptionGreeks) -> anyhow::Result<()> {
646 if self
647 .subscriptions_option_greeks
648 .contains(&cmd.instrument_id)
649 {
650 self.subscriptions_option_greeks.remove(&cmd.instrument_id);
651 self.client.unsubscribe_option_greeks(cmd)?;
652 }
653 Ok(())
654 }
655
656 pub fn request_data(&self, req: RequestCustomData) -> anyhow::Result<()> {
664 self.client.request_data(req)
665 }
666
667 pub fn request_instrument(&self, req: RequestInstrument) -> anyhow::Result<()> {
673 self.client.request_instrument(req)
674 }
675
676 pub fn request_instruments(&self, req: RequestInstruments) -> anyhow::Result<()> {
682 self.client.request_instruments(req)
683 }
684
685 pub fn request_book_snapshot(&self, req: RequestBookSnapshot) -> anyhow::Result<()> {
691 self.client.request_book_snapshot(req)
692 }
693
694 pub fn request_quotes(&self, req: RequestQuotes) -> anyhow::Result<()> {
700 self.client.request_quotes(req)
701 }
702
703 pub fn request_trades(&self, req: RequestTrades) -> anyhow::Result<()> {
709 self.client.request_trades(req)
710 }
711
712 pub fn request_funding_rates(&self, req: RequestFundingRates) -> anyhow::Result<()> {
718 self.client.request_funding_rates(req)
719 }
720
721 pub fn request_forward_prices(&self, req: RequestForwardPrices) -> anyhow::Result<()> {
727 self.client.request_forward_prices(req)
728 }
729
730 pub fn request_bars(&self, req: RequestBars) -> anyhow::Result<()> {
736 self.client.request_bars(req)
737 }
738
739 pub fn request_book_depth(&self, req: RequestBookDepth) -> anyhow::Result<()> {
745 self.client.request_book_depth(req)
746 }
747}