1use ahash::{AHashMap, AHashSet};
17use alloy::primitives::{Address, keccak256};
18use nautilus_core::hex;
19use nautilus_model::defi::DexType;
20
21#[derive(Debug)]
26pub struct DefiDataSubscriptionManager {
27 pool_swap_event_encoded: AHashMap<DexType, String>,
28 pool_mint_event_encoded: AHashMap<DexType, String>,
29 pool_burn_event_encoded: AHashMap<DexType, String>,
30 pool_collect_event_encoded: AHashMap<DexType, String>,
31 pool_flash_event_encoded: AHashMap<DexType, String>,
32 subscribed_pool_swaps: AHashMap<DexType, AHashSet<Address>>,
33 subscribed_pool_mints: AHashMap<DexType, AHashSet<Address>>,
34 subscribed_pool_burns: AHashMap<DexType, AHashSet<Address>>,
35 subscribed_pool_collects: AHashMap<DexType, AHashSet<Address>>,
36 subscribed_pool_flashes: AHashMap<DexType, AHashSet<Address>>,
37}
38
39impl Default for DefiDataSubscriptionManager {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45impl DefiDataSubscriptionManager {
46 #[must_use]
48 pub fn new() -> Self {
49 Self {
50 pool_swap_event_encoded: AHashMap::new(),
51 pool_burn_event_encoded: AHashMap::new(),
52 pool_mint_event_encoded: AHashMap::new(),
53 pool_collect_event_encoded: AHashMap::new(),
54 pool_flash_event_encoded: AHashMap::new(),
55 subscribed_pool_burns: AHashMap::new(),
56 subscribed_pool_mints: AHashMap::new(),
57 subscribed_pool_swaps: AHashMap::new(),
58 subscribed_pool_collects: AHashMap::new(),
59 subscribed_pool_flashes: AHashMap::new(),
60 }
61 }
62
63 #[must_use]
65 pub fn get_subscribed_dex_contract_addresses(&self, dex: &DexType) -> Vec<Address> {
66 let mut unique_addresses = AHashSet::new();
67
68 if let Some(addresses) = self.subscribed_pool_swaps.get(dex) {
69 unique_addresses.extend(addresses.iter().copied());
70 }
71
72 if let Some(addresses) = self.subscribed_pool_mints.get(dex) {
73 unique_addresses.extend(addresses.iter().copied());
74 }
75
76 if let Some(addresses) = self.subscribed_pool_burns.get(dex) {
77 unique_addresses.extend(addresses.iter().copied());
78 }
79
80 if let Some(addresses) = self.subscribed_pool_collects.get(dex) {
81 unique_addresses.extend(addresses.iter().copied());
82 }
83
84 if let Some(addresses) = self.subscribed_pool_flashes.get(dex) {
85 unique_addresses.extend(addresses.iter().copied());
86 }
87
88 unique_addresses.into_iter().collect()
89 }
90
91 #[must_use]
93 pub fn get_subscribed_dex_event_signatures(&self, dex: &DexType) -> Vec<String> {
94 let mut result = Vec::new();
95
96 if let Some(swap_event_signature) = self.pool_swap_event_encoded.get(dex) {
97 result.push(swap_event_signature.clone());
98 }
99
100 if let Some(mint_event_signature) = self.pool_mint_event_encoded.get(dex) {
101 result.push(mint_event_signature.clone());
102 }
103
104 if let Some(burn_event_signature) = self.pool_burn_event_encoded.get(dex) {
105 result.push(burn_event_signature.clone());
106 }
107
108 if let Some(collect_event_signature) = self.pool_collect_event_encoded.get(dex) {
109 result.push(collect_event_signature.clone());
110 }
111
112 if let Some(flash_event_signature) = self.pool_flash_event_encoded.get(dex) {
113 result.push(flash_event_signature.clone());
114 }
115
116 result
117 }
118
119 #[must_use]
121 pub fn get_dex_pool_swap_event_signature(&self, dex: &DexType) -> Option<String> {
122 self.pool_swap_event_encoded.get(dex).cloned()
123 }
124
125 #[must_use]
127 pub fn get_dex_pool_mint_event_signature(&self, dex: &DexType) -> Option<String> {
128 self.pool_mint_event_encoded.get(dex).cloned()
129 }
130 #[must_use]
132 pub fn get_dex_pool_burn_event_signature(&self, dex: &DexType) -> Option<String> {
133 self.pool_burn_event_encoded.get(dex).cloned()
134 }
135
136 #[must_use]
138 pub fn get_dex_pool_collect_event_signature(&self, dex: &DexType) -> Option<String> {
139 self.pool_collect_event_encoded.get(dex).cloned()
140 }
141
142 fn normalize_topic(sig: &str) -> String {
151 let s = sig.trim();
152
153 if let Some(rest) = s.strip_prefix("0x")
155 && rest.len() == 64
156 && rest.chars().all(|c| c.is_ascii_hexdigit())
157 {
158 return format!("0x{}", rest.to_ascii_lowercase());
159 }
160
161 if s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit()) {
163 return format!("0x{}", s.to_ascii_lowercase());
164 }
165
166 hex::encode_prefixed(keccak256(s.as_bytes()))
168 }
169
170 pub fn register_dex_for_subscriptions(
175 &mut self,
176 dex: DexType,
177 swap_event_signature: &str,
178 mint_event_signature: &str,
179 burn_event_signature: &str,
180 collect_event_signature: &str,
181 flash_event_signature: Option<&str>,
182 ) {
183 self.subscribed_pool_swaps.insert(dex, AHashSet::new());
184 self.pool_swap_event_encoded
185 .insert(dex, Self::normalize_topic(swap_event_signature));
186
187 self.subscribed_pool_mints.insert(dex, AHashSet::new());
188 self.pool_mint_event_encoded
189 .insert(dex, Self::normalize_topic(mint_event_signature));
190
191 self.subscribed_pool_burns.insert(dex, AHashSet::new());
192 self.pool_burn_event_encoded
193 .insert(dex, Self::normalize_topic(burn_event_signature));
194
195 self.subscribed_pool_collects.insert(dex, AHashSet::new());
196 self.pool_collect_event_encoded
197 .insert(dex, Self::normalize_topic(collect_event_signature));
198
199 if let Some(flash_event_signature) = flash_event_signature {
200 self.subscribed_pool_flashes.insert(dex, AHashSet::new());
201 self.pool_flash_event_encoded
202 .insert(dex, Self::normalize_topic(flash_event_signature));
203 }
204
205 log::info!("Registered DEX for subscriptions: {dex:?}");
206 }
207
208 pub fn subscribe_swaps(&mut self, dex: DexType, address: Address) {
210 if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
211 pool_set.insert(address);
212 } else {
213 log::error!("DEX not registered for swap subscriptions: {dex:?}");
214 }
215 }
216
217 pub fn subscribe_mints(&mut self, dex: DexType, address: Address) {
219 if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
220 pool_set.insert(address);
221 } else {
222 log::error!("DEX not registered for mint subscriptions: {dex:?}");
223 }
224 }
225
226 pub fn subscribe_burns(&mut self, dex: DexType, address: Address) {
228 if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
229 pool_set.insert(address);
230 } else {
231 log::warn!("DEX not registered for burn subscriptions: {dex:?}");
232 }
233 }
234
235 pub fn unsubscribe_swaps(&mut self, dex: DexType, address: Address) {
237 if let Some(pool_set) = self.subscribed_pool_swaps.get_mut(&dex) {
238 pool_set.remove(&address);
239 } else {
240 log::error!("DEX not registered for swap subscriptions: {dex:?}");
241 }
242 }
243
244 pub fn unsubscribe_mints(&mut self, dex: DexType, address: Address) {
246 if let Some(pool_set) = self.subscribed_pool_mints.get_mut(&dex) {
247 pool_set.remove(&address);
248 } else {
249 log::error!("DEX not registered for mint subscriptions: {dex:?}");
250 }
251 }
252
253 pub fn unsubscribe_burns(&mut self, dex: DexType, address: Address) {
255 if let Some(pool_set) = self.subscribed_pool_burns.get_mut(&dex) {
256 pool_set.remove(&address);
257 } else {
258 log::error!("DEX not registered for burn subscriptions: {dex:?}");
259 }
260 }
261
262 pub fn subscribe_collects(&mut self, dex: DexType, address: Address) {
264 if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
265 pool_set.insert(address);
266 } else {
267 log::error!("DEX not registered for collect subscriptions: {dex:?}");
268 }
269 }
270
271 pub fn unsubscribe_collects(&mut self, dex: DexType, address: Address) {
273 if let Some(pool_set) = self.subscribed_pool_collects.get_mut(&dex) {
274 pool_set.remove(&address);
275 } else {
276 log::error!("DEX not registered for collect subscriptions: {dex:?}");
277 }
278 }
279
280 pub fn subscribe_flashes(&mut self, dex: DexType, address: Address) {
282 if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
283 pool_set.insert(address);
284 } else {
285 log::error!("DEX not registered for flash subscriptions: {dex:?}");
286 }
287 }
288
289 pub fn unsubscribe_flashes(&mut self, dex: DexType, address: Address) {
291 if let Some(pool_set) = self.subscribed_pool_flashes.get_mut(&dex) {
292 pool_set.remove(&address);
293 } else {
294 log::error!("DEX not registered for flash subscriptions: {dex:?}");
295 }
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use alloy::primitives::address;
302 use rstest::{fixture, rstest};
303
304 use super::*;
305
306 #[fixture]
307 fn manager() -> DefiDataSubscriptionManager {
308 DefiDataSubscriptionManager::new()
309 }
310
311 #[fixture]
312 fn registered_manager() -> DefiDataSubscriptionManager {
313 let mut manager = DefiDataSubscriptionManager::new();
314 manager.register_dex_for_subscriptions(
315 DexType::UniswapV3,
316 "Swap(address,address,int256,int256,uint160,uint128,int24)",
317 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
318 "Burn(address,int24,int24,uint128,uint256,uint256)",
319 "Collect(address,address,int24,int24,uint128,uint128)",
320 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
321 );
322 manager
323 }
324
325 #[rstest]
326 fn test_new_creates_empty_manager(manager: DefiDataSubscriptionManager) {
327 assert_eq!(
328 manager
329 .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
330 .len(),
331 0
332 );
333 assert_eq!(
334 manager
335 .get_subscribed_dex_event_signatures(&DexType::UniswapV3)
336 .len(),
337 0
338 );
339 assert!(
340 manager
341 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
342 .is_none()
343 );
344 assert!(
345 manager
346 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
347 .is_none()
348 );
349 assert!(
350 manager
351 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
352 .is_none()
353 );
354 }
355
356 #[rstest]
357 fn test_register_dex_for_subscriptions(registered_manager: DefiDataSubscriptionManager) {
358 let signatures =
360 registered_manager.get_subscribed_dex_event_signatures(&DexType::UniswapV3);
361 assert_eq!(signatures.len(), 5);
362
363 assert!(
365 registered_manager
366 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
367 .is_some()
368 );
369 assert!(
370 registered_manager
371 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
372 .is_some()
373 );
374 assert!(
375 registered_manager
376 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
377 .is_some()
378 );
379 }
380
381 #[rstest]
382 fn test_subscribe_and_get_addresses(mut registered_manager: DefiDataSubscriptionManager) {
383 let pool_address = address!("1234567890123456789012345678901234567890");
384
385 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
387
388 let addresses =
389 registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
390 assert_eq!(addresses.len(), 1);
391 assert_eq!(addresses[0], pool_address);
392 }
393
394 #[rstest]
395 fn test_subscribe_to_unregistered_dex(mut manager: DefiDataSubscriptionManager) {
396 let pool_address = address!("1234567890123456789012345678901234567890");
397
398 manager.subscribe_swaps(DexType::UniswapV3, pool_address);
400 manager.subscribe_mints(DexType::UniswapV3, pool_address);
401 manager.subscribe_burns(DexType::UniswapV3, pool_address);
402
403 let addresses = manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
405 assert_eq!(addresses.len(), 0);
406 }
407
408 #[rstest]
409 fn test_unsubscribe_removes_address(mut registered_manager: DefiDataSubscriptionManager) {
410 let pool_address = address!("1234567890123456789012345678901234567890");
411
412 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
414
415 assert_eq!(
417 registered_manager
418 .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
419 .len(),
420 1
421 );
422
423 registered_manager.unsubscribe_swaps(DexType::UniswapV3, pool_address);
425
426 assert_eq!(
428 registered_manager
429 .get_subscribed_dex_contract_addresses(&DexType::UniswapV3)
430 .len(),
431 0
432 );
433 }
434
435 #[rstest]
436 fn test_get_event_signatures(registered_manager: DefiDataSubscriptionManager) {
437 let swap_sig = registered_manager.get_dex_pool_swap_event_signature(&DexType::UniswapV3);
438 let mint_sig = registered_manager.get_dex_pool_mint_event_signature(&DexType::UniswapV3);
439 let burn_sig = registered_manager.get_dex_pool_burn_event_signature(&DexType::UniswapV3);
440
441 assert!(swap_sig.is_some() && swap_sig.unwrap().starts_with("0x"));
443 assert!(mint_sig.is_some() && mint_sig.unwrap().starts_with("0x"));
444 assert!(burn_sig.is_some() && burn_sig.unwrap().starts_with("0x"));
445 }
446
447 #[rstest]
448 fn test_multiple_subscriptions_same_pool(mut registered_manager: DefiDataSubscriptionManager) {
449 let pool_address = address!("1234567890123456789012345678901234567890");
450
451 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
453 registered_manager.subscribe_swaps(DexType::UniswapV3, pool_address);
454
455 let addresses =
457 registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
458 assert_eq!(addresses.len(), 1);
459 }
460
461 #[rstest]
462 fn test_get_combined_addresses_from_all_events(
463 mut registered_manager: DefiDataSubscriptionManager,
464 ) {
465 let pool1 = address!("1111111111111111111111111111111111111111");
466 let pool2 = address!("2222222222222222222222222222222222222222");
467 let pool3 = address!("3333333333333333333333333333333333333333");
468
469 registered_manager.subscribe_swaps(DexType::UniswapV3, pool1);
471 registered_manager.subscribe_mints(DexType::UniswapV3, pool2);
472 registered_manager.subscribe_burns(DexType::UniswapV3, pool3);
473
474 let addresses =
476 registered_manager.get_subscribed_dex_contract_addresses(&DexType::UniswapV3);
477 assert_eq!(addresses.len(), 3);
478 assert!(addresses.contains(&pool1));
479 assert!(addresses.contains(&pool2));
480 assert!(addresses.contains(&pool3));
481 }
482
483 #[rstest]
484 fn test_event_signature_encoding(registered_manager: DefiDataSubscriptionManager) {
485 let swap_sig = registered_manager
488 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
489 .unwrap();
490
491 assert!(swap_sig.starts_with("0x"));
493 assert_eq!(swap_sig.len(), 66); let hex_part = &swap_sig[2..];
497 assert!(hex_part.chars().all(|c| c.is_ascii_hexdigit()));
498 }
499
500 #[rstest]
501 #[case(DexType::UniswapV3)]
502 #[case(DexType::UniswapV2)]
503 fn test_complete_subscription_workflow(#[case] dex_type: DexType) {
504 let mut manager = DefiDataSubscriptionManager::new();
505 let pool1 = address!("1111111111111111111111111111111111111111");
506 let pool2 = address!("2222222222222222222222222222222222222222");
507
508 manager.register_dex_for_subscriptions(
510 dex_type,
511 "Swap(address,uint256,uint256)",
512 "Mint(address,uint256)",
513 "Burn(address,uint256)",
514 "Collect(address,uint256,uint256)",
515 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
516 );
517
518 manager.subscribe_swaps(dex_type, pool1);
520 manager.subscribe_swaps(dex_type, pool2);
521 manager.subscribe_mints(dex_type, pool1);
522 manager.subscribe_burns(dex_type, pool2);
523
524 let addresses = manager.get_subscribed_dex_contract_addresses(&dex_type);
526 assert_eq!(addresses.len(), 2);
527 assert!(addresses.contains(&pool1));
528 assert!(addresses.contains(&pool2));
529
530 let signatures = manager.get_subscribed_dex_event_signatures(&dex_type);
532 assert_eq!(signatures.len(), 5);
533
534 manager.unsubscribe_swaps(dex_type, pool1);
536 manager.unsubscribe_burns(dex_type, pool2);
537
538 let remaining = manager.get_subscribed_dex_contract_addresses(&dex_type);
540 assert!(remaining.contains(&pool1)); assert!(remaining.contains(&pool2)); }
543
544 #[rstest]
545 fn test_register_with_raw_signatures() {
546 let mut manager = DefiDataSubscriptionManager::new();
547
548 manager.register_dex_for_subscriptions(
550 DexType::UniswapV3,
551 "Swap(address,address,int256,int256,uint160,uint128,int24)",
552 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
553 "Burn(address,int24,int24,uint128,uint256,uint256)",
554 "Collect(address,address,int24,int24,uint128,uint128)",
555 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
556 );
557
558 let swap_sig = manager
560 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
561 .unwrap();
562 let mint_sig = manager
563 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
564 .unwrap();
565 let burn_sig = manager
566 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
567 .unwrap();
568
569 assert_eq!(
571 swap_sig,
572 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
573 );
574 assert_eq!(
575 mint_sig,
576 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
577 );
578 assert_eq!(
579 burn_sig,
580 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
581 );
582 }
583
584 #[rstest]
585 fn test_register_with_pre_encoded_signatures() {
586 let mut manager = DefiDataSubscriptionManager::new();
587
588 manager.register_dex_for_subscriptions(
590 DexType::UniswapV3,
591 "Swap(address,address,int256,int256,uint160,uint128,int24)",
592 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
593 "Burn(address,int24,int24,uint128,uint256,uint256)",
594 "Collect(address,address,int24,int24,uint128,uint128)",
595 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
596 );
597
598 let swap_sig = manager
600 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
601 .unwrap();
602 let mint_sig = manager
603 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
604 .unwrap();
605 let burn_sig = manager
606 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
607 .unwrap();
608
609 assert_eq!(
610 swap_sig,
611 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
612 );
613 assert_eq!(
614 mint_sig,
615 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
616 );
617 assert_eq!(
618 burn_sig,
619 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
620 );
621 }
622
623 #[rstest]
624 fn test_register_with_pre_encoded_signatures_no_prefix() {
625 let mut manager = DefiDataSubscriptionManager::new();
626
627 manager.register_dex_for_subscriptions(
629 DexType::UniswapV3,
630 "Swap(address,address,int256,int256,uint160,uint128,int24)",
631 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
632 "Burn(address,int24,int24,uint128,uint256,uint256)",
633 "Collect(address,address,int24,int24,uint128,uint128)",
634 Some("Flash(address,address,uint256,uint256,uint256,uint256)"),
635 );
636
637 let swap_sig = manager
639 .get_dex_pool_swap_event_signature(&DexType::UniswapV3)
640 .unwrap();
641 let mint_sig = manager
642 .get_dex_pool_mint_event_signature(&DexType::UniswapV3)
643 .unwrap();
644 let burn_sig = manager
645 .get_dex_pool_burn_event_signature(&DexType::UniswapV3)
646 .unwrap();
647
648 assert_eq!(
649 swap_sig,
650 "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
651 );
652 assert_eq!(
653 mint_sig,
654 "0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde"
655 );
656 assert_eq!(
657 burn_sig,
658 "0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c"
659 );
660 }
661}