nautilus_blockchain/services/
pool_discovery.rs1use std::{cmp::max, collections::HashSet};
17
18use alloy::primitives::Address;
19use futures_util::StreamExt;
20use nautilus_core::string::formatting::Separable;
21use nautilus_model::defi::{
22 SharedDex,
23 amm::Pool,
24 chain::SharedChain,
25 reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
26 token::Token,
27};
28use tokio_util::sync::CancellationToken;
29
30use crate::{
31 cache::BlockchainCache,
32 config::BlockchainDataClientConfig,
33 contracts::erc20::Erc20Contract,
34 events::pool_created::PoolCreatedEvent,
35 exchanges::extended::DexExtended,
36 hypersync::{client::HyperSyncClient, helpers::extract_block_number},
37};
38
39const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
40const POOL_DB_BATCH_SIZE: usize = 2000;
41
42fn sanitize_string(s: &str) -> String {
48 s.chars()
49 .filter(|c| {
50 *c != '\0' && (*c >= ' ' || *c == '\t' || *c == '\n' || *c == '\r')
53 })
54 .collect()
55}
56
57#[derive(Debug)]
62pub struct PoolDiscoveryService<'a> {
63 chain: SharedChain,
65 cache: &'a mut BlockchainCache,
67 erc20_contract: &'a Erc20Contract,
69 hypersync_client: &'a HyperSyncClient,
71 cancellation_token: CancellationToken,
73 config: BlockchainDataClientConfig,
75}
76
77impl<'a> PoolDiscoveryService<'a> {
78 #[must_use]
80 pub const fn new(
81 chain: SharedChain,
82 cache: &'a mut BlockchainCache,
83 erc20_contract: &'a Erc20Contract,
84 hypersync_client: &'a HyperSyncClient,
85 cancellation_token: CancellationToken,
86 config: BlockchainDataClientConfig,
87 ) -> Self {
88 Self {
89 chain,
90 cache,
91 erc20_contract,
92 hypersync_client,
93 cancellation_token,
94 config,
95 }
96 }
97
98 pub async fn sync_pools(
108 &mut self,
109 dex: &DexExtended,
110 from_block: u64,
111 to_block: Option<u64>,
112 reset: bool,
113 ) -> anyhow::Result<()> {
114 let (last_synced_block, effective_from_block) = if reset {
116 (None, from_block)
117 } else {
118 let last_synced_block = self.cache.get_dex_last_synced_block(&dex.dex.name).await?;
119 let effective_from_block = last_synced_block
120 .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
121 (last_synced_block, effective_from_block)
122 };
123
124 let to_block = match to_block {
125 Some(block) => block,
126 None => self.hypersync_client.current_block().await,
127 };
128
129 if effective_from_block > to_block {
131 log::info!(
132 "DEX {} already synced to block {} (current: {}), skipping sync",
133 dex.dex.name,
134 last_synced_block.unwrap_or(0).separate_with_commas(),
135 to_block.separate_with_commas()
136 );
137 return Ok(());
138 }
139
140 let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
141 log::info!(
142 "Syncing DEX exchange pools from {} to {} (total: {} blocks){}",
143 effective_from_block.separate_with_commas(),
144 to_block.separate_with_commas(),
145 total_blocks.separate_with_commas(),
146 if let Some(last_synced) = last_synced_block {
147 format!(
148 " - resuming from last synced block {}",
149 last_synced.separate_with_commas()
150 )
151 } else {
152 String::new()
153 },
154 );
155 log::info!(
156 "Syncing {} pool creation events from factory contract {} on chain {}",
157 dex.dex.name,
158 dex.factory,
159 self.chain.name
160 );
161
162 if let Err(e) = self.cache.toggle_performance_settings(true).await {
164 log::warn!("Failed to enable performance settings: {e}");
165 }
166
167 let mut metrics = BlockchainSyncReporter::new(
168 BlockchainSyncReportItems::PoolCreatedEvents,
169 effective_from_block,
170 total_blocks,
171 BLOCKS_PROCESS_IN_SYNC_REPORT,
172 );
173
174 let factory_address = &dex.factory;
175 let pair_created_event_signature = dex.pool_created_event.as_ref();
176 let pools_stream = self
177 .hypersync_client
178 .request_contract_events_stream(
179 effective_from_block,
180 Some(to_block),
181 factory_address,
182 vec![pair_created_event_signature],
183 )
184 .await;
185
186 tokio::pin!(pools_stream);
187
188 let token_rpc_batch_size = (self.config.multicall_calls_per_rpc_request / 3) as usize;
190 let mut token_rpc_buffer: HashSet<Address> = HashSet::new();
191
192 let mut token_db_buffer: Vec<Token> = Vec::new();
194 let mut pool_events_buffer: Vec<PoolCreatedEvent> = Vec::new();
195
196 let mut last_block_saved = effective_from_block;
197
198 let mut total_discovered = 0;
200 let mut total_skipped_exists = 0;
201 let mut total_skipped_invalid_tokens = 0;
202 let mut total_saved = 0;
203
204 let cancellation_token = self.cancellation_token.clone();
205 let sync_result = tokio::select! {
206 () = cancellation_token.cancelled() => {
207 log::info!("Exchange pool sync cancelled");
208 Err(anyhow::anyhow!("Sync cancelled"))
209 }
210
211 result = async {
212 while let Some(log) = pools_stream.next().await {
213 let block_number = extract_block_number(&log)?;
214 let blocks_progress = block_number - last_block_saved;
215 last_block_saved = block_number;
216
217 let pool = dex.parse_pool_created_event_hypersync(log)?;
218 total_discovered += 1;
219
220 if self.cache.get_pool(&pool.pool_identifier).is_some() {
221 total_skipped_exists += 1;
223 continue;
224 }
225
226 if self.cache.is_invalid_token(&pool.token0)
227 || self.cache.is_invalid_token(&pool.token1)
228 {
229 total_skipped_invalid_tokens += 1;
231 continue;
232 }
233
234 if self.cache.get_token(&pool.token0).is_none() {
236 token_rpc_buffer.insert(pool.token0);
237 }
238
239 if self.cache.get_token(&pool.token1).is_none() {
240 token_rpc_buffer.insert(pool.token1);
241 }
242
243 pool_events_buffer.push(pool);
245
246 if token_rpc_buffer.len() >= token_rpc_batch_size {
248 let fetched_tokens = self
249 .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
250 .await?;
251
252 token_db_buffer.extend(fetched_tokens);
254 }
255
256 if pool_events_buffer.len() >= POOL_DB_BATCH_SIZE {
259 if !token_rpc_buffer.is_empty() {
261 let fetched_tokens = self
262 .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
263 .await?;
264 token_db_buffer.extend(fetched_tokens);
265 }
266
267 if !token_db_buffer.is_empty() {
269 self.cache
270 .add_tokens_batch(std::mem::take(&mut token_db_buffer))
271 .await?;
272 }
273
274 let pools = self
276 .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
277 .await?;
278 total_saved += pools.len();
279 self.cache.add_pools_batch(pools).await?;
280 }
281
282 metrics.update(blocks_progress as usize);
283 if metrics.should_log_progress(block_number, to_block) {
285 metrics.log_progress(block_number);
286 }
287 }
288
289 if !token_rpc_buffer.is_empty() {
292 let fetched_tokens = self
293 .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
294 .await?;
295 token_db_buffer.extend(fetched_tokens);
296 }
297
298 if !token_db_buffer.is_empty() {
300 self.cache
301 .add_tokens_batch(std::mem::take(&mut token_db_buffer))
302 .await?;
303 }
304
305 if !pool_events_buffer.is_empty() {
307 let pools = self
308 .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
309 .await?;
310 total_saved += pools.len();
311 self.cache.add_pools_batch(pools).await?;
312 }
313
314 metrics.log_final_stats();
315
316 self.cache
318 .update_dex_last_synced_block(&dex.dex.name, to_block)
319 .await?;
320
321 log::info!(
322 "Successfully synced DEX {} pools up to block {} | Summary: discovered={}, saved={}, skipped_exists={}, skipped_invalid_tokens={}",
323 dex.dex.name,
324 to_block.separate_with_commas(),
325 total_discovered,
326 total_saved,
327 total_skipped_exists,
328 total_skipped_invalid_tokens
329 );
330
331 Ok(())
332 } => result
333 };
334
335 sync_result?;
336
337 if let Err(e) = self.cache.toggle_performance_settings(false).await {
339 log::warn!("Failed to restore default settings: {e}");
340 }
341
342 Ok(())
343 }
344
345 async fn fetch_and_cache_tokens_in_memory(
354 &mut self,
355 token_buffer: &mut HashSet<Address>,
356 ) -> anyhow::Result<Vec<Token>> {
357 let batch_addresses: Vec<Address> = token_buffer.drain().collect();
358 let token_infos = self
359 .erc20_contract
360 .batch_fetch_token_info(&batch_addresses)
361 .await?;
362
363 let mut valid_tokens = Vec::new();
364
365 for (token_address, token_info) in token_infos {
366 match token_info {
367 Ok(token_info) => {
368 let sanitized_name = sanitize_string(&token_info.name);
370 let sanitized_symbol = sanitize_string(&token_info.symbol);
371
372 let token = Token::new(
373 self.chain.clone(),
374 token_address,
375 sanitized_name,
376 sanitized_symbol,
377 token_info.decimals,
378 );
379
380 self.cache.insert_token_in_memory(token.clone());
382
383 valid_tokens.push(token);
385 }
386 Err(token_info_error) => {
387 self.cache.insert_invalid_token_in_memory(token_address);
388 if let Some(database) = &self.cache.database {
389 let sanitized_error = sanitize_string(&token_info_error.to_string());
390 database
391 .add_invalid_token(
392 self.chain.chain_id,
393 &token_address,
394 &sanitized_error,
395 )
396 .await?;
397 }
398 }
399 }
400 }
401
402 Ok(valid_tokens)
403 }
404
405 async fn construct_pools_batch(
414 &self,
415 pool_events: &mut Vec<PoolCreatedEvent>,
416 dex: &SharedDex,
417 ) -> anyhow::Result<Vec<Pool>> {
418 let mut pools = Vec::with_capacity(pool_events.len());
419
420 for pool_event in pool_events.drain(..) {
421 let token0 = match self.cache.get_token(&pool_event.token0) {
423 Some(token) => token.clone(),
424 None => {
425 if !self.cache.is_invalid_token(&pool_event.token0) {
426 log::warn!(
427 "Skipping pool {}: Token0 {} not in cache and not marked as invalid",
428 pool_event.pool_address,
429 pool_event.token0
430 );
431 }
432 continue;
433 }
434 };
435
436 let token1 = match self.cache.get_token(&pool_event.token1) {
437 Some(token) => token.clone(),
438 None => {
439 if !self.cache.is_invalid_token(&pool_event.token1) {
440 log::warn!(
441 "Skipping pool {}: Token1 {} not in cache and not marked as invalid",
442 pool_event.pool_address,
443 pool_event.token1
444 );
445 }
446 continue;
447 }
448 };
449
450 let mut pool = Pool::new(
451 self.chain.clone(),
452 dex.clone(),
453 pool_event.pool_address,
454 pool_event.pool_identifier,
455 pool_event.block_number,
456 token0,
457 token1,
458 pool_event.fee,
459 pool_event.tick_spacing,
460 nautilus_core::UnixNanos::default(),
461 );
462
463 if let Some(hooks) = pool_event.hooks {
465 pool.set_hooks(hooks);
466 }
467
468 if let (Some(sqrt_price_x96), Some(tick)) = (pool_event.sqrt_price_x96, pool_event.tick)
470 {
471 pool.initialize(sqrt_price_x96, tick);
472 }
473
474 pools.push(pool);
475 }
476
477 Ok(pools)
478 }
479}