nautilus_data/defi/
client.rs1use nautilus_common::{
22 clients::log_command_error,
23 messages::defi::{
24 DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
25 SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
26 SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
27 UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents, UnsubscribePoolLiquidityUpdates,
28 UnsubscribePoolSwaps,
29 },
30};
31
32use crate::client::DataClientAdapter;
33
34impl DataClientAdapter {
35 #[inline]
36 pub fn execute_defi_subscribe(&mut self, cmd: DefiSubscribeCommand) {
37 let cmd_debug = format!("{cmd:?}");
38 if let Err(e) = match cmd {
39 DefiSubscribeCommand::Blocks(cmd) => self.subscribe_blocks(cmd),
40 DefiSubscribeCommand::Pool(cmd) => self.subscribe_pool(cmd),
41 DefiSubscribeCommand::PoolSwaps(cmd) => self.subscribe_pool_swaps(cmd),
42 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
43 self.subscribe_pool_liquidity_updates(cmd)
44 }
45 DefiSubscribeCommand::PoolFeeCollects(cmd) => self.subscribe_pool_fee_collects(cmd),
46 DefiSubscribeCommand::PoolFlashEvents(cmd) => self.subscribe_pool_flash_events(cmd),
47 } {
48 log_command_error(&cmd_debug, &e);
49 }
50 }
51
52 #[inline]
53 pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) {
54 if let Err(e) = match cmd {
55 DefiUnsubscribeCommand::Blocks(cmd) => self.unsubscribe_blocks(cmd),
56 DefiUnsubscribeCommand::Pool(cmd) => self.unsubscribe_pool(cmd),
57 DefiUnsubscribeCommand::PoolSwaps(cmd) => self.unsubscribe_pool_swaps(cmd),
58 DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
59 self.unsubscribe_pool_liquidity_updates(cmd)
60 }
61 DefiUnsubscribeCommand::PoolFeeCollects(cmd) => self.unsubscribe_pool_fee_collects(cmd),
62 DefiUnsubscribeCommand::PoolFlashEvents(cmd) => self.unsubscribe_pool_flash_events(cmd),
63 } {
64 log_command_error(&cmd, &e);
65 }
66 }
67
68 #[inline]
74 pub fn execute_defi_request(&self, cmd: DefiRequestCommand) -> anyhow::Result<()> {
75 match cmd {
76 DefiRequestCommand::PoolSnapshot(cmd) => self.request_pool_snapshot(cmd),
77 }
78 }
79
80 fn subscribe_blocks(&mut self, cmd: SubscribeBlocks) -> anyhow::Result<()> {
86 if !self.subscriptions_blocks.contains(&cmd.chain) {
87 self.subscriptions_blocks.insert(cmd.chain);
88 self.client.subscribe_blocks(cmd)?;
89 }
90 Ok(())
91 }
92
93 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
99 if self.subscriptions_blocks.contains(&cmd.chain) {
100 self.subscriptions_blocks.remove(&cmd.chain);
101 self.client.unsubscribe_blocks(cmd)?;
102 }
103 Ok(())
104 }
105
106 fn subscribe_pool(&mut self, cmd: SubscribePool) -> anyhow::Result<()> {
112 if !self.subscriptions_pools.contains(&cmd.instrument_id) {
113 self.subscriptions_pools.insert(cmd.instrument_id);
114 self.client.subscribe_pool(cmd)?;
115 }
116 Ok(())
117 }
118
119 fn subscribe_pool_swaps(&mut self, cmd: SubscribePoolSwaps) -> anyhow::Result<()> {
125 if !self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
126 self.subscriptions_pool_swaps.insert(cmd.instrument_id);
127 self.client.subscribe_pool_swaps(cmd)?;
128 }
129 Ok(())
130 }
131
132 fn subscribe_pool_liquidity_updates(
138 &mut self,
139 cmd: SubscribePoolLiquidityUpdates,
140 ) -> anyhow::Result<()> {
141 if !self
142 .subscriptions_pool_liquidity_updates
143 .contains(&cmd.instrument_id)
144 {
145 self.subscriptions_pool_liquidity_updates
146 .insert(cmd.instrument_id);
147 self.client.subscribe_pool_liquidity_updates(cmd)?;
148 }
149 Ok(())
150 }
151
152 fn subscribe_pool_fee_collects(&mut self, cmd: SubscribePoolFeeCollects) -> anyhow::Result<()> {
158 if !self
159 .subscriptions_pool_fee_collects
160 .contains(&cmd.instrument_id)
161 {
162 self.subscriptions_pool_fee_collects
163 .insert(cmd.instrument_id);
164 self.client.subscribe_pool_fee_collects(cmd)?;
165 }
166 Ok(())
167 }
168
169 fn subscribe_pool_flash_events(&mut self, cmd: SubscribePoolFlashEvents) -> anyhow::Result<()> {
175 if !self.subscriptions_pool_flash.contains(&cmd.instrument_id) {
176 self.subscriptions_pool_flash.insert(cmd.instrument_id);
177 self.client.subscribe_pool_flash_events(cmd)?;
178 }
179 Ok(())
180 }
181
182 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
188 if self.subscriptions_pools.contains(&cmd.instrument_id) {
189 self.subscriptions_pools.remove(&cmd.instrument_id);
190 self.client.unsubscribe_pool(cmd)?;
191 }
192 Ok(())
193 }
194
195 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
201 if self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
202 self.subscriptions_pool_swaps.remove(&cmd.instrument_id);
203 self.client.unsubscribe_pool_swaps(cmd)?;
204 }
205 Ok(())
206 }
207
208 fn unsubscribe_pool_liquidity_updates(
214 &mut self,
215 cmd: &UnsubscribePoolLiquidityUpdates,
216 ) -> anyhow::Result<()> {
217 if self
218 .subscriptions_pool_liquidity_updates
219 .contains(&cmd.instrument_id)
220 {
221 self.subscriptions_pool_liquidity_updates
222 .remove(&cmd.instrument_id);
223 self.client.unsubscribe_pool_liquidity_updates(cmd)?;
224 }
225 Ok(())
226 }
227
228 fn unsubscribe_pool_fee_collects(
234 &mut self,
235 cmd: &UnsubscribePoolFeeCollects,
236 ) -> anyhow::Result<()> {
237 if self
238 .subscriptions_pool_fee_collects
239 .contains(&cmd.instrument_id)
240 {
241 self.subscriptions_pool_fee_collects
242 .remove(&cmd.instrument_id);
243 self.client.unsubscribe_pool_fee_collects(cmd)?;
244 }
245 Ok(())
246 }
247
248 fn unsubscribe_pool_flash_events(
254 &mut self,
255 cmd: &UnsubscribePoolFlashEvents,
256 ) -> anyhow::Result<()> {
257 if self.subscriptions_pool_flash.contains(&cmd.instrument_id) {
258 self.subscriptions_pool_flash.remove(&cmd.instrument_id);
259 self.client.unsubscribe_pool_flash_events(cmd)?;
260 }
261 Ok(())
262 }
263
264 pub fn request_pool_snapshot(&self, req: RequestPoolSnapshot) -> anyhow::Result<()> {
270 self.client.request_pool_snapshot(req)
271 }
272}