Skip to main content

nautilus_blockchain/data/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_common::{
17    clients::DataClient,
18    defi::RequestPoolSnapshot,
19    live::get_runtime,
20    messages::{
21        DataEvent,
22        defi::{
23            DefiDataCommand, DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand,
24            SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
25            SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
26            UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
27            UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
28        },
29    },
30};
31use nautilus_model::{
32    defi::{DefiData, PoolIdentifier, SharedChain, validation::validate_address},
33    identifiers::{ClientId, Venue},
34};
35use ustr::Ustr;
36
37use crate::{
38    config::BlockchainDataClientConfig,
39    data::core::BlockchainDataClientCore,
40    exchanges::get_dex_extended,
41    rpc::{BlockchainRpcClient, types::BlockchainMessage},
42};
43
44/// A client for interacting with blockchain data from multiple sources.
45///
46/// The `BlockchainDataClient` serves as a facade that coordinates between different blockchain
47/// data providers, caching mechanisms, and contract interactions. It provides a unified interface
48/// for retrieving and processing blockchain data, particularly focused on DeFi protocols.
49///
50/// This client supports two primary data sources:
51/// 1. Direct RPC connections to blockchain nodes (via WebSocket).
52/// 2. HyperSync API for efficient historical data queries.
53#[derive(Debug)]
54pub struct BlockchainDataClient {
55    /// The client ID used to identify this client with the data engine.
56    pub client_id: ClientId,
57    /// The blockchain being targeted by this client instance.
58    pub chain: SharedChain,
59    /// Configuration parameters for the blockchain data client.
60    pub config: BlockchainDataClientConfig,
61    /// The core client instance that handles blockchain operations.
62    /// Wrapped in Option to allow moving it into the background processing task.
63    pub core_client: Option<BlockchainDataClientCore>,
64    /// Channel receiver for messages from the HyperSync client.
65    hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
66    /// Channel sender for messages to the HyperSync client.
67    hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
68    /// Channel sender for commands to be processed asynchronously.
69    command_tx: tokio::sync::mpsc::UnboundedSender<DefiDataCommand>,
70    /// Channel receiver for commands to be processed asynchronously.
71    command_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DefiDataCommand>>,
72    /// Background task for processing messages.
73    process_task: Option<tokio::task::JoinHandle<()>>,
74    /// Cancellation token for graceful shutdown of background tasks.
75    cancellation_token: tokio_util::sync::CancellationToken,
76}
77
78impl BlockchainDataClient {
79    /// Creates a new [`BlockchainDataClient`] instance for the specified configuration.
80    #[must_use]
81    pub fn new(client_id: ClientId, config: BlockchainDataClientConfig) -> Self {
82        let chain = config.chain.clone();
83        let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
84        let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
85        Self {
86            client_id,
87            chain,
88            core_client: None,
89            config,
90            hypersync_rx: Some(hypersync_rx),
91            hypersync_tx: Some(hypersync_tx),
92            command_tx,
93            command_rx: Some(command_rx),
94            process_task: None,
95            cancellation_token: tokio_util::sync::CancellationToken::new(),
96        }
97    }
98
99    /// Spawns the main processing task that handles commands and blockchain data.
100    ///
101    /// This method creates a background task that:
102    /// 1. Processes subscription/unsubscription commands from the command channel
103    /// 2. Handles incoming blockchain data from HyperSync
104    /// 3. Processes RPC messages if RPC client is configured
105    /// 4. Routes processed data to subscribers
106    fn spawn_process_task(&mut self) {
107        let command_rx = if let Some(r) = self.command_rx.take() {
108            r
109        } else {
110            log::error!("Command receiver already taken, not spawning handler");
111            return;
112        };
113
114        let cancellation_token = self.cancellation_token.clone();
115
116        let data_tx = nautilus_common::live::runner::get_data_event_sender();
117
118        let mut hypersync_rx = self.hypersync_rx.take().unwrap();
119        let hypersync_tx = self.hypersync_tx.take();
120
121        let mut core_client = BlockchainDataClientCore::new(
122            self.config.clone(),
123            hypersync_tx,
124            Some(data_tx),
125            cancellation_token.clone(),
126        );
127
128        let handle = get_runtime().spawn(async move {
129            log::debug!("Started task 'process'");
130
131            if let Err(e) = core_client.connect().await {
132                // TODO: connect() could return more granular error types to distinguish
133                // cancellation from actual failures without string matching
134                if e.to_string().contains("cancelled") || e.to_string().contains("Sync cancelled") {
135                    log::warn!("Blockchain core client connection interrupted: {e}");
136                } else {
137                    log::error!("Failed to connect blockchain core client: {e}");
138                }
139                return;
140            }
141
142            let mut command_rx = command_rx;
143
144            loop {
145                tokio::select! {
146                    () = cancellation_token.cancelled() => {
147                        log::debug!("Received cancellation signal in Blockchain data client process task");
148                        core_client.disconnect().await;
149                        break;
150                    }
151                    command = command_rx.recv() => {
152                        if let Some(cmd) = command {
153                            match cmd {
154                                DefiDataCommand::Subscribe(cmd) => {
155                                    let chain = cmd.blockchain();
156                                    if chain != core_client.chain.name {
157                                        log::error!("Incorrect blockchain for subscribe command: {chain}");
158                                        continue;
159                                    }
160
161                                      if let Err(e) = Self::handle_subscribe_command(cmd, &mut core_client).await{
162                                        log::error!("Error processing subscribe command: {e}");
163                                    }
164                                }
165                                DefiDataCommand::Unsubscribe(cmd) => {
166                                    let chain = cmd.blockchain();
167                                    if chain != core_client.chain.name {
168                                        log::error!("Incorrect blockchain for subscribe command: {chain}");
169                                        continue;
170                                    }
171
172                                    if let Err(e) = Self::handle_unsubscribe_command(cmd, &mut core_client).await{
173                                        log::error!("Error processing subscribe command: {e}");
174                                    }
175                                }
176                                DefiDataCommand::Request(cmd) => {
177                                    if let Err(e) = Self::handle_request_command(cmd, &mut core_client).await {
178                                        log::error!("Error processing request command: {e}");
179                                    }
180                                }
181                            }
182                        } else {
183                            log::debug!("Command channel closed");
184                            break;
185                        }
186                    }
187                    data = hypersync_rx.recv() => {
188                        if let Some(msg) = data {
189                            let data_event = match msg {
190                                BlockchainMessage::Block(block) => {
191                                    // Fetch and process all subscribed events per DEX
192                                    for dex in core_client.cache.get_registered_dexes(){
193                                        let addresses = core_client.subscription_manager.get_subscribed_dex_contract_addresses(&dex);
194                                        if !addresses.is_empty() {
195                                            core_client.hypersync_client.process_block_dex_contract_events(
196                                                &dex,
197                                                block.number,
198                                                &addresses,
199                                                core_client.subscription_manager.get_dex_pool_swap_event_signature(&dex).unwrap(),
200                                                core_client.subscription_manager.get_dex_pool_mint_event_signature(&dex).unwrap(),
201                                                core_client.subscription_manager.get_dex_pool_burn_event_signature(&dex).unwrap(),
202                                            );
203                                        }
204                                    }
205
206                                    Some(DataEvent::DeFi(DefiData::Block(block)))
207                                }
208                                BlockchainMessage::SwapEvent(swap_event) => {
209                                    match core_client.get_pool(&swap_event.pool_identifier) {
210                                        Ok(pool) => {
211                                            match core_client.process_pool_swap_event(&swap_event, pool){
212                                                Ok(swap) => Some(DataEvent::DeFi(DefiData::PoolSwap(swap))),
213                                                Err(e) => {
214                                                    log::error!("Error processing pool swap event: {e}");
215                                                    None
216                                                }
217                                            }
218                                        }
219                                        Err(e) => {
220                                            log::error!("Failed to get pool {} with error {:?}", swap_event.pool_identifier, e);
221                                            None
222                                        }
223                                    }
224                                }
225                                BlockchainMessage::BurnEvent(burn_event) => {
226                                    match core_client.get_pool(&burn_event.pool_identifier) {
227                                        Ok(pool) => {
228                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
229                                            match core_client.process_pool_burn_event(
230                                                &burn_event,
231                                                pool,
232                                                dex_extended,
233                                            ){
234                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
235                                                Err(e) => {
236                                                    log::error!("Error processing pool burn event: {e}");
237                                                    None
238                                                }
239                                            }
240                                        }
241                                        Err(e) => {
242                                            log::error!("Failed to get pool {} with error {:?}", burn_event.pool_identifier, e);
243                                            None
244                                        }
245                                    }
246                                }
247                                BlockchainMessage::MintEvent(mint_event) => {
248                                    match core_client.get_pool(&mint_event.pool_identifier) {
249                                        Ok(pool) => {
250                                            let dex_extended = get_dex_extended(core_client.chain.name,&pool.dex.name).expect("Failed to get dex extended");
251                                            match core_client.process_pool_mint_event(
252                                                &mint_event,
253                                                pool,
254                                                dex_extended,
255                                            ){
256                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
257                                                Err(e) => {
258                                                    log::error!("Error processing pool mint event: {e}");
259                                                    None
260                                                }
261                                            }
262                                        }
263                                        Err(e) => {
264                                            log::error!("Failed to get pool {} with error {:?}", mint_event.pool_identifier, e);
265                                            None
266                                        }
267                                    }
268                                }
269                                BlockchainMessage::CollectEvent(collect_event) => {
270                                    match core_client.get_pool(&collect_event.pool_identifier) {
271                                        Ok(pool) => {
272                                            let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
273                                            match core_client.process_pool_collect_event(
274                                                &collect_event,
275                                                pool,
276                                                dex_extended,
277                                            ){
278                                                Ok(update) => Some(DataEvent::DeFi(DefiData::PoolFeeCollect(update))),
279                                                Err(e) => {
280                                                    log::error!("Error processing pool collect event: {e}");
281                                                    None
282                                                }
283                                            }
284                                        }
285                                        Err(e) => {
286                                            log::error!("Failed to get pool {} with error {:?}", collect_event.pool_identifier, e);
287                                            None
288                                        }
289                                    }
290                                }
291                            BlockchainMessage::FlashEvent(flash_event) => {
292                                    match core_client.get_pool(&flash_event.pool_identifier) {
293                                        Ok(pool) => {
294                                            match core_client.process_pool_flash_event(&flash_event,pool){
295                                                Ok(flash) => Some(DataEvent::DeFi(DefiData::PoolFlash(flash))),
296                                                Err(e) => {
297                                                    log::error!("Error processing pool flash event: {e}");
298                                                    None
299                                                }
300                                            }
301                                        }
302                                        Err(e) => {
303                                            log::error!("Failed to get pool {} with error {:?}", flash_event.pool_identifier, e);
304                                            None
305                                        }
306                                    }
307                                }
308                            };
309
310                            if let Some(event) = data_event {
311                                core_client.send_data(event);
312                            }
313                        } else {
314                            log::debug!("HyperSync data channel closed");
315                            break;
316                        }
317                    }
318                    msg = async {
319                        match core_client.rpc_client {
320                            Some(ref mut rpc_client) => rpc_client.next_rpc_message().await,
321                            None => std::future::pending().await,  // Never resolves
322                        }
323                    } => {
324                        // This branch only fires when we actually receive a message
325                        match msg {
326                            Ok(BlockchainMessage::Block(block)) => {
327                                let data = DataEvent::DeFi(DefiData::Block(block));
328                                core_client.send_data(data);
329                            },
330                            Ok(BlockchainMessage::SwapEvent(_)) => {
331                                log::warn!("RPC swap events are not yet supported");
332                            }
333                            Ok(BlockchainMessage::MintEvent(_)) => {
334                                log::warn!("RPC mint events are not yet supported");
335                            }
336                            Ok(BlockchainMessage::BurnEvent(_)) => {
337                                log::warn!("RPC burn events are not yet supported");
338                            }
339                            Ok(BlockchainMessage::CollectEvent(_)) => {
340                                log::warn!("RPC collect events are not yet supported");
341                            }
342                            Ok(BlockchainMessage::FlashEvent(_)) => {
343                                log::warn!("RPC flash events are not yet supported");
344                            }
345                            Err(e) => {
346                                log::error!("Error processing RPC message: {e}");
347                            }
348                        }
349                    }
350                }
351            }
352
353            log::debug!("Stopped task 'process'");
354        });
355
356        self.process_task = Some(handle);
357    }
358
359    /// Processes DeFi subscription commands to start receiving specific blockchain data.
360    async fn handle_subscribe_command(
361        command: DefiSubscribeCommand,
362        core_client: &mut BlockchainDataClientCore,
363    ) -> anyhow::Result<()> {
364        match command {
365            DefiSubscribeCommand::Blocks(_cmd) => {
366                log::info!("Processing subscribe blocks command");
367
368                // Try RPC client first if available, otherwise use HyperSync
369                if let Some(ref mut rpc) = core_client.rpc_client {
370                    if let Err(e) = rpc.subscribe_blocks().await {
371                        log::warn!(
372                            "RPC blocks subscription failed: {e}, falling back to HyperSync"
373                        );
374                        core_client.hypersync_client.subscribe_blocks();
375                        tokio::task::yield_now().await;
376                    } else {
377                        log::info!("Successfully subscribed to blocks via RPC");
378                    }
379                } else {
380                    log::info!("Subscribing to blocks via HyperSync");
381                    core_client.hypersync_client.subscribe_blocks();
382                    tokio::task::yield_now().await;
383                }
384
385                Ok(())
386            }
387            DefiSubscribeCommand::Pool(cmd) => {
388                log::info!(
389                    "Processing subscribe pool command for {}",
390                    cmd.instrument_id
391                );
392
393                if let Some(ref mut _rpc) = core_client.rpc_client {
394                    log::warn!("RPC pool subscription not yet implemented, using HyperSync");
395                }
396
397                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
398                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
399                        .map_err(|e| {
400                            anyhow::anyhow!(
401                                "Invalid pool address '{}' failed with error: {:?}",
402                                cmd.instrument_id,
403                                e
404                            )
405                        })?;
406
407                    // Subscribe to all pool event types
408                    core_client
409                        .subscription_manager
410                        .subscribe_swaps(dex, pool_address);
411                    core_client
412                        .subscription_manager
413                        .subscribe_burns(dex, pool_address);
414                    core_client
415                        .subscription_manager
416                        .subscribe_mints(dex, pool_address);
417                    core_client
418                        .subscription_manager
419                        .subscribe_collects(dex, pool_address);
420                    core_client
421                        .subscription_manager
422                        .subscribe_flashes(dex, pool_address);
423
424                    log::info!(
425                        "Subscribed to all pool events for {} at address {}",
426                        cmd.instrument_id,
427                        pool_address
428                    );
429                } else {
430                    anyhow::bail!(
431                        "Invalid venue {}, expected Blockchain DEX format",
432                        cmd.instrument_id.venue
433                    )
434                }
435
436                Ok(())
437            }
438            DefiSubscribeCommand::PoolSwaps(cmd) => {
439                log::info!(
440                    "Processing subscribe pool swaps command for {}",
441                    cmd.instrument_id
442                );
443
444                if let Some(ref mut _rpc) = core_client.rpc_client {
445                    log::warn!("RPC pool swaps subscription not yet implemented, using HyperSync");
446                }
447
448                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
449                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
450                        .map_err(|e| {
451                            anyhow::anyhow!(
452                                "Invalid pool swap address '{}' failed with error: {:?}",
453                                cmd.instrument_id,
454                                e
455                            )
456                        })?;
457                    core_client
458                        .subscription_manager
459                        .subscribe_swaps(dex, pool_address);
460                } else {
461                    anyhow::bail!(
462                        "Invalid venue {}, expected Blockchain DEX format",
463                        cmd.instrument_id.venue
464                    )
465                }
466
467                Ok(())
468            }
469            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
470                log::info!(
471                    "Processing subscribe pool liquidity updates command for address: {}",
472                    cmd.instrument_id
473                );
474
475                if let Some(ref mut _rpc) = core_client.rpc_client {
476                    log::warn!(
477                        "RPC pool liquidity updates subscription not yet implemented, using HyperSync"
478                    );
479                }
480
481                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
482                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
483                        .map_err(|_| {
484                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
485                        })?;
486                    core_client
487                        .subscription_manager
488                        .subscribe_burns(dex, pool_address);
489                    core_client
490                        .subscription_manager
491                        .subscribe_mints(dex, pool_address);
492                } else {
493                    anyhow::bail!(
494                        "Invalid venue {}, expected Blockchain DEX format",
495                        cmd.instrument_id.venue
496                    )
497                }
498
499                Ok(())
500            }
501            DefiSubscribeCommand::PoolFeeCollects(cmd) => {
502                log::info!(
503                    "Processing subscribe pool fee collects command for address: {}",
504                    cmd.instrument_id
505                );
506
507                if let Some(ref mut _rpc) = core_client.rpc_client {
508                    log::warn!(
509                        "RPC pool fee collects subscription not yet implemented, using HyperSync"
510                    );
511                }
512
513                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
514                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
515                        .map_err(|_| {
516                            anyhow::anyhow!(
517                                "Invalid pool fee collect address: {}",
518                                cmd.instrument_id
519                            )
520                        })?;
521                    core_client
522                        .subscription_manager
523                        .subscribe_collects(dex, pool_address);
524                } else {
525                    anyhow::bail!(
526                        "Invalid venue {}, expected Blockchain DEX format",
527                        cmd.instrument_id.venue
528                    )
529                }
530
531                Ok(())
532            }
533            DefiSubscribeCommand::PoolFlashEvents(cmd) => {
534                log::info!(
535                    "Processing subscribe pool flash command for address: {}",
536                    cmd.instrument_id
537                );
538
539                if let Some(ref mut _rpc) = core_client.rpc_client {
540                    log::warn!(
541                        "RPC pool fee collects subscription not yet implemented, using HyperSync"
542                    );
543                }
544
545                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
546                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
547                        .map_err(|_| {
548                            anyhow::anyhow!(
549                                "Invalid pool flash subscribe address: {}",
550                                cmd.instrument_id
551                            )
552                        })?;
553                    core_client
554                        .subscription_manager
555                        .subscribe_flashes(dex, pool_address);
556                } else {
557                    anyhow::bail!(
558                        "Invalid venue {}, expected Blockchain DEX format",
559                        cmd.instrument_id.venue
560                    )
561                }
562
563                Ok(())
564            }
565        }
566    }
567
568    /// Processes DeFi unsubscription commands to stop receiving specific blockchain data.
569    async fn handle_unsubscribe_command(
570        command: DefiUnsubscribeCommand,
571        core_client: &mut BlockchainDataClientCore,
572    ) -> anyhow::Result<()> {
573        match command {
574            DefiUnsubscribeCommand::Blocks(_cmd) => {
575                log::info!("Processing unsubscribe blocks command");
576
577                // TODO: Implement RPC unsubscription when available
578                if core_client.rpc_client.is_some() {
579                    log::warn!("RPC blocks unsubscription not yet implemented");
580                }
581
582                // Use HyperSync client for unsubscription
583                core_client.hypersync_client.unsubscribe_blocks().await;
584                log::info!("Unsubscribed from blocks via HyperSync");
585
586                Ok(())
587            }
588            DefiUnsubscribeCommand::Pool(cmd) => {
589                log::info!(
590                    "Processing unsubscribe pool command for {}",
591                    cmd.instrument_id
592                );
593
594                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
595                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
596                        .map_err(|_| {
597                            anyhow::anyhow!("Invalid pool address: {}", cmd.instrument_id)
598                        })?;
599
600                    // Unsubscribe from all pool event types
601                    core_client
602                        .subscription_manager
603                        .unsubscribe_swaps(dex, pool_address);
604                    core_client
605                        .subscription_manager
606                        .unsubscribe_burns(dex, pool_address);
607                    core_client
608                        .subscription_manager
609                        .unsubscribe_mints(dex, pool_address);
610                    core_client
611                        .subscription_manager
612                        .unsubscribe_collects(dex, pool_address);
613                    core_client
614                        .subscription_manager
615                        .unsubscribe_flashes(dex, pool_address);
616
617                    log::info!(
618                        "Unsubscribed from all pool events for {} at address {}",
619                        cmd.instrument_id,
620                        pool_address
621                    );
622                } else {
623                    anyhow::bail!(
624                        "Invalid venue {}, expected Blockchain DEX format",
625                        cmd.instrument_id.venue
626                    )
627                }
628
629                Ok(())
630            }
631            DefiUnsubscribeCommand::PoolSwaps(cmd) => {
632                log::info!("Processing unsubscribe pool swaps command");
633
634                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
635                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
636                        .map_err(|_| {
637                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
638                        })?;
639                    core_client
640                        .subscription_manager
641                        .unsubscribe_swaps(dex, pool_address);
642                } else {
643                    anyhow::bail!(
644                        "Invalid venue {}, expected Blockchain DEX format",
645                        cmd.instrument_id.venue
646                    )
647                }
648
649                Ok(())
650            }
651            DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
652                log::info!(
653                    "Processing unsubscribe pool liquidity updates command for {}",
654                    cmd.instrument_id
655                );
656
657                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
658                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
659                        .map_err(|_| {
660                            anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
661                        })?;
662                    core_client
663                        .subscription_manager
664                        .unsubscribe_burns(dex, pool_address);
665                    core_client
666                        .subscription_manager
667                        .unsubscribe_mints(dex, pool_address);
668                } else {
669                    anyhow::bail!(
670                        "Invalid venue {}, expected Blockchain DEX format",
671                        cmd.instrument_id.venue
672                    )
673                }
674
675                Ok(())
676            }
677            DefiUnsubscribeCommand::PoolFeeCollects(cmd) => {
678                log::info!(
679                    "Processing unsubscribe pool fee collects command for {}",
680                    cmd.instrument_id
681                );
682
683                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
684                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
685                        .map_err(|_| {
686                            anyhow::anyhow!(
687                                "Invalid pool fee collect address: {}",
688                                cmd.instrument_id
689                            )
690                        })?;
691                    core_client
692                        .subscription_manager
693                        .unsubscribe_collects(dex, pool_address);
694                } else {
695                    anyhow::bail!(
696                        "Invalid venue {}, expected Blockchain DEX format",
697                        cmd.instrument_id.venue
698                    )
699                }
700
701                Ok(())
702            }
703            DefiUnsubscribeCommand::PoolFlashEvents(cmd) => {
704                log::info!(
705                    "Processing unsubscribe pool flash command for {}",
706                    cmd.instrument_id
707                );
708
709                if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
710                    let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
711                        .map_err(|_| {
712                            anyhow::anyhow!("Invalid pool flash address: {}", cmd.instrument_id)
713                        })?;
714                    core_client
715                        .subscription_manager
716                        .unsubscribe_flashes(dex, pool_address);
717                } else {
718                    anyhow::bail!(
719                        "Invalid venue {}, expected Blockchain DEX format",
720                        cmd.instrument_id.venue
721                    )
722                }
723
724                Ok(())
725            }
726        }
727    }
728
729    /// Processes DeFi request commands to fetch specific blockchain data.
730    async fn handle_request_command(
731        command: DefiRequestCommand,
732        core_client: &mut BlockchainDataClientCore,
733    ) -> anyhow::Result<()> {
734        match command {
735            DefiRequestCommand::PoolSnapshot(cmd) => {
736                log::info!("Processing pool snapshot request for {}", cmd.instrument_id);
737
738                let pool_address =
739                    validate_address(cmd.instrument_id.symbol.as_str()).map_err(|e| {
740                        anyhow::anyhow!(
741                            "Invalid pool address '{}' failed with error: {:?}",
742                            cmd.instrument_id,
743                            e
744                        )
745                    })?;
746
747                let pool_identifier =
748                    PoolIdentifier::Address(Ustr::from(&pool_address.to_string()));
749
750                match core_client.get_pool(&pool_identifier) {
751                    Ok(pool) => {
752                        let pool = pool.clone();
753                        log::debug!("Found pool for snapshot request: {}", cmd.instrument_id);
754
755                        // Send the pool definition
756                        let pool_data = DataEvent::DeFi(DefiData::Pool(pool.as_ref().clone()));
757                        core_client.send_data(pool_data);
758
759                        match core_client.bootstrap_latest_pool_profiler(&pool).await {
760                            Ok((profiler, already_valid)) => {
761                                let snapshot = profiler.extract_snapshot();
762
763                                log::info!(
764                                    "Saving pool snapshot with {} positions and {} ticks to database...",
765                                    snapshot.positions.len(),
766                                    snapshot.ticks.len()
767                                );
768                                core_client
769                                    .cache
770                                    .add_pool_snapshot(
771                                        &pool.dex.name,
772                                        &pool.pool_identifier,
773                                        &snapshot,
774                                    )
775                                    .await?;
776
777                                // If snapshot is valid, send it back to the data engine.
778                                if core_client
779                                    .check_snapshot_validity(&profiler, already_valid)
780                                    .await?
781                                {
782                                    let snapshot_data =
783                                        DataEvent::DeFi(DefiData::PoolSnapshot(snapshot));
784                                    core_client.send_data(snapshot_data);
785                                }
786                            }
787                            Err(e) => log::error!(
788                                "Failed to bootstrap pool profiler for {} and extract snapshot with error {e}",
789                                cmd.instrument_id
790                            ),
791                        }
792                    }
793                    Err(e) => {
794                        log::warn!("Pool {} not found in cache: {e}", cmd.instrument_id);
795                    }
796                }
797
798                Ok(())
799            }
800        }
801    }
802
803    /// Waits for the background processing task to complete.
804    ///
805    /// This method blocks until the spawned process task finishes execution,
806    /// which typically happens after a shutdown signal is sent.
807    pub async fn await_process_task_close(&mut self) {
808        if let Some(handle) = self.process_task.take()
809            && let Err(e) = handle.await
810        {
811            log::error!("Process task join error: {e}");
812        }
813    }
814}
815
816#[async_trait::async_trait(?Send)]
817impl DataClient for BlockchainDataClient {
818    fn client_id(&self) -> ClientId {
819        self.client_id
820    }
821
822    fn venue(&self) -> Option<Venue> {
823        // Blockchain data clients don't map to a single venue since they can provide
824        // data for multiple DEXs across the blockchain
825        None
826    }
827
828    fn start(&mut self) -> anyhow::Result<()> {
829        log::info!(
830            "Starting blockchain data client: chain_name={}, dex_ids={:?}, use_hypersync_for_live_data={}, proxy_url={:?}",
831            self.chain.name,
832            self.config.dex_ids,
833            self.config.use_hypersync_for_live_data,
834            self.config.proxy_url
835        );
836        Ok(())
837    }
838
839    fn stop(&mut self) -> anyhow::Result<()> {
840        log::info!(
841            "Stopping blockchain data client for '{chain_name}'",
842            chain_name = self.chain.name
843        );
844        self.cancellation_token.cancel();
845
846        // Create fresh token for next start cycle
847        self.cancellation_token = tokio_util::sync::CancellationToken::new();
848        Ok(())
849    }
850
851    fn reset(&mut self) -> anyhow::Result<()> {
852        log::info!(
853            "Resetting blockchain data client for '{chain_name}'",
854            chain_name = self.chain.name
855        );
856        self.cancellation_token = tokio_util::sync::CancellationToken::new();
857        Ok(())
858    }
859
860    fn dispose(&mut self) -> anyhow::Result<()> {
861        log::info!(
862            "Disposing blockchain data client for '{chain_name}'",
863            chain_name = self.chain.name
864        );
865        Ok(())
866    }
867
868    async fn connect(&mut self) -> anyhow::Result<()> {
869        log::info!(
870            "Connecting blockchain data client for '{}'",
871            self.chain.name
872        );
873
874        if self.process_task.is_none() {
875            self.spawn_process_task();
876        }
877
878        Ok(())
879    }
880
881    async fn disconnect(&mut self) -> anyhow::Result<()> {
882        log::info!(
883            "Disconnecting blockchain data client for '{}'",
884            self.chain.name
885        );
886
887        self.cancellation_token.cancel();
888        self.await_process_task_close().await;
889
890        // Create fresh token and channels for next connect cycle
891        self.cancellation_token = tokio_util::sync::CancellationToken::new();
892        let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
893        self.hypersync_tx = Some(hypersync_tx);
894        self.hypersync_rx = Some(hypersync_rx);
895        let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
896        self.command_tx = command_tx;
897        self.command_rx = Some(command_rx);
898
899        Ok(())
900    }
901
902    fn is_connected(&self) -> bool {
903        // TODO: Improve connection detection
904        // For now, we'll assume connected if we have either RPC or HyperSync configured
905        true
906    }
907
908    fn is_disconnected(&self) -> bool {
909        !self.is_connected()
910    }
911
912    fn subscribe_blocks(&mut self, cmd: SubscribeBlocks) -> anyhow::Result<()> {
913        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd));
914        self.command_tx.send(command)?;
915        Ok(())
916    }
917
918    fn subscribe_pool(&mut self, cmd: SubscribePool) -> anyhow::Result<()> {
919        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd));
920        self.command_tx.send(command)?;
921        Ok(())
922    }
923
924    fn subscribe_pool_swaps(&mut self, cmd: SubscribePoolSwaps) -> anyhow::Result<()> {
925        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd));
926        self.command_tx.send(command)?;
927        Ok(())
928    }
929
930    fn subscribe_pool_liquidity_updates(
931        &mut self,
932        cmd: SubscribePoolLiquidityUpdates,
933    ) -> anyhow::Result<()> {
934        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd));
935        self.command_tx.send(command)?;
936        Ok(())
937    }
938
939    fn subscribe_pool_fee_collects(&mut self, cmd: SubscribePoolFeeCollects) -> anyhow::Result<()> {
940        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFeeCollects(cmd));
941        self.command_tx.send(command)?;
942        Ok(())
943    }
944
945    fn subscribe_pool_flash_events(&mut self, cmd: SubscribePoolFlashEvents) -> anyhow::Result<()> {
946        let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFlashEvents(cmd));
947        self.command_tx.send(command)?;
948        Ok(())
949    }
950
951    fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
952        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
953        self.command_tx.send(command)?;
954        Ok(())
955    }
956
957    fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
958        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
959        self.command_tx.send(command)?;
960        Ok(())
961    }
962
963    fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
964        let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
965        self.command_tx.send(command)?;
966        Ok(())
967    }
968
969    fn unsubscribe_pool_liquidity_updates(
970        &mut self,
971        cmd: &UnsubscribePoolLiquidityUpdates,
972    ) -> anyhow::Result<()> {
973        let command =
974            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
975        self.command_tx.send(command)?;
976        Ok(())
977    }
978
979    fn unsubscribe_pool_fee_collects(
980        &mut self,
981        cmd: &UnsubscribePoolFeeCollects,
982    ) -> anyhow::Result<()> {
983        let command =
984            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFeeCollects(cmd.clone()));
985        self.command_tx.send(command)?;
986        Ok(())
987    }
988
989    fn unsubscribe_pool_flash_events(
990        &mut self,
991        cmd: &UnsubscribePoolFlashEvents,
992    ) -> anyhow::Result<()> {
993        let command =
994            DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFlashEvents(cmd.clone()));
995        self.command_tx.send(command)?;
996        Ok(())
997    }
998
999    fn request_pool_snapshot(&self, cmd: RequestPoolSnapshot) -> anyhow::Result<()> {
1000        let command = DefiDataCommand::Request(DefiRequestCommand::PoolSnapshot(cmd));
1001        self.command_tx.send(command)?;
1002        Ok(())
1003    }
1004}