1use std::{cmp::max, sync::Arc};
17
18use futures_util::StreamExt;
19use nautilus_common::messages::DataEvent;
20use nautilus_core::{hex, string::formatting::Separable};
21use nautilus_model::defi::{
22 Block, Blockchain, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate, PoolProfiler, PoolSwap,
23 SharedChain, SharedDex, SharedPool,
24 data::{DefiData, DexPoolData, PoolFeeCollect, PoolFlash, block::BlockPosition},
25 pool_analysis::{compare::compare_pool_profiler, snapshot::PoolSnapshot},
26 reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
27};
28use nautilus_network::websocket::TransportBackend;
29
30use crate::{
31 cache::BlockchainCache,
32 config::BlockchainDataClientConfig,
33 contracts::{erc20::Erc20Contract, uniswap_v3_pool::UniswapV3PoolContract},
34 data::subscription::DefiDataSubscriptionManager,
35 events::{
36 burn::BurnEvent, collect::CollectEvent, flash::FlashEvent, mint::MintEvent, swap::SwapEvent,
37 },
38 exchanges::{extended::DexExtended, get_dex_extended},
39 hypersync::{
40 client::HyperSyncClient,
41 helpers::{extract_block_number, extract_event_signature_bytes},
42 },
43 rpc::{
44 BlockchainRpcClient, BlockchainRpcClientAny,
45 chains::{
46 arbitrum::ArbitrumRpcClient, base::BaseRpcClient, ethereum::EthereumRpcClient,
47 polygon::PolygonRpcClient,
48 },
49 http::BlockchainHttpRpcClient,
50 types::BlockchainMessage,
51 },
52 services::PoolDiscoveryService,
53};
54
55const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
56
57#[derive(Debug)]
62pub struct BlockchainDataClientCore {
63 pub chain: SharedChain,
65 pub config: BlockchainDataClientConfig,
67 pub cache: BlockchainCache,
69 tokens: Erc20Contract,
71 univ3_pool: UniswapV3PoolContract,
73 pub hypersync_client: HyperSyncClient,
75 pub rpc_client: Option<BlockchainRpcClientAny>,
77 pub subscription_manager: DefiDataSubscriptionManager,
79 data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
81 cancellation_token: tokio_util::sync::CancellationToken,
83}
84
85impl BlockchainDataClientCore {
86 #[must_use]
92 pub fn new(
93 config: BlockchainDataClientConfig,
94 hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
95 data_tx: Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>,
96 cancellation_token: tokio_util::sync::CancellationToken,
97 ) -> Self {
98 let chain = config.chain.clone();
99 let cache = BlockchainCache::new(chain.clone());
100
101 log::info!(
103 "Initializing blockchain data client for '{}' with HTTP RPC: {}",
104 chain.name,
105 config.http_rpc_url
106 );
107
108 let rpc_client = if !config.use_hypersync_for_live_data && config.wss_rpc_url.is_some() {
109 let wss_rpc_url = config.wss_rpc_url.clone().expect("wss_rpc_url is required");
110 log::info!("WebSocket RPC URL: {wss_rpc_url}");
111 Some(Self::initialize_rpc_client(
112 chain.name,
113 wss_rpc_url,
114 config.transport_backend,
115 config.proxy_url.clone(),
116 ))
117 } else {
118 log::info!("Using HyperSync for live data (no WebSocket RPC)");
119 None
120 };
121 let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
122 config.http_rpc_url.clone(),
123 config.rpc_requests_per_second,
124 config.proxy_url.clone(),
125 ));
126 let erc20_contract = Erc20Contract::new(
127 http_rpc_client.clone(),
128 config.pool_filters.remove_pools_with_empty_erc20fields,
129 );
130
131 let hypersync_client =
132 HyperSyncClient::new(chain.clone(), hypersync_tx, cancellation_token.clone());
133 Self {
134 chain,
135 config,
136 rpc_client,
137 tokens: erc20_contract,
138 univ3_pool: UniswapV3PoolContract::new(http_rpc_client),
139 cache,
140 hypersync_client,
141 subscription_manager: DefiDataSubscriptionManager::new(),
142 data_tx,
143 cancellation_token,
144 }
145 }
146
147 pub async fn initialize_cache_database(&mut self) {
149 if let Some(pg_connect_options) = &self.config.postgres_cache_database_config {
150 log::info!(
151 "Initializing blockchain cache on database '{}'",
152 pg_connect_options.database
153 );
154 self.cache
155 .initialize_database(pg_connect_options.clone().into())
156 .await;
157 }
158 }
159
160 fn initialize_rpc_client(
162 blockchain: Blockchain,
163 wss_rpc_url: String,
164 transport_backend: TransportBackend,
165 proxy_url: Option<String>,
166 ) -> BlockchainRpcClientAny {
167 let mut client = match blockchain {
168 Blockchain::Ethereum => {
169 BlockchainRpcClientAny::Ethereum(EthereumRpcClient::new(wss_rpc_url, proxy_url))
170 }
171 Blockchain::Polygon => {
172 BlockchainRpcClientAny::Polygon(PolygonRpcClient::new(wss_rpc_url, proxy_url))
173 }
174 Blockchain::Base => {
175 BlockchainRpcClientAny::Base(BaseRpcClient::new(wss_rpc_url, proxy_url))
176 }
177 Blockchain::Arbitrum => {
178 BlockchainRpcClientAny::Arbitrum(ArbitrumRpcClient::new(wss_rpc_url, proxy_url))
179 }
180 _ => panic!("Unsupported blockchain {blockchain} for RPC connection"),
181 };
182 client.set_transport_backend(transport_backend);
183 client
184 }
185
186 pub async fn connect(&mut self) -> anyhow::Result<()> {
192 log::info!(
193 "Connecting blockchain data client for '{}'",
194 self.chain.name
195 );
196 self.initialize_cache_database().await;
197
198 if let Some(ref mut rpc_client) = self.rpc_client {
199 rpc_client.connect().await?;
200 }
201
202 let from_block = self.determine_from_block();
203
204 log::info!(
205 "Connecting to blockchain data source for '{}' from block {}",
206 self.chain.name,
207 from_block.separate_with_commas()
208 );
209
210 self.cache.initialize_chain().await;
212 self.cache.connect(from_block).await?;
214 for dex in self.config.dex_ids.clone() {
218 self.register_dex_exchange(dex).await?;
219 self.sync_exchange_pools(&dex, from_block, None, false)
220 .await?;
221 }
222
223 Ok(())
224 }
225
226 pub async fn sync_blocks_checked(
232 &mut self,
233 from_block: u64,
234 to_block: Option<u64>,
235 ) -> anyhow::Result<()> {
236 if let Some(blocks_status) = self.cache.get_cache_block_consistency_status().await {
237 if blocks_status.is_consistent() {
239 log::info!(
240 "Cache is consistent: no gaps detected (last continuous block: {})",
241 blocks_status.last_continuous_block
242 );
243 let target_block = max(blocks_status.max_block + 1, from_block);
244 log::info!(
245 "Starting fast sync with COPY from block {}",
246 target_block.separate_with_commas()
247 );
248 self.sync_blocks(target_block, to_block, true).await?;
249 } else {
250 let gap_size = blocks_status.max_block - blocks_status.last_continuous_block;
251 log::info!(
252 "Cache inconsistency detected: {} blocks missing between {} and {}",
253 gap_size,
254 blocks_status.last_continuous_block + 1,
255 blocks_status.max_block
256 );
257
258 log::info!(
259 "Block syncing Phase 1: Filling gaps with INSERT (blocks {} to {})",
260 blocks_status.last_continuous_block + 1,
261 blocks_status.max_block
262 );
263 self.sync_blocks(
264 blocks_status.last_continuous_block + 1,
265 Some(blocks_status.max_block),
266 false,
267 )
268 .await?;
269
270 log::info!(
271 "Block syncing Phase 2: Continuing with fast COPY from block {}",
272 (blocks_status.max_block + 1).separate_with_commas()
273 );
274 self.sync_blocks(blocks_status.max_block + 1, to_block, true)
275 .await?;
276 }
277 } else {
278 self.sync_blocks(from_block, to_block, true).await?;
279 }
280
281 Ok(())
282 }
283
284 pub async fn sync_blocks(
290 &mut self,
291 from_block: u64,
292 to_block: Option<u64>,
293 use_copy_command: bool,
294 ) -> anyhow::Result<()> {
295 const BATCH_SIZE: usize = 1000;
296
297 let to_block = if let Some(block) = to_block {
298 block
299 } else {
300 self.hypersync_client.current_block().await
301 };
302 let total_blocks = to_block.saturating_sub(from_block) + 1;
303 log::info!(
304 "Syncing blocks from {} to {} (total: {} blocks)",
305 from_block.separate_with_commas(),
306 to_block.separate_with_commas(),
307 total_blocks.separate_with_commas()
308 );
309
310 if let Err(e) = self.cache.toggle_performance_settings(true).await {
312 log::warn!("Failed to enable performance settings: {e}");
313 }
314
315 let blocks_stream = self
316 .hypersync_client
317 .request_blocks_stream(from_block, Some(to_block))
318 .await;
319
320 tokio::pin!(blocks_stream);
321
322 let mut metrics = BlockchainSyncReporter::new(
323 BlockchainSyncReportItems::Blocks,
324 from_block,
325 total_blocks,
326 BLOCKS_PROCESS_IN_SYNC_REPORT,
327 );
328
329 let mut batch: Vec<Block> = Vec::with_capacity(BATCH_SIZE);
330
331 let cancellation_token = self.cancellation_token.clone();
332 let sync_result = tokio::select! {
333 () = cancellation_token.cancelled() => {
334 log::info!("Block sync cancelled");
335 Err(anyhow::anyhow!("Sync cancelled"))
336 }
337 result = async {
338 while let Some(block) = blocks_stream.next().await {
339 let block_number = block.number;
340 if self.cache.get_block_timestamp(block_number).is_some() {
341 continue;
342 }
343 batch.push(block);
344
345 if batch.len() >= BATCH_SIZE || block_number >= to_block {
347 let batch_size = batch.len();
348
349 self.cache.add_blocks_batch(batch, use_copy_command).await?;
350 metrics.update(batch_size);
351
352 batch = Vec::with_capacity(BATCH_SIZE);
354 }
355
356 if metrics.should_log_progress(block_number, to_block) {
358 metrics.log_progress(block_number);
359 }
360 }
361
362 if !batch.is_empty() {
364 let batch_size = batch.len();
365 self.cache.add_blocks_batch(batch, use_copy_command).await?;
366 metrics.update(batch_size);
367 }
368
369 metrics.log_final_stats();
370 Ok(())
371 } => result
372 };
373
374 sync_result?;
375
376 if let Err(e) = self.cache.toggle_performance_settings(false).await {
378 log::warn!("Failed to restore default settings: {e}");
379 }
380
381 Ok(())
382 }
383
384 pub async fn sync_pool_events(
390 &mut self,
391 dex: &DexType,
392 pool_identifier: PoolIdentifier,
393 from_block: Option<u64>,
394 to_block: Option<u64>,
395 reset: bool,
396 ) -> anyhow::Result<()> {
397 const EVENT_BATCH_SIZE: usize = 20000;
398
399 let pool: SharedPool = self.get_pool(&pool_identifier)?.clone();
400 let pool_display = pool.to_full_spec_string();
401 let from_block = from_block.unwrap_or(pool.creation_block);
402 let pool_address = &pool.address;
404
405 let (last_synced_block, effective_from_block) = if reset {
406 (None, from_block)
407 } else {
408 let last_synced_block = self
409 .cache
410 .get_pool_last_synced_block(dex, &pool_identifier)
411 .await?;
412 let effective_from_block = last_synced_block
413 .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
414 (last_synced_block, effective_from_block)
415 };
416
417 let to_block = match to_block {
418 Some(block) => block,
419 None => self.hypersync_client.current_block().await,
420 };
421
422 if effective_from_block > to_block {
424 log::info!(
425 "D {} already synced to block {} (current: {}), skipping sync",
426 dex,
427 last_synced_block.unwrap_or(0).separate_with_commas(),
428 to_block.separate_with_commas()
429 );
430 return Ok(());
431 }
432
433 let last_block_across_pool_events_table = self
435 .cache
436 .get_pool_event_tables_last_block(&pool_identifier)
437 .await?;
438
439 let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
440 log::info!(
441 "Syncing Pool: '{}' events from {} to {} (total: {} blocks){}",
442 pool_display,
443 effective_from_block.separate_with_commas(),
444 to_block.separate_with_commas(),
445 total_blocks.separate_with_commas(),
446 if let Some(last_synced) = last_synced_block {
447 format!(
448 " - resuming from last synced block {}",
449 last_synced.separate_with_commas()
450 )
451 } else {
452 String::new()
453 }
454 );
455
456 let mut metrics = BlockchainSyncReporter::new(
457 BlockchainSyncReportItems::PoolEvents,
458 effective_from_block,
459 total_blocks,
460 BLOCKS_PROCESS_IN_SYNC_REPORT,
461 );
462 let dex_extended = self.get_dex_extended(dex)?.clone();
463 let swap_event_signature = dex_extended.swap_created_event.as_ref();
464 let mint_event_signature = dex_extended.mint_created_event.as_ref();
465 let burn_event_signature = dex_extended.burn_created_event.as_ref();
466 let collect_event_signature = dex_extended.collect_created_event.as_ref();
467 let flash_event_signature = dex_extended.flash_created_event.as_ref();
468 let initialize_event_signature: Option<&str> =
469 dex_extended.initialize_event.as_ref().map(|s| s.as_ref());
470
471 let swap_sig_bytes = hex::decode(
473 swap_event_signature
474 .strip_prefix("0x")
475 .unwrap_or(swap_event_signature),
476 )?;
477 let mint_sig_bytes = hex::decode(
478 mint_event_signature
479 .strip_prefix("0x")
480 .unwrap_or(mint_event_signature),
481 )?;
482 let burn_sig_bytes = hex::decode(
483 burn_event_signature
484 .strip_prefix("0x")
485 .unwrap_or(burn_event_signature),
486 )?;
487 let collect_sig_bytes = hex::decode(
488 collect_event_signature
489 .strip_prefix("0x")
490 .unwrap_or(collect_event_signature),
491 )?;
492 let flash_sig_bytes = flash_event_signature
493 .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
494 let initialize_sig_bytes = initialize_event_signature
495 .map(|s| hex::decode(s.strip_prefix("0x").unwrap_or(s)).unwrap_or_default());
496
497 let mut event_signatures = vec![
498 swap_event_signature,
499 mint_event_signature,
500 burn_event_signature,
501 collect_event_signature,
502 ];
503
504 if let Some(event) = dex_extended.initialize_event.as_ref() {
505 event_signatures.push(event);
506 }
507
508 if let Some(event) = dex_extended.flash_created_event.as_ref() {
509 event_signatures.push(event);
510 }
511 let pool_events_stream = self
512 .hypersync_client
513 .request_contract_events_stream(
514 effective_from_block,
515 Some(to_block),
516 pool_address,
517 event_signatures,
518 )
519 .await;
520 tokio::pin!(pool_events_stream);
521
522 let mut last_block_saved = effective_from_block;
523 let mut blocks_processed = 0;
524
525 let mut swap_batch: Vec<PoolSwap> = Vec::with_capacity(EVENT_BATCH_SIZE);
526 let mut liquidity_batch: Vec<PoolLiquidityUpdate> = Vec::with_capacity(EVENT_BATCH_SIZE);
527 let mut collect_batch: Vec<PoolFeeCollect> = Vec::with_capacity(EVENT_BATCH_SIZE);
528 let mut flash_batch: Vec<PoolFlash> = Vec::with_capacity(EVENT_BATCH_SIZE);
529
530 let mut beyond_stale_data = last_block_across_pool_events_table
532 .is_none_or(|tables_max| effective_from_block > tables_max);
533
534 let cancellation_token = self.cancellation_token.clone();
535 let sync_result = tokio::select! {
536 () = cancellation_token.cancelled() => {
537 log::info!("Pool event sync cancelled");
538 Err(anyhow::anyhow!("Sync cancelled"))
539 }
540 result = async {
541 while let Some(log) = pool_events_stream.next().await {
542 let block_number = extract_block_number(&log)?;
543 blocks_processed += block_number - last_block_saved;
544 last_block_saved = block_number;
545
546 let event_sig_bytes = extract_event_signature_bytes(&log)?;
547 if event_sig_bytes == swap_sig_bytes.as_slice() {
548 let swap_event = dex_extended.parse_swap_event_hypersync(&log)?;
549 match self.process_pool_swap_event(&swap_event, &pool) {
550 Ok(swap) => swap_batch.push(swap),
551 Err(e) => log::error!("Failed to process swap event: {e}"),
552 }
553 } else if event_sig_bytes == mint_sig_bytes.as_slice() {
554 let mint_event = dex_extended.parse_mint_event_hypersync(&log)?;
555 match self.process_pool_mint_event(&mint_event, &pool, &dex_extended) {
556 Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
557 Err(e) => log::error!("Failed to process mint event: {e}"),
558 }
559 } else if event_sig_bytes == burn_sig_bytes.as_slice() {
560 let burn_event = dex_extended.parse_burn_event_hypersync(&log)?;
561 match self.process_pool_burn_event(&burn_event, &pool, &dex_extended) {
562 Ok(liquidity_update) => liquidity_batch.push(liquidity_update),
563 Err(e) => log::error!("Failed to process burn event: {e}"),
564 }
565 } else if event_sig_bytes == collect_sig_bytes.as_slice() {
566 let collect_event = dex_extended.parse_collect_event_hypersync(&log)?;
567 match self.process_pool_collect_event(&collect_event, &pool, &dex_extended) {
568 Ok(fee_collect) => collect_batch.push(fee_collect),
569 Err(e) => log::error!("Failed to process collect event: {e}"),
570 }
571 } else if initialize_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
572 let initialize_event = dex_extended.parse_initialize_event_hypersync(&log)?;
573 self.cache
574 .update_pool_initialize_price_tick(&initialize_event)
575 .await?;
576 } else if flash_sig_bytes.as_ref().is_some_and(|sig| sig.as_slice() == event_sig_bytes) {
577 if let Some(parse_fn) = dex_extended.parse_flash_event_hypersync_fn {
578 match parse_fn(dex_extended.dex.clone(), &log) {
579 Ok(flash_event) => {
580 match self.process_pool_flash_event(&flash_event, &pool) {
581 Ok(flash) => flash_batch.push(flash),
582 Err(e) => log::error!("Failed to process flash event: {e}"),
583 }
584 }
585 Err(e) => log::error!("Failed to parse flash event: {e}"),
586 }
587 }
588 } else {
589 let event_signature = hex::encode(event_sig_bytes);
590 log::error!(
591 "Unexpected event signature: {event_signature} for log {log:?}"
592 );
593 }
594
595 if !beyond_stale_data
597 && last_block_across_pool_events_table
598 .is_some_and(|table_max| block_number > table_max)
599 {
600 log::info!(
601 "Crossed beyond stale data at block {block_number} - flushing current batches with ON CONFLICT, then switching to COPY"
602 );
603
604 self.flush_event_batches(
606 EVENT_BATCH_SIZE,
607 &mut swap_batch,
608 &mut liquidity_batch,
609 &mut collect_batch,
610 &mut flash_batch,
611 false,
612 true,
613 )
614 .await?;
615
616 beyond_stale_data = true;
617 log::info!("Switched to COPY mode - future batches will use COPY command");
618 } else {
619 self.flush_event_batches(
621 EVENT_BATCH_SIZE,
622 &mut swap_batch,
623 &mut liquidity_batch,
624 &mut collect_batch,
625 &mut flash_batch,
626 false, false,
628 )
629 .await?;
630 }
631
632 metrics.update(blocks_processed as usize);
633 blocks_processed = 0;
634
635 if metrics.should_log_progress(block_number, to_block) {
637 metrics.log_progress(block_number);
638 self.cache
639 .update_pool_last_synced_block(dex, &pool_identifier, block_number)
640 .await?;
641 }
642 }
643
644 self.flush_event_batches(
645 EVENT_BATCH_SIZE,
646 &mut swap_batch,
647 &mut liquidity_batch,
648 &mut collect_batch,
649 &mut flash_batch,
650 false,
651 true,
652 )
653 .await?;
654
655 metrics.log_final_stats();
656 self.cache
657 .update_pool_last_synced_block(dex, &pool_identifier, to_block)
658 .await?;
659
660 log::info!(
661 "Successfully synced Dex '{}' Pool '{}' up to block {}",
662 dex,
663 pool_display,
664 to_block.separate_with_commas()
665 );
666 Ok(())
667 } => result
668 };
669
670 sync_result
671 }
672
673 #[expect(clippy::too_many_arguments)]
674 async fn flush_event_batches(
675 &self,
676 event_batch_size: usize,
677 swap_batch: &mut Vec<PoolSwap>,
678 liquidity_batch: &mut Vec<PoolLiquidityUpdate>,
679 collect_batch: &mut Vec<PoolFeeCollect>,
680 flash_batch: &mut Vec<PoolFlash>,
681 use_copy_command: bool,
682 force_flush_all: bool,
683 ) -> anyhow::Result<()> {
684 if (force_flush_all || swap_batch.len() >= event_batch_size) && !swap_batch.is_empty() {
685 self.cache
686 .add_pool_swaps_batch(swap_batch, use_copy_command)
687 .await?;
688 swap_batch.clear();
689 }
690
691 if (force_flush_all || liquidity_batch.len() >= event_batch_size)
692 && !liquidity_batch.is_empty()
693 {
694 self.cache
695 .add_pool_liquidity_updates_batch(liquidity_batch, use_copy_command)
696 .await?;
697 liquidity_batch.clear();
698 }
699
700 if (force_flush_all || collect_batch.len() >= event_batch_size) && !collect_batch.is_empty()
701 {
702 self.cache
703 .add_pool_fee_collects_batch(collect_batch, use_copy_command)
704 .await?;
705 collect_batch.clear();
706 }
707
708 if (force_flush_all || flash_batch.len() >= event_batch_size) && !flash_batch.is_empty() {
709 self.cache.add_pool_flash_batch(flash_batch).await?;
710 flash_batch.clear();
711 }
712 Ok(())
713 }
714
715 pub fn process_pool_swap_event(
721 &self,
722 swap_event: &SwapEvent,
723 pool: &SharedPool,
724 ) -> anyhow::Result<PoolSwap> {
725 let timestamp = self
726 .cache
727 .get_block_timestamp(swap_event.block_number)
728 .copied();
729 let mut swap = swap_event.to_pool_swap(
730 self.chain.clone(),
731 pool.instrument_id,
732 pool.pool_identifier,
733 timestamp,
734 );
735 swap.calculate_trade_info(&pool.token0, &pool.token1, None)?;
736
737 Ok(swap)
738 }
739
740 pub fn process_pool_mint_event(
746 &self,
747 mint_event: &MintEvent,
748 pool: &SharedPool,
749 dex_extended: &DexExtended,
750 ) -> anyhow::Result<PoolLiquidityUpdate> {
751 let timestamp = self
752 .cache
753 .get_block_timestamp(mint_event.block_number)
754 .copied();
755
756 let liquidity_update = mint_event.to_pool_liquidity_update(
757 self.chain.clone(),
758 dex_extended.dex.clone(),
759 pool.instrument_id,
760 timestamp,
761 );
762
763 Ok(liquidity_update)
766 }
767
768 pub fn process_pool_burn_event(
775 &self,
776 burn_event: &BurnEvent,
777 pool: &SharedPool,
778 dex_extended: &DexExtended,
779 ) -> anyhow::Result<PoolLiquidityUpdate> {
780 let timestamp = self
781 .cache
782 .get_block_timestamp(burn_event.block_number)
783 .copied();
784
785 let liquidity_update = burn_event.to_pool_liquidity_update(
786 self.chain.clone(),
787 dex_extended.dex.clone(),
788 pool.instrument_id,
789 pool.pool_identifier,
790 timestamp,
791 );
792
793 Ok(liquidity_update)
796 }
797
798 pub fn process_pool_collect_event(
804 &self,
805 collect_event: &CollectEvent,
806 pool: &SharedPool,
807 dex_extended: &DexExtended,
808 ) -> anyhow::Result<PoolFeeCollect> {
809 let timestamp = self
810 .cache
811 .get_block_timestamp(collect_event.block_number)
812 .copied();
813
814 let fee_collect = collect_event.to_pool_fee_collect(
815 self.chain.clone(),
816 dex_extended.dex.clone(),
817 pool.instrument_id,
818 timestamp,
819 );
820
821 Ok(fee_collect)
822 }
823
824 pub fn process_pool_flash_event(
830 &self,
831 flash_event: &FlashEvent,
832 pool: &SharedPool,
833 ) -> anyhow::Result<PoolFlash> {
834 let timestamp = self
835 .cache
836 .get_block_timestamp(flash_event.block_number)
837 .copied();
838
839 let flash = flash_event.to_pool_flash(self.chain.clone(), pool.instrument_id, timestamp);
840
841 Ok(flash)
842 }
843
844 pub async fn sync_exchange_pools(
855 &mut self,
856 dex: &DexType,
857 from_block: u64,
858 to_block: Option<u64>,
859 reset: bool,
860 ) -> anyhow::Result<()> {
861 let dex_extended = self.get_dex_extended(dex)?.clone();
862
863 let mut service = PoolDiscoveryService::new(
864 self.chain.clone(),
865 &mut self.cache,
866 &self.tokens,
867 &self.hypersync_client,
868 self.cancellation_token.clone(),
869 self.config.clone(),
870 );
871
872 service
873 .sync_pools(&dex_extended, from_block, to_block, reset)
874 .await?;
875
876 Ok(())
877 }
878
879 pub async fn register_dex_exchange(&mut self, dex_id: DexType) -> anyhow::Result<()> {
890 if let Some(dex_extended) = get_dex_extended(self.chain.name, &dex_id) {
891 log::info!("Registering DEX {dex_id} on chain {}", self.chain.name);
892
893 self.cache.add_dex(dex_extended.dex.clone()).await?;
894 let _ = self.cache.load_pools(&dex_id).await?;
895
896 self.subscription_manager.register_dex_for_subscriptions(
897 dex_id,
898 dex_extended.swap_created_event.as_ref(),
899 dex_extended.mint_created_event.as_ref(),
900 dex_extended.burn_created_event.as_ref(),
901 dex_extended.collect_created_event.as_ref(),
902 dex_extended.flash_created_event.as_deref(),
903 );
904 Ok(())
905 } else {
906 anyhow::bail!("Unknown DEX {dex_id} on chain {}", self.chain.name)
907 }
908 }
909
910 pub async fn bootstrap_latest_pool_profiler(
931 &mut self,
932 pool: &SharedPool,
933 ) -> anyhow::Result<(PoolProfiler, bool)> {
934 log::info!(
935 "Bootstrapping latest pool profiler for pool {}",
936 pool.address
937 );
938
939 if self.cache.database.is_none() {
940 anyhow::bail!(
941 "Database is not initialized, so we cannot properly bootstrap the latest pool profiler"
942 );
943 }
944
945 let mut profiler = PoolProfiler::new(pool.clone());
946
947 let from_position = match self
949 .cache
950 .database
951 .as_ref()
952 .unwrap()
953 .load_latest_valid_pool_snapshot(pool.chain.chain_id, &pool.pool_identifier)
954 .await
955 {
956 Ok(Some(snapshot)) => {
957 log::info!(
958 "Loaded valid snapshot from block {} which contains {} positions and {} ticks",
959 snapshot.block_position.number.separate_with_commas(),
960 snapshot.positions.len(),
961 snapshot.ticks.len()
962 );
963 let block_position = snapshot.block_position.clone();
964 profiler.restore_from_snapshot(snapshot)?;
965 log::info!("Restored profiler from snapshot");
966 Some(block_position)
967 }
968 _ => {
969 log::info!("No valid snapshot found, processing from beginning");
970 None
971 }
972 };
973
974 if self
978 .cache
979 .database
980 .as_ref()
981 .unwrap()
982 .get_pool_last_synced_block(self.chain.chain_id, &pool.dex.name, &pool.pool_identifier)
983 .await?
984 .is_none()
985 {
986 return self
987 .construct_pool_profiler_from_hypersync_rpc(profiler, from_position)
988 .await;
989 }
990
991 if let Err(e) = self
993 .sync_pool_events(&pool.dex.name, pool.pool_identifier, None, None, false)
994 .await
995 {
996 log::error!("Failed to sync pool events for snapshot request: {e}");
997 }
998
999 if !profiler.is_initialized {
1000 if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
1001 profiler.initialize(initial_sqrt_price_x96);
1002 } else {
1003 anyhow::bail!(
1004 "Pool is not initialized and it doesn't contain initial price, cannot bootstrap profiler"
1005 );
1006 }
1007 }
1008
1009 let from_block = from_position
1010 .as_ref()
1011 .map_or(profiler.pool.creation_block, |block_position| {
1012 block_position.number
1013 });
1014 let to_block = self.hypersync_client.current_block().await;
1015 let total_blocks = to_block.saturating_sub(from_block) + 1;
1016
1017 profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1019
1020 let mut stream = self.cache.database.as_ref().unwrap().stream_pool_events(
1021 pool.chain.clone(),
1022 pool.dex.clone(),
1023 pool.instrument_id,
1024 pool.pool_identifier,
1025 from_position.clone(),
1026 );
1027
1028 while let Some(result) = stream.next().await {
1029 match result {
1030 Ok(event) => {
1031 profiler.process(&event)?;
1032 }
1033 Err(e) => log::error!("Error processing event: {e}"),
1034 }
1035 }
1036
1037 profiler.finalize_reporting();
1038
1039 Ok((profiler, false))
1040 }
1041
1042 async fn construct_pool_profiler_from_hypersync_rpc(
1062 &self,
1063 mut profiler: PoolProfiler,
1064 from_position: Option<BlockPosition>,
1065 ) -> anyhow::Result<(PoolProfiler, bool)> {
1066 log::info!("Constructing pool profiler from hypersync stream and RPC final state querying");
1067 let dex_extended = self.get_dex_extended(&profiler.pool.dex.name)?.clone();
1068 let mint_event_signature = dex_extended.mint_created_event.as_ref();
1069 let burn_event_signature = dex_extended.burn_created_event.as_ref();
1070 let initialize_event_signature =
1071 if let Some(initialize_event) = &dex_extended.initialize_event {
1072 initialize_event.as_ref()
1073 } else {
1074 anyhow::bail!(
1075 "DEX {} does not have initialize event set.",
1076 &profiler.pool.dex.name
1077 );
1078 };
1079 let mint_sig_bytes = hex::decode(
1080 mint_event_signature
1081 .strip_prefix("0x")
1082 .unwrap_or(mint_event_signature),
1083 )?;
1084 let burn_sig_bytes = hex::decode(
1085 burn_event_signature
1086 .strip_prefix("0x")
1087 .unwrap_or(burn_event_signature),
1088 )?;
1089 let initialize_sig_bytes = hex::decode(
1090 initialize_event_signature
1091 .strip_prefix("0x")
1092 .unwrap_or(initialize_event_signature),
1093 )?;
1094
1095 let from_block = from_position.map_or(profiler.pool.creation_block, |block_position| {
1096 block_position.number
1097 });
1098 let to_block = self.hypersync_client.current_block().await;
1099 let total_blocks = to_block.saturating_sub(from_block) + 1;
1100
1101 log::info!(
1102 "Bootstrapping pool profiler for pool {} from block {} to {} (total: {} blocks)",
1103 profiler.pool.address,
1104 from_block.separate_with_commas(),
1105 to_block.separate_with_commas(),
1106 total_blocks.separate_with_commas()
1107 );
1108
1109 profiler.enable_reporting(from_block, total_blocks, BLOCKS_PROCESS_IN_SYNC_REPORT);
1111
1112 let pool_events_stream = self
1113 .hypersync_client
1114 .request_contract_events_stream(
1115 from_block,
1116 None,
1117 &profiler.pool.address,
1118 vec![
1119 mint_event_signature,
1120 burn_event_signature,
1121 initialize_event_signature,
1122 ],
1123 )
1124 .await;
1125 tokio::pin!(pool_events_stream);
1126
1127 while let Some(log) = pool_events_stream.next().await {
1128 let event_sig_bytes = extract_event_signature_bytes(&log)?;
1129
1130 if event_sig_bytes == initialize_sig_bytes {
1131 let initialize_event = dex_extended.parse_initialize_event_hypersync(&log)?;
1132 profiler.initialize(initialize_event.sqrt_price_x96);
1133 self.cache
1134 .database
1135 .as_ref()
1136 .unwrap()
1137 .update_pool_initial_price_tick(self.chain.chain_id, &initialize_event)
1138 .await?;
1139 } else if event_sig_bytes == mint_sig_bytes {
1140 let mint_event = dex_extended.parse_mint_event_hypersync(&log)?;
1141 match self.process_pool_mint_event(&mint_event, &profiler.pool, &dex_extended) {
1142 Ok(liquidity_update) => {
1143 profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1144 }
1145 Err(e) => log::error!("Failed to process mint event: {e}"),
1146 }
1147 } else if event_sig_bytes == burn_sig_bytes {
1148 let burn_event = dex_extended.parse_burn_event_hypersync(&log)?;
1149 match self.process_pool_burn_event(&burn_event, &profiler.pool, &dex_extended) {
1150 Ok(liquidity_update) => {
1151 profiler.process(&DexPoolData::LiquidityUpdate(liquidity_update))?;
1152 }
1153 Err(e) => log::error!("Failed to process burn event: {e}"),
1154 }
1155 } else {
1156 let event_signature = hex::encode(event_sig_bytes);
1157 log::error!(
1158 "Unexpected event signature in bootstrap_latest_pool_profiler: {event_signature} for log {log:?}"
1159 );
1160 }
1161 }
1162
1163 profiler.finalize_reporting();
1164
1165 match self.get_on_chain_snapshot(&profiler).await {
1167 Ok(on_chain_snapshot) => profiler.restore_from_snapshot(on_chain_snapshot)?,
1168 Err(e) => log::error!(
1169 "Failed to restore from on-chain snapshot: {e}, sending not hydrated state to client"
1170 ),
1171 }
1172
1173 Ok((profiler, true))
1174 }
1175
1176 pub async fn check_snapshot_validity(
1191 &self,
1192 profiler: &PoolProfiler,
1193 already_validated: bool,
1194 ) -> anyhow::Result<bool> {
1195 let (is_valid, block_position) = if already_validated {
1197 log::info!("Snapshot already validated from RPC, skipping on-chain comparison");
1199 let last_event = profiler
1200 .last_processed_event
1201 .clone()
1202 .expect("Profiler should have last_processed_event");
1203 (true, last_event)
1204 } else {
1205 match self.get_on_chain_snapshot(profiler).await {
1207 Ok(on_chain_snapshot) => {
1208 log::info!("Comparing profiler state with on-chain state...");
1209 let valid = compare_pool_profiler(profiler, &on_chain_snapshot);
1210 if !valid {
1211 log::error!(
1212 "Pool profiler state does NOT match on-chain smart contract state"
1213 );
1214 }
1215 (valid, on_chain_snapshot.block_position)
1216 }
1217 Err(e) => {
1218 log::error!("Failed to check snapshot validity: {e}");
1219 return Ok(false);
1220 }
1221 }
1222 };
1223
1224 if is_valid && let Some(cache_database) = &self.cache.database {
1226 cache_database
1227 .mark_pool_snapshot_valid(
1228 profiler.pool.chain.chain_id,
1229 &profiler.pool.pool_identifier,
1230 block_position.number,
1231 block_position.transaction_index,
1232 block_position.log_index,
1233 )
1234 .await?;
1235 log::info!("Marked pool profiler snapshot as valid");
1236 }
1237
1238 Ok(is_valid)
1239 }
1240
1241 async fn get_on_chain_snapshot(&self, profiler: &PoolProfiler) -> anyhow::Result<PoolSnapshot> {
1247 if profiler.pool.dex.name == DexType::UniswapV3 {
1248 let last_processed_event = profiler
1249 .last_processed_event
1250 .clone()
1251 .expect("We expect at least one processed event in the pool");
1252 let on_chain_snapshot = self
1253 .univ3_pool
1254 .fetch_snapshot(
1255 &profiler.pool.address,
1256 profiler.pool.instrument_id,
1257 profiler.get_active_tick_values().as_slice(),
1258 &profiler.get_all_position_keys(),
1259 last_processed_event,
1260 )
1261 .await?;
1262
1263 Ok(on_chain_snapshot)
1264 } else {
1265 anyhow::bail!(
1266 "Fetching on-chain snapshot for Dex protocol {} is not supported yet.",
1267 profiler.pool.dex.name
1268 )
1269 }
1270 }
1271
1272 pub async fn replay_pool_events(&self, pool: &Pool, dex: &SharedDex) -> anyhow::Result<()> {
1281 if let Some(database) = &self.cache.database {
1282 log::info!(
1283 "Replaying historical events for pool {} to hydrate profiler",
1284 pool.instrument_id
1285 );
1286
1287 let mut event_stream = database.stream_pool_events(
1288 self.chain.clone(),
1289 dex.clone(),
1290 pool.instrument_id,
1291 pool.pool_identifier,
1292 None,
1293 );
1294 let mut event_count = 0;
1295
1296 while let Some(event_result) = event_stream.next().await {
1297 match event_result {
1298 Ok(event) => {
1299 let data_event = match event {
1300 DexPoolData::Swap(swap) => DataEvent::DeFi(DefiData::PoolSwap(swap)),
1301 DexPoolData::LiquidityUpdate(update) => {
1302 DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))
1303 }
1304 DexPoolData::FeeCollect(collect) => {
1305 DataEvent::DeFi(DefiData::PoolFeeCollect(collect))
1306 }
1307 DexPoolData::Flash(flash) => {
1308 DataEvent::DeFi(DefiData::PoolFlash(flash))
1309 }
1310 };
1311 self.send_data(data_event);
1312 event_count += 1;
1313 }
1314 Err(e) => {
1315 log::error!("Error streaming event for pool {}: {e}", pool.instrument_id);
1316 }
1317 }
1318 }
1319
1320 log::info!(
1321 "Replayed {event_count} historical events for pool {}",
1322 pool.instrument_id
1323 );
1324 } else {
1325 log::debug!(
1326 "No database available, skipping event replay for pool {}",
1327 pool.instrument_id
1328 );
1329 }
1330
1331 Ok(())
1332 }
1333
1334 fn determine_from_block(&self) -> u64 {
1336 self.config
1337 .from_block
1338 .unwrap_or_else(|| self.cache.min_dex_creation_block().unwrap_or(0))
1339 }
1340
1341 fn get_dex_extended(&self, dex_id: &DexType) -> anyhow::Result<&DexExtended> {
1343 if !self.cache.get_registered_dexes().contains(dex_id) {
1344 anyhow::bail!("DEX {dex_id} is not registered in the data client");
1345 }
1346
1347 match get_dex_extended(self.chain.name, dex_id) {
1348 Some(dex) => Ok(dex),
1349 None => anyhow::bail!("Dex {dex_id} doesn't exist for chain {}", self.chain.name),
1350 }
1351 }
1352
1353 pub fn get_pool(&self, pool_identifier: &PoolIdentifier) -> anyhow::Result<&SharedPool> {
1359 match self.cache.get_pool(pool_identifier) {
1360 Some(pool) => Ok(pool),
1361 None => anyhow::bail!("Pool {pool_identifier} is not registered"),
1362 }
1363 }
1364
1365 pub fn send_data(&self, data: DataEvent) {
1367 if let Some(data_tx) = &self.data_tx {
1368 log::debug!("Sending {data}");
1369
1370 if let Err(e) = data_tx.send(data) {
1371 log::error!("Failed to send data: {e}");
1372 }
1373 } else {
1374 log::error!("No data event channel for sending data");
1375 }
1376 }
1377
1378 pub async fn disconnect(&mut self) {
1383 self.hypersync_client.disconnect().await;
1384 }
1385}