Skip to main content

nautilus_blockchain/data/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cmp::max, sync::Arc};
17
18use futures_util::StreamExt;
19use nautilus_common::messages::DataEvent;
20use nautilus_core::{hex, string::formatting::Separable};
21use nautilus_model::defi::{
22    Block, Blockchain, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate, PoolProfiler, PoolSwap,
23    SharedChain, SharedDex, SharedPool,
24    data::{DefiData, DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
25    pool_analysis::{compare::compare_pool_profiler, snapshot::PoolSnapshot},
26    reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
27};
28use nautilus_network::websocket::TransportBackend;
29
30use crate::{
31    cache::BlockchainCache,
32    config::BlockchainDataClientConfig,
33    contracts::{erc20::Erc20Contract, uniswap_v3_pool::UniswapV3PoolContract},
34    data::subscription::DefiDataSubscriptionManager,
35    events::{
36        burn::BurnEvent, collect::CollectEvent, flash::FlashEvent, mint::MintEvent, swap::SwapEvent,
37    },
38    exchanges::{extended::DexExtended, get_dex_extended},
39    hypersync::{
40        client::HyperSyncClient,
41        helpers::{extract_block_number, extract_event_signature_bytes},
42    },
43    rpc::{
44        BlockchainRpcClient, BlockchainRpcClientAny,
45        chains::{
46            arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
47            polygon::PolygonRpcClient,
48        },
49        http::BlockchainHttpRpcClient,
50        types::BlockchainMessage,
51    },
52    services::PoolDiscoveryService,
53};
54
55const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
56
57/// Core blockchain data client responsible for fetching, processing, and caching blockchain data.
58///
59/// This struct encapsulates the core functionality for interacting with blockchain networks,
60/// including syncing historical data, processing real-time events, and managing cached entities.
61#[derive(Debug)]
62pub struct BlockchainDataClientCore {
63    /// The blockchain being targeted by this client instance.
64    pub chain: SharedChain,
65    /// The configuration for the data client.
66    pub config: BlockchainDataClientConfig,
67    /// Local cache for blockchain entities.
68    pub cache: BlockchainCache,
69    /// Interface for interacting with ERC20 token contracts.
70    tokens: Erc20Contract,
71    /// Interface for interacting with UniswapV3 pool contracts.
72    univ3_pool: UniswapV3PoolContract,
73    /// Client for the HyperSync data indexing service.
74    pub hypersync_client: HyperSyncClient,
75    /// Optional WebSocket RPC client for direct blockchain node communication.
76    pub rpc_client: Option<BlockchainRpcClientAny>,
77    /// Manages subscriptions for various DEX events (swaps, mints, burns).
78    pub subscription_manager: DefiDataSubscriptionManager,
79    /// Channel sender for data events.
80    data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
81    /// Cancellation token for graceful shutdown of long-running operations.
82    cancellation_token: tokio_util::sync::CancellationToken,
83}
84
85impl BlockchainDataClientCore {
86    /// Creates a new instance of [`BlockchainDataClientCore`].
87    ///
88    /// # Panics
89    ///
90    /// Panics if `use_hypersync_for_live_data` is false but `wss_rpc_url` is None.
91    #[must_use]
92    pub fn new(
93        config: BlockchainDataClientConfig,
94        hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
95        data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
96        cancellation_token: tokio_util::sync::CancellationToken,
97    ) -> Self {
98        let chain = config.chain.clone();
99        let cache = BlockchainCache::new(chain.clone());
100
101        // Log RPC endpoints being used
102        log::info!(
103            "Initializing blockchain data client for '{}' with HTTP RPC: {}",
104            chain.name,
105            config.http_rpc_url
106        );
107
108        let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
109            let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
110            log::info!("WebSocket RPC URL: {wss_rpc_url}");
111            Some(Self::initialize_rpc_client(
112                chain.name,
113                wss_rpc_url,
114                config.transport_backend,
115                config.proxy_url.clone(),
116            ))
117        } else {
118            log::info!("Using HyperSync for live data (no WebSocket RPC)");
119            None
120        };
121        let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
122            config.http_rpc_url.clone(),
123            config.rpc_requests_per_second,
124            config.proxy_url.clone(),
125        ));
126        let erc20_contract = Erc20Contract::new(
127            http_rpc_client.clone(),
128            config.pool_filters.remove_pools_with_empty_erc20fields,
129        );
130
131        let hypersync_client =
132            HyperSyncClient::new(chain.clone(), hypersync_tx, cancellation_token.clone());
133        Self {
134            chain,
135            config,
136            rpc_client,
137            tokens: erc20_contract,
138            univ3_pool: UniswapV3PoolContract::new(http_rpc_client),
139            cache,
140            hypersync_client,
141            subscription_manager: DefiDataSubscriptionManager::new(),
142            data_tx,
143            cancellation_token,
144        }
145    }
146
147    /// Initializes the database connection for the blockchain cache.
148    pub async fn initialize_cache_database(&mut self) {
149        if let Some(pg_connect_options) = &self.config.postgres_cache_database_config {
150            log::info!(
151                "Initializing blockchain cache on database '{}'",
152                pg_connect_options.database
153            );
154            self.cache
155                .initialize_database(pg_connect_options.clone().into())
156                .await;
157        }
158    }
159
160    /// Creates an appropriate blockchain RPC client for the specified blockchain.
161    fn initialize_rpc_client(
162        blockchain: Blockchain,
163        wss_rpc_url: String,
164        transport_backend: TransportBackend,
165        proxy_url: Option<String>,
166    ) -> BlockchainRpcClientAny {
167        let mut client = match blockchain {
168            Blockchain::Ethereum => {
169                BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url, proxy_url))
170            }
171            Blockchain::Polygon => {
172                BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url, proxy_url))
173            }
174            Blockchain::Base => {
175                BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url, proxy_url))
176            }
177            Blockchain::Arbitrum => {
178                BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url, proxy_url))
179            }
180            _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
181        };
182        client.set_transport_backend(transport_backend);
183        client
184    }
185
186    /// Establishes connections to all configured data sources and initializes the cache.
187    ///
188    /// # Errors
189    ///
190    /// Returns an error if cache initialization or connection setup fails.
191    pub async fn connect(&mut self) -> anyhow::Result<()> {
192        log::info!(
193            "Connecting blockchain data client for '{}'",
194            self.chain.name
195        );
196        self.initialize_cache_database().await;
197
198        if let Some(ref mut rpc_client) = self.rpc_client {
199            rpc_client.connect().await?;
200        }
201
202        let from_block = self.determine_from_block();
203
204        log::info!(
205            "Connecting to blockchain data source for '{}' from block {}",
206            self.chain.name,
207            from_block.separate_with_commas()
208        );
209
210        // Initialize the chain and register the Dex exchanges in the cache.
211        self.cache.initialize_chain().await;
212        // Import the cached blockchain data.
213        self.cache.connect(from_block).await?;
214        // TODO disable block syncing for now as we don't have timestamps yet configured
215        // Sync the remaining blocks which are missing.
216        // self.sync_blocks(Some(from_block), None).await?;
217        for dex in self.config.dex_ids.clone() {
218            self.register_dex_exchange(dex).await?;
219            self.sync_exchange_pools(&dex, from_block, None, false)
220                .await?;
221        }
222
223        Ok(())
224    }
225
226    /// Syncs blocks with consistency checks to ensure data integrity.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if block syncing fails or if consistency checks fail.
231    pub async fn sync_blocks_checked(
232        &mut self,
233        from_block: u64,
234        to_block: Option<u64>,
235    ) -> anyhow::Result<()> {
236        if let Some(blocks_status) = self.cache.get_cache_block_consistency_status().await {
237            // If blocks are consistent proceed with copy command.
238            if blocks_status.is_consistent() {
239                log::info!(
240                    "Cache is consistent: no gaps detected (last continuous block: {})",
241                    blocks_status.last_continuous_block
242                );
243                let target_block = max(blocks_status.max_block + 1, from_block);
244                log::info!(
245                    "Starting fast sync with COPY from block {}",
246                    target_block.separate_with_commas()
247                );
248                self.sync_blocks(target_block, to_block, true).await?;
249            } else {
250                let gap_size = blocks_status.max_block - blocks_status.last_continuous_block;
251                log::info!(
252                    "Cache inconsistency detected: {} blocks missing between {} and {}",
253                    gap_size,
254                    blocks_status.last_continuous_block + 1,
255                    blocks_status.max_block
256                );
257
258                log::info!(
259                    "Block syncing Phase 1: Filling gaps with INSERT (blocks {} to {})",
260                    blocks_status.last_continuous_block + 1,
261                    blocks_status.max_block
262                );
263                self.sync_blocks(
264                    blocks_status.last_continuous_block + 1,
265                    Some(blocks_status.max_block),
266                    false,
267                )
268                .await?;
269
270                log::info!(
271                    "Block syncing Phase 2: Continuing with fast COPY from block {}",
272                    (blocks_status.max_block + 1).separate_with_commas()
273                );
274                self.sync_blocks(blocks_status.max_block + 1, to_block, true)
275                    .await?;
276            }
277        } else {
278            self.sync_blocks(from_block, to_block, true).await?;
279        }
280
281        Ok(())
282    }
283
284    /// Synchronizes blockchain data by fetching and caching all blocks from the starting block to the current chain head.
285    ///
286    /// # Errors
287    ///
288    /// Returns an error if block fetching, caching, or database operations fail.
289    pub async fn sync_blocks(
290        &mut self,
291        from_block: u64,
292        to_block: Option<u64>,
293        use_copy_command: bool,
294    ) -> anyhow::Result<()> {
295        const BATCH_SIZE: usize = 1000;
296
297        let to_block = if let Some(block) = to_block {
298            block
299        } else {
300            self.hypersync_client.current_block().await
301        };
302        let total_blocks = to_block.saturating_sub(from_block) + 1;
303        log::info!(
304            "Syncing blocks from {} to {} (total: {} blocks)",
305            from_block.separate_with_commas(),
306            to_block.separate_with_commas(),
307            total_blocks.separate_with_commas()
308        );
309
310        // Enable performance settings for sync operations
311        if let Err(e) = self.cache.toggle_performance_settings(true).await {
312            log::warn!("Failed to enable performance settings: {e}");
313        }
314
315        let blocks_stream = self
316            .hypersync_client
317            .request_blocks_stream(from_block, Some(to_block))
318            .await;
319
320        tokio::pin!(blocks_stream);
321
322        let mut metrics = BlockchainSyncReporter::new(
323            BlockchainSyncReportItems::Blocks,
324            from_block,
325            total_blocks,
326            BLOCKS_PROCESS_IN_SYNC_REPORT,
327        );
328
329        let mut batch: Vec<Block> = Vec::with_capacity(BATCH_SIZE);
330
331        let cancellation_token = self.cancellation_token.clone();
332        let sync_result = tokio::select! {
333            () = cancellation_token.cancelled() => {
334                log::info!("Block sync cancelled");
335                Err(anyhow::anyhow!("Sync cancelled"))
336            }
337            result = async {
338                while let Some(block) = blocks_stream.next().await {
339                    let block_number = block.number;
340                    if self.cache.get_block_timestamp(block_number).is_some() {
341                        continue;
342                    }
343                    batch.push(block);
344
345                    // Process batch when full or last block
346                    if batch.len() >= BATCH_SIZE || block_number >= to_block {
347                        let batch_size = batch.len();
348
349                        self.cache.add_blocks_batch(batch, use_copy_command).await?;
350                        metrics.update(batch_size);
351
352                        // Re-initialize batch vector
353                        batch = Vec::with_capacity(BATCH_SIZE);
354                    }
355
356                    // Log progress if needed
357                    if metrics.should_log_progress(block_number, to_block) {
358                        metrics.log_progress(block_number);
359                    }
360                }
361
362                // Process any remaining blocks
363                if !batch.is_empty() {
364                    let batch_size = batch.len();
365                    self.cache.add_blocks_batch(batch, use_copy_command).await?;
366                    metrics.update(batch_size);
367                }
368
369                metrics.log_final_stats();
370                Ok(())
371            } => result
372        };
373
374        sync_result?;
375
376        // Restore default safe settings after sync completion
377        if let Err(e) = self.cache.toggle_performance_settings(false).await {
378            log::warn!("Failed to restore default settings: {e}");
379        }
380
381        Ok(())
382    }
383
384    /// Synchronizes all events for a specific pool within the given block range.
385    ///
386    /// # Errors
387    ///
388    /// Returns an error if event syncing, parsing, or database operations fail.
389    pub async fn sync_pool_events(
390        &mut self,
391        dex: &DexType,
392        pool_identifier: PoolIdentifier,
393        from_block: Option<u64>,
394        to_block: Option<u64>,
395        reset: bool,
396    ) -> anyhow::Result<()> {
397        const EVENT_BATCH_SIZE: usize = 20000;
398
399        let pool: SharedPool = self.get_pool(&pool_identifier)?.clone();
400        let pool_display = pool.to_full_spec_string();
401        let from_block = from_block.unwrap_or(pool.creation_block);
402        // Extract address for blockchain queries
403        let pool_address = &pool.address;
404
405        let (last_synced_block, effective_from_block) = if reset {
406            (None, from_block)
407        } else {
408            let last_synced_block = self
409                .cache
410                .get_pool_last_synced_block(dex, &pool_identifier)
411                .await?;
412            let effective_from_block = last_synced_block
413                .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
414            (last_synced_block, effective_from_block)
415        };
416
417        let to_block = match to_block {
418            Some(block) => block,
419            None => self.hypersync_client.current_block().await,
420        };
421
422        // Skip sync if we're already up to date
423        if effective_from_block > to_block {
424            log::info!(
425                "D {} already synced to block {} (current: {}), skipping sync",
426                dex,
427                last_synced_block.unwrap_or(0).separate_with_commas(),
428                to_block.separate_with_commas()
429            );
430            return Ok(());
431        }
432
433        // Query table max blocks to detect last blocks to use batch insert before that, then COPY command.
434        let last_block_across_pool_events_table = self
435            .cache
436            .get_pool_event_tables_last_block(&pool_identifier)
437            .await?;
438
439        let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
440        log::info!(
441            "Syncing Pool: '{}' events from {} to {} (total: {} blocks){}",
442            pool_display,
443            effective_from_block.separate_with_commas(),
444            to_block.separate_with_commas(),
445            total_blocks.separate_with_commas(),
446            if let Some(last_synced) = last_synced_block {
447                format!(
448                    " - resuming from last synced block {}",
449                    last_synced.separate_with_commas()
450                )
451            } else {
452                String::new()
453            }
454        );
455
456        let mut metrics = BlockchainSyncReporter::new(
457            BlockchainSyncReportItems::PoolEvents,
458            effective_from_block,
459            total_blocks,
460            BLOCKS_PROCESS_IN_SYNC_REPORT,
461        );
462        let dex_extended = self.get_dex_extended(dex)?.clone();
463        let swap_event_signature = dex_extended.swap_created_event.as_ref();
464        let mint_event_signature = dex_extended.mint_created_event.as_ref();
465        let burn_event_signature = dex_extended.burn_created_event.as_ref();
466        let collect_event_signature = dex_extended.collect_created_event.as_ref();
467        let flash_event_signature = dex_extended.flash_created_event.as_ref();
468        let initialize_event_signature: Option<&str> =
469            dex_extended.initialize_event.as_ref().map(|s| s.as_ref());
470
471        // Pre-decode event signatures to bytes for efficient comparison
472        let swap_sig_bytes = hex::decode(
473            swap_event_signature
474                .strip_prefix("0x")
475                .unwrap_or(swap_event_signature),
476        )?;
477        let mint_sig_bytes = hex::decode(
478            mint_event_signature
479                .strip_prefix("0x")
480                .unwrap_or(mint_event_signature),
481        )?;
482        let burn_sig_bytes = hex::decode(
483            burn_event_signature
484                .strip_prefix("0x")
485                .unwrap_or(burn_event_signature),
486        )?;
487        let collect_sig_bytes = hex::decode(
488            collect_event_signature
489                .strip_prefix("0x")
490                .unwrap_or(collect_event_signature),
491        )?;
492        let flash_sig_bytes = flash_event_signature
493            .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
494        let initialize_sig_bytes = initialize_event_signature
495            .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
496
497        let mut event_signatures = vec![
498            swap_event_signature,
499            mint_event_signature,
500            burn_event_signature,
501            collect_event_signature,
502        ];
503
504        if let Some(event) = dex_extended.initialize_event.as_ref() {
505            event_signatures.push(event);
506        }
507
508        if let Some(event) = dex_extended.flash_created_event.as_ref() {
509            event_signatures.push(event);
510        }
511        let pool_events_stream = self
512            .hypersync_client
513            .request_contract_events_stream(
514                effective_from_block,
515                Some(to_block),
516                pool_address,
517                event_signatures,
518            )
519            .await;
520        tokio::pin!(pool_events_stream);
521
522        let mut last_block_saved = effective_from_block;
523        let mut blocks_processed = 0;
524
525        let mut swap_batch: Vec<PoolSwap> = Vec::with_capacity(EVENT_BATCH_SIZE);
526        let mut liquidity_batch: Vec<PoolLiquidityUpdate> = Vec::with_capacity(EVENT_BATCH_SIZE);
527        let mut collect_batch: Vec<PoolFeeCollect> = Vec::with_capacity(EVENT_BATCH_SIZE);
528        let mut flash_batch: Vec<PoolFlash> = Vec::with_capacity(EVENT_BATCH_SIZE);
529
530        // Track when we've moved beyond stale data and can use COPY
531        let mut beyond_stale_data = last_block_across_pool_events_table
532            .is_none_or(|tables_max| effective_from_block > tables_max);
533
534        let cancellation_token = self.cancellation_token.clone();
535        let sync_result = tokio::select! {
536            () = cancellation_token.cancelled() => {
537                log::info!("Pool event sync cancelled");
538                Err(anyhow::anyhow!("Sync cancelled"))
539            }
540            result = async {
541                while let Some(log) = pool_events_stream.next().await {
542                    let block_number = extract_block_number(&log)?;
543                    blocks_processed += block_number - last_block_saved;
544                    last_block_saved = block_number;
545
546                    let event_sig_bytes = extract_event_signature_bytes(&log)?;
547            if event_sig_bytes == swap_sig_bytes.as_slice() {
548                let swap_event = dex_extended.parse_swap_event_hypersync(&log)?;
549                match self.process_pool_swap_event(&swap_event, &pool) {
550                    Ok(swap) => swap_batch.push(swap),
551                    Err(e) => log::error!("Failed to process swap event: {e}"),
552                }
553            } else if event_sig_bytes == mint_sig_bytes.as_slice() {
554                let mint_event = dex_extended.parse_mint_event_hypersync(&log)?;
555                match self.process_pool_mint_event(&mint_event, &pool, &dex_extended) {
556                    Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
557                    Err(e) => log::error!("Failed to process mint event: {e}"),
558                }
559            } else if event_sig_bytes == burn_sig_bytes.as_slice() {
560                let burn_event = dex_extended.parse_burn_event_hypersync(&log)?;
561                match self.process_pool_burn_event(&burn_event, &pool, &dex_extended) {
562                    Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
563                    Err(e) => log::error!("Failed to process burn event: {e}"),
564                }
565            } else if event_sig_bytes == collect_sig_bytes.as_slice() {
566                let collect_event = dex_extended.parse_collect_event_hypersync(&log)?;
567                match self.process_pool_collect_event(&collect_event, &pool, &dex_extended) {
568                    Ok(fee_collect) => collect_batch.push(fee_collect),
569                    Err(e) => log::error!("Failed to process collect event: {e}"),
570                }
571            } else if initialize_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
572                let initialize_event = dex_extended.parse_initialize_event_hypersync(&log)?;
573                self.cache
574                    .update_pool_initialize_price_tick(&initialize_event)
575                    .await?;
576            } else if flash_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
577                if let Some(parse_fn) = dex_extended.parse_flash_event_hypersync_fn {
578                    match parse_fn(dex_extended.dex.clone(), &log) {
579                        Ok(flash_event) => {
580                            match self.process_pool_flash_event(&flash_event, &pool) {
581                                Ok(flash) => flash_batch.push(flash),
582                                Err(e) => log::error!("Failed to process flash event: {e}"),
583                            }
584                        }
585                        Err(e) => log::error!("Failed to parse flash event: {e}"),
586                    }
587                }
588            } else {
589                let event_signature = hex::encode(event_sig_bytes);
590                log::error!(
591                    "Unexpected event signature: {event_signature} for log {log:?}"
592                );
593            }
594
595            // Check if we've moved beyond stale data (transition point for strategy change)
596            if !beyond_stale_data
597                && last_block_across_pool_events_table
598                    .is_some_and(|table_max| block_number > table_max)
599            {
600                log::info!(
601                    "Crossed beyond stale data at block {block_number} - flushing current batches with ON CONFLICT, then switching to COPY"
602                );
603
604                // Flush all batches with ON CONFLICT to handle any remaining duplicates
605                self.flush_event_batches(
606                    EVENT_BATCH_SIZE,
607                    &mut swap_batch,
608                    &mut liquidity_batch,
609                    &mut collect_batch,
610                    &mut flash_batch,
611                    false,
612                    true,
613                )
614                .await?;
615
616                beyond_stale_data = true;
617                log::info!("Switched to COPY mode - future batches will use COPY command");
618            } else {
619                // Process batches when they reach batch size
620                self.flush_event_batches(
621                    EVENT_BATCH_SIZE,
622                    &mut swap_batch,
623                    &mut liquidity_batch,
624                    &mut collect_batch,
625                    &mut flash_batch,
626                    false, // TODO temporary dont use copy command
627                    false,
628                )
629                .await?;
630            }
631
632            metrics.update(blocks_processed as usize);
633            blocks_processed = 0;
634
635            // Log progress if needed
636            if metrics.should_log_progress(block_number, to_block) {
637                metrics.log_progress(block_number);
638                self.cache
639                    .update_pool_last_synced_block(dex, &pool_identifier, block_number)
640                    .await?;
641            }
642        }
643
644        self.flush_event_batches(
645            EVENT_BATCH_SIZE,
646            &mut swap_batch,
647            &mut liquidity_batch,
648            &mut collect_batch,
649            &mut flash_batch,
650            false,
651            true,
652        )
653        .await?;
654
655        metrics.log_final_stats();
656        self.cache
657            .update_pool_last_synced_block(dex, &pool_identifier, to_block)
658            .await?;
659
660        log::info!(
661            "Successfully synced Dex '{}' Pool '{}' up to block {}",
662            dex,
663            pool_display,
664            to_block.separate_with_commas()
665        );
666                Ok(())
667            } => result
668        };
669
670        sync_result
671    }
672
673    #[expect(clippy::too_many_arguments)]
674    async fn flush_event_batches(
675        &self,
676        event_batch_size: usize,
677        swap_batch: &mut Vec<PoolSwap>,
678        liquidity_batch: &mut Vec<PoolLiquidityUpdate>,
679        collect_batch: &mut Vec<PoolFeeCollect>,
680        flash_batch: &mut Vec<PoolFlash>,
681        use_copy_command: bool,
682        force_flush_all: bool,
683    ) -> anyhow::Result<()> {
684        if (force_flush_all || swap_batch.len() >= event_batch_size) && !swap_batch.is_empty() {
685            self.cache
686                .add_pool_swaps_batch(swap_batch, use_copy_command)
687                .await?;
688            swap_batch.clear();
689        }
690
691        if (force_flush_all || liquidity_batch.len() >= event_batch_size)
692            && !liquidity_batch.is_empty()
693        {
694            self.cache
695                .add_pool_liquidity_updates_batch(liquidity_batch, use_copy_command)
696                .await?;
697            liquidity_batch.clear();
698        }
699
700        if (force_flush_all || collect_batch.len() >= event_batch_size) && !collect_batch.is_empty()
701        {
702            self.cache
703                .add_pool_fee_collects_batch(collect_batch, use_copy_command)
704                .await?;
705            collect_batch.clear();
706        }
707
708        if (force_flush_all || flash_batch.len() >= event_batch_size) && !flash_batch.is_empty() {
709            self.cache.add_pool_flash_batch(flash_batch).await?;
710            flash_batch.clear();
711        }
712        Ok(())
713    }
714
715    /// Processes a swap event and converts it to a pool swap.
716    ///
717    /// # Errors
718    ///
719    /// Returns an error if swap event processing fails.
720    pub fn process_pool_swap_event(
721        &self,
722        swap_event: &SwapEvent,
723        pool: &SharedPool,
724    ) -> anyhow::Result<PoolSwap> {
725        let timestamp = self
726            .cache
727            .get_block_timestamp(swap_event.block_number)
728            .copied();
729        let mut swap = swap_event.to_pool_swap(
730            self.chain.clone(),
731            pool.instrument_id,
732            pool.pool_identifier,
733            timestamp,
734        );
735        swap.calculate_trade_info(&pool.token0, &pool.token1, None)?;
736
737        Ok(swap)
738    }
739
740    /// Processes a mint event (liquidity addition) and converts it to a `PoolLiquidityUpdate`.
741    ///
742    /// # Errors
743    ///
744    /// Returns an error if mint event processing fails or if the liquidity update creation fails.
745    pub fn process_pool_mint_event(
746        &self,
747        mint_event: &MintEvent,
748        pool: &SharedPool,
749        dex_extended: &DexExtended,
750    ) -> anyhow::Result<PoolLiquidityUpdate> {
751        let timestamp = self
752            .cache
753            .get_block_timestamp(mint_event.block_number)
754            .copied();
755
756        let liquidity_update = mint_event.to_pool_liquidity_update(
757            self.chain.clone(),
758            dex_extended.dex.clone(),
759            pool.instrument_id,
760            timestamp,
761        );
762
763        // self.cache.add_liquidity_update(&liquidity_update).await?;
764
765        Ok(liquidity_update)
766    }
767
768    /// Processes a burn event (liquidity removal) and converts it to a `PoolLiquidityUpdate`.
769    /// Processes a pool burn event and converts it to a liquidity update.
770    ///
771    /// # Errors
772    ///
773    /// Returns an error if the burn event processing fails or if the liquidity update creation fails.
774    pub fn process_pool_burn_event(
775        &self,
776        burn_event: &BurnEvent,
777        pool: &SharedPool,
778        dex_extended: &DexExtended,
779    ) -> anyhow::Result<PoolLiquidityUpdate> {
780        let timestamp = self
781            .cache
782            .get_block_timestamp(burn_event.block_number)
783            .copied();
784
785        let liquidity_update = burn_event.to_pool_liquidity_update(
786            self.chain.clone(),
787            dex_extended.dex.clone(),
788            pool.instrument_id,
789            pool.pool_identifier,
790            timestamp,
791        );
792
793        // self.cache.add_liquidity_update(&liquidity_update).await?;
794
795        Ok(liquidity_update)
796    }
797
798    /// Processes a pool collect event and converts it to a fee collection.
799    ///
800    /// # Errors
801    ///
802    /// Returns an error if the collect event processing fails or if the fee collection creation fails.
803    pub fn process_pool_collect_event(
804        &self,
805        collect_event: &CollectEvent,
806        pool: &SharedPool,
807        dex_extended: &DexExtended,
808    ) -> anyhow::Result<PoolFeeCollect> {
809        let timestamp = self
810            .cache
811            .get_block_timestamp(collect_event.block_number)
812            .copied();
813
814        let fee_collect = collect_event.to_pool_fee_collect(
815            self.chain.clone(),
816            dex_extended.dex.clone(),
817            pool.instrument_id,
818            timestamp,
819        );
820
821        Ok(fee_collect)
822    }
823
824    /// Processes a pool flash event and converts it to a flash loan.
825    ///
826    /// # Errors
827    ///
828    /// Returns an error if the flash event processing fails or if the flash loan creation fails.
829    pub fn process_pool_flash_event(
830        &self,
831        flash_event: &FlashEvent,
832        pool: &SharedPool,
833    ) -> anyhow::Result<PoolFlash> {
834        let timestamp = self
835            .cache
836            .get_block_timestamp(flash_event.block_number)
837            .copied();
838
839        let flash = flash_event.to_pool_flash(self.chain.clone(), pool.instrument_id, timestamp);
840
841        Ok(flash)
842    }
843
844    /// Synchronizes all pools and their tokens for a specific DEX within the given block range.
845    ///
846    /// This method performs a full sync of:
847    /// 1. Pool creation events from the DEX factory
848    /// 2. Token metadata for all tokens in discovered pools
849    /// 3. Pool entities with proper token associations
850    ///
851    /// # Errors
852    ///
853    /// Returns an error if syncing pools, tokens, or DEX operations fail.
854    pub async fn sync_exchange_pools(
855        &mut self,
856        dex: &DexType,
857        from_block: u64,
858        to_block: Option<u64>,
859        reset: bool,
860    ) -> anyhow::Result<()> {
861        let dex_extended = self.get_dex_extended(dex)?.clone();
862
863        let mut service = PoolDiscoveryService::new(
864            self.chain.clone(),
865            &mut self.cache,
866            &self.tokens,
867            &self.hypersync_client,
868            self.cancellation_token.clone(),
869            self.config.clone(),
870        );
871
872        service
873            .sync_pools(&dex_extended, from_block, to_block, reset)
874            .await?;
875
876        Ok(())
877    }
878
879    /// Registers a decentralized exchange for data collection and event monitoring.
880    ///
881    /// Registration involves:
882    /// 1. Adding the DEX to the cache
883    /// 2. Loading existing pools for the DEX
884    /// 3. Configuring event signatures for subscriptions
885    ///
886    /// # Errors
887    ///
888    /// Returns an error if DEX registration, cache operations, or pool loading fails.
889    pub async fn register_dex_exchange(&mut self, dex_id: DexType) -> anyhow::Result<()> {
890        if let Some(dex_extended) = get_dex_extended(self.chain.name, &dex_id) {
891            log::info!("Registering DEX {dex_id} on chain {}", self.chain.name);
892
893            self.cache.add_dex(dex_extended.dex.clone()).await?;
894            let _ = self.cache.load_pools(&dex_id).await?;
895
896            self.subscription_manager.register_dex_for_subscriptions(
897                dex_id,
898                dex_extended.swap_created_event.as_ref(),
899                dex_extended.mint_created_event.as_ref(),
900                dex_extended.burn_created_event.as_ref(),
901                dex_extended.collect_created_event.as_ref(),
902                dex_extended.flash_created_event.as_deref(),
903            );
904            Ok(())
905        } else {
906            anyhow::bail!("Unknown DEX {dex_id} on chain {}", self.chain.name)
907        }
908    }
909
910    /// Bootstraps a [`PoolProfiler`] with the latest state for a given pool.
911    ///
912    /// Uses two paths depending on whether the pool has been synced to the database:
913    /// - **Never synced**: Streams events from HyperSync → restores from on-chain RPC → returns `(profiler, true)`
914    /// - **Previously synced**: Syncs new events to DB → streams from DB → returns `(profiler, false)`
915    ///
916    /// Both paths restore from the latest valid snapshot first (if available), otherwise initialize with pool's initial price.
917    ///
918    /// # Returns
919    ///
920    /// - `PoolProfiler`: Hydrated profiler with current pool state
921    /// - `bool`: `true` if constructed from RPC (already valid), `false` if from DB (needs validation)
922    ///
923    /// # Errors
924    ///
925    /// Returns an error if database is not initialized or event processing fails.
926    ///
927    /// # Panics
928    ///
929    /// Panics if the database reference is unavailable.
930    pub async fn bootstrap_latest_pool_profiler(
931        &mut self,
932        pool: &SharedPool,
933    ) -> anyhow::Result<(PoolProfiler, bool)> {
934        log::info!(
935            "Bootstrapping latest pool profiler for pool {}",
936            pool.address
937        );
938
939        if self.cache.database.is_none() {
940            anyhow::bail!(
941                "Database is not initialized, so we cannot properly bootstrap the latest pool profiler"
942            );
943        }
944
945        let mut profiler = PoolProfiler::new(pool.clone());
946
947        // Calculate latest valid block position after which we need to start profiling.
948        let from_position = match self
949            .cache
950            .database
951            .as_ref()
952            .unwrap()
953            .load_latest_valid_pool_snapshot(pool.chain.chain_id, &pool.pool_identifier)
954            .await
955        {
956            Ok(Some(snapshot)) => {
957                log::info!(
958                    "Loaded valid snapshot from block {} which contains {} positions and {} ticks",
959                    snapshot.block_position.number.separate_with_commas(),
960                    snapshot.positions.len(),
961                    snapshot.ticks.len()
962                );
963                let block_position = snapshot.block_position.clone();
964                profiler.restore_from_snapshot(snapshot)?;
965                log::info!("Restored profiler from snapshot");
966                Some(block_position)
967            }
968            _ => {
969                log::info!("No valid snapshot found, processing from beginning");
970                None
971            }
972        };
973
974        // If we don't have never synced pool events, proceed with faster
975        // construction of pool profiler from hypersync and RPC, where we
976        // dont need syncing of pool events and fetching it from database
977        if self
978            .cache
979            .database
980            .as_ref()
981            .unwrap()
982            .get_pool_last_synced_block(self.chain.chain_id, &pool.dex.name, &pool.pool_identifier)
983            .await?
984            .is_none()
985        {
986            return self
987                .construct_pool_profiler_from_hypersync_rpc(profiler, from_position)
988                .await;
989        }
990
991        // Sync the pool events before bootstrapping of pool profiler
992        if let Err(e) = self
993            .sync_pool_events(&pool.dex.name, pool.pool_identifier, None, None, false)
994            .await
995        {
996            log::error!("Failed to sync pool events for snapshot request: {e}");
997        }
998
999        if !profiler.is_initialized {
1000            if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
1001                profiler.initialize(initial_sqrt_price_x96);
1002            } else {
1003                anyhow::bail!(
1004                    "Pool is not initialized and it doesn't contain initial price, cannot bootstrap profiler"
1005                );
1006            }
1007        }
1008
1009        let from_block = from_position
1010            .as_ref()
1011            .map_or(profiler.pool.creation_block, |block_position| {
1012                block_position.number
1013            });
1014        let to_block = self.hypersync_client.current_block().await;
1015        let total_blocks = to_block.saturating_sub(from_block) + 1;
1016
1017        // Enable embedded profiler reporting
1018        profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1019
1020        let mut stream = self.cache.database.as_ref().unwrap().stream_pool_events(
1021            pool.chain.clone(),
1022            pool.dex.clone(),
1023            pool.instrument_id,
1024            pool.pool_identifier,
1025            from_position.clone(),
1026        );
1027
1028        while let Some(result) = stream.next().await {
1029            match result {
1030                Ok(event) => {
1031                    profiler.process(&event)?;
1032                }
1033                Err(e) => log::error!("Error processing event: {e}"),
1034            }
1035        }
1036
1037        profiler.finalize_reporting();
1038
1039        Ok((profiler, false))
1040    }
1041
1042    /// Constructs a pool profiler by fetching events directly from HyperSync RPC.
1043    ///
1044    /// This method is used when the pool has never been synced to the database. It streams
1045    /// liquidity events (mints, burns) directly from HyperSync and processes them
1046    /// to build up the profiler's state in real-time. After processing all events, it
1047    /// restores the profiler from the current on-chain state with the provided ticks and positions
1048    ///
1049    /// # Returns
1050    ///
1051    /// Returns a tuple of:
1052    /// - `PoolProfiler`: The hydrated profiler with state built from events
1053    /// - `bool`: Always `true` to indicate the profiler state was valid, and it was constructed from RPC
1054    ///
1055    /// # Errors
1056    ///
1057    /// Returns an error if:
1058    /// - Event streaming from HyperSync fails
1059    /// - Event parsing or processing fails
1060    /// - DEX configuration is invalid
1061    async fn construct_pool_profiler_from_hypersync_rpc(
1062        &self,
1063        mut profiler: PoolProfiler,
1064        from_position: Option<BlockPosition>,
1065    ) -> anyhow::Result<(PoolProfiler, bool)> {
1066        log::info!("Constructing pool profiler from hypersync stream and RPC final state querying");
1067        let dex_extended = self.get_dex_extended(&profiler.pool.dex.name)?.clone();
1068        let mint_event_signature = dex_extended.mint_created_event.as_ref();
1069        let burn_event_signature = dex_extended.burn_created_event.as_ref();
1070        let initialize_event_signature =
1071            if let Some(initialize_event) = &dex_extended.initialize_event {
1072                initialize_event.as_ref()
1073            } else {
1074                anyhow::bail!(
1075                    "DEX {} does not have initialize event set.",
1076                    &profiler.pool.dex.name
1077                );
1078            };
1079        let mint_sig_bytes = hex::decode(
1080            mint_event_signature
1081                .strip_prefix("0x")
1082                .unwrap_or(mint_event_signature),
1083        )?;
1084        let burn_sig_bytes = hex::decode(
1085            burn_event_signature
1086                .strip_prefix("0x")
1087                .unwrap_or(burn_event_signature),
1088        )?;
1089        let initialize_sig_bytes = hex::decode(
1090            initialize_event_signature
1091                .strip_prefix("0x")
1092                .unwrap_or(initialize_event_signature),
1093        )?;
1094
1095        let from_block = from_position.map_or(profiler.pool.creation_block, |block_position| {
1096            block_position.number
1097        });
1098        let to_block = self.hypersync_client.current_block().await;
1099        let total_blocks = to_block.saturating_sub(from_block) + 1;
1100
1101        log::info!(
1102            "Bootstrapping pool profiler for pool {} from block {} to {} (total: {} blocks)",
1103            profiler.pool.address,
1104            from_block.separate_with_commas(),
1105            to_block.separate_with_commas(),
1106            total_blocks.separate_with_commas()
1107        );
1108
1109        // Enable embedded profiler reporting
1110        profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1111
1112        let pool_events_stream = self
1113            .hypersync_client
1114            .request_contract_events_stream(
1115                from_block,
1116                None,
1117                &profiler.pool.address,
1118                vec![
1119                    mint_event_signature,
1120                    burn_event_signature,
1121                    initialize_event_signature,
1122                ],
1123            )
1124            .await;
1125        tokio::pin!(pool_events_stream);
1126
1127        while let Some(log) = pool_events_stream.next().await {
1128            let event_sig_bytes = extract_event_signature_bytes(&log)?;
1129
1130            if event_sig_bytes == initialize_sig_bytes {
1131                let initialize_event = dex_extended.parse_initialize_event_hypersync(&log)?;
1132                profiler.initialize(initialize_event.sqrt_price_x96);
1133                self.cache
1134                    .database
1135                    .as_ref()
1136                    .unwrap()
1137                    .update_pool_initial_price_tick(self.chain.chain_id, &initialize_event)
1138                    .await?;
1139            } else if event_sig_bytes == mint_sig_bytes {
1140                let mint_event = dex_extended.parse_mint_event_hypersync(&log)?;
1141                match self.process_pool_mint_event(&mint_event, &profiler.pool, &dex_extended) {
1142                    Ok(liquidity_update) => {
1143                        profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1144                    }
1145                    Err(e) => log::error!("Failed to process mint event: {e}"),
1146                }
1147            } else if event_sig_bytes == burn_sig_bytes {
1148                let burn_event = dex_extended.parse_burn_event_hypersync(&log)?;
1149                match self.process_pool_burn_event(&burn_event, &profiler.pool, &dex_extended) {
1150                    Ok(liquidity_update) => {
1151                        profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1152                    }
1153                    Err(e) => log::error!("Failed to process burn event: {e}"),
1154                }
1155            } else {
1156                let event_signature = hex::encode(event_sig_bytes);
1157                log::error!(
1158                    "Unexpected event signature in bootstrap_latest_pool_profiler: {event_signature} for log {log:?}"
1159                );
1160            }
1161        }
1162
1163        profiler.finalize_reporting();
1164
1165        // Hydrate from the current RPC state
1166        match self.get_on_chain_snapshot(&profiler).await {
1167            Ok(on_chain_snapshot) => profiler.restore_from_snapshot(on_chain_snapshot)?,
1168            Err(e) => log::error!(
1169                "Failed to restore from on-chain snapshot: {e}, sending not hydrated state to client"
1170            ),
1171        }
1172
1173        Ok((profiler, true))
1174    }
1175
1176    /// Validates a pool profiler's state against on-chain data for accuracy verification.
1177    ///
1178    /// This method performs integrity checking by comparing the profiler's internal state
1179    /// (positions, ticks, liquidity) with the actual on-chain smart contract state. For UniswapV3
1180    /// pools, it fetches current on-chain data and verifies that the profiler's tracked state matches.
1181    /// If validation succeeds or is bypassed, the snapshot is marked as valid in the database.
1182    ///
1183    /// # Errors
1184    ///
1185    /// Returns an error if database operations fail when marking the snapshot as valid.
1186    ///
1187    /// # Panics
1188    ///
1189    /// Panics if the profiler does not have a last_processed_event when already_validated is true.
1190    pub async fn check_snapshot_validity(
1191        &self,
1192        profiler: &PoolProfiler,
1193        already_validated: bool,
1194    ) -> anyhow::Result<bool> {
1195        // Determine validity and get block position for marking
1196        let (is_valid, block_position) = if already_validated {
1197            // Skip RPC call - profiler was validated during construction from RPC
1198            log::info!("Snapshot already validated from RPC, skipping on-chain comparison");
1199            let last_event = profiler
1200                .last_processed_event
1201                .clone()
1202                .expect("Profiler should have last_processed_event");
1203            (true, last_event)
1204        } else {
1205            // Fetch on-chain state and compare
1206            match self.get_on_chain_snapshot(profiler).await {
1207                Ok(on_chain_snapshot) => {
1208                    log::info!("Comparing profiler state with on-chain state...");
1209                    let valid = compare_pool_profiler(profiler, &on_chain_snapshot);
1210                    if !valid {
1211                        log::error!(
1212                            "Pool profiler state does NOT match on-chain smart contract state"
1213                        );
1214                    }
1215                    (valid, on_chain_snapshot.block_position)
1216                }
1217                Err(e) => {
1218                    log::error!("Failed to check snapshot validity: {e}");
1219                    return Ok(false);
1220                }
1221            }
1222        };
1223
1224        // Mark snapshot as valid in database if validation passed
1225        if is_valid && let Some(cache_database) = &self.cache.database {
1226            cache_database
1227                .mark_pool_snapshot_valid(
1228                    profiler.pool.chain.chain_id,
1229                    &profiler.pool.pool_identifier,
1230                    block_position.number,
1231                    block_position.transaction_index,
1232                    block_position.log_index,
1233                )
1234                .await?;
1235            log::info!("Marked pool profiler snapshot as valid");
1236        }
1237
1238        Ok(is_valid)
1239    }
1240
1241    /// Fetches current on-chain pool state at the last processed block.
1242    ///
1243    /// Queries the pool smart contract to retrieve active tick liquidity and position data,
1244    /// using the profiler's active positions and last processed block number.
1245    /// Used for profiler state restoration after bootstrapping and validation.
1246    async fn get_on_chain_snapshot(&self, profiler: &PoolProfiler) -> anyhow::Result<PoolSnapshot> {
1247        if profiler.pool.dex.name == DexType::UniswapV3 {
1248            let last_processed_event = profiler
1249                .last_processed_event
1250                .clone()
1251                .expect("We expect at least one processed event in the pool");
1252            let on_chain_snapshot = self
1253                .univ3_pool
1254                .fetch_snapshot(
1255                    &profiler.pool.address,
1256                    profiler.pool.instrument_id,
1257                    profiler.get_active_tick_values().as_slice(),
1258                    &profiler.get_all_position_keys(),
1259                    last_processed_event,
1260                )
1261                .await?;
1262
1263            Ok(on_chain_snapshot)
1264        } else {
1265            anyhow::bail!(
1266                "Fetching on-chain snapshot for Dex protocol {} is not supported yet.",
1267                profiler.pool.dex.name
1268            )
1269        }
1270    }
1271
1272    /// Replays historical events for a pool to hydrate its profiler state.
1273    ///
1274    /// Streams all historical swap, liquidity, and fee collect events from the database
1275    /// and sends them through the normal data event pipeline to build up pool profiler state.
1276    ///
1277    /// # Errors
1278    ///
1279    /// Returns an error if database streaming fails or event processing fails.
1280    pub async fn replay_pool_events(&self, pool: &Pool, dex: &SharedDex) -> anyhow::Result<()> {
1281        if let Some(database) = &self.cache.database {
1282            log::info!(
1283                "Replaying historical events for pool {} to hydrate profiler",
1284                pool.instrument_id
1285            );
1286
1287            let mut event_stream = database.stream_pool_events(
1288                self.chain.clone(),
1289                dex.clone(),
1290                pool.instrument_id,
1291                pool.pool_identifier,
1292                None,
1293            );
1294            let mut event_count = 0;
1295
1296            while let Some(event_result) = event_stream.next().await {
1297                match event_result {
1298                    Ok(event) => {
1299                        let data_event = match event {
1300                            DexPoolData::Swap(swap) => DataEvent::DeFi(DefiData::PoolSwap(swap)),
1301                            DexPoolData::LiquidityUpdate(update) => {
1302                                DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))
1303                            }
1304                            DexPoolData::FeeCollect(collect) => {
1305                                DataEvent::DeFi(DefiData::PoolFeeCollect(collect))
1306                            }
1307                            DexPoolData::Flash(flash) => {
1308                                DataEvent::DeFi(DefiData::PoolFlash(flash))
1309                            }
1310                        };
1311                        self.send_data(data_event);
1312                        event_count += 1;
1313                    }
1314                    Err(e) => {
1315                        log::error!("Error streaming event for pool {}: {e}", pool.instrument_id);
1316                    }
1317                }
1318            }
1319
1320            log::info!(
1321                "Replayed {event_count} historical events for pool {}",
1322                pool.instrument_id
1323            );
1324        } else {
1325            log::debug!(
1326                "No database available, skipping event replay for pool {}",
1327                pool.instrument_id
1328            );
1329        }
1330
1331        Ok(())
1332    }
1333
1334    /// Determines the starting block for syncing operations.
1335    fn determine_from_block(&self) -> u64 {
1336        self.config
1337            .from_block
1338            .unwrap_or_else(|| self.cache.min_dex_creation_block().unwrap_or(0))
1339    }
1340
1341    /// Retrieves extended DEX information for a registered DEX.
1342    fn get_dex_extended(&self, dex_id: &DexType) -> anyhow::Result<&DexExtended> {
1343        if !self.cache.get_registered_dexes().contains(dex_id) {
1344            anyhow::bail!("DEX {dex_id} is not registered in the data client");
1345        }
1346
1347        match get_dex_extended(self.chain.name, dex_id) {
1348            Some(dex) => Ok(dex),
1349            None => anyhow::bail!("Dex {dex_id} doesn't exist for chain {}", self.chain.name),
1350        }
1351    }
1352
1353    /// Retrieves a pool from the cache by its address.
1354    ///
1355    /// # Errors
1356    ///
1357    /// Returns an error if the pool is not registered in the cache.
1358    pub fn get_pool(&self, pool_identifier: &PoolIdentifier) -> anyhow::Result<&SharedPool> {
1359        match self.cache.get_pool(pool_identifier) {
1360            Some(pool) => Ok(pool),
1361            None => anyhow::bail!("Pool {pool_identifier} is not registered"),
1362        }
1363    }
1364
1365    /// Sends a data event to all subscribers through the data channel.
1366    pub fn send_data(&self, data: DataEvent) {
1367        if let Some(data_tx) = &self.data_tx {
1368            log::debug!("Sending {data}");
1369
1370            if let Err(e) = data_tx.send(data) {
1371                log::error!("Failed to send data: {e}");
1372            }
1373        } else {
1374            log::error!("No data event channel for sending data");
1375        }
1376    }
1377
1378    /// Disconnects all active connections and cleanup resources.
1379    ///
1380    /// This method should be called when shutting down the client to ensure
1381    /// proper cleanup of network connections and background tasks.
1382    pub async fn disconnect(&mut self) {
1383        self.hypersync_client.disconnect().await;
1384    }
1385}