Skip to main content

nautilus_blockchain/hypersync/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use ahash::AHashMap;
19use alloy::primitives::Address;
20use futures_util::Stream;
21use hypersync_client::{
22    StreamConfig,
23    net_types::{BlockField, BlockSelection, FieldSelection, Query},
24    simple_types::Log,
25};
26use nautilus_common::live::get_runtime;
27use nautilus_core::hex;
28use nautilus_model::{
29    defi::{Block, DexType, SharedChain},
30    identifiers::InstrumentId,
31};
32use nautilus_network::http::Url;
33
34use crate::{
35    exchanges::get_dex_extended, hypersync::transform::transform_hypersync_block,
36    rpc::types::BlockchainMessage,
37};
38
39/// The interval in milliseconds at which to check for new blocks when waiting
40/// for the hypersync to index the block.
41const BLOCK_POLLING_INTERVAL_MS: u64 = 50;
42
43/// Timeout in seconds for HyperSync HTTP requests.
44const HYPERSYNC_REQUEST_TIMEOUT_SECS: u64 = 30;
45
46/// Timeout in seconds for graceful task shutdown during disconnect.
47/// If the task doesn't finish within this time, it will be forcefully aborted.
48const DISCONNECT_TIMEOUT_SECS: u64 = 5;
49
50/// A client for interacting with a HyperSync API to retrieve blockchain data.
51#[derive(Debug)]
52pub struct HyperSyncClient {
53    /// The target blockchain identifier (e.g. Ethereum, Arbitrum).
54    chain: SharedChain,
55    /// The underlying HyperSync Rust client for making API requests.
56    client: Arc<hypersync_client::Client>,
57    /// Background task handle for the block subscription task.
58    blocks_task: Option<tokio::task::JoinHandle<()>>,
59    /// Cancellation token for the blocks subscription task.
60    blocks_cancellation_token: Option<tokio_util::sync::CancellationToken>,
61    /// Channel for sending blockchain messages to the adapter data client.
62    tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
63    /// Index of pool addressed keyed by instrument ID.
64    pool_addresses: AHashMap<InstrumentId, Address>,
65    /// Cancellation token for graceful shutdown of background tasks.
66    cancellation_token: tokio_util::sync::CancellationToken,
67}
68
69impl HyperSyncClient {
70    /// Creates a new [`HyperSyncClient`] instance for the given chain and message sender.
71    ///
72    /// # Panics
73    ///
74    /// Panics if:
75    /// - The chain's `hypersync_url` is invalid.
76    /// - The `ENVIO_API_TOKEN` environment variable is not set or invalid.
77    /// - The underlying client cannot be initialized.
78    #[must_use]
79    pub fn new(
80        chain: SharedChain,
81        tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
82        cancellation_token: tokio_util::sync::CancellationToken,
83    ) -> Self {
84        let mut config = hypersync_client::ClientConfig::default();
85        let hypersync_url =
86            Url::parse(chain.hypersync_url.as_str()).expect("Invalid HyperSync URL");
87        config.url = hypersync_url.to_string();
88        config.api_token = std::env::var("ENVIO_API_TOKEN")
89            .expect("ENVIO_API_TOKEN environment variable must be set");
90        let client = hypersync_client::Client::new(config)
91            .expect("Failed to create HyperSync client - check ENVIO_API_TOKEN is a valid UUID");
92
93        Self {
94            chain,
95            client: Arc::new(client),
96            blocks_task: None,
97            blocks_cancellation_token: None,
98            tx,
99            pool_addresses: AHashMap::new(),
100            cancellation_token,
101        }
102    }
103
104    #[must_use]
105    pub fn get_pool_address(&self, instrument_id: InstrumentId) -> Option<&Address> {
106        self.pool_addresses.get(&instrument_id)
107    }
108
109    /// Processes DEX contract events for a specific block.
110    ///
111    /// # Panics
112    ///
113    /// Panics if the DEX extended configuration cannot be retrieved or if stream creation fails.
114    pub fn process_block_dex_contract_events(
115        &mut self,
116        dex: &DexType,
117        block: u64,
118        contract_addresses: &[Address],
119        swap_event_encoded_signature: String,
120        mint_event_encoded_signature: String,
121        burn_event_encoded_signature: String,
122    ) {
123        let topics = vec![
124            swap_event_encoded_signature.as_str(),
125            &mint_event_encoded_signature.as_str(),
126            &burn_event_encoded_signature.as_str(),
127        ];
128        let query = Self::construct_contract_events_query(
129            block,
130            Some(block + 1),
131            contract_addresses,
132            &topics,
133        );
134        let tx = if let Some(tx) = &self.tx {
135            tx.clone()
136        } else {
137            log::error!("Hypersync client channel should have been initialized");
138            return;
139        };
140        let client = self.client.clone();
141        let dex_extended =
142            get_dex_extended(self.chain.name, dex).expect("Failed to get dex extended");
143        let cancellation_token = self.cancellation_token.clone();
144
145        let _task = get_runtime().spawn(async move {
146            let mut rx = match client.stream(query, StreamConfig::default()).await {
147                Ok(rx) => rx,
148                Err(e) => {
149                    log::error!("Failed to create DEX event stream: {e}");
150                    return;
151                }
152            };
153
154            loop {
155                tokio::select! {
156                    () = cancellation_token.cancelled() => {
157                        log::debug!("DEX event processing task received cancellation signal");
158                        break;
159                    }
160                    response = rx.recv() => {
161                        let Some(response) = response else {
162                            break;
163                        };
164
165                        let response = match response {
166                            Ok(resp) => resp,
167                            Err(e) => {
168                                log::error!("Failed to receive DEX event stream response: {e}");
169                                break;
170                            }
171                        };
172
173                        for batch in response.data.logs {
174                            for log in batch {
175                                let event_signature = match log.topics.first().and_then(|t| t.as_ref()) {
176                                    Some(log_argument) => {
177                                        hex::encode_prefixed(log_argument.as_ref())
178                                    }
179                                    None => continue,
180                                };
181
182                                if event_signature == swap_event_encoded_signature {
183                                    match dex_extended.parse_swap_event_hypersync(&log) {
184                                        Ok(swap_event) => {
185                                            if let Err(e) =
186                                                tx.send(BlockchainMessage::SwapEvent(swap_event))
187                                            {
188                                                log::error!("Failed to send swap event: {e}");
189                                            }
190                                        }
191                                        Err(e) => {
192                                            log::error!(
193                                                "Failed to parse swap with error '{e:?}' for event: {log:?}",
194                                            );
195                                        }
196                                    }
197                                } else if event_signature == mint_event_encoded_signature {
198                                    match dex_extended.parse_mint_event_hypersync(&log) {
199                                        Ok(swap_event) => {
200                                            if let Err(e) =
201                                                tx.send(BlockchainMessage::MintEvent(swap_event))
202                                            {
203                                                log::error!("Failed to send mint event: {e}");
204                                            }
205                                        }
206                                        Err(e) => {
207                                            log::error!(
208                                                "Failed to parse mint with error '{e:?}' for event: {log:?}",
209                                            );
210                                        }
211                                    }
212                                } else if event_signature == burn_event_encoded_signature {
213                                    match dex_extended.parse_burn_event_hypersync(&log) {
214                                        Ok(swap_event) => {
215                                            if let Err(e) =
216                                                tx.send(BlockchainMessage::BurnEvent(swap_event))
217                                            {
218                                                log::error!("Failed to send burn event: {e}");
219                                            }
220                                        }
221                                        Err(e) => {
222                                            log::error!(
223                                                "Failed to parse burn with error '{e:?}' for event: {log:?}",
224                                            );
225                                        }
226                                    }
227                                } else {
228                                    log::error!("Unknown event signature: {event_signature}");
229                                }
230                            }
231                        }
232                    }
233                }
234            }
235        });
236
237        // Fire-and-forget: task is short-lived (processes one block), errors are logged,
238        // and it responds to cancellation_token for graceful shutdown
239    }
240
241    /// Creates a stream of contract event logs matching the specified criteria.
242    ///
243    /// # Panics
244    ///
245    /// Panics if the contract address cannot be parsed as a valid Ethereum address.
246    pub async fn request_contract_events_stream(
247        &self,
248        from_block: u64,
249        to_block: Option<u64>,
250        contract_address: &Address,
251        topics: Vec<&str>,
252    ) -> impl Stream<Item = Log> + use<> {
253        let query = Self::construct_contract_events_query(
254            from_block,
255            to_block,
256            &[*contract_address],
257            &topics,
258        );
259
260        let mut rx = self
261            .client
262            .clone()
263            .stream(query, StreamConfig::default())
264            .await
265            .expect("Failed to create stream");
266
267        async_stream::stream! {
268              while let Some(response) = rx.recv().await {
269                let response = response.unwrap();
270
271                for batch in response.data.logs {
272                    for log in batch {
273                        yield log
274                    }
275                }
276            }
277        }
278    }
279
280    /// Disconnects from the HyperSync service and stops all background tasks.
281    pub async fn disconnect(&mut self) {
282        log::debug!("Disconnecting HyperSync client");
283        self.cancellation_token.cancel();
284
285        // Await blocks task with timeout, abort if it takes too long
286        if let Some(mut task) = self.blocks_task.take() {
287            match tokio::time::timeout(
288                std::time::Duration::from_secs(DISCONNECT_TIMEOUT_SECS),
289                &mut task,
290            )
291            .await
292            {
293                Ok(Ok(())) => {
294                    log::debug!("Blocks task completed gracefully");
295                }
296                Ok(Err(e)) => {
297                    log::error!("Error awaiting blocks task: {e}");
298                }
299                Err(_) => {
300                    log::warn!(
301                        "Blocks task did not complete within {DISCONNECT_TIMEOUT_SECS}s timeout, \
302                         aborting task (this is expected if Hypersync long-poll was in progress)"
303                    );
304                    task.abort();
305                    let _ = task.await;
306                }
307            }
308        }
309
310        // DEX event tasks are short-lived and self-clean via cancellation_token
311
312        log::debug!("HyperSync client disconnected");
313    }
314
315    /// Returns the current block
316    ///
317    /// # Panics
318    ///
319    /// Panics if the client height request fails.
320    pub async fn current_block(&self) -> u64 {
321        self.client.get_height().await.unwrap()
322    }
323
324    /// Creates a stream that yields blockchain blocks within the specified range.
325    ///
326    /// # Panics
327    ///
328    /// Panics if the stream creation or block transformation fails.
329    pub async fn request_blocks_stream(
330        &self,
331        from_block: u64,
332        to_block: Option<u64>,
333    ) -> impl Stream<Item = Block> {
334        let query = Self::construct_block_query(from_block, to_block);
335        let mut rx = self
336            .client
337            .clone()
338            .stream(query, StreamConfig::default())
339            .await
340            .unwrap();
341
342        let chain = self.chain.name;
343
344        async_stream::stream! {
345            while let Some(response) = rx.recv().await {
346                let response = response.unwrap();
347                for batch in response.data.blocks {
348                        for received_block in batch {
349                            let block = transform_hypersync_block(chain, received_block).unwrap();
350                            yield block
351                        }
352                    }
353            }
354        }
355    }
356
357    /// Starts a background task that continuously polls for new blockchain blocks.
358    ///
359    /// # Panics
360    ///
361    /// Panics if client height requests or block transformations fail.
362    pub fn subscribe_blocks(&mut self) {
363        if self.blocks_task.is_some() {
364            return;
365        }
366
367        let chain = self.chain.name;
368        let client = self.client.clone();
369        let tx = if let Some(tx) = &self.tx {
370            tx.clone()
371        } else {
372            log::error!("Hypersync client channel should have been initialized");
373            return;
374        };
375
376        // Create a child token that can be cancelled independently
377        let blocks_token = self.cancellation_token.child_token();
378        let cancellation_token = blocks_token.clone();
379        self.blocks_cancellation_token = Some(blocks_token);
380
381        let task = get_runtime().spawn(async move {
382            log::debug!("Starting task 'blocks_feed");
383
384            let current_block_height = client.get_height().await.unwrap();
385            let mut query = Self::construct_block_query(current_block_height, None);
386
387            loop {
388                tokio::select! {
389                    () = cancellation_token.cancelled() => {
390                        log::debug!("Blocks subscription task received cancellation signal");
391                        break;
392                    }
393                    result = tokio::time::timeout(
394                        std::time::Duration::from_secs(HYPERSYNC_REQUEST_TIMEOUT_SECS),
395                        client.get(&query)
396                    ) => {
397                        let response = match result {
398                            Ok(Ok(resp)) => resp,
399                            Ok(Err(e)) => {
400                                log::error!("Hypersync request failed: {e}");
401                                break;
402                            }
403                            Err(_) => {
404                                log::warn!("Hypersync request timed out after {HYPERSYNC_REQUEST_TIMEOUT_SECS}s, retrying...");
405                                continue;
406                            }
407                        };
408
409                        for batch in response.data.blocks {
410                            for received_block in batch {
411                                let block = transform_hypersync_block(chain, received_block).unwrap();
412                                let msg = BlockchainMessage::Block(block);
413                                if let Err(e) = tx.send(msg) {
414                                    log::error!("Error sending message: {e}");
415                                }
416                            }
417                        }
418
419                        if let Some(archive_block_height) = response.archive_height
420                            && archive_block_height < response.next_block
421                        {
422                            while client.get_height().await.unwrap() < response.next_block {
423                                tokio::select! {
424                                    () = cancellation_token.cancelled() => {
425                                        log::debug!("Blocks subscription task received cancellation signal during polling");
426                                        return;
427                                    }
428                                    () = tokio::time::sleep(std::time::Duration::from_millis(
429                                        BLOCK_POLLING_INTERVAL_MS,
430                                    )) => {}
431                                }
432                            }
433                        }
434
435                        query.from_block = response.next_block;
436                    }
437                }
438            }
439        });
440
441        self.blocks_task = Some(task);
442    }
443
444    /// Constructs a HyperSync query for fetching blocks with all available fields within the specified range.
445    fn construct_block_query(from_block: u64, to_block: Option<u64>) -> Query {
446        Query {
447            from_block,
448            to_block,
449            blocks: vec![BlockSelection::default()],
450            field_selection: FieldSelection {
451                block: BlockField::all(),
452                ..Default::default()
453            },
454            ..Default::default()
455        }
456    }
457
458    fn construct_contract_events_query(
459        from_block: u64,
460        to_block: Option<u64>,
461        contract_addresses: &[Address],
462        topics: &[&str],
463    ) -> Query {
464        let mut query_value = serde_json::json!({
465            "from_block": from_block,
466            "logs": [{
467                "topics": [topics],
468                "address": contract_addresses
469            }],
470            "field_selection": {
471                "log": [
472                    "block_number",
473                    "transaction_hash",
474                    "transaction_index",
475                    "log_index",
476                    "address",
477                    "data",
478                    "topic0",
479                    "topic1",
480                    "topic2",
481                    "topic3",
482                ]
483            }
484        });
485
486        if let Some(to_block) = to_block
487            && let Some(obj) = query_value.as_object_mut()
488        {
489            obj.insert("to_block".to_string(), serde_json::json!(to_block));
490        }
491
492        serde_json::from_value(query_value).unwrap()
493    }
494
495    /// Unsubscribes from new blocks by stopping the background watch task.
496    pub async fn unsubscribe_blocks(&mut self) {
497        if let Some(task) = self.blocks_task.take() {
498            // Cancel only the blocks child token, not the main cancellation token
499            if let Some(token) = self.blocks_cancellation_token.take() {
500                token.cancel();
501            }
502
503            if let Err(e) = task.await {
504                log::error!("Error awaiting blocks task during unsubscribe: {e}");
505            }
506            log::debug!("Unsubscribed from blocks");
507        }
508    }
509}