1use std::{
23 collections::{BTreeMap, HashMap, HashSet},
24 sync::Arc,
25};
26
27use alloy::primitives::Address;
28use nautilus_core::UnixNanos;
29use nautilus_model::defi::{
30 Block, DexType, Pool, PoolIdentifier, PoolLiquidityUpdate, PoolSwap, SharedChain, SharedDex,
31 SharedPool, Token,
32 data::{PoolFeeCollect, PoolFlash},
33 pool_analysis::{position::PoolPosition, snapshot::PoolSnapshot},
34 tick_map::tick::PoolTick,
35};
36use sqlx::postgres::PgConnectOptions;
37
38use crate::{
39 cache::{consistency::CachedBlocksConsistencyStatus, database::BlockchainCacheDatabase},
40 events::initialize::InitializeEvent,
41};
42
43pub mod consistency;
44pub mod copy;
45pub mod database;
46pub mod rows;
47pub mod types;
48
49#[derive(Debug)]
51pub struct BlockchainCache {
52 chain: SharedChain,
54 block_timestamps: BTreeMap<u64, UnixNanos>,
56 dexes: HashMap<DexType, SharedDex>,
58 tokens: HashMap<Address, Token>,
60 invalid_tokens: HashSet<Address>,
62 pools: HashMap<PoolIdentifier, SharedPool>,
64 pub database: Option<BlockchainCacheDatabase>,
66}
67
68impl BlockchainCache {
69 #[must_use]
71 pub fn new(chain: SharedChain) -> Self {
72 Self {
73 chain,
74 dexes: HashMap::new(),
75 tokens: HashMap::new(),
76 invalid_tokens: HashSet::new(),
77 pools: HashMap::new(),
78 block_timestamps: BTreeMap::new(),
79 database: None,
80 }
81 }
82
83 pub async fn get_cache_block_consistency_status(
85 &self,
86 ) -> Option<CachedBlocksConsistencyStatus> {
87 let database = self.database.as_ref()?;
88 database
89 .get_block_consistency_status(&self.chain)
90 .await
91 .map_err(|e| log::error!("Error getting block consistency status: {e}"))
92 .ok()
93 }
94
95 #[must_use]
97 pub fn min_dex_creation_block(&self) -> Option<u64> {
98 self.dexes
99 .values()
100 .map(|dex| dex.factory_creation_block)
101 .min()
102 }
103
104 #[must_use]
106 pub fn get_block_timestamp(&self, block_number: u64) -> Option<&UnixNanos> {
107 self.block_timestamps.get(&block_number)
108 }
109
110 pub async fn initialize_database(&mut self, pg_connect_options: PgConnectOptions) {
112 let database = BlockchainCacheDatabase::init(pg_connect_options).await;
113 self.database = Some(database);
114 }
115
116 pub async fn toggle_performance_settings(&self, enable: bool) -> anyhow::Result<()> {
122 if let Some(database) = &self.database {
123 database.toggle_perf_sync_settings(enable).await
124 } else {
125 log::warn!("Database not initialized, skipping performance settings toggle");
126 Ok(())
127 }
128 }
129
130 pub async fn initialize_chain(&mut self) {
135 if let Some(database) = &self.database {
137 if let Err(e) = database.seed_chain(&self.chain).await {
138 log::error!(
139 "Error seeding chain in database: {e}. Continuing without database cache functionality"
140 );
141 return;
142 }
143 log::info!("Chain seeded in the database");
144
145 match database.create_block_partition(&self.chain).await {
146 Ok(message) => log::info!("Executing block partition creation: {message}"),
147 Err(e) => log::error!(
148 "Error creating block partition for chain {}: {e}. Continuing without partition creation...",
149 self.chain.chain_id
150 ),
151 }
152
153 match database.create_token_partition(&self.chain).await {
154 Ok(message) => log::info!("Executing token partition creation: {message}"),
155 Err(e) => log::error!(
156 "Error creating token partition for chain {}: {e}. Continuing without partition creation...",
157 self.chain.chain_id
158 ),
159 }
160 }
161
162 if let Err(e) = self.load_tokens().await {
163 log::error!("Error loading tokens from the database: {e}");
164 }
165 }
166
167 pub async fn connect(&mut self, from_block: u64) -> anyhow::Result<()> {
173 log::debug!("Connecting and loading from_block {from_block}");
174
175 if let Err(e) = self.load_tokens().await {
176 log::error!("Error loading tokens from the database: {e}");
177 }
178
179 Ok(())
185 }
186
187 async fn load_tokens(&mut self) -> anyhow::Result<()> {
189 if let Some(database) = &self.database {
190 let (tokens, invalid_tokens) = tokio::try_join!(
191 database.load_tokens(self.chain.clone()),
192 database.load_invalid_token_addresses(self.chain.chain_id)
193 )?;
194
195 log::info!(
196 "Loading {} valid tokens and {} invalid tokens from cache database",
197 tokens.len(),
198 invalid_tokens.len()
199 );
200
201 self.tokens
202 .extend(tokens.into_iter().map(|token| (token.address, token)));
203 self.invalid_tokens.extend(invalid_tokens);
204 }
205 Ok(())
206 }
207
208 pub async fn load_pools(&mut self, dex_id: &DexType) -> anyhow::Result<Vec<Pool>> {
216 let mut loaded_pools = Vec::new();
217
218 if let Some(database) = &self.database {
219 let dex = self
220 .get_dex(dex_id)
221 .ok_or_else(|| anyhow::anyhow!("DEX {dex_id:?} has not been registered"))?;
222 let pool_rows = database
223 .load_pools(self.chain.clone(), &dex_id.to_string())
224 .await?;
225 log::info!(
226 "Loading {} pools for DEX {} from cache database",
227 pool_rows.len(),
228 dex_id,
229 );
230
231 for pool_row in pool_rows {
232 let token0 = if let Some(token) = self.tokens.get(&pool_row.token0_address) {
233 token
234 } else {
235 log::error!(
236 "Failed to load pool {} for DEX {}: Token0 with address {} not found in cache. \
237 This may indicate the token was not properly loaded from the database or the pool references an unknown token",
238 pool_row.address,
239 dex_id,
240 pool_row.token0_address
241 );
242 continue;
243 };
244
245 let token1 = if let Some(token) = self.tokens.get(&pool_row.token1_address) {
246 token
247 } else {
248 log::error!(
249 "Failed to load pool {} for DEX {}: Token1 with address {} not found in cache. \
250 This may indicate the token was not properly loaded from the database or the pool references an unknown token",
251 pool_row.address,
252 dex_id,
253 pool_row.token1_address
254 );
255 continue;
256 };
257
258 let Some(pool_identifier) = pool_row.pool_identifier.parse().ok() else {
260 log::error!(
261 "Invalid pool identifier '{}' in database for pool {}, skipping",
262 pool_row.pool_identifier,
263 pool_row.address
264 );
265 continue;
266 };
267 let mut pool = Pool::new(
268 self.chain.clone(),
269 dex.clone(),
270 pool_row.address,
271 pool_identifier,
272 pool_row.creation_block as u64,
273 token0.clone(),
274 token1.clone(),
275 pool_row.fee.map(|fee| fee as u32),
276 pool_row
277 .tick_spacing
278 .map(|tick_spacing| tick_spacing as u32),
279 UnixNanos::default(), );
281
282 if let Some(ref hook_address_str) = pool_row.hook_address
284 && let Ok(hooks) = hook_address_str.parse()
285 {
286 pool.set_hooks(hooks);
287 }
288
289 if let Some(initial_sqrt_price_x96_str) = &pool_row.initial_sqrt_price_x96
291 && let Ok(initial_sqrt_price_x96) = initial_sqrt_price_x96_str.parse()
292 && let Some(initial_tick) = pool_row.initial_tick
293 {
294 pool.initialize(initial_sqrt_price_x96, initial_tick);
295 }
296
297 loaded_pools.push(pool.clone());
299 self.pools.insert(pool.pool_identifier, Arc::new(pool));
300 }
301 }
302 Ok(loaded_pools)
303 }
304
305 #[allow(dead_code)]
308 async fn load_blocks(&mut self, from_block: u64) -> anyhow::Result<()> {
309 if let Some(database) = &self.database {
310 let block_timestamps = database
311 .load_block_timestamps(self.chain.clone(), from_block)
312 .await?;
313
314 if !block_timestamps.is_empty() {
316 let first = block_timestamps.first().unwrap().number;
317 let last = block_timestamps.last().unwrap().number;
318 let expected_len = (last - first + 1) as usize;
319 if block_timestamps.len() != expected_len {
320 anyhow::bail!(
321 "Block timestamps are not consistent and sequential. Expected {expected_len} blocks but got {}",
322 block_timestamps.len()
323 );
324 }
325 }
326
327 if block_timestamps.is_empty() {
328 log::info!("No blocks found in database");
329 return Ok(());
330 }
331
332 log::info!(
333 "Loading {} blocks timestamps from the cache database with last block number {}",
334 block_timestamps.len(),
335 block_timestamps.last().unwrap().number,
336 );
337
338 for block in block_timestamps {
339 self.block_timestamps.insert(block.number, block.timestamp);
340 }
341 }
342 Ok(())
343 }
344
345 pub async fn add_block(&mut self, block: Block) -> anyhow::Result<()> {
351 if let Some(database) = &self.database {
352 database.add_block(self.chain.chain_id, &block).await?;
353 }
354 self.block_timestamps.insert(block.number, block.timestamp);
355 Ok(())
356 }
357
358 pub async fn add_blocks_batch(
364 &mut self,
365 blocks: Vec<Block>,
366 use_copy_command: bool,
367 ) -> anyhow::Result<()> {
368 if blocks.is_empty() {
369 return Ok(());
370 }
371
372 if let Some(database) = &self.database {
373 if use_copy_command {
374 database
375 .add_blocks_copy(self.chain.chain_id, &blocks)
376 .await?;
377 } else {
378 database
379 .add_blocks_batch(self.chain.chain_id, &blocks)
380 .await?;
381 }
382 }
383
384 for block in blocks {
386 self.block_timestamps.insert(block.number, block.timestamp);
387 }
388
389 Ok(())
390 }
391
392 pub async fn add_dex(&mut self, dex: SharedDex) -> anyhow::Result<()> {
398 log::info!("Adding dex {} to the cache", dex.name);
399
400 if let Some(database) = &self.database {
401 database.add_dex(dex.clone()).await?;
402 }
403
404 self.dexes.insert(dex.name, dex);
405 Ok(())
406 }
407
408 pub async fn add_pool(&mut self, pool: Pool) -> anyhow::Result<()> {
414 if let Some(database) = &self.database {
415 database.add_pool(&pool).await?;
416 }
417
418 self.pools.insert(pool.pool_identifier, Arc::new(pool));
419 Ok(())
420 }
421
422 pub async fn add_pools_batch(&mut self, pools: Vec<Pool>) -> anyhow::Result<()> {
428 if pools.is_empty() {
429 return Ok(());
430 }
431
432 if let Some(database) = &self.database {
433 database.add_pools_copy(self.chain.chain_id, &pools).await?;
434 }
435 self.pools.extend(
436 pools
437 .into_iter()
438 .map(|pool| (pool.pool_identifier, Arc::new(pool))),
439 );
440
441 Ok(())
442 }
443
444 pub async fn add_token(&mut self, token: Token) -> anyhow::Result<()> {
450 if let Some(database) = &self.database {
451 database.add_token(&token).await?;
452 }
453 self.tokens.insert(token.address, token);
454 Ok(())
455 }
456
457 pub async fn add_tokens_batch(&mut self, tokens: Vec<Token>) -> anyhow::Result<()> {
463 if tokens.is_empty() {
464 return Ok(());
465 }
466
467 if let Some(database) = &self.database {
468 database
469 .add_tokens_copy(self.chain.chain_id, &tokens)
470 .await?;
471 }
472
473 self.tokens
474 .extend(tokens.into_iter().map(|token| (token.address, token)));
475
476 Ok(())
477 }
478
479 pub fn insert_token_in_memory(&mut self, token: Token) {
481 self.tokens.insert(token.address, token);
482 }
483
484 pub fn insert_invalid_token_in_memory(&mut self, address: Address) {
486 self.invalid_tokens.insert(address);
487 }
488
489 pub async fn add_invalid_token(
495 &mut self,
496 address: Address,
497 error_string: &str,
498 ) -> anyhow::Result<()> {
499 if let Some(database) = &self.database {
500 database
501 .add_invalid_token(self.chain.chain_id, &address, error_string)
502 .await?;
503 }
504 self.invalid_tokens.insert(address);
505 Ok(())
506 }
507
508 pub async fn add_pool_swap(&self, swap: &PoolSwap) -> anyhow::Result<()> {
514 if let Some(database) = &self.database {
515 database.add_swap(self.chain.chain_id, swap).await?;
516 }
517
518 Ok(())
519 }
520
521 pub async fn add_liquidity_update(
527 &self,
528 liquidity_update: &PoolLiquidityUpdate,
529 ) -> anyhow::Result<()> {
530 if let Some(database) = &self.database {
531 database
532 .add_pool_liquidity_update(self.chain.chain_id, liquidity_update)
533 .await?;
534 }
535
536 Ok(())
537 }
538
539 pub async fn add_pool_swaps_batch(
545 &self,
546 swaps: &[PoolSwap],
547 use_copy_command: bool,
548 ) -> anyhow::Result<()> {
549 if let Some(database) = &self.database {
550 if use_copy_command {
551 database
552 .add_pool_swaps_copy(self.chain.chain_id, swaps)
553 .await?;
554 } else {
555 database
556 .add_pool_swaps_batch(self.chain.chain_id, swaps)
557 .await?;
558 }
559 }
560
561 Ok(())
562 }
563
564 pub async fn add_pool_liquidity_updates_batch(
570 &self,
571 updates: &[PoolLiquidityUpdate],
572 use_copy_command: bool,
573 ) -> anyhow::Result<()> {
574 if let Some(database) = &self.database {
575 if use_copy_command {
576 database
577 .add_pool_liquidity_updates_copy(self.chain.chain_id, updates)
578 .await?;
579 } else {
580 database
581 .add_pool_liquidity_updates_batch(self.chain.chain_id, updates)
582 .await?;
583 }
584 }
585
586 Ok(())
587 }
588
589 pub async fn add_pool_fee_collects_batch(
595 &self,
596 collects: &[PoolFeeCollect],
597 use_copy_command: bool,
598 ) -> anyhow::Result<()> {
599 if let Some(database) = &self.database {
600 if use_copy_command {
601 database
602 .copy_pool_fee_collects_batch(self.chain.chain_id, collects)
603 .await?;
604 } else {
605 database
606 .add_pool_collects_batch(self.chain.chain_id, collects)
607 .await?;
608 }
609 }
610
611 Ok(())
612 }
613
614 pub async fn add_pool_flash_batch(&self, flash_events: &[PoolFlash]) -> anyhow::Result<()> {
620 if let Some(database) = &self.database {
621 database
622 .add_pool_flash_batch(self.chain.chain_id, flash_events)
623 .await?;
624 }
625
626 Ok(())
627 }
628
629 pub async fn add_pool_snapshot(
640 &self,
641 dex: &DexType,
642 pool_identifier: &PoolIdentifier,
643 snapshot: &PoolSnapshot,
644 ) -> anyhow::Result<()> {
645 if let Some(database) = &self.database {
646 database
648 .add_pool_snapshot(self.chain.chain_id, dex, pool_identifier, snapshot)
649 .await?;
650
651 let positions: Vec<(PoolIdentifier, PoolPosition)> = snapshot
652 .positions
653 .iter()
654 .map(|pos| (*pool_identifier, pos.clone()))
655 .collect();
656
657 if !positions.is_empty() {
658 database
659 .add_pool_positions_batch(
660 self.chain.chain_id,
661 snapshot.block_position.number,
662 snapshot.block_position.transaction_index,
663 snapshot.block_position.log_index,
664 &positions,
665 )
666 .await?;
667 }
668
669 let ticks: Vec<(PoolIdentifier, &PoolTick)> = snapshot
670 .ticks
671 .iter()
672 .map(|tick| (*pool_identifier, tick))
673 .collect();
674
675 if !ticks.is_empty() {
676 database
677 .add_pool_ticks_batch(
678 self.chain.chain_id,
679 snapshot.block_position.number,
680 snapshot.block_position.transaction_index,
681 snapshot.block_position.log_index,
682 &ticks,
683 )
684 .await?;
685 }
686 }
687
688 Ok(())
689 }
690
691 pub async fn update_pool_initialize_price_tick(
697 &mut self,
698 initialize_event: &InitializeEvent,
699 ) -> anyhow::Result<()> {
700 if let Some(database) = &self.database {
701 database
702 .update_pool_initial_price_tick(self.chain.chain_id, initialize_event)
703 .await?;
704 }
705
706 let pool_identifier = initialize_event.pool_identifier;
708 if let Some(cached_pool) = self.pools.get(&pool_identifier) {
709 let mut updated_pool = (**cached_pool).clone();
710 updated_pool.initialize(initialize_event.sqrt_price_x96, initialize_event.tick);
711
712 self.pools.insert(pool_identifier, Arc::new(updated_pool));
713 }
714
715 Ok(())
716 }
717
718 #[must_use]
720 pub fn get_dex(&self, dex_id: &DexType) -> Option<SharedDex> {
721 self.dexes.get(dex_id).cloned()
722 }
723
724 #[must_use]
726 pub fn get_registered_dexes(&self) -> HashSet<DexType> {
727 self.dexes.keys().copied().collect()
728 }
729
730 #[must_use]
732 pub fn get_pool(&self, pool_identifier: &PoolIdentifier) -> Option<&SharedPool> {
733 self.pools.get(pool_identifier)
734 }
735
736 #[must_use]
738 pub fn get_token(&self, address: &Address) -> Option<&Token> {
739 self.tokens.get(address)
740 }
741
742 #[must_use]
747 pub fn is_invalid_token(&self, address: &Address) -> bool {
748 self.invalid_tokens.contains(address)
749 }
750
751 pub async fn update_dex_last_synced_block(
757 &self,
758 dex: &DexType,
759 block_number: u64,
760 ) -> anyhow::Result<()> {
761 if let Some(database) = &self.database {
762 database
763 .update_dex_last_synced_block(self.chain.chain_id, dex, block_number)
764 .await
765 } else {
766 Ok(())
767 }
768 }
769
770 pub async fn update_pool_last_synced_block(
776 &self,
777 dex: &DexType,
778 pool_identifier: &PoolIdentifier,
779 block_number: u64,
780 ) -> anyhow::Result<()> {
781 if let Some(database) = &self.database {
782 database
783 .update_pool_last_synced_block(
784 self.chain.chain_id,
785 dex,
786 pool_identifier,
787 block_number,
788 )
789 .await
790 } else {
791 Ok(())
792 }
793 }
794
795 pub async fn get_dex_last_synced_block(&self, dex: &DexType) -> anyhow::Result<Option<u64>> {
801 if let Some(database) = &self.database {
802 database
803 .get_dex_last_synced_block(self.chain.chain_id, dex)
804 .await
805 } else {
806 Ok(None)
807 }
808 }
809
810 pub async fn get_pool_last_synced_block(
816 &self,
817 dex: &DexType,
818 pool_identifier: &PoolIdentifier,
819 ) -> anyhow::Result<Option<u64>> {
820 if let Some(database) = &self.database {
821 database
822 .get_pool_last_synced_block(self.chain.chain_id, dex, pool_identifier)
823 .await
824 } else {
825 Ok(None)
826 }
827 }
828
829 pub async fn get_pool_event_tables_last_block(
835 &self,
836 pool_identifier: &PoolIdentifier,
837 ) -> anyhow::Result<Option<u64>> {
838 if let Some(database) = &self.database {
839 let (swaps_last_block, liquidity_last_block, collect_last_block) = tokio::try_join!(
840 database.get_table_last_block(
841 self.chain.chain_id,
842 "pool_swap_event",
843 pool_identifier
844 ),
845 database.get_table_last_block(
846 self.chain.chain_id,
847 "pool_liquidity_event",
848 pool_identifier
849 ),
850 database.get_table_last_block(
851 self.chain.chain_id,
852 "pool_collect_event",
853 pool_identifier
854 ),
855 )?;
856
857 let max_block = [swaps_last_block, liquidity_last_block, collect_last_block]
858 .into_iter()
859 .flatten()
860 .max();
861 Ok(max_block)
862 } else {
863 Ok(None)
864 }
865 }
866}