Skip to main content

nautilus_blockchain/services/
pool_discovery.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cmp::max, collections::HashSet};
17
18use alloy::primitives::Address;
19use futures_util::StreamExt;
20use nautilus_core::string::formatting::Separable;
21use nautilus_model::defi::{
22    SharedDex,
23    amm::Pool,
24    chain::SharedChain,
25    reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
26    token::Token,
27};
28use tokio_util::sync::CancellationToken;
29
30use crate::{
31    cache::BlockchainCache,
32    config::BlockchainDataClientConfig,
33    contracts::erc20::Erc20Contract,
34    events::pool_created::PoolCreatedEvent,
35    exchanges::extended::DexExtended,
36    hypersync::{client::HyperSyncClient, helpers::extract_block_number},
37};
38
39const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
40const POOL_DB_BATCH_SIZE: usize = 2000;
41
42/// Sanitizes a string by removing null bytes and other invalid characters for PostgreSQL UTF-8.
43///
44/// This function strips null bytes (0x00) and other problematic control characters that are
45/// invalid in PostgreSQL's UTF-8 text fields. Common with malformed on-chain token metadata.
46/// Preserves printable characters and common whitespace (space, tab, newline).
47fn sanitize_string(s: &str) -> String {
48    s.chars()
49        .filter(|c| {
50            // Keep printable characters and common whitespace, but filter null bytes
51            // and other problematic control characters
52            *c != '\0' && (*c >= ' ' || *c == '\t' || *c == '\n' || *c == '\r')
53        })
54        .collect()
55}
56
57/// Service responsible for discovering DEX liquidity pools from blockchain events.
58///
59/// This service handles the synchronization of pool creation events from various DEXes,
60/// managing token metadata fetching, buffering strategies, and database persistence.
61#[derive(Debug)]
62pub struct PoolDiscoveryService<'a> {
63    /// The blockchain network being synced
64    chain: SharedChain,
65    /// Cache for tokens and pools
66    cache: &'a mut BlockchainCache,
67    /// ERC20 contract interface for token metadata
68    erc20_contract: &'a Erc20Contract,
69    /// HyperSync client for event streaming
70    hypersync_client: &'a HyperSyncClient,
71    /// Cancellation token for graceful shutdown
72    cancellation_token: CancellationToken,
73    /// Configuration for sync operations
74    config: BlockchainDataClientConfig,
75}
76
77impl<'a> PoolDiscoveryService<'a> {
78    /// Creates a new [`PoolDiscoveryService`] instance.
79    #[must_use]
80    pub const fn new(
81        chain: SharedChain,
82        cache: &'a mut BlockchainCache,
83        erc20_contract: &'a Erc20Contract,
84        hypersync_client: &'a HyperSyncClient,
85        cancellation_token: CancellationToken,
86        config: BlockchainDataClientConfig,
87    ) -> Self {
88        Self {
89            chain,
90            cache,
91            erc20_contract,
92            hypersync_client,
93            cancellation_token,
94            config,
95        }
96    }
97
98    /// Synchronizes pools for a specific DEX within a given block range.
99    ///
100    /// # Errors
101    ///
102    /// Returns an error if:
103    /// - HyperSync streaming fails
104    /// - Token RPC calls fail
105    /// - Database operations fail
106    /// - Sync is cancelled
107    pub async fn sync_pools(
108        &mut self,
109        dex: &DexExtended,
110        from_block: u64,
111        to_block: Option<u64>,
112        reset: bool,
113    ) -> anyhow::Result<()> {
114        // Determine effective sync range
115        let (last_synced_block, effective_from_block) = if reset {
116            (None, from_block)
117        } else {
118            let last_synced_block = self.cache.get_dex_last_synced_block(&dex.dex.name).await?;
119            let effective_from_block = last_synced_block
120                .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
121            (last_synced_block, effective_from_block)
122        };
123
124        let to_block = match to_block {
125            Some(block) => block,
126            None => self.hypersync_client.current_block().await,
127        };
128
129        // Skip sync if already up to date
130        if effective_from_block > to_block {
131            log::info!(
132                "DEX {} already synced to block {} (current: {}), skipping sync",
133                dex.dex.name,
134                last_synced_block.unwrap_or(0).separate_with_commas(),
135                to_block.separate_with_commas()
136            );
137            return Ok(());
138        }
139
140        let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
141        log::info!(
142            "Syncing DEX exchange pools from {} to {} (total: {} blocks){}",
143            effective_from_block.separate_with_commas(),
144            to_block.separate_with_commas(),
145            total_blocks.separate_with_commas(),
146            if let Some(last_synced) = last_synced_block {
147                format!(
148                    " - resuming from last synced block {}",
149                    last_synced.separate_with_commas()
150                )
151            } else {
152                String::new()
153            },
154        );
155        log::info!(
156            "Syncing {} pool creation events from factory contract {} on chain {}",
157            dex.dex.name,
158            dex.factory,
159            self.chain.name
160        );
161
162        // Enable performance settings for sync operations
163        if let Err(e) = self.cache.toggle_performance_settings(true).await {
164            log::warn!("Failed to enable performance settings: {e}");
165        }
166
167        let mut metrics = BlockchainSyncReporter::new(
168            BlockchainSyncReportItems::PoolCreatedEvents,
169            effective_from_block,
170            total_blocks,
171            BLOCKS_PROCESS_IN_SYNC_REPORT,
172        );
173
174        let factory_address = &dex.factory;
175        let pair_created_event_signature = dex.pool_created_event.as_ref();
176        let pools_stream = self
177            .hypersync_client
178            .request_contract_events_stream(
179                effective_from_block,
180                Some(to_block),
181                factory_address,
182                vec![pair_created_event_signature],
183            )
184            .await;
185
186        tokio::pin!(pools_stream);
187
188        // LEVEL 1: RPC buffers (small, constrained by rate limits)
189        let token_rpc_batch_size = (self.config.multicall_calls_per_rpc_request / 3) as usize;
190        let mut token_rpc_buffer: HashSet<Address> = HashSet::new();
191
192        // LEVEL 2: DB buffers (large, optimize for throughput)
193        let mut token_db_buffer: Vec<Token> = Vec::new();
194        let mut pool_events_buffer: Vec<PoolCreatedEvent> = Vec::new();
195
196        let mut last_block_saved = effective_from_block;
197
198        // Tracking counters
199        let mut total_discovered = 0;
200        let mut total_skipped_exists = 0;
201        let mut total_skipped_invalid_tokens = 0;
202        let mut total_saved = 0;
203
204        let cancellation_token = self.cancellation_token.clone();
205        let sync_result = tokio::select! {
206            () = cancellation_token.cancelled() => {
207                log::info!("Exchange pool sync cancelled");
208                Err(anyhow::anyhow!("Sync cancelled"))
209            }
210
211            result = async {
212                while let Some(log) = pools_stream.next().await {
213                    let block_number = extract_block_number(&log)?;
214                    let blocks_progress = block_number - last_block_saved;
215                    last_block_saved = block_number;
216
217                    let pool = dex.parse_pool_created_event_hypersync(log)?;
218                    total_discovered += 1;
219
220                    if self.cache.get_pool(&pool.pool_identifier).is_some() {
221                        // Pool is already initialized and cached.
222                        total_skipped_exists += 1;
223                        continue;
224                    }
225
226                    if self.cache.is_invalid_token(&pool.token0)
227                        || self.cache.is_invalid_token(&pool.token1)
228                    {
229                        // Skip pools with invalid tokens as they cannot be properly processed or traded.
230                        total_skipped_invalid_tokens += 1;
231                        continue;
232                    }
233
234                    // Collect tokens needed for RPC fetch
235                    if self.cache.get_token(&pool.token0).is_none() {
236                        token_rpc_buffer.insert(pool.token0);
237                    }
238
239                    if self.cache.get_token(&pool.token1).is_none() {
240                        token_rpc_buffer.insert(pool.token1);
241                    }
242
243                    // Buffer the pool for later processing
244                    pool_events_buffer.push(pool);
245
246                    // ==== RPC FLUSHING (small batches) ====
247                    if token_rpc_buffer.len() >= token_rpc_batch_size {
248                        let fetched_tokens = self
249                            .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
250                            .await?;
251
252                        // Accumulate for later DB write
253                        token_db_buffer.extend(fetched_tokens);
254                    }
255
256                    // ==== DB FLUSHING (large batches) ====
257                    // Process pools when buffer is full
258                    if pool_events_buffer.len() >= POOL_DB_BATCH_SIZE {
259                        // 1. Fetch any remaining tokens in RPC buffer (needed for pool construction)
260                        if !token_rpc_buffer.is_empty() {
261                            let fetched_tokens = self
262                                .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
263                                .await?;
264                            token_db_buffer.extend(fetched_tokens);
265                        }
266
267                        // 2. Flush ALL tokens to DB (satisfy foreign key constraints)
268                        if !token_db_buffer.is_empty() {
269                            self.cache
270                                .add_tokens_batch(std::mem::take(&mut token_db_buffer))
271                                .await?;
272                        }
273
274                        // 3. Now safe to construct and flush pools
275                        let pools = self
276                            .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
277                            .await?;
278                        total_saved += pools.len();
279                        self.cache.add_pools_batch(pools).await?;
280                    }
281
282                    metrics.update(blocks_progress as usize);
283                    // Log progress if needed
284                    if metrics.should_log_progress(block_number, to_block) {
285                        metrics.log_progress(block_number);
286                    }
287                }
288
289                // ==== FINAL FLUSH (all remaining data) ====
290                // 1. Fetch any remaining tokens
291                if !token_rpc_buffer.is_empty() {
292                    let fetched_tokens = self
293                        .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
294                        .await?;
295                    token_db_buffer.extend(fetched_tokens);
296                }
297
298                // 2. Flush all tokens to DB
299                if !token_db_buffer.is_empty() {
300                    self.cache
301                        .add_tokens_batch(std::mem::take(&mut token_db_buffer))
302                        .await?;
303                }
304
305                // 3. Process and flush all pools
306                if !pool_events_buffer.is_empty() {
307                    let pools = self
308                        .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
309                        .await?;
310                    total_saved += pools.len();
311                    self.cache.add_pools_batch(pools).await?;
312                }
313
314                metrics.log_final_stats();
315
316                // Update the last synced block after successful completion.
317                self.cache
318                    .update_dex_last_synced_block(&dex.dex.name, to_block)
319                    .await?;
320
321                log::info!(
322                    "Successfully synced DEX {} pools up to block {} | Summary: discovered={}, saved={}, skipped_exists={}, skipped_invalid_tokens={}",
323                    dex.dex.name,
324                    to_block.separate_with_commas(),
325                    total_discovered,
326                    total_saved,
327                    total_skipped_exists,
328                    total_skipped_invalid_tokens
329                );
330
331                Ok(())
332            } => result
333        };
334
335        sync_result?;
336
337        // Restore default safe settings after sync completion
338        if let Err(e) = self.cache.toggle_performance_settings(false).await {
339            log::warn!("Failed to restore default settings: {e}");
340        }
341
342        Ok(())
343    }
344
345    /// Fetches token metadata via RPC and updates in-memory cache immediately.
346    ///
347    /// This method fetches token information using multicall, updates the in-memory cache right away
348    /// (so pool construction can proceed), and returns valid tokens for later batch DB writes.
349    ///
350    /// # Errors
351    ///
352    /// Returns an error if the RPC multicall fails or database operations fail.
353    async fn fetch_and_cache_tokens_in_memory(
354        &mut self,
355        token_buffer: &mut HashSet<Address>,
356    ) -> anyhow::Result<Vec<Token>> {
357        let batch_addresses: Vec<Address> = token_buffer.drain().collect();
358        let token_infos = self
359            .erc20_contract
360            .batch_fetch_token_info(&batch_addresses)
361            .await?;
362
363        let mut valid_tokens = Vec::new();
364
365        for (token_address, token_info) in token_infos {
366            match token_info {
367                Ok(token_info) => {
368                    // Sanitize token metadata to remove null bytes and invalid UTF-8 characters
369                    let sanitized_name = sanitize_string(&token_info.name);
370                    let sanitized_symbol = sanitize_string(&token_info.symbol);
371
372                    let token = Token::new(
373                        self.chain.clone(),
374                        token_address,
375                        sanitized_name,
376                        sanitized_symbol,
377                        token_info.decimals,
378                    );
379
380                    // Update in-memory cache IMMEDIATELY (so construct_pool can read it)
381                    self.cache.insert_token_in_memory(token.clone());
382
383                    // Collect for LATER DB write
384                    valid_tokens.push(token);
385                }
386                Err(token_info_error) => {
387                    self.cache.insert_invalid_token_in_memory(token_address);
388                    if let Some(database) = &self.cache.database {
389                        let sanitized_error = sanitize_string(&token_info_error.to_string());
390                        database
391                            .add_invalid_token(
392                                self.chain.chain_id,
393                                &token_address,
394                                &sanitized_error,
395                            )
396                            .await?;
397                    }
398                }
399            }
400        }
401
402        Ok(valid_tokens)
403    }
404
405    /// Constructs multiple pools from pool creation events.
406    ///
407    /// Assumes all required tokens are already in the in-memory cache.
408    ///
409    /// # Errors
410    ///
411    /// Logs errors for pools that cannot be constructed (missing tokens),
412    /// but does not fail the entire batch.
413    async fn construct_pools_batch(
414        &self,
415        pool_events: &mut Vec<PoolCreatedEvent>,
416        dex: &SharedDex,
417    ) -> anyhow::Result<Vec<Pool>> {
418        let mut pools = Vec::with_capacity(pool_events.len());
419
420        for pool_event in pool_events.drain(..) {
421            // Both tokens should be in cache now
422            let token0 = match self.cache.get_token(&pool_event.token0) {
423                Some(token) => token.clone(),
424                None => {
425                    if !self.cache.is_invalid_token(&pool_event.token0) {
426                        log::warn!(
427                            "Skipping pool {}: Token0 {} not in cache and not marked as invalid",
428                            pool_event.pool_address,
429                            pool_event.token0
430                        );
431                    }
432                    continue;
433                }
434            };
435
436            let token1 = match self.cache.get_token(&pool_event.token1) {
437                Some(token) => token.clone(),
438                None => {
439                    if !self.cache.is_invalid_token(&pool_event.token1) {
440                        log::warn!(
441                            "Skipping pool {}: Token1 {} not in cache and not marked as invalid",
442                            pool_event.pool_address,
443                            pool_event.token1
444                        );
445                    }
446                    continue;
447                }
448            };
449
450            let mut pool = Pool::new(
451                self.chain.clone(),
452                dex.clone(),
453                pool_event.pool_address,
454                pool_event.pool_identifier,
455                pool_event.block_number,
456                token0,
457                token1,
458                pool_event.fee,
459                pool_event.tick_spacing,
460                nautilus_core::UnixNanos::default(),
461            );
462
463            // Set hooks if available (UniswapV4)
464            if let Some(hooks) = pool_event.hooks {
465                pool.set_hooks(hooks);
466            }
467
468            // Initialize pool with sqrt_price_x96 and tick if available (UniswapV4)
469            if let (Some(sqrt_price_x96), Some(tick)) = (pool_event.sqrt_price_x96, pool_event.tick)
470            {
471                pool.initialize(sqrt_price_x96, tick);
472            }
473
474            pools.push(pool);
475        }
476
477        Ok(pools)
478    }
479}