nautilus_blockchain/hypersync/
client.rs1use std::sync::Arc;
17
18use ahash::AHashMap;
19use alloy::primitives::Address;
20use futures_util::Stream;
21use hypersync_client::{
22 StreamConfig,
23 net_types::{BlockField, BlockSelection, FieldSelection, Query},
24 simple_types::Log,
25};
26use nautilus_common::live::get_runtime;
27use nautilus_core::hex;
28use nautilus_model::{
29 defi::{Block, DexType, SharedChain},
30 identifiers::InstrumentId,
31};
32use nautilus_network::http::Url;
33
34use crate::{
35 exchanges::get_dex_extended, hypersync::transform::transform_hypersync_block,
36 rpc::types::BlockchainMessage,
37};
38
39const BLOCK_POLLING_INTERVAL_MS: u64 = 50;
42
43const HYPERSYNC_REQUEST_TIMEOUT_SECS: u64 = 30;
45
46const DISCONNECT_TIMEOUT_SECS: u64 = 5;
49
50#[derive(Debug)]
52pub struct HyperSyncClient {
53 chain: SharedChain,
55 client: Arc<hypersync_client::Client>,
57 blocks_task: Option<tokio::task::JoinHandle<()>>,
59 blocks_cancellation_token: Option<tokio_util::sync::CancellationToken>,
61 tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
63 pool_addresses: AHashMap<InstrumentId, Address>,
65 cancellation_token: tokio_util::sync::CancellationToken,
67}
68
69impl HyperSyncClient {
70 #[must_use]
79 pub fn new(
80 chain: SharedChain,
81 tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
82 cancellation_token: tokio_util::sync::CancellationToken,
83 ) -> Self {
84 let mut config = hypersync_client::ClientConfig::default();
85 let hypersync_url =
86 Url::parse(chain.hypersync_url.as_str()).expect("Invalid HyperSync URL");
87 config.url = hypersync_url.to_string();
88 config.api_token = std::env::var("ENVIO_API_TOKEN")
89 .expect("ENVIO_API_TOKEN environment variable must be set");
90 let client = hypersync_client::Client::new(config)
91 .expect("Failed to create HyperSync client - check ENVIO_API_TOKEN is a valid UUID");
92
93 Self {
94 chain,
95 client: Arc::new(client),
96 blocks_task: None,
97 blocks_cancellation_token: None,
98 tx,
99 pool_addresses: AHashMap::new(),
100 cancellation_token,
101 }
102 }
103
104 #[must_use]
105 pub fn get_pool_address(&self, instrument_id: InstrumentId) -> Option<&Address> {
106 self.pool_addresses.get(&instrument_id)
107 }
108
109 pub fn process_block_dex_contract_events(
115 &mut self,
116 dex: &DexType,
117 block: u64,
118 contract_addresses: &[Address],
119 swap_event_encoded_signature: String,
120 mint_event_encoded_signature: String,
121 burn_event_encoded_signature: String,
122 ) {
123 let topics = vec![
124 swap_event_encoded_signature.as_str(),
125 &mint_event_encoded_signature.as_str(),
126 &burn_event_encoded_signature.as_str(),
127 ];
128 let query = Self::construct_contract_events_query(
129 block,
130 Some(block + 1),
131 contract_addresses,
132 &topics,
133 );
134 let tx = if let Some(tx) = &self.tx {
135 tx.clone()
136 } else {
137 log::error!("Hypersync client channel should have been initialized");
138 return;
139 };
140 let client = self.client.clone();
141 let dex_extended =
142 get_dex_extended(self.chain.name, dex).expect("Failed to get dex extended");
143 let cancellation_token = self.cancellation_token.clone();
144
145 let _task = get_runtime().spawn(async move {
146 let mut rx = match client.stream(query, StreamConfig::default()).await {
147 Ok(rx) => rx,
148 Err(e) => {
149 log::error!("Failed to create DEX event stream: {e}");
150 return;
151 }
152 };
153
154 loop {
155 tokio::select! {
156 () = cancellation_token.cancelled() => {
157 log::debug!("DEX event processing task received cancellation signal");
158 break;
159 }
160 response = rx.recv() => {
161 let Some(response) = response else {
162 break;
163 };
164
165 let response = match response {
166 Ok(resp) => resp,
167 Err(e) => {
168 log::error!("Failed to receive DEX event stream response: {e}");
169 break;
170 }
171 };
172
173 for batch in response.data.logs {
174 for log in batch {
175 let event_signature = match log.topics.first().and_then(|t| t.as_ref()) {
176 Some(log_argument) => {
177 hex::encode_prefixed(log_argument.as_ref())
178 }
179 None => continue,
180 };
181
182 if event_signature == swap_event_encoded_signature {
183 match dex_extended.parse_swap_event_hypersync(&log) {
184 Ok(swap_event) => {
185 if let Err(e) =
186 tx.send(BlockchainMessage::SwapEvent(swap_event))
187 {
188 log::error!("Failed to send swap event: {e}");
189 }
190 }
191 Err(e) => {
192 log::error!(
193 "Failed to parse swap with error '{e:?}' for event: {log:?}",
194 );
195 }
196 }
197 } else if event_signature == mint_event_encoded_signature {
198 match dex_extended.parse_mint_event_hypersync(&log) {
199 Ok(swap_event) => {
200 if let Err(e) =
201 tx.send(BlockchainMessage::MintEvent(swap_event))
202 {
203 log::error!("Failed to send mint event: {e}");
204 }
205 }
206 Err(e) => {
207 log::error!(
208 "Failed to parse mint with error '{e:?}' for event: {log:?}",
209 );
210 }
211 }
212 } else if event_signature == burn_event_encoded_signature {
213 match dex_extended.parse_burn_event_hypersync(&log) {
214 Ok(swap_event) => {
215 if let Err(e) =
216 tx.send(BlockchainMessage::BurnEvent(swap_event))
217 {
218 log::error!("Failed to send burn event: {e}");
219 }
220 }
221 Err(e) => {
222 log::error!(
223 "Failed to parse burn with error '{e:?}' for event: {log:?}",
224 );
225 }
226 }
227 } else {
228 log::error!("Unknown event signature: {event_signature}");
229 }
230 }
231 }
232 }
233 }
234 }
235 });
236
237 }
240
241 pub async fn request_contract_events_stream(
247 &self,
248 from_block: u64,
249 to_block: Option<u64>,
250 contract_address: &Address,
251 topics: Vec<&str>,
252 ) -> impl Stream<Item = Log> + use<> {
253 let query = Self::construct_contract_events_query(
254 from_block,
255 to_block,
256 &[*contract_address],
257 &topics,
258 );
259
260 let mut rx = self
261 .client
262 .clone()
263 .stream(query, StreamConfig::default())
264 .await
265 .expect("Failed to create stream");
266
267 async_stream::stream! {
268 while let Some(response) = rx.recv().await {
269 let response = response.unwrap();
270
271 for batch in response.data.logs {
272 for log in batch {
273 yield log
274 }
275 }
276 }
277 }
278 }
279
280 pub async fn disconnect(&mut self) {
282 log::debug!("Disconnecting HyperSync client");
283 self.cancellation_token.cancel();
284
285 if let Some(mut task) = self.blocks_task.take() {
287 match tokio::time::timeout(
288 std::time::Duration::from_secs(DISCONNECT_TIMEOUT_SECS),
289 &mut task,
290 )
291 .await
292 {
293 Ok(Ok(())) => {
294 log::debug!("Blocks task completed gracefully");
295 }
296 Ok(Err(e)) => {
297 log::error!("Error awaiting blocks task: {e}");
298 }
299 Err(_) => {
300 log::warn!(
301 "Blocks task did not complete within {DISCONNECT_TIMEOUT_SECS}s timeout, \
302 aborting task (this is expected if Hypersync long-poll was in progress)"
303 );
304 task.abort();
305 let _ = task.await;
306 }
307 }
308 }
309
310 log::debug!("HyperSync client disconnected");
313 }
314
315 pub async fn current_block(&self) -> u64 {
321 self.client.get_height().await.unwrap()
322 }
323
324 pub async fn request_blocks_stream(
330 &self,
331 from_block: u64,
332 to_block: Option<u64>,
333 ) -> impl Stream<Item = Block> {
334 let query = Self::construct_block_query(from_block, to_block);
335 let mut rx = self
336 .client
337 .clone()
338 .stream(query, StreamConfig::default())
339 .await
340 .unwrap();
341
342 let chain = self.chain.name;
343
344 async_stream::stream! {
345 while let Some(response) = rx.recv().await {
346 let response = response.unwrap();
347 for batch in response.data.blocks {
348 for received_block in batch {
349 let block = transform_hypersync_block(chain, received_block).unwrap();
350 yield block
351 }
352 }
353 }
354 }
355 }
356
357 pub fn subscribe_blocks(&mut self) {
363 if self.blocks_task.is_some() {
364 return;
365 }
366
367 let chain = self.chain.name;
368 let client = self.client.clone();
369 let tx = if let Some(tx) = &self.tx {
370 tx.clone()
371 } else {
372 log::error!("Hypersync client channel should have been initialized");
373 return;
374 };
375
376 let blocks_token = self.cancellation_token.child_token();
378 let cancellation_token = blocks_token.clone();
379 self.blocks_cancellation_token = Some(blocks_token);
380
381 let task = get_runtime().spawn(async move {
382 log::debug!("Starting task 'blocks_feed");
383
384 let current_block_height = client.get_height().await.unwrap();
385 let mut query = Self::construct_block_query(current_block_height, None);
386
387 loop {
388 tokio::select! {
389 () = cancellation_token.cancelled() => {
390 log::debug!("Blocks subscription task received cancellation signal");
391 break;
392 }
393 result = tokio::time::timeout(
394 std::time::Duration::from_secs(HYPERSYNC_REQUEST_TIMEOUT_SECS),
395 client.get(&query)
396 ) => {
397 let response = match result {
398 Ok(Ok(resp)) => resp,
399 Ok(Err(e)) => {
400 log::error!("Hypersync request failed: {e}");
401 break;
402 }
403 Err(_) => {
404 log::warn!("Hypersync request timed out after {HYPERSYNC_REQUEST_TIMEOUT_SECS}s, retrying...");
405 continue;
406 }
407 };
408
409 for batch in response.data.blocks {
410 for received_block in batch {
411 let block = transform_hypersync_block(chain, received_block).unwrap();
412 let msg = BlockchainMessage::Block(block);
413 if let Err(e) = tx.send(msg) {
414 log::error!("Error sending message: {e}");
415 }
416 }
417 }
418
419 if let Some(archive_block_height) = response.archive_height
420 && archive_block_height < response.next_block
421 {
422 while client.get_height().await.unwrap() < response.next_block {
423 tokio::select! {
424 () = cancellation_token.cancelled() => {
425 log::debug!("Blocks subscription task received cancellation signal during polling");
426 return;
427 }
428 () = tokio::time::sleep(std::time::Duration::from_millis(
429 BLOCK_POLLING_INTERVAL_MS,
430 )) => {}
431 }
432 }
433 }
434
435 query.from_block = response.next_block;
436 }
437 }
438 }
439 });
440
441 self.blocks_task = Some(task);
442 }
443
444 fn construct_block_query(from_block: u64, to_block: Option<u64>) -> Query {
446 Query {
447 from_block,
448 to_block,
449 blocks: vec![BlockSelection::default()],
450 field_selection: FieldSelection {
451 block: BlockField::all(),
452 ..Default::default()
453 },
454 ..Default::default()
455 }
456 }
457
458 fn construct_contract_events_query(
459 from_block: u64,
460 to_block: Option<u64>,
461 contract_addresses: &[Address],
462 topics: &[&str],
463 ) -> Query {
464 let mut query_value = serde_json::json!({
465 "from_block": from_block,
466 "logs": [{
467 "topics": [topics],
468 "address": contract_addresses
469 }],
470 "field_selection": {
471 "log": [
472 "block_number",
473 "transaction_hash",
474 "transaction_index",
475 "log_index",
476 "address",
477 "data",
478 "topic0",
479 "topic1",
480 "topic2",
481 "topic3",
482 ]
483 }
484 });
485
486 if let Some(to_block) = to_block
487 && let Some(obj) = query_value.as_object_mut()
488 {
489 obj.insert("to_block".to_string(), serde_json::json!(to_block));
490 }
491
492 serde_json::from_value(query_value).unwrap()
493 }
494
495 pub async fn unsubscribe_blocks(&mut self) {
497 if let Some(task) = self.blocks_task.take() {
498 if let Some(token) = self.blocks_cancellation_token.take() {
500 token.cancel();
501 }
502
503 if let Err(e) = task.await {
504 log::error!("Error awaiting blocks task during unsubscribe: {e}");
505 }
506 log::debug!("Unsubscribed from blocks");
507 }
508 }
509}