Skip to main content

nautilus_blockchain/data/
subscription.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use ahash::{AHashMap, AHashSet};
17use alloy::primitives::{Address, keccak256};
18use nautilus_core::hex;
19use nautilus_model::defi::DexType;
20
21/// Manages subscriptions to DeFi protocol events (swaps, mints, burns, collects) across different DEXs.
22///
23/// This manager tracks which pool addresses are subscribed for each event type
24/// and maintains the event signature encodings for efficient filtering.
25#[derive(Debug)]
26pub struct DefiDataSubscriptionManager {
27    pool_swap_event_encoded: AHashMap<DexType, String>,
28    pool_mint_event_encoded: AHashMap<DexType, String>,
29    pool_burn_event_encoded: AHashMap<DexType, String>,
30    pool_collect_event_encoded: AHashMap<DexType, String>,
31    pool_flash_event_encoded: AHashMap<DexType, String>,
32    subscribed_pool_swaps: AHashMap<DexType, AHashSet<Address>>,
33    subscribed_pool_mints: AHashMap<DexType, AHashSet<Address>>,
34    subscribed_pool_burns: AHashMap<DexType, AHashSet<Address>>,
35    subscribed_pool_collects: AHashMap<DexType, AHashSet<Address>>,
36    subscribed_pool_flashes: AHashMap<DexType, AHashSet<Address>>,
37}
38
39impl Default for DefiDataSubscriptionManager {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45impl DefiDataSubscriptionManager {
46    /// Creates a new [`DefiDataSubscriptionManager`] instance.
47    #[must_use]
48    pub fn new() -> Self {
49        Self {
50            pool_swap_event_encoded: AHashMap::new(),
51            pool_burn_event_encoded: AHashMap::new(),
52            pool_mint_event_encoded: AHashMap::new(),
53            pool_collect_event_encoded: AHashMap::new(),
54            pool_flash_event_encoded: AHashMap::new(),
55            subscribed_pool_burns: AHashMap::new(),
56            subscribed_pool_mints: AHashMap::new(),
57            subscribed_pool_swaps: AHashMap::new(),
58            subscribed_pool_collects: AHashMap::new(),
59            subscribed_pool_flashes: AHashMap::new(),
60        }
61    }
62
63    /// Gets all unique contract addresses subscribed for any event type for a given DEX.
64    #[must_use]
65    pub fn get_subscribed_dex_contract_addresses(&self, dex: &DexType) -> Vec<Address> {
66        let mut unique_addresses = AHashSet::new();
67
68        if let Some(addresses) = self.subscribed_pool_swaps.get(dex) {
69            unique_addresses.extend(addresses.iter().copied());
70        }
71
72        if let Some(addresses) = self.subscribed_pool_mints.get(dex) {
73            unique_addresses.extend(addresses.iter().copied());
74        }
75
76        if let Some(addresses) = self.subscribed_pool_burns.get(dex) {
77            unique_addresses.extend(addresses.iter().copied());
78        }
79
80        if let Some(addresses) = self.subscribed_pool_collects.get(dex) {
81            unique_addresses.extend(addresses.iter().copied());
82        }
83
84        if let Some(addresses) = self.subscribed_pool_flashes.get(dex) {
85            unique_addresses.extend(addresses.iter().copied());
86        }
87
88        unique_addresses.into_iter().collect()
89    }
90
91    /// Gets all event signatures (keccak256 hashes) registered for a given DEX.
92    #[must_use]
93    pub fn get_subscribed_dex_event_signatures(&self, dex: &DexType) -> Vec<String> {
94        let mut result = Vec::new();
95
96        if let Some(swap_event_signature) = self.pool_swap_event_encoded.get(dex) {
97            result.push(swap_event_signature.clone());
98        }
99
100        if let Some(mint_event_signature) = self.pool_mint_event_encoded.get(dex) {
101            result.push(mint_event_signature.clone());
102        }
103
104        if let Some(burn_event_signature) = self.pool_burn_event_encoded.get(dex) {
105            result.push(burn_event_signature.clone());
106        }
107
108        if let Some(collect_event_signature) = self.pool_collect_event_encoded.get(dex) {
109            result.push(collect_event_signature.clone());
110        }
111
112        if let Some(flash_event_signature) = self.pool_flash_event_encoded.get(dex) {
113            result.push(flash_event_signature.clone());
114        }
115
116        result
117    }
118
119    /// Gets the swap event signature for a specific DEX.
120    #[must_use]
121    pub fn get_dex_pool_swap_event_signature(&self, dex: &DexType) -> Option<String> {
122        self.pool_swap_event_encoded.get(dex).cloned()
123    }
124
125    /// Gets the mint event signature for a specific DEX.
126    #[must_use]
127    pub fn get_dex_pool_mint_event_signature(&self, dex: &DexType) -> Option<String> {
128        self.pool_mint_event_encoded.get(dex).cloned()
129    }
130    /// Gets the burn event signature for a specific DEX.
131    #[must_use]
132    pub fn get_dex_pool_burn_event_signature(&self, dex: &DexType) -> Option<String> {
133        self.pool_burn_event_encoded.get(dex).cloned()
134    }
135
136    /// Gets the collect event signature for a specific DEX.
137    #[must_use]
138    pub fn get_dex_pool_collect_event_signature(&self, dex: &DexType) -> Option<String> {
139        self.pool_collect_event_encoded.get(dex).cloned()
140    }
141
142    /// Normalizes an event signature to a consistent format.
143    ///
144    /// Accepts:
145    /// - A raw event signature like "Swap(address,address,int256,int256,uint160,uint128,int24)".
146    /// - A pre-encoded topic like "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67".
147    /// - A hex string without 0x prefix.
148    ///
149    /// Returns a normalized "0x..." format string.
150    fn normalize_topic(sig: &str) -> String {
151        let s = sig.trim();
152
153        // Check if it's already a properly formatted hex string with 0x prefix
154        if let Some(rest) = s.strip_prefix("0x")
155            && rest.len() == 64
156            && rest.chars().all(|c| c.is_ascii_hexdigit())
157        {
158            return format!("0x{}", rest.to_ascii_lowercase());
159        }
160
161        // Check if it's a hex string without 0x prefix
162        if s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit()) {
163            return format!("0x{}", s.to_ascii_lowercase());
164        }
165
166        // Otherwise, it's a raw signature that needs hashing
167        hex::encode_prefixed(keccak256(s.as_bytes()))
168    }
169
170    /// Registers a DEX with its event signatures for subscription management.
171    ///
172    /// This must be called before subscribing to any events for a DEX.
173    /// Event signatures can be either raw signatures or pre-encoded keccak256 hashes.
174    pub fn register_dex_for_subscriptions(
175        &mut self,
176        dex: DexType,
177        swap_event_signature: &str,
178        mint_event_signature: &str,
179        burn_event_signature: &str,
180        collect_event_signature: &str,
181        flash_event_signature: Option<&str>,
182    ) {
183        self.subscribed_pool_swaps.insert(dex, AHashSet::new());
184        self.pool_swap_event_encoded
185            .insert(dex, Self::normalize_topic(swap_event_signature));
186
187        self.subscribed_pool_mints.insert(dex, AHashSet::new());
188        self.pool_mint_event_encoded
189            .insert(dex, Self::normalize_topic(mint_event_signature));
190
191        self.subscribed_pool_burns.insert(dex, AHashSet::new());
192        self.pool_burn_event_encoded
193            .insert(dex, Self::normalize_topic(burn_event_signature));
194
195        self.subscribed_pool_collects.insert(dex, AHashSet::new());
196        self.pool_collect_event_encoded
197            .insert(dex, Self::normalize_topic(collect_event_signature));
198
199        if let Some(flash_event_signature) = flash_event_signature {
200            self.subscribed_pool_flashes.insert(dex, AHashSet::new());
201            self.pool_flash_event_encoded
202                .insert(dex, Self::normalize_topic(flash_event_signature));
203        }
204
205        log::info!("Registered DEX for subscriptions: {dex:?}");
206    }
207
208    /// Subscribes to swap events for a specific pool address on a DEX.
209    pub fn subscribe_swaps(&mut self, dex: DexType, address: Address) {
210        if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
211            pool_set.insert(address);
212        } else {
213            log::error!("DEX not registered for swap subscriptions: {dex:?}");
214        }
215    }
216
217    /// Subscribes to mint events for a specific pool address on a DEX.
218    pub fn subscribe_mints(&mut self, dex: DexType, address: Address) {
219        if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
220            pool_set.insert(address);
221        } else {
222            log::error!("DEX not registered for mint subscriptions: {dex:?}");
223        }
224    }
225
226    /// Subscribes to burn events for a specific pool address on a DEX.
227    pub fn subscribe_burns(&mut self, dex: DexType, address: Address) {
228        if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
229            pool_set.insert(address);
230        } else {
231            log::warn!("DEX not registered for burn subscriptions: {dex:?}");
232        }
233    }
234
235    /// Unsubscribes from swap events for a specific pool address on a DEX.
236    pub fn unsubscribe_swaps(&mut self, dex: DexType, address: Address) {
237        if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
238            pool_set.remove(&address);
239        } else {
240            log::error!("DEX not registered for swap subscriptions: {dex:?}");
241        }
242    }
243
244    /// Unsubscribes from mint events for a specific pool address on a DEX.
245    pub fn unsubscribe_mints(&mut self, dex: DexType, address: Address) {
246        if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
247            pool_set.remove(&address);
248        } else {
249            log::error!("DEX not registered for mint subscriptions: {dex:?}");
250        }
251    }
252
253    /// Unsubscribes from burn events for a specific pool address on a DEX.
254    pub fn unsubscribe_burns(&mut self, dex: DexType, address: Address) {
255        if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
256            pool_set.remove(&address);
257        } else {
258            log::error!("DEX not registered for burn subscriptions: {dex:?}");
259        }
260    }
261
262    /// Subscribes to collect events for a specific pool address on a DEX.
263    pub fn subscribe_collects(&mut self, dex: DexType, address: Address) {
264        if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
265            pool_set.insert(address);
266        } else {
267            log::error!("DEX not registered for collect subscriptions: {dex:?}");
268        }
269    }
270
271    /// Unsubscribes from collect events for a specific pool address on a DEX.
272    pub fn unsubscribe_collects(&mut self, dex: DexType, address: Address) {
273        if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
274            pool_set.remove(&address);
275        } else {
276            log::error!("DEX not registered for collect subscriptions: {dex:?}");
277        }
278    }
279
280    /// Subscribes to flash events for a specific pool address on a DEX.
281    pub fn subscribe_flashes(&mut self, dex: DexType, address: Address) {
282        if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
283            pool_set.insert(address);
284        } else {
285            log::error!("DEX not registered for flash subscriptions: {dex:?}");
286        }
287    }
288
289    /// Unsubscribes from flash events for a specific pool address on a DEX.
290    pub fn unsubscribe_flashes(&mut self, dex: DexType, address: Address) {
291        if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
292            pool_set.remove(&address);
293        } else {
294            log::error!("DEX not registered for flash subscriptions: {dex:?}");
295        }
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use alloy::primitives::address;
302    use rstest::{fixture, rstest};
303
304    use super::*;
305
306    #[fixture]
307    fn manager() -> DefiDataSubscriptionManager {
308        DefiDataSubscriptionManager::new()
309    }
310
311    #[fixture]
312    fn registered_manager() -> DefiDataSubscriptionManager {
313        let mut manager = DefiDataSubscriptionManager::new();
314        manager.register_dex_for_subscriptions(
315            DexType::UniswapV3,
316            "Swap(address,address,int256,int256,uint160,uint128,int24)",
317            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
318            "Burn(address,int24,int24,uint128,uint256,uint256)",
319            "Collect(address,address,int24,int24,uint128,uint128)",
320            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
321        );
322        manager
323    }
324
325    #[rstest]
326    fn test_new_creates_empty_manager(manager: DefiDataSubscriptionManager) {
327        assert_eq!(
328            manager
329                .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
330                .len(),
331            0
332        );
333        assert_eq!(
334            manager
335                .get_subscribed_dex_event_signatures(&DexType::UniswapV3)
336                .len(),
337            0
338        );
339        assert!(
340            manager
341                .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
342                .is_none()
343        );
344        assert!(
345            manager
346                .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
347                .is_none()
348        );
349        assert!(
350            manager
351                .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
352                .is_none()
353        );
354    }
355
356    #[rstest]
357    fn test_register_dex_for_subscriptions(registered_manager: DefiDataSubscriptionManager) {
358        // Should have all four event signatures
359        let signatures =
360            registered_manager.get_subscribed_dex_event_signatures(&DexType::UniswapV3);
361        assert_eq!(signatures.len(), 5);
362
363        // Each signature should be properly encoded
364        assert!(
365            registered_manager
366                .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
367                .is_some()
368        );
369        assert!(
370            registered_manager
371                .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
372                .is_some()
373        );
374        assert!(
375            registered_manager
376                .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
377                .is_some()
378        );
379    }
380
381    #[rstest]
382    fn test_subscribe_and_get_addresses(mut registered_manager: DefiDataSubscriptionManager) {
383        let pool_address = address!("1234567890123456789012345678901234567890");
384
385        // Subscribe to swap events
386        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
387
388        let addresses =
389            registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
390        assert_eq!(addresses.len(), 1);
391        assert_eq!(addresses[0], pool_address);
392    }
393
394    #[rstest]
395    fn test_subscribe_to_unregistered_dex(mut manager: DefiDataSubscriptionManager) {
396        let pool_address = address!("1234567890123456789012345678901234567890");
397
398        // Try to subscribe without registering - should log warning but not panic
399        manager.subscribe_swaps(DexType::UniswapV3, pool_address);
400        manager.subscribe_mints(DexType::UniswapV3, pool_address);
401        manager.subscribe_burns(DexType::UniswapV3, pool_address);
402
403        // Should return empty results
404        let addresses = manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
405        assert_eq!(addresses.len(), 0);
406    }
407
408    #[rstest]
409    fn test_unsubscribe_removes_address(mut registered_manager: DefiDataSubscriptionManager) {
410        let pool_address = address!("1234567890123456789012345678901234567890");
411
412        // Subscribe
413        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
414
415        // Verify subscription
416        assert_eq!(
417            registered_manager
418                .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
419                .len(),
420            1
421        );
422
423        // Unsubscribe
424        registered_manager.unsubscribe_swaps(DexType::UniswapV3, pool_address);
425
426        // Verify removal
427        assert_eq!(
428            registered_manager
429                .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
430                .len(),
431            0
432        );
433    }
434
435    #[rstest]
436    fn test_get_event_signatures(registered_manager: DefiDataSubscriptionManager) {
437        let swap_sig = registered_manager.get_dex_pool_swap_event_signature(&DexType::UniswapV3);
438        let mint_sig = registered_manager.get_dex_pool_mint_event_signature(&DexType::UniswapV3);
439        let burn_sig = registered_manager.get_dex_pool_burn_event_signature(&DexType::UniswapV3);
440
441        // All should be Some and start with 0x
442        assert!(swap_sig.is_some() && swap_sig.unwrap().starts_with("0x"));
443        assert!(mint_sig.is_some() && mint_sig.unwrap().starts_with("0x"));
444        assert!(burn_sig.is_some() && burn_sig.unwrap().starts_with("0x"));
445    }
446
447    #[rstest]
448    fn test_multiple_subscriptions_same_pool(mut registered_manager: DefiDataSubscriptionManager) {
449        let pool_address = address!("1234567890123456789012345678901234567890");
450
451        // Subscribe same address multiple times to same event type
452        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
453        registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
454
455        // Should only appear once (HashSet behavior)
456        let addresses =
457            registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
458        assert_eq!(addresses.len(), 1);
459    }
460
461    #[rstest]
462    fn test_get_combined_addresses_from_all_events(
463        mut registered_manager: DefiDataSubscriptionManager,
464    ) {
465        let pool1 = address!("1111111111111111111111111111111111111111");
466        let pool2 = address!("2222222222222222222222222222222222222222");
467        let pool3 = address!("3333333333333333333333333333333333333333");
468
469        // Subscribe different pools to different events
470        registered_manager.subscribe_swaps(DexType::UniswapV3, pool1);
471        registered_manager.subscribe_mints(DexType::UniswapV3, pool2);
472        registered_manager.subscribe_burns(DexType::UniswapV3, pool3);
473
474        // Should get all unique addresses
475        let addresses =
476            registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
477        assert_eq!(addresses.len(), 3);
478        assert!(addresses.contains(&pool1));
479        assert!(addresses.contains(&pool2));
480        assert!(addresses.contains(&pool3));
481    }
482
483    #[rstest]
484    fn test_event_signature_encoding(registered_manager: DefiDataSubscriptionManager) {
485        // Known event signature and its expected keccak256 hash
486        // Swap(address,address,int256,int256,uint160,uint128,int24) for UniswapV3
487        let swap_sig = registered_manager
488            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
489            .unwrap();
490
491        // Should be properly formatted hex string
492        assert!(swap_sig.starts_with("0x"));
493        assert_eq!(swap_sig.len(), 66); // 0x + 64 hex chars (32 bytes)
494
495        // Verify it's valid hex
496        let hex_part = &swap_sig[2..];
497        assert!(hex_part.chars().all(|c| c.is_ascii_hexdigit()));
498    }
499
500    #[rstest]
501    #[case(DexType::UniswapV3)]
502    #[case(DexType::UniswapV2)]
503    fn test_complete_subscription_workflow(#[case] dex_type: DexType) {
504        let mut manager = DefiDataSubscriptionManager::new();
505        let pool1 = address!("1111111111111111111111111111111111111111");
506        let pool2 = address!("2222222222222222222222222222222222222222");
507
508        // Step 1: Register DEX
509        manager.register_dex_for_subscriptions(
510            dex_type,
511            "Swap(address,uint256,uint256)",
512            "Mint(address,uint256)",
513            "Burn(address,uint256)",
514            "Collect(address,uint256,uint256)",
515            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
516        );
517
518        // Step 2: Subscribe to events
519        manager.subscribe_swaps(dex_type, pool1);
520        manager.subscribe_swaps(dex_type, pool2);
521        manager.subscribe_mints(dex_type, pool1);
522        manager.subscribe_burns(dex_type, pool2);
523
524        // Step 3: Verify subscriptions
525        let addresses = manager.get_subscribed_dex_contract_addresses(&dex_type);
526        assert_eq!(addresses.len(), 2);
527        assert!(addresses.contains(&pool1));
528        assert!(addresses.contains(&pool2));
529
530        // Step 4: Get event signatures
531        let signatures = manager.get_subscribed_dex_event_signatures(&dex_type);
532        assert_eq!(signatures.len(), 5);
533
534        // Step 5: Unsubscribe from some events
535        manager.unsubscribe_swaps(dex_type, pool1);
536        manager.unsubscribe_burns(dex_type, pool2);
537
538        // Step 6: Verify remaining subscriptions (only pool1 mint remains)
539        let remaining = manager.get_subscribed_dex_contract_addresses(&dex_type);
540        assert!(remaining.contains(&pool1)); // Still has mint subscription
541        assert!(remaining.contains(&pool2)); // Still has swap subscription
542    }
543
544    #[rstest]
545    fn test_register_with_raw_signatures() {
546        let mut manager = DefiDataSubscriptionManager::new();
547
548        // Register with raw event signatures
549        manager.register_dex_for_subscriptions(
550            DexType::UniswapV3,
551            "Swap(address,address,int256,int256,uint160,uint128,int24)",
552            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
553            "Burn(address,int24,int24,uint128,uint256,uint256)",
554            "Collect(address,address,int24,int24,uint128,uint128)",
555            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
556        );
557
558        // Known keccak256 hashes for UniswapV3 events
559        let swap_sig = manager
560            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
561            .unwrap();
562        let mint_sig = manager
563            .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
564            .unwrap();
565        let burn_sig = manager
566            .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
567            .unwrap();
568
569        // Verify the exact hash values
570        assert_eq!(
571            swap_sig,
572            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
573        );
574        assert_eq!(
575            mint_sig,
576            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
577        );
578        assert_eq!(
579            burn_sig,
580            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
581        );
582    }
583
584    #[rstest]
585    fn test_register_with_pre_encoded_signatures() {
586        let mut manager = DefiDataSubscriptionManager::new();
587
588        // Register with pre-encoded keccak256 hashes (with 0x prefix)
589        manager.register_dex_for_subscriptions(
590            DexType::UniswapV3,
591            "Swap(address,address,int256,int256,uint160,uint128,int24)",
592            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
593            "Burn(address,int24,int24,uint128,uint256,uint256)",
594            "Collect(address,address,int24,int24,uint128,uint128)",
595            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
596        );
597
598        // Should store them unchanged (normalized to lowercase)
599        let swap_sig = manager
600            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
601            .unwrap();
602        let mint_sig = manager
603            .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
604            .unwrap();
605        let burn_sig = manager
606            .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
607            .unwrap();
608
609        assert_eq!(
610            swap_sig,
611            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
612        );
613        assert_eq!(
614            mint_sig,
615            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
616        );
617        assert_eq!(
618            burn_sig,
619            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
620        );
621    }
622
623    #[rstest]
624    fn test_register_with_pre_encoded_signatures_no_prefix() {
625        let mut manager = DefiDataSubscriptionManager::new();
626
627        // Register with pre-encoded hashes without 0x prefix
628        manager.register_dex_for_subscriptions(
629            DexType::UniswapV3,
630            "Swap(address,address,int256,int256,uint160,uint128,int24)",
631            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
632            "Burn(address,int24,int24,uint128,uint256,uint256)",
633            "Collect(address,address,int24,int24,uint128,uint128)",
634            Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
635        );
636
637        // Should add 0x prefix and normalize to lowercase
638        let swap_sig = manager
639            .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
640            .unwrap();
641        let mint_sig = manager
642            .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
643            .unwrap();
644        let burn_sig = manager
645            .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
646            .unwrap();
647
648        assert_eq!(
649            swap_sig,
650            "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
651        );
652        assert_eq!(
653            mint_sig,
654            "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
655        );
656        assert_eq!(
657            burn_sig,
658            "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
659        );
660    }
661}