Skip to main content

nautilus_blockchain/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Caching layer for blockchain entities and domain objects.
17//!
18//! This module provides an in-memory cache with optional PostgreSQL persistence for storing
19//! and retrieving blockchain-related data such as blocks, tokens, pools, swaps, and other
20//! DeFi protocol events.
21
22use std::{
23    collections::{BTreeMap, HashMap, HashSet},
24    sync::Arc,
25};
26
27use alloy::primitives::Address;
28use nautilus_core::UnixNanos;
29use nautilus_model::defi::{
30    Block, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex,
31    SharedPool, Token,
32    data::{PoolFeeCollect, PoolFlash},
33    pool_analysis::{position::PoolPosition, snapshot::PoolSnapshot},
34    tick_map::tick::PoolTick,
35};
36use sqlx::postgres::PgConnectOptions;
37
38use crate::{
39    cache::{consistency::CachedBlocksConsistencyStatus, database::BlockchainCacheDatabase},
40    events::initialize::InitializeEvent,
41};
42
43pub mod consistency;
44pub mod copy;
45pub mod database;
46pub mod rows;
47pub mod types;
48
49/// Provides caching functionality for various blockchain domain objects.
50#[derive(Debug)]
51pub struct BlockchainCache {
52    /// The blockchain chain this cache is associated with.
53    chain: SharedChain,
54    /// Map of block numbers to their corresponding timestamp
55    block_timestamps: BTreeMap<u64, UnixNanos>,
56    /// Map of DEX identifiers to their corresponding DEX objects.
57    dexes: HashMap<DexType, SharedDex>,
58    /// Map of token addresses to their corresponding `Token` objects.
59    tokens: HashMap<Address, Token>,
60    /// Cached set of invalid token addresses that failed validation or processing.
61    invalid_tokens: HashSet<Address>,
62    /// Map of pool identifiers to their corresponding `Pool` objects.
63    pools: HashMap<PoolIdentifier, SharedPool>,
64    /// Optional database connection for persistent storage.
65    pub database: Option<BlockchainCacheDatabase>,
66}
67
68impl BlockchainCache {
69    /// Creates a new in-memory blockchain cache for the specified chain.
70    #[must_use]
71    pub fn new(chain: SharedChain) -> Self {
72        Self {
73            chain,
74            dexes: HashMap::new(),
75            tokens: HashMap::new(),
76            invalid_tokens: HashSet::new(),
77            pools: HashMap::new(),
78            block_timestamps: BTreeMap::new(),
79            database: None,
80        }
81    }
82
83    /// Returns the highest continuous block number currently cached, if any.
84    pub async fn get_cache_block_consistency_status(
85        &self,
86    ) -> Option<CachedBlocksConsistencyStatus> {
87        let database = self.database.as_ref()?;
88        database
89            .get_block_consistency_status(&self.chain)
90            .await
91            .map_err(|e| log::error!("Error getting block consistency status: {e}"))
92            .ok()
93    }
94
95    /// Returns the earliest block number where any DEX in the cache was created on the blockchain.
96    #[must_use]
97    pub fn min_dex_creation_block(&self) -> Option<u64> {
98        self.dexes
99            .values()
100            .map(|dex| dex.factory_creation_block)
101            .min()
102    }
103
104    /// Returns the timestamp for the specified block number if it exists in the cache.
105    #[must_use]
106    pub fn get_block_timestamp(&self, block_number: u64) -> Option<&UnixNanos> {
107        self.block_timestamps.get(&block_number)
108    }
109
110    /// Initializes the database connection for persistent storage.
111    pub async fn initialize_database(&mut self, pg_connect_options: PgConnectOptions) {
112        let database = BlockchainCacheDatabase::init(pg_connect_options).await;
113        self.database = Some(database);
114    }
115
116    /// Toggles performance optimization settings in the database.
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if the database is not initialized or the operation fails.
121    pub async fn toggle_performance_settings(&self, enable: bool) -> anyhow::Result<()> {
122        if let Some(database) = &self.database {
123            database.toggle_perf_sync_settings(enable).await
124        } else {
125            log::warn!("Database not initialized, skipping performance settings toggle");
126            Ok(())
127        }
128    }
129
130    /// Initializes the chain by seeding it in the database and creating necessary partitions.
131    ///
132    /// This method sets up the blockchain chain in the database, creates block and token
133    /// partitions for optimal performance, and loads existing tokens into the cache.
134    pub async fn initialize_chain(&mut self) {
135        // Seed target adapter chain in database
136        if let Some(database) = &self.database {
137            if let Err(e) = database.seed_chain(&self.chain).await {
138                log::error!(
139                    "Error seeding chain in database: {e}. Continuing without database cache functionality"
140                );
141                return;
142            }
143            log::info!("Chain seeded in the database");
144
145            match database.create_block_partition(&self.chain).await {
146                Ok(message) => log::info!("Executing block partition creation: {message}"),
147                Err(e) => log::error!(
148                    "Error creating block partition for chain {}: {e}. Continuing without partition creation...",
149                    self.chain.chain_id
150                ),
151            }
152
153            match database.create_token_partition(&self.chain).await {
154                Ok(message) => log::info!("Executing token partition creation: {message}"),
155                Err(e) => log::error!(
156                    "Error creating token partition for chain {}: {e}. Continuing without partition creation...",
157                    self.chain.chain_id
158                ),
159            }
160        }
161
162        if let Err(e) = self.load_tokens().await {
163            log::error!("Error loading tokens from the database: {e}");
164        }
165    }
166
167    /// Connects to the database and loads initial data.
168    ///
169    /// # Errors
170    ///
171    /// Returns an error if database seeding, token loading, or block loading fails.
172    pub async fn connect(&mut self, from_block: u64) -> anyhow::Result<()> {
173        log::debug!("Connecting and loading from_block {from_block}");
174
175        if let Err(e) = self.load_tokens().await {
176            log::error!("Error loading tokens from the database: {e}");
177        }
178
179        // TODO disable block syncing for now as we don't have timestamps yet configured
180        // if let Err(e) = self.load_blocks(from_block).await {
181        //     log::error!("Error loading blocks from database: {e}");
182        // }
183
184        Ok(())
185    }
186
187    /// Loads tokens from the database into the in-memory cache.
188    async fn load_tokens(&mut self) -> anyhow::Result<()> {
189        if let Some(database) = &self.database {
190            let (tokens, invalid_tokens) = tokio::try_join!(
191                database.load_tokens(self.chain.clone()),
192                database.load_invalid_token_addresses(self.chain.chain_id)
193            )?;
194
195            log::info!(
196                "Loading {} valid tokens and {} invalid tokens from cache database",
197                tokens.len(),
198                invalid_tokens.len()
199            );
200
201            self.tokens
202                .extend(tokens.into_iter().map(|token| (token.address, token)));
203            self.invalid_tokens.extend(invalid_tokens);
204        }
205        Ok(())
206    }
207
208    /// Loads DEX exchange pools from the database into the in-memory cache.
209    ///
210    /// Returns the loaded pools.
211    ///
212    /// # Errors
213    ///
214    /// Returns an error if the DEX has not been registered or if database operations fail.
215    pub async fn load_pools(&mut self, dex_id: &DexType) -> anyhow::Result<Vec<Pool>> {
216        let mut loaded_pools = Vec::new();
217
218        if let Some(database) = &self.database {
219            let dex = self
220                .get_dex(dex_id)
221                .ok_or_else(|| anyhow::anyhow!("DEX {dex_id:?} has not been registered"))?;
222            let pool_rows = database
223                .load_pools(self.chain.clone(), &dex_id.to_string())
224                .await?;
225            log::info!(
226                "Loading {} pools for DEX {} from cache database",
227                pool_rows.len(),
228                dex_id,
229            );
230
231            for pool_row in pool_rows {
232                let token0 = if let Some(token) = self.tokens.get(&pool_row.token0_address) {
233                    token
234                } else {
235                    log::error!(
236                        "Failed to load pool {} for DEX {}: Token0 with address {} not found in cache. \
237                             This may indicate the token was not properly loaded from the database or the pool references an unknown token",
238                        pool_row.address,
239                        dex_id,
240                        pool_row.token0_address
241                    );
242                    continue;
243                };
244
245                let token1 = if let Some(token) = self.tokens.get(&pool_row.token1_address) {
246                    token
247                } else {
248                    log::error!(
249                        "Failed to load pool {} for DEX {}: Token1 with address {} not found in cache. \
250                             This may indicate the token was not properly loaded from the database or the pool references an unknown token",
251                        pool_row.address,
252                        dex_id,
253                        pool_row.token1_address
254                    );
255                    continue;
256                };
257
258                // Construct pool from row data and cached tokens
259                let Some(pool_identifier) = pool_row.pool_identifier.parse().ok() else {
260                    log::error!(
261                        "Invalid pool identifier '{}' in database for pool {}, skipping",
262                        pool_row.pool_identifier,
263                        pool_row.address
264                    );
265                    continue;
266                };
267                let mut pool = Pool::new(
268                    self.chain.clone(),
269                    dex.clone(),
270                    pool_row.address,
271                    pool_identifier,
272                    pool_row.creation_block as u64,
273                    token0.clone(),
274                    token1.clone(),
275                    pool_row.fee.map(|fee| fee as u32),
276                    pool_row
277                        .tick_spacing
278                        .map(|tick_spacing| tick_spacing as u32),
279                    UnixNanos::default(), // TODO use default for now
280                );
281
282                // Set hooks if available
283                if let Some(ref hook_address_str) = pool_row.hook_address
284                    && let Ok(hooks) = hook_address_str.parse()
285                {
286                    pool.set_hooks(hooks);
287                }
288
289                // Initialize pool with initial values if available
290                if let Some(initial_sqrt_price_x96_str) = &pool_row.initial_sqrt_price_x96
291                    && let Ok(initial_sqrt_price_x96) = initial_sqrt_price_x96_str.parse()
292                    && let Some(initial_tick) = pool_row.initial_tick
293                {
294                    pool.initialize(initial_sqrt_price_x96, initial_tick);
295                }
296
297                // Add pool to cache and loaded pools list
298                loaded_pools.push(pool.clone());
299                self.pools.insert(pool.pool_identifier, Arc::new(pool));
300            }
301        }
302        Ok(loaded_pools)
303    }
304
305    /// Loads block timestamps from the database starting `from_block` number
306    /// into the in-memory cache.
307    #[allow(dead_code)]
308    async fn load_blocks(&mut self, from_block: u64) -> anyhow::Result<()> {
309        if let Some(database) = &self.database {
310            let block_timestamps = database
311                .load_block_timestamps(self.chain.clone(), from_block)
312                .await?;
313
314            // Verify block number sequence consistency
315            if !block_timestamps.is_empty() {
316                let first = block_timestamps.first().unwrap().number;
317                let last = block_timestamps.last().unwrap().number;
318                let expected_len = (last - first + 1) as usize;
319                if block_timestamps.len() != expected_len {
320                    anyhow::bail!(
321                        "Block timestamps are not consistent and sequential. Expected {expected_len} blocks but got {}",
322                        block_timestamps.len()
323                    );
324                }
325            }
326
327            if block_timestamps.is_empty() {
328                log::info!("No blocks found in database");
329                return Ok(());
330            }
331
332            log::info!(
333                "Loading {} blocks timestamps from the cache database with last block number {}",
334                block_timestamps.len(),
335                block_timestamps.last().unwrap().number,
336            );
337
338            for block in block_timestamps {
339                self.block_timestamps.insert(block.number, block.timestamp);
340            }
341        }
342        Ok(())
343    }
344
345    /// Adds a block to the cache and persists it to the database if available.
346    ///
347    /// # Errors
348    ///
349    /// Returns an error if adding the block to the database fails.
350    pub async fn add_block(&mut self, block: Block) -> anyhow::Result<()> {
351        if let Some(database) = &self.database {
352            database.add_block(self.chain.chain_id, &block).await?;
353        }
354        self.block_timestamps.insert(block.number, block.timestamp);
355        Ok(())
356    }
357
358    /// Adds multiple blocks to the cache and persists them to the database in batch if available.
359    ///
360    /// # Errors
361    ///
362    /// Returns an error if adding the blocks to the database fails.
363    pub async fn add_blocks_batch(
364        &mut self,
365        blocks: Vec<Block>,
366        use_copy_command: bool,
367    ) -> anyhow::Result<()> {
368        if blocks.is_empty() {
369            return Ok(());
370        }
371
372        if let Some(database) = &self.database {
373            if use_copy_command {
374                database
375                    .add_blocks_copy(self.chain.chain_id, &blocks)
376                    .await?;
377            } else {
378                database
379                    .add_blocks_batch(self.chain.chain_id, &blocks)
380                    .await?;
381            }
382        }
383
384        // Update in-memory cache
385        for block in blocks {
386            self.block_timestamps.insert(block.number, block.timestamp);
387        }
388
389        Ok(())
390    }
391
392    /// Adds a DEX to the cache with the specified identifier.
393    ///
394    /// # Errors
395    ///
396    /// Returns an error if adding the DEX to the database fails.
397    pub async fn add_dex(&mut self, dex: SharedDex) -> anyhow::Result<()> {
398        log::info!("Adding dex {} to the cache", dex.name);
399
400        if let Some(database) = &self.database {
401            database.add_dex(dex.clone()).await?;
402        }
403
404        self.dexes.insert(dex.name, dex);
405        Ok(())
406    }
407
408    /// Adds a liquidity pool/pair to the cache.
409    ///
410    /// # Errors
411    ///
412    /// Returns an error if adding the pool to the database fails.
413    pub async fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
414        if let Some(database) = &self.database {
415            database.add_pool(&pool).await?;
416        }
417
418        self.pools.insert(pool.pool_identifier, Arc::new(pool));
419        Ok(())
420    }
421
422    /// Adds multiple pools to the cache and persists them to the database in batch if available.
423    ///
424    /// # Errors
425    ///
426    /// Returns an error if adding the pools to the database fails.
427    pub async fn add_pools_batch(&mut self, pools: Vec<Pool>) -> anyhow::Result<()> {
428        if pools.is_empty() {
429            return Ok(());
430        }
431
432        if let Some(database) = &self.database {
433            database.add_pools_copy(self.chain.chain_id, &pools).await?;
434        }
435        self.pools.extend(
436            pools
437                .into_iter()
438                .map(|pool| (pool.pool_identifier, Arc::new(pool))),
439        );
440
441        Ok(())
442    }
443
444    /// Adds a [`Token`] to the cache.
445    ///
446    /// # Errors
447    ///
448    /// Returns an error if adding the token to the database fails.
449    pub async fn add_token(&mut self, token: Token) -> anyhow::Result<()> {
450        if let Some(database) = &self.database {
451            database.add_token(&token).await?;
452        }
453        self.tokens.insert(token.address, token);
454        Ok(())
455    }
456
457    /// Adds multiple tokens to the cache and persists them to the database in batch if available.
458    ///
459    /// # Errors
460    ///
461    /// Returns an error if adding the tokens to the database fails.
462    pub async fn add_tokens_batch(&mut self, tokens: Vec<Token>) -> anyhow::Result<()> {
463        if tokens.is_empty() {
464            return Ok(());
465        }
466
467        if let Some(database) = &self.database {
468            database
469                .add_tokens_copy(self.chain.chain_id, &tokens)
470                .await?;
471        }
472
473        self.tokens
474            .extend(tokens.into_iter().map(|token| (token.address, token)));
475
476        Ok(())
477    }
478
479    /// Updates the in-memory token cache without persisting to the database.
480    pub fn insert_token_in_memory(&mut self, token: Token) {
481        self.tokens.insert(token.address, token);
482    }
483
484    /// Marks a token address as invalid in the in-memory cache without persisting to the database.
485    pub fn insert_invalid_token_in_memory(&mut self, address: Address) {
486        self.invalid_tokens.insert(address);
487    }
488
489    /// Adds an invalid token address with associated error information to the cache.
490    ///
491    /// # Errors
492    ///
493    /// Returns an error if adding the invalid token to the database fails.
494    pub async fn add_invalid_token(
495        &mut self,
496        address: Address,
497        error_string: &str,
498    ) -> anyhow::Result<()> {
499        if let Some(database) = &self.database {
500            database
501                .add_invalid_token(self.chain.chain_id, &address, error_string)
502                .await?;
503        }
504        self.invalid_tokens.insert(address);
505        Ok(())
506    }
507
508    /// Adds a [`PoolSwap`] to the cache database if available.
509    ///
510    /// # Errors
511    ///
512    /// Returns an error if adding the swap to the database fails.
513    pub async fn add_pool_swap(&self, swap: &PoolSwap) -> anyhow::Result<()> {
514        if let Some(database) = &self.database {
515            database.add_swap(self.chain.chain_id, swap).await?;
516        }
517
518        Ok(())
519    }
520
521    /// Adds a [`PoolLiquidityUpdate`] to the cache database if available.
522    ///
523    /// # Errors
524    ///
525    /// Returns an error if adding the liquidity update to the database fails.
526    pub async fn add_liquidity_update(
527        &self,
528        liquidity_update: &PoolLiquidityUpdate,
529    ) -> anyhow::Result<()> {
530        if let Some(database) = &self.database {
531            database
532                .add_pool_liquidity_update(self.chain.chain_id, liquidity_update)
533                .await?;
534        }
535
536        Ok(())
537    }
538
539    /// Adds multiple [`PoolSwap`]s to the cache database in a single batch operation if available.
540    ///
541    /// # Errors
542    ///
543    /// Returns an error if adding the swaps to the database fails.
544    pub async fn add_pool_swaps_batch(
545        &self,
546        swaps: &[PoolSwap],
547        use_copy_command: bool,
548    ) -> anyhow::Result<()> {
549        if let Some(database) = &self.database {
550            if use_copy_command {
551                database
552                    .add_pool_swaps_copy(self.chain.chain_id, swaps)
553                    .await?;
554            } else {
555                database
556                    .add_pool_swaps_batch(self.chain.chain_id, swaps)
557                    .await?;
558            }
559        }
560
561        Ok(())
562    }
563
564    /// Adds multiple [`PoolLiquidityUpdate`]s to the cache database in a single batch operation if available.
565    ///
566    /// # Errors
567    ///
568    /// Returns an error if adding the liquidity updates to the database fails.
569    pub async fn add_pool_liquidity_updates_batch(
570        &self,
571        updates: &[PoolLiquidityUpdate],
572        use_copy_command: bool,
573    ) -> anyhow::Result<()> {
574        if let Some(database) = &self.database {
575            if use_copy_command {
576                database
577                    .add_pool_liquidity_updates_copy(self.chain.chain_id, updates)
578                    .await?;
579            } else {
580                database
581                    .add_pool_liquidity_updates_batch(self.chain.chain_id, updates)
582                    .await?;
583            }
584        }
585
586        Ok(())
587    }
588
589    /// Adds a batch of pool fee collect events to the cache.
590    ///
591    /// # Errors
592    ///
593    /// Returns an error if adding the fee collects to the database fails.
594    pub async fn add_pool_fee_collects_batch(
595        &self,
596        collects: &[PoolFeeCollect],
597        use_copy_command: bool,
598    ) -> anyhow::Result<()> {
599        if let Some(database) = &self.database {
600            if use_copy_command {
601                database
602                    .copy_pool_fee_collects_batch(self.chain.chain_id, collects)
603                    .await?;
604            } else {
605                database
606                    .add_pool_collects_batch(self.chain.chain_id, collects)
607                    .await?;
608            }
609        }
610
611        Ok(())
612    }
613
614    /// Adds a batch of pool flash events to the cache.
615    ///
616    /// # Errors
617    ///
618    /// Returns an error if adding the flash events to the database fails.
619    pub async fn add_pool_flash_batch(&self, flash_events: &[PoolFlash]) -> anyhow::Result<()> {
620        if let Some(database) = &self.database {
621            database
622                .add_pool_flash_batch(self.chain.chain_id, flash_events)
623                .await?;
624        }
625
626        Ok(())
627    }
628
629    /// Adds a pool snapshot to the cache database.
630    ///
631    /// This method saves the complete snapshot including:
632    /// - Pool state and analytics (pool_snapshot table)
633    /// - All positions at this snapshot (pool_position table)
634    /// - All ticks at this snapshot (pool_tick table)
635    ///
636    /// # Errors
637    ///
638    /// Returns an error if adding the snapshot to the database fails.
639    pub async fn add_pool_snapshot(
640        &self,
641        dex: &DexType,
642        pool_identifier: &PoolIdentifier,
643        snapshot: &PoolSnapshot,
644    ) -> anyhow::Result<()> {
645        if let Some(database) = &self.database {
646            // Save snapshot first (required for foreign key constraints)
647            database
648                .add_pool_snapshot(self.chain.chain_id, dex, pool_identifier, snapshot)
649                .await?;
650
651            let positions: Vec<(PoolIdentifier, PoolPosition)> = snapshot
652                .positions
653                .iter()
654                .map(|pos| (*pool_identifier, pos.clone()))
655                .collect();
656
657            if !positions.is_empty() {
658                database
659                    .add_pool_positions_batch(
660                        self.chain.chain_id,
661                        snapshot.block_position.number,
662                        snapshot.block_position.transaction_index,
663                        snapshot.block_position.log_index,
664                        &positions,
665                    )
666                    .await?;
667            }
668
669            let ticks: Vec<(PoolIdentifier, &PoolTick)> = snapshot
670                .ticks
671                .iter()
672                .map(|tick| (*pool_identifier, tick))
673                .collect();
674
675            if !ticks.is_empty() {
676                database
677                    .add_pool_ticks_batch(
678                        self.chain.chain_id,
679                        snapshot.block_position.number,
680                        snapshot.block_position.transaction_index,
681                        snapshot.block_position.log_index,
682                        &ticks,
683                    )
684                    .await?;
685            }
686        }
687
688        Ok(())
689    }
690
691    /// Updates the initial price and tick for a pool.
692    ///
693    /// # Errors
694    ///
695    /// Returns an error if the database update fails.
696    pub async fn update_pool_initialize_price_tick(
697        &mut self,
698        initialize_event: &InitializeEvent,
699    ) -> anyhow::Result<()> {
700        if let Some(database) = &self.database {
701            database
702                .update_pool_initial_price_tick(self.chain.chain_id, initialize_event)
703                .await?;
704        }
705
706        // Update the cached pool if it exists
707        let pool_identifier = initialize_event.pool_identifier;
708        if let Some(cached_pool) = self.pools.get(&pool_identifier) {
709            let mut updated_pool = (**cached_pool).clone();
710            updated_pool.initialize(initialize_event.sqrt_price_x96, initialize_event.tick);
711
712            self.pools.insert(pool_identifier, Arc::new(updated_pool));
713        }
714
715        Ok(())
716    }
717
718    /// Returns a reference to the `DexExtended` associated with the given name.
719    #[must_use]
720    pub fn get_dex(&self, dex_id: &DexType) -> Option<SharedDex> {
721        self.dexes.get(dex_id).cloned()
722    }
723
724    /// Returns a list of registered `DexType` in the cache.
725    #[must_use]
726    pub fn get_registered_dexes(&self) -> HashSet<DexType> {
727        self.dexes.keys().copied().collect()
728    }
729
730    /// Returns a reference to the pool associated with the given address.
731    #[must_use]
732    pub fn get_pool(&self, pool_identifier: &PoolIdentifier) -> Option<&SharedPool> {
733        self.pools.get(pool_identifier)
734    }
735
736    /// Returns a reference to the `Token` associated with the given address.
737    #[must_use]
738    pub fn get_token(&self, address: &Address) -> Option<&Token> {
739        self.tokens.get(address)
740    }
741
742    /// Checks if a token address is marked as invalid in the cache.
743    ///
744    /// Returns `true` if the address was previously recorded as invalid due to
745    /// validation or processing failures.
746    #[must_use]
747    pub fn is_invalid_token(&self, address: &Address) -> bool {
748        self.invalid_tokens.contains(address)
749    }
750
751    /// Saves the checkpoint block number indicating the last completed pool synchronization for a specific DEX.
752    ///
753    /// # Errors
754    ///
755    /// Returns an error if the database operation fails.
756    pub async fn update_dex_last_synced_block(
757        &self,
758        dex: &DexType,
759        block_number: u64,
760    ) -> anyhow::Result<()> {
761        if let Some(database) = &self.database {
762            database
763                .update_dex_last_synced_block(self.chain.chain_id, dex, block_number)
764                .await
765        } else {
766            Ok(())
767        }
768    }
769
770    /// Updates the last synced block number for a pool.
771    ///
772    /// # Errors
773    ///
774    /// Returns an error if the database update fails.
775    pub async fn update_pool_last_synced_block(
776        &self,
777        dex: &DexType,
778        pool_identifier: &PoolIdentifier,
779        block_number: u64,
780    ) -> anyhow::Result<()> {
781        if let Some(database) = &self.database {
782            database
783                .update_pool_last_synced_block(
784                    self.chain.chain_id,
785                    dex,
786                    pool_identifier,
787                    block_number,
788                )
789                .await
790        } else {
791            Ok(())
792        }
793    }
794
795    /// Retrieves the saved checkpoint block number from the last completed pool synchronization for a specific DEX.
796    ///
797    /// # Errors
798    ///
799    /// Returns an error if the database query fails.
800    pub async fn get_dex_last_synced_block(&self, dex: &DexType) -> anyhow::Result<Option<u64>> {
801        if let Some(database) = &self.database {
802            database
803                .get_dex_last_synced_block(self.chain.chain_id, dex)
804                .await
805        } else {
806            Ok(None)
807        }
808    }
809
810    /// Retrieves the last synced block number for a pool.
811    ///
812    /// # Errors
813    ///
814    /// Returns an error if the database query fails.
815    pub async fn get_pool_last_synced_block(
816        &self,
817        dex: &DexType,
818        pool_identifier: &PoolIdentifier,
819    ) -> anyhow::Result<Option<u64>> {
820        if let Some(database) = &self.database {
821            database
822                .get_pool_last_synced_block(self.chain.chain_id, dex, pool_identifier)
823                .await
824        } else {
825            Ok(None)
826        }
827    }
828
829    /// Retrieves the maximum block number across all pool event tables for a given pool.
830    ///
831    /// # Errors
832    ///
833    /// Returns an error if any of the database queries fail.
834    pub async fn get_pool_event_tables_last_block(
835        &self,
836        pool_identifier: &PoolIdentifier,
837    ) -> anyhow::Result<Option<u64>> {
838        if let Some(database) = &self.database {
839            let (swaps_last_block, liquidity_last_block, collect_last_block) = tokio::try_join!(
840                database.get_table_last_block(
841                    self.chain.chain_id,
842                    "pool_swap_event",
843                    pool_identifier
844                ),
845                database.get_table_last_block(
846                    self.chain.chain_id,
847                    "pool_liquidity_event",
848                    pool_identifier
849                ),
850                database.get_table_last_block(
851                    self.chain.chain_id,
852                    "pool_collect_event",
853                    pool_identifier
854                ),
855            )?;
856
857            let max_block = [swaps_last_block, liquidity_last_block, collect_last_block]
858                .into_iter()
859                .flatten()
860                .max();
861            Ok(max_block)
862        } else {
863            Ok(None)
864        }
865    }
866}