1use nautilus_common::{
17 clients::DataClient,
18 defi::RequestPoolSnapshot,
19 live::get_runtime,
20 messages::{
21 DataEvent,
22 defi::{
23 DefiDataCommand, DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand,
24 SubscribeBlocks, SubscribePool, SubscribePoolFeeCollects, SubscribePoolFlashEvents,
25 SubscribePoolLiquidityUpdates, SubscribePoolSwaps, UnsubscribeBlocks, UnsubscribePool,
26 UnsubscribePoolFeeCollects, UnsubscribePoolFlashEvents,
27 UnsubscribePoolLiquidityUpdates, UnsubscribePoolSwaps,
28 },
29 },
30};
31use nautilus_model::{
32 defi::{DefiData, PoolIdentifier, SharedChain, validation::validate_address},
33 identifiers::{ClientId, Venue},
34};
35use ustr::Ustr;
36
37use crate::{
38 config::BlockchainDataClientConfig,
39 data::core::BlockchainDataClientCore,
40 exchanges::get_dex_extended,
41 rpc::{BlockchainRpcClient, types::BlockchainMessage},
42};
43
44#[derive(Debug)]
54pub struct BlockchainDataClient {
55 pub client_id: ClientId,
57 pub chain: SharedChain,
59 pub config: BlockchainDataClientConfig,
61 pub core_client: Option<BlockchainDataClientCore>,
64 hypersync_rx: Option<tokio::sync::mpsc::UnboundedReceiver<BlockchainMessage>>,
66 hypersync_tx: Option<tokio::sync::mpsc::UnboundedSender<BlockchainMessage>>,
68 command_tx: tokio::sync::mpsc::UnboundedSender<DefiDataCommand>,
70 command_rx: Option<tokio::sync::mpsc::UnboundedReceiver<DefiDataCommand>>,
72 process_task: Option<tokio::task::JoinHandle<()>>,
74 cancellation_token: tokio_util::sync::CancellationToken,
76}
77
78impl BlockchainDataClient {
79 #[must_use]
81 pub fn new(client_id: ClientId, config: BlockchainDataClientConfig) -> Self {
82 let chain = config.chain.clone();
83 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
84 let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
85 Self {
86 client_id,
87 chain,
88 core_client: None,
89 config,
90 hypersync_rx: Some(hypersync_rx),
91 hypersync_tx: Some(hypersync_tx),
92 command_tx,
93 command_rx: Some(command_rx),
94 process_task: None,
95 cancellation_token: tokio_util::sync::CancellationToken::new(),
96 }
97 }
98
99 fn spawn_process_task(&mut self) {
107 let command_rx = if let Some(r) = self.command_rx.take() {
108 r
109 } else {
110 log::error!("Command receiver already taken, not spawning handler");
111 return;
112 };
113
114 let cancellation_token = self.cancellation_token.clone();
115
116 let data_tx = nautilus_common::live::runner::get_data_event_sender();
117
118 let mut hypersync_rx = self.hypersync_rx.take().unwrap();
119 let hypersync_tx = self.hypersync_tx.take();
120
121 let mut core_client = BlockchainDataClientCore::new(
122 self.config.clone(),
123 hypersync_tx,
124 Some(data_tx),
125 cancellation_token.clone(),
126 );
127
128 let handle = get_runtime().spawn(async move {
129 log::debug!("Started task 'process'");
130
131 if let Err(e) = core_client.connect().await {
132 if e.to_string().contains("cancelled") || e.to_string().contains("Sync cancelled") {
135 log::warn!("Blockchain core client connection interrupted: {e}");
136 } else {
137 log::error!("Failed to connect blockchain core client: {e}");
138 }
139 return;
140 }
141
142 let mut command_rx = command_rx;
143
144 loop {
145 tokio::select! {
146 () = cancellation_token.cancelled() => {
147 log::debug!("Received cancellation signal in Blockchain data client process task");
148 core_client.disconnect().await;
149 break;
150 }
151 command = command_rx.recv() => {
152 if let Some(cmd) = command {
153 match cmd {
154 DefiDataCommand::Subscribe(cmd) => {
155 let chain = cmd.blockchain();
156 if chain != core_client.chain.name {
157 log::error!("Incorrect blockchain for subscribe command: {chain}");
158 continue;
159 }
160
161 if let Err(e) = Self::handle_subscribe_command(cmd, &mut core_client).await{
162 log::error!("Error processing subscribe command: {e}");
163 }
164 }
165 DefiDataCommand::Unsubscribe(cmd) => {
166 let chain = cmd.blockchain();
167 if chain != core_client.chain.name {
168 log::error!("Incorrect blockchain for subscribe command: {chain}");
169 continue;
170 }
171
172 if let Err(e) = Self::handle_unsubscribe_command(cmd, &mut core_client).await{
173 log::error!("Error processing subscribe command: {e}");
174 }
175 }
176 DefiDataCommand::Request(cmd) => {
177 if let Err(e) = Self::handle_request_command(cmd, &mut core_client).await {
178 log::error!("Error processing request command: {e}");
179 }
180 }
181 }
182 } else {
183 log::debug!("Command channel closed");
184 break;
185 }
186 }
187 data = hypersync_rx.recv() => {
188 if let Some(msg) = data {
189 let data_event = match msg {
190 BlockchainMessage::Block(block) => {
191 for dex in core_client.cache.get_registered_dexes(){
193 let addresses = core_client.subscription_manager.get_subscribed_dex_contract_addresses(&dex);
194 if !addresses.is_empty() {
195 core_client.hypersync_client.process_block_dex_contract_events(
196 &dex,
197 block.number,
198 &addresses,
199 core_client.subscription_manager.get_dex_pool_swap_event_signature(&dex).unwrap(),
200 core_client.subscription_manager.get_dex_pool_mint_event_signature(&dex).unwrap(),
201 core_client.subscription_manager.get_dex_pool_burn_event_signature(&dex).unwrap(),
202 );
203 }
204 }
205
206 Some(DataEvent::DeFi(DefiData::Block(block)))
207 }
208 BlockchainMessage::SwapEvent(swap_event) => {
209 match core_client.get_pool(&swap_event.pool_identifier) {
210 Ok(pool) => {
211 match core_client.process_pool_swap_event(&swap_event, pool){
212 Ok(swap) => Some(DataEvent::DeFi(DefiData::PoolSwap(swap))),
213 Err(e) => {
214 log::error!("Error processing pool swap event: {e}");
215 None
216 }
217 }
218 }
219 Err(e) => {
220 log::error!("Failed to get pool {} with error {:?}", swap_event.pool_identifier, e);
221 None
222 }
223 }
224 }
225 BlockchainMessage::BurnEvent(burn_event) => {
226 match core_client.get_pool(&burn_event.pool_identifier) {
227 Ok(pool) => {
228 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
229 match core_client.process_pool_burn_event(
230 &burn_event,
231 pool,
232 dex_extended,
233 ){
234 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
235 Err(e) => {
236 log::error!("Error processing pool burn event: {e}");
237 None
238 }
239 }
240 }
241 Err(e) => {
242 log::error!("Failed to get pool {} with error {:?}", burn_event.pool_identifier, e);
243 None
244 }
245 }
246 }
247 BlockchainMessage::MintEvent(mint_event) => {
248 match core_client.get_pool(&mint_event.pool_identifier) {
249 Ok(pool) => {
250 let dex_extended = get_dex_extended(core_client.chain.name,&pool.dex.name).expect("Failed to get dex extended");
251 match core_client.process_pool_mint_event(
252 &mint_event,
253 pool,
254 dex_extended,
255 ){
256 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolLiquidityUpdate(update))),
257 Err(e) => {
258 log::error!("Error processing pool mint event: {e}");
259 None
260 }
261 }
262 }
263 Err(e) => {
264 log::error!("Failed to get pool {} with error {:?}", mint_event.pool_identifier, e);
265 None
266 }
267 }
268 }
269 BlockchainMessage::CollectEvent(collect_event) => {
270 match core_client.get_pool(&collect_event.pool_identifier) {
271 Ok(pool) => {
272 let dex_extended = get_dex_extended(core_client.chain.name, &pool.dex.name).expect("Failed to get dex extended");
273 match core_client.process_pool_collect_event(
274 &collect_event,
275 pool,
276 dex_extended,
277 ){
278 Ok(update) => Some(DataEvent::DeFi(DefiData::PoolFeeCollect(update))),
279 Err(e) => {
280 log::error!("Error processing pool collect event: {e}");
281 None
282 }
283 }
284 }
285 Err(e) => {
286 log::error!("Failed to get pool {} with error {:?}", collect_event.pool_identifier, e);
287 None
288 }
289 }
290 }
291 BlockchainMessage::FlashEvent(flash_event) => {
292 match core_client.get_pool(&flash_event.pool_identifier) {
293 Ok(pool) => {
294 match core_client.process_pool_flash_event(&flash_event,pool){
295 Ok(flash) => Some(DataEvent::DeFi(DefiData::PoolFlash(flash))),
296 Err(e) => {
297 log::error!("Error processing pool flash event: {e}");
298 None
299 }
300 }
301 }
302 Err(e) => {
303 log::error!("Failed to get pool {} with error {:?}", flash_event.pool_identifier, e);
304 None
305 }
306 }
307 }
308 };
309
310 if let Some(event) = data_event {
311 core_client.send_data(event);
312 }
313 } else {
314 log::debug!("HyperSync data channel closed");
315 break;
316 }
317 }
318 msg = async {
319 match core_client.rpc_client {
320 Some(ref mut rpc_client) => rpc_client.next_rpc_message().await,
321 None => std::future::pending().await, }
323 } => {
324 match msg {
326 Ok(BlockchainMessage::Block(block)) => {
327 let data = DataEvent::DeFi(DefiData::Block(block));
328 core_client.send_data(data);
329 },
330 Ok(BlockchainMessage::SwapEvent(_)) => {
331 log::warn!("RPC swap events are not yet supported");
332 }
333 Ok(BlockchainMessage::MintEvent(_)) => {
334 log::warn!("RPC mint events are not yet supported");
335 }
336 Ok(BlockchainMessage::BurnEvent(_)) => {
337 log::warn!("RPC burn events are not yet supported");
338 }
339 Ok(BlockchainMessage::CollectEvent(_)) => {
340 log::warn!("RPC collect events are not yet supported");
341 }
342 Ok(BlockchainMessage::FlashEvent(_)) => {
343 log::warn!("RPC flash events are not yet supported");
344 }
345 Err(e) => {
346 log::error!("Error processing RPC message: {e}");
347 }
348 }
349 }
350 }
351 }
352
353 log::debug!("Stopped task 'process'");
354 });
355
356 self.process_task = Some(handle);
357 }
358
359 async fn handle_subscribe_command(
361 command: DefiSubscribeCommand,
362 core_client: &mut BlockchainDataClientCore,
363 ) -> anyhow::Result<()> {
364 match command {
365 DefiSubscribeCommand::Blocks(_cmd) => {
366 log::info!("Processing subscribe blocks command");
367
368 if let Some(ref mut rpc) = core_client.rpc_client {
370 if let Err(e) = rpc.subscribe_blocks().await {
371 log::warn!(
372 "RPC blocks subscription failed: {e}, falling back to HyperSync"
373 );
374 core_client.hypersync_client.subscribe_blocks();
375 tokio::task::yield_now().await;
376 } else {
377 log::info!("Successfully subscribed to blocks via RPC");
378 }
379 } else {
380 log::info!("Subscribing to blocks via HyperSync");
381 core_client.hypersync_client.subscribe_blocks();
382 tokio::task::yield_now().await;
383 }
384
385 Ok(())
386 }
387 DefiSubscribeCommand::Pool(cmd) => {
388 log::info!(
389 "Processing subscribe pool command for {}",
390 cmd.instrument_id
391 );
392
393 if let Some(ref mut _rpc) = core_client.rpc_client {
394 log::warn!("RPC pool subscription not yet implemented, using HyperSync");
395 }
396
397 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
398 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
399 .map_err(|e| {
400 anyhow::anyhow!(
401 "Invalid pool address '{}' failed with error: {:?}",
402 cmd.instrument_id,
403 e
404 )
405 })?;
406
407 core_client
409 .subscription_manager
410 .subscribe_swaps(dex, pool_address);
411 core_client
412 .subscription_manager
413 .subscribe_burns(dex, pool_address);
414 core_client
415 .subscription_manager
416 .subscribe_mints(dex, pool_address);
417 core_client
418 .subscription_manager
419 .subscribe_collects(dex, pool_address);
420 core_client
421 .subscription_manager
422 .subscribe_flashes(dex, pool_address);
423
424 log::info!(
425 "Subscribed to all pool events for {} at address {}",
426 cmd.instrument_id,
427 pool_address
428 );
429 } else {
430 anyhow::bail!(
431 "Invalid venue {}, expected Blockchain DEX format",
432 cmd.instrument_id.venue
433 )
434 }
435
436 Ok(())
437 }
438 DefiSubscribeCommand::PoolSwaps(cmd) => {
439 log::info!(
440 "Processing subscribe pool swaps command for {}",
441 cmd.instrument_id
442 );
443
444 if let Some(ref mut _rpc) = core_client.rpc_client {
445 log::warn!("RPC pool swaps subscription not yet implemented, using HyperSync");
446 }
447
448 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
449 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
450 .map_err(|e| {
451 anyhow::anyhow!(
452 "Invalid pool swap address '{}' failed with error: {:?}",
453 cmd.instrument_id,
454 e
455 )
456 })?;
457 core_client
458 .subscription_manager
459 .subscribe_swaps(dex, pool_address);
460 } else {
461 anyhow::bail!(
462 "Invalid venue {}, expected Blockchain DEX format",
463 cmd.instrument_id.venue
464 )
465 }
466
467 Ok(())
468 }
469 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
470 log::info!(
471 "Processing subscribe pool liquidity updates command for address: {}",
472 cmd.instrument_id
473 );
474
475 if let Some(ref mut _rpc) = core_client.rpc_client {
476 log::warn!(
477 "RPC pool liquidity updates subscription not yet implemented, using HyperSync"
478 );
479 }
480
481 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
482 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
483 .map_err(|_| {
484 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
485 })?;
486 core_client
487 .subscription_manager
488 .subscribe_burns(dex, pool_address);
489 core_client
490 .subscription_manager
491 .subscribe_mints(dex, pool_address);
492 } else {
493 anyhow::bail!(
494 "Invalid venue {}, expected Blockchain DEX format",
495 cmd.instrument_id.venue
496 )
497 }
498
499 Ok(())
500 }
501 DefiSubscribeCommand::PoolFeeCollects(cmd) => {
502 log::info!(
503 "Processing subscribe pool fee collects command for address: {}",
504 cmd.instrument_id
505 );
506
507 if let Some(ref mut _rpc) = core_client.rpc_client {
508 log::warn!(
509 "RPC pool fee collects subscription not yet implemented, using HyperSync"
510 );
511 }
512
513 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
514 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
515 .map_err(|_| {
516 anyhow::anyhow!(
517 "Invalid pool fee collect address: {}",
518 cmd.instrument_id
519 )
520 })?;
521 core_client
522 .subscription_manager
523 .subscribe_collects(dex, pool_address);
524 } else {
525 anyhow::bail!(
526 "Invalid venue {}, expected Blockchain DEX format",
527 cmd.instrument_id.venue
528 )
529 }
530
531 Ok(())
532 }
533 DefiSubscribeCommand::PoolFlashEvents(cmd) => {
534 log::info!(
535 "Processing subscribe pool flash command for address: {}",
536 cmd.instrument_id
537 );
538
539 if let Some(ref mut _rpc) = core_client.rpc_client {
540 log::warn!(
541 "RPC pool fee collects subscription not yet implemented, using HyperSync"
542 );
543 }
544
545 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
546 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
547 .map_err(|_| {
548 anyhow::anyhow!(
549 "Invalid pool flash subscribe address: {}",
550 cmd.instrument_id
551 )
552 })?;
553 core_client
554 .subscription_manager
555 .subscribe_flashes(dex, pool_address);
556 } else {
557 anyhow::bail!(
558 "Invalid venue {}, expected Blockchain DEX format",
559 cmd.instrument_id.venue
560 )
561 }
562
563 Ok(())
564 }
565 }
566 }
567
568 async fn handle_unsubscribe_command(
570 command: DefiUnsubscribeCommand,
571 core_client: &mut BlockchainDataClientCore,
572 ) -> anyhow::Result<()> {
573 match command {
574 DefiUnsubscribeCommand::Blocks(_cmd) => {
575 log::info!("Processing unsubscribe blocks command");
576
577 if core_client.rpc_client.is_some() {
579 log::warn!("RPC blocks unsubscription not yet implemented");
580 }
581
582 core_client.hypersync_client.unsubscribe_blocks().await;
584 log::info!("Unsubscribed from blocks via HyperSync");
585
586 Ok(())
587 }
588 DefiUnsubscribeCommand::Pool(cmd) => {
589 log::info!(
590 "Processing unsubscribe pool command for {}",
591 cmd.instrument_id
592 );
593
594 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
595 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
596 .map_err(|_| {
597 anyhow::anyhow!("Invalid pool address: {}", cmd.instrument_id)
598 })?;
599
600 core_client
602 .subscription_manager
603 .unsubscribe_swaps(dex, pool_address);
604 core_client
605 .subscription_manager
606 .unsubscribe_burns(dex, pool_address);
607 core_client
608 .subscription_manager
609 .unsubscribe_mints(dex, pool_address);
610 core_client
611 .subscription_manager
612 .unsubscribe_collects(dex, pool_address);
613 core_client
614 .subscription_manager
615 .unsubscribe_flashes(dex, pool_address);
616
617 log::info!(
618 "Unsubscribed from all pool events for {} at address {}",
619 cmd.instrument_id,
620 pool_address
621 );
622 } else {
623 anyhow::bail!(
624 "Invalid venue {}, expected Blockchain DEX format",
625 cmd.instrument_id.venue
626 )
627 }
628
629 Ok(())
630 }
631 DefiUnsubscribeCommand::PoolSwaps(cmd) => {
632 log::info!("Processing unsubscribe pool swaps command");
633
634 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
635 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
636 .map_err(|_| {
637 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
638 })?;
639 core_client
640 .subscription_manager
641 .unsubscribe_swaps(dex, pool_address);
642 } else {
643 anyhow::bail!(
644 "Invalid venue {}, expected Blockchain DEX format",
645 cmd.instrument_id.venue
646 )
647 }
648
649 Ok(())
650 }
651 DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd) => {
652 log::info!(
653 "Processing unsubscribe pool liquidity updates command for {}",
654 cmd.instrument_id
655 );
656
657 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
658 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
659 .map_err(|_| {
660 anyhow::anyhow!("Invalid pool swap address: {}", cmd.instrument_id)
661 })?;
662 core_client
663 .subscription_manager
664 .unsubscribe_burns(dex, pool_address);
665 core_client
666 .subscription_manager
667 .unsubscribe_mints(dex, pool_address);
668 } else {
669 anyhow::bail!(
670 "Invalid venue {}, expected Blockchain DEX format",
671 cmd.instrument_id.venue
672 )
673 }
674
675 Ok(())
676 }
677 DefiUnsubscribeCommand::PoolFeeCollects(cmd) => {
678 log::info!(
679 "Processing unsubscribe pool fee collects command for {}",
680 cmd.instrument_id
681 );
682
683 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
684 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
685 .map_err(|_| {
686 anyhow::anyhow!(
687 "Invalid pool fee collect address: {}",
688 cmd.instrument_id
689 )
690 })?;
691 core_client
692 .subscription_manager
693 .unsubscribe_collects(dex, pool_address);
694 } else {
695 anyhow::bail!(
696 "Invalid venue {}, expected Blockchain DEX format",
697 cmd.instrument_id.venue
698 )
699 }
700
701 Ok(())
702 }
703 DefiUnsubscribeCommand::PoolFlashEvents(cmd) => {
704 log::info!(
705 "Processing unsubscribe pool flash command for {}",
706 cmd.instrument_id
707 );
708
709 if let Ok((_, dex)) = cmd.instrument_id.venue.parse_dex() {
710 let pool_address = validate_address(cmd.instrument_id.symbol.as_str())
711 .map_err(|_| {
712 anyhow::anyhow!("Invalid pool flash address: {}", cmd.instrument_id)
713 })?;
714 core_client
715 .subscription_manager
716 .unsubscribe_flashes(dex, pool_address);
717 } else {
718 anyhow::bail!(
719 "Invalid venue {}, expected Blockchain DEX format",
720 cmd.instrument_id.venue
721 )
722 }
723
724 Ok(())
725 }
726 }
727 }
728
729 async fn handle_request_command(
731 command: DefiRequestCommand,
732 core_client: &mut BlockchainDataClientCore,
733 ) -> anyhow::Result<()> {
734 match command {
735 DefiRequestCommand::PoolSnapshot(cmd) => {
736 log::info!("Processing pool snapshot request for {}", cmd.instrument_id);
737
738 let pool_address =
739 validate_address(cmd.instrument_id.symbol.as_str()).map_err(|e| {
740 anyhow::anyhow!(
741 "Invalid pool address '{}' failed with error: {:?}",
742 cmd.instrument_id,
743 e
744 )
745 })?;
746
747 let pool_identifier =
748 PoolIdentifier::Address(Ustr::from(&pool_address.to_string()));
749
750 match core_client.get_pool(&pool_identifier) {
751 Ok(pool) => {
752 let pool = pool.clone();
753 log::debug!("Found pool for snapshot request: {}", cmd.instrument_id);
754
755 let pool_data = DataEvent::DeFi(DefiData::Pool(pool.as_ref().clone()));
757 core_client.send_data(pool_data);
758
759 match core_client.bootstrap_latest_pool_profiler(&pool).await {
760 Ok((profiler, already_valid)) => {
761 let snapshot = profiler.extract_snapshot();
762
763 log::info!(
764 "Saving pool snapshot with {} positions and {} ticks to database...",
765 snapshot.positions.len(),
766 snapshot.ticks.len()
767 );
768 core_client
769 .cache
770 .add_pool_snapshot(
771 &pool.dex.name,
772 &pool.pool_identifier,
773 &snapshot,
774 )
775 .await?;
776
777 if core_client
779 .check_snapshot_validity(&profiler, already_valid)
780 .await?
781 {
782 let snapshot_data =
783 DataEvent::DeFi(DefiData::PoolSnapshot(snapshot));
784 core_client.send_data(snapshot_data);
785 }
786 }
787 Err(e) => log::error!(
788 "Failed to bootstrap pool profiler for {} and extract snapshot with error {e}",
789 cmd.instrument_id
790 ),
791 }
792 }
793 Err(e) => {
794 log::warn!("Pool {} not found in cache: {e}", cmd.instrument_id);
795 }
796 }
797
798 Ok(())
799 }
800 }
801 }
802
803 pub async fn await_process_task_close(&mut self) {
808 if let Some(handle) = self.process_task.take()
809 && let Err(e) = handle.await
810 {
811 log::error!("Process task join error: {e}");
812 }
813 }
814}
815
816#[async_trait::async_trait(?Send)]
817impl DataClient for BlockchainDataClient {
818 fn client_id(&self) -> ClientId {
819 self.client_id
820 }
821
822 fn venue(&self) -> Option<Venue> {
823 None
826 }
827
828 fn start(&mut self) -> anyhow::Result<()> {
829 log::info!(
830 "Starting blockchain data client: chain_name={}, dex_ids={:?}, use_hypersync_for_live_data={}, proxy_url={:?}",
831 self.chain.name,
832 self.config.dex_ids,
833 self.config.use_hypersync_for_live_data,
834 self.config.proxy_url
835 );
836 Ok(())
837 }
838
839 fn stop(&mut self) -> anyhow::Result<()> {
840 log::info!(
841 "Stopping blockchain data client for '{chain_name}'",
842 chain_name = self.chain.name
843 );
844 self.cancellation_token.cancel();
845
846 self.cancellation_token = tokio_util::sync::CancellationToken::new();
848 Ok(())
849 }
850
851 fn reset(&mut self) -> anyhow::Result<()> {
852 log::info!(
853 "Resetting blockchain data client for '{chain_name}'",
854 chain_name = self.chain.name
855 );
856 self.cancellation_token = tokio_util::sync::CancellationToken::new();
857 Ok(())
858 }
859
860 fn dispose(&mut self) -> anyhow::Result<()> {
861 log::info!(
862 "Disposing blockchain data client for '{chain_name}'",
863 chain_name = self.chain.name
864 );
865 Ok(())
866 }
867
868 async fn connect(&mut self) -> anyhow::Result<()> {
869 log::info!(
870 "Connecting blockchain data client for '{}'",
871 self.chain.name
872 );
873
874 if self.process_task.is_none() {
875 self.spawn_process_task();
876 }
877
878 Ok(())
879 }
880
881 async fn disconnect(&mut self) -> anyhow::Result<()> {
882 log::info!(
883 "Disconnecting blockchain data client for '{}'",
884 self.chain.name
885 );
886
887 self.cancellation_token.cancel();
888 self.await_process_task_close().await;
889
890 self.cancellation_token = tokio_util::sync::CancellationToken::new();
892 let (hypersync_tx, hypersync_rx) = tokio::sync::mpsc::unbounded_channel();
893 self.hypersync_tx = Some(hypersync_tx);
894 self.hypersync_rx = Some(hypersync_rx);
895 let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
896 self.command_tx = command_tx;
897 self.command_rx = Some(command_rx);
898
899 Ok(())
900 }
901
902 fn is_connected(&self) -> bool {
903 true
906 }
907
908 fn is_disconnected(&self) -> bool {
909 !self.is_connected()
910 }
911
912 fn subscribe_blocks(&mut self, cmd: SubscribeBlocks) -> anyhow::Result<()> {
913 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Blocks(cmd));
914 self.command_tx.send(command)?;
915 Ok(())
916 }
917
918 fn subscribe_pool(&mut self, cmd: SubscribePool) -> anyhow::Result<()> {
919 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::Pool(cmd));
920 self.command_tx.send(command)?;
921 Ok(())
922 }
923
924 fn subscribe_pool_swaps(&mut self, cmd: SubscribePoolSwaps) -> anyhow::Result<()> {
925 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolSwaps(cmd));
926 self.command_tx.send(command)?;
927 Ok(())
928 }
929
930 fn subscribe_pool_liquidity_updates(
931 &mut self,
932 cmd: SubscribePoolLiquidityUpdates,
933 ) -> anyhow::Result<()> {
934 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolLiquidityUpdates(cmd));
935 self.command_tx.send(command)?;
936 Ok(())
937 }
938
939 fn subscribe_pool_fee_collects(&mut self, cmd: SubscribePoolFeeCollects) -> anyhow::Result<()> {
940 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFeeCollects(cmd));
941 self.command_tx.send(command)?;
942 Ok(())
943 }
944
945 fn subscribe_pool_flash_events(&mut self, cmd: SubscribePoolFlashEvents) -> anyhow::Result<()> {
946 let command = DefiDataCommand::Subscribe(DefiSubscribeCommand::PoolFlashEvents(cmd));
947 self.command_tx.send(command)?;
948 Ok(())
949 }
950
951 fn unsubscribe_blocks(&mut self, cmd: &UnsubscribeBlocks) -> anyhow::Result<()> {
952 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Blocks(cmd.clone()));
953 self.command_tx.send(command)?;
954 Ok(())
955 }
956
957 fn unsubscribe_pool(&mut self, cmd: &UnsubscribePool) -> anyhow::Result<()> {
958 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::Pool(cmd.clone()));
959 self.command_tx.send(command)?;
960 Ok(())
961 }
962
963 fn unsubscribe_pool_swaps(&mut self, cmd: &UnsubscribePoolSwaps) -> anyhow::Result<()> {
964 let command = DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolSwaps(cmd.clone()));
965 self.command_tx.send(command)?;
966 Ok(())
967 }
968
969 fn unsubscribe_pool_liquidity_updates(
970 &mut self,
971 cmd: &UnsubscribePoolLiquidityUpdates,
972 ) -> anyhow::Result<()> {
973 let command =
974 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolLiquidityUpdates(cmd.clone()));
975 self.command_tx.send(command)?;
976 Ok(())
977 }
978
979 fn unsubscribe_pool_fee_collects(
980 &mut self,
981 cmd: &UnsubscribePoolFeeCollects,
982 ) -> anyhow::Result<()> {
983 let command =
984 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFeeCollects(cmd.clone()));
985 self.command_tx.send(command)?;
986 Ok(())
987 }
988
989 fn unsubscribe_pool_flash_events(
990 &mut self,
991 cmd: &UnsubscribePoolFlashEvents,
992 ) -> anyhow::Result<()> {
993 let command =
994 DefiDataCommand::Unsubscribe(DefiUnsubscribeCommand::PoolFlashEvents(cmd.clone()));
995 self.command_tx.send(command)?;
996 Ok(())
997 }
998
999 fn request_pool_snapshot(&self, cmd: RequestPoolSnapshot) -> anyhow::Result<()> {
1000 let command = DefiDataCommand::Request(DefiRequestCommand::PoolSnapshot(cmd));
1001 self.command_tx.send(command)?;
1002 Ok(())
1003 }
1004}