Skip to main content

nautilus_data/defi/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! DeFi-specific data client functionality.
17//!
18//! This module provides DeFi subscription and request helper methods
19//! for the `DataClientAdapter`. All code in this module requires the `defi` feature flag.
20
21use nautilus_common::{
22    clients::log_command_error,
23    messages::defi::{
24        DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
25        SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
26        SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
27        UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents, UnsubscribePoolLiquidityUpdates,
28        UnsubscribePoolSwaps,
29    },
30};
31
32use crate::client::DataClientAdapter;
33
34impl DataClientAdapter {
35    #[inline]
36    pub fn execute_defi_subscribe(&mut self, cmd: DefiSubscribeCommand) {
37        let cmd_debug = format!("{cmd:?}");
38        if let Err(e) = match cmd {
39            DefiSubscribeCommand::Blocks(cmd) => self.subscribe_blocks(cmd),
40            DefiSubscribeCommand::Pool(cmd) => self.subscribe_pool(cmd),
41            DefiSubscribeCommand::PoolSwaps(cmd) => self.subscribe_pool_swaps(cmd),
42            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
43                self.subscribe_pool_liquidity_updates(cmd)
44            }
45            DefiSubscribeCommand::PoolFeeCollects(cmd) => self.subscribe_pool_fee_collects(cmd),
46            DefiSubscribeCommand::PoolFlashEvents(cmd) => self.subscribe_pool_flash_events(cmd),
47        } {
48            log_command_error(&cmd_debug, &e);
49        }
50    }
51
52    #[inline]
53    pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) {
54        if let Err(e) = match cmd {
55            DefiUnsubscribeCommand::Blocks(cmd) => self.unsubscribe_blocks(cmd),
56            DefiUnsubscribeCommand::Pool(cmd) => self.unsubscribe_pool(cmd),
57            DefiUnsubscribeCommand::PoolSwaps(cmd) => self.unsubscribe_pool_swaps(cmd),
58            DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
59                self.unsubscribe_pool_liquidity_updates(cmd)
60            }
61            DefiUnsubscribeCommand::PoolFeeCollects(cmd) => self.unsubscribe_pool_fee_collects(cmd),
62            DefiUnsubscribeCommand::PoolFlashEvents(cmd) => self.unsubscribe_pool_flash_events(cmd),
63        } {
64            log_command_error(&cmd, &e);
65        }
66    }
67
68    /// Executes a DeFi data request command by dispatching to the appropriate handler.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the underlying client request fails.
73    #[inline]
74    pub fn execute_defi_request(&self, cmd: DefiRequestCommand) -> anyhow::Result<()> {
75        match cmd {
76            DefiRequestCommand::PoolSnapshot(cmd) => self.request_pool_snapshot(cmd),
77        }
78    }
79
80    /// Subscribes to block events for the specified blockchain.
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the underlying client subscribe operation fails.
85    fn subscribe_blocks(&mut self, cmd: SubscribeBlocks) -> anyhow::Result<()> {
86        if !self.subscriptions_blocks.contains(&cmd.chain) {
87            self.subscriptions_blocks.insert(cmd.chain);
88            self.client.subscribe_blocks(cmd)?;
89        }
90        Ok(())
91    }
92
93    /// Unsubscribes from block events for the specified blockchain.
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if the underlying client unsubscribe operation fails.
98    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
99        if self.subscriptions_blocks.contains(&cmd.chain) {
100            self.subscriptions_blocks.remove(&cmd.chain);
101            self.client.unsubscribe_blocks(cmd)?;
102        }
103        Ok(())
104    }
105
106    /// Subscribes to pool definition updates for the specified AMM pool.
107    ///
108    /// # Errors
109    ///
110    /// Returns an error if the underlying client subscribe operation fails.
111    fn subscribe_pool(&mut self, cmd: SubscribePool) -> anyhow::Result<()> {
112        if !self.subscriptions_pools.contains(&cmd.instrument_id) {
113            self.subscriptions_pools.insert(cmd.instrument_id);
114            self.client.subscribe_pool(cmd)?;
115        }
116        Ok(())
117    }
118
119    /// Subscribes to pool swap events for the specified AMM pool.
120    ///
121    /// # Errors
122    ///
123    /// Returns an error if the underlying client subscribe operation fails.
124    fn subscribe_pool_swaps(&mut self, cmd: SubscribePoolSwaps) -> anyhow::Result<()> {
125        if !self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
126            self.subscriptions_pool_swaps.insert(cmd.instrument_id);
127            self.client.subscribe_pool_swaps(cmd)?;
128        }
129        Ok(())
130    }
131
132    /// Subscribes to pool liquidity update events for the specified AMM pool.
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if the underlying client subscribe operation fails.
137    fn subscribe_pool_liquidity_updates(
138        &mut self,
139        cmd: SubscribePoolLiquidityUpdates,
140    ) -> anyhow::Result<()> {
141        if !self
142            .subscriptions_pool_liquidity_updates
143            .contains(&cmd.instrument_id)
144        {
145            self.subscriptions_pool_liquidity_updates
146                .insert(cmd.instrument_id);
147            self.client.subscribe_pool_liquidity_updates(cmd)?;
148        }
149        Ok(())
150    }
151
152    /// Subscribes to pool fee collect events for the specified AMM pool.
153    ///
154    /// # Errors
155    ///
156    /// Returns an error if the underlying client subscribe operation fails.
157    fn subscribe_pool_fee_collects(&mut self, cmd: SubscribePoolFeeCollects) -> anyhow::Result<()> {
158        if !self
159            .subscriptions_pool_fee_collects
160            .contains(&cmd.instrument_id)
161        {
162            self.subscriptions_pool_fee_collects
163                .insert(cmd.instrument_id);
164            self.client.subscribe_pool_fee_collects(cmd)?;
165        }
166        Ok(())
167    }
168
169    /// Subscribes to pool flash loan events for the specified AMM pool.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if the underlying client subscribe operation fails.
174    fn subscribe_pool_flash_events(&mut self, cmd: SubscribePoolFlashEvents) -> anyhow::Result<()> {
175        if !self.subscriptions_pool_flash.contains(&cmd.instrument_id) {
176            self.subscriptions_pool_flash.insert(cmd.instrument_id);
177            self.client.subscribe_pool_flash_events(cmd)?;
178        }
179        Ok(())
180    }
181
182    /// Unsubscribes from pool definition updates for the specified AMM pool.
183    ///
184    /// # Errors
185    ///
186    /// Returns an error if the underlying client unsubscribe operation fails.
187    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
188        if self.subscriptions_pools.contains(&cmd.instrument_id) {
189            self.subscriptions_pools.remove(&cmd.instrument_id);
190            self.client.unsubscribe_pool(cmd)?;
191        }
192        Ok(())
193    }
194
195    /// Unsubscribes from swap events for the specified AMM pool.
196    ///
197    /// # Errors
198    ///
199    /// Returns an error if the underlying client unsubscribe operation fails.
200    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
201        if self.subscriptions_pool_swaps.contains(&cmd.instrument_id) {
202            self.subscriptions_pool_swaps.remove(&cmd.instrument_id);
203            self.client.unsubscribe_pool_swaps(cmd)?;
204        }
205        Ok(())
206    }
207
208    /// Unsubscribes from pool liquidity update events for the specified AMM pool.
209    ///
210    /// # Errors
211    ///
212    /// Returns an error if the underlying client unsubscribe operation fails.
213    fn unsubscribe_pool_liquidity_updates(
214        &mut self,
215        cmd: &UnsubscribePoolLiquidityUpdates,
216    ) -> anyhow::Result<()> {
217        if self
218            .subscriptions_pool_liquidity_updates
219            .contains(&cmd.instrument_id)
220        {
221            self.subscriptions_pool_liquidity_updates
222                .remove(&cmd.instrument_id);
223            self.client.unsubscribe_pool_liquidity_updates(cmd)?;
224        }
225        Ok(())
226    }
227
228    /// Unsubscribes from pool fee collect events for the specified AMM pool.
229    ///
230    /// # Errors
231    ///
232    /// Returns an error if the underlying client unsubscribe operation fails.
233    fn unsubscribe_pool_fee_collects(
234        &mut self,
235        cmd: &UnsubscribePoolFeeCollects,
236    ) -> anyhow::Result<()> {
237        if self
238            .subscriptions_pool_fee_collects
239            .contains(&cmd.instrument_id)
240        {
241            self.subscriptions_pool_fee_collects
242                .remove(&cmd.instrument_id);
243            self.client.unsubscribe_pool_fee_collects(cmd)?;
244        }
245        Ok(())
246    }
247
248    /// Unsubscribes from pool flash loan events for the specified AMM pool.
249    ///
250    /// # Errors
251    ///
252    /// Returns an error if the underlying client unsubscribe operation fails.
253    fn unsubscribe_pool_flash_events(
254        &mut self,
255        cmd: &UnsubscribePoolFlashEvents,
256    ) -> anyhow::Result<()> {
257        if self.subscriptions_pool_flash.contains(&cmd.instrument_id) {
258            self.subscriptions_pool_flash.remove(&cmd.instrument_id);
259            self.client.unsubscribe_pool_flash_events(cmd)?;
260        }
261        Ok(())
262    }
263
264    /// Sends a pool snapshot request for a given AMM pool.
265    ///
266    /// # Errors
267    ///
268    /// Returns an error if the client fails to process the pool snapshot request.
269    pub fn request_pool_snapshot(&self, req: RequestPoolSnapshot) -> anyhow::Result<()> {
270        self.client.request_pool_snapshot(req)
271    }
272}