1use std::{rc::Rc, sync::Arc};
22
23use nautilus_common::{
24 defi,
25 messages::defi::{
26 DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
27 },
28 msgbus::{self, TypedHandler},
29};
30use nautilus_core::UUID4;
31use nautilus_model::{
32 defi::{
33 Blockchain, DefiData, PoolProfiler,
34 data::{DexPoolData, block::BlockPosition},
35 },
36 identifiers::{ClientId, InstrumentId},
37};
38
39use crate::engine::{
40 DataEngine,
41 pool::{
42 PoolCollectHandler, PoolFlashHandler, PoolLiquidityHandler, PoolSwapHandler, PoolUpdater,
43 },
44};
45
46fn get_event_block_position(event: &DexPoolData) -> (u64, u32, u32) {
48 match event {
49 DexPoolData::Swap(s) => (s.block, s.transaction_index, s.log_index),
50 DexPoolData::LiquidityUpdate(u) => (u.block, u.transaction_index, u.log_index),
51 DexPoolData::FeeCollect(c) => (c.block, c.transaction_index, c.log_index),
52 DexPoolData::Flash(f) => (f.block, f.transaction_index, f.log_index),
53 }
54}
55
56fn convert_and_sort_buffered_events(buffered_events: Vec<DefiData>) -> Vec<DexPoolData> {
58 let mut events: Vec<DexPoolData> = buffered_events
59 .into_iter()
60 .filter_map(|event| match event {
61 DefiData::PoolSwap(swap) => Some(DexPoolData::Swap(swap)),
62 DefiData::PoolLiquidityUpdate(update) => Some(DexPoolData::LiquidityUpdate(update)),
63 DefiData::PoolFeeCollect(collect) => Some(DexPoolData::FeeCollect(collect)),
64 DefiData::PoolFlash(flash) => Some(DexPoolData::Flash(flash)),
65 _ => None,
66 })
67 .collect();
68
69 events.sort_by(|a, b| {
70 let pos_a = get_event_block_position(a);
71 let pos_b = get_event_block_position(b);
72 pos_a.cmp(&pos_b)
73 });
74
75 events
76}
77
78impl DataEngine {
79 #[must_use]
81 pub fn subscribed_blocks(&self) -> Vec<Blockchain> {
82 self.collect_subscriptions(|client| &client.subscriptions_blocks)
83 }
84
85 #[must_use]
87 pub fn subscribed_pools(&self) -> Vec<InstrumentId> {
88 self.collect_subscriptions(|client| &client.subscriptions_pools)
89 }
90
91 #[must_use]
93 pub fn subscribed_pool_swaps(&self) -> Vec<InstrumentId> {
94 self.collect_subscriptions(|client| &client.subscriptions_pool_swaps)
95 }
96
97 #[must_use]
99 pub fn subscribed_pool_liquidity_updates(&self) -> Vec<InstrumentId> {
100 self.collect_subscriptions(|client| &client.subscriptions_pool_liquidity_updates)
101 }
102
103 #[must_use]
105 pub fn subscribed_pool_fee_collects(&self) -> Vec<InstrumentId> {
106 self.collect_subscriptions(|client| &client.subscriptions_pool_fee_collects)
107 }
108
109 #[must_use]
111 pub fn subscribed_pool_flash(&self) -> Vec<InstrumentId> {
112 self.collect_subscriptions(|client| &client.subscriptions_pool_flash)
113 }
114
115 pub fn execute_defi_subscribe(&mut self, cmd: DefiSubscribeCommand) -> anyhow::Result<()> {
122 if let Some(client_id) = cmd.client_id()
123 && self.external_clients.contains(client_id)
124 {
125 if self.config.debug {
126 log::debug!("Skipping defi subscribe for external client {client_id}: {cmd:?}",);
127 }
128 return Ok(());
129 }
130
131 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
132 log::info!("Forwarding subscription to client {}", client.client_id);
133 client.execute_defi_subscribe(cmd.clone());
134 } else {
135 log::error!(
136 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
137 cmd.client_id(),
138 cmd.venue(),
139 );
140 }
141
142 match cmd {
143 DefiSubscribeCommand::Pool(cmd) => {
144 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
145 }
146 DefiSubscribeCommand::PoolSwaps(cmd) => {
147 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
148 }
149 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
150 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
151 }
152 DefiSubscribeCommand::PoolFeeCollects(cmd) => {
153 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
154 }
155 DefiSubscribeCommand::PoolFlashEvents(cmd) => {
156 self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
157 }
158 DefiSubscribeCommand::Blocks(_) => {} }
160
161 Ok(())
162 }
163
164 pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) -> anyhow::Result<()> {
170 if let Some(client_id) = cmd.client_id()
171 && self.external_clients.contains(client_id)
172 {
173 if self.config.debug {
174 log::debug!("Skipping defi unsubscribe for external client {client_id}: {cmd:?}",);
175 }
176 return Ok(());
177 }
178
179 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
180 client.execute_defi_unsubscribe(cmd);
181 } else {
182 log::error!(
183 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
184 cmd.client_id(),
185 cmd.venue(),
186 );
187 }
188
189 Ok(())
190 }
191
192 pub fn execute_defi_request(&mut self, req: DefiRequestCommand) -> anyhow::Result<()> {
199 if let Some(cid) = req.client_id()
201 && self.external_clients.contains(cid)
202 {
203 if self.config.debug {
204 log::debug!("Skipping defi data request for external client {cid}: {req:?}");
205 }
206 return Ok(());
207 }
208
209 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
210 client.execute_defi_request(req)
211 } else {
212 anyhow::bail!(
213 "Cannot handle request: no client found for {:?} {:?}",
214 req.client_id(),
215 req.venue()
216 );
217 }
218 }
219
220 pub fn process_defi_data(&mut self, data: DefiData) {
222 self.increment_data_count();
223
224 match data {
225 DefiData::Block(block) => {
226 let topic = defi::switchboard::get_defi_blocks_topic(block.chain());
227 msgbus::publish_defi_block(topic, &block);
228 }
229 DefiData::Pool(pool) => {
230 if let Err(e) = self.cache.borrow_mut().add_pool(pool.clone()) {
231 log::error!("Failed to add Pool to cache: {e}");
232 }
233
234 if self.pool_updaters_pending.remove(&pool.instrument_id) {
236 log::info!(
237 "Pool {} now loaded, creating deferred pool profiler",
238 pool.instrument_id
239 );
240 self.setup_pool_updater(&pool.instrument_id, None);
241 }
242
243 let topic = defi::switchboard::get_defi_pool_topic(pool.instrument_id);
244 msgbus::publish_defi_pool(topic, &pool);
245 }
246 DefiData::PoolSnapshot(snapshot) => {
247 let instrument_id = snapshot.instrument_id;
248 log::info!(
249 "Received pool snapshot for {instrument_id} at block {} with {} positions and {} ticks",
250 snapshot.block_position.number,
251 snapshot.positions.len(),
252 snapshot.ticks.len()
253 );
254
255 if !self.pool_snapshot_pending.contains(&instrument_id) {
257 log::warn!(
258 "Received unexpected pool snapshot for {instrument_id} (not in pending set)"
259 );
260 return;
261 }
262
263 let pool = match self.cache.borrow().pool(&instrument_id) {
265 Some(pool) => Arc::new(pool.clone()),
266 None => {
267 log::error!(
268 "Pool {instrument_id} not found in cache when processing snapshot"
269 );
270 return;
271 }
272 };
273
274 let mut profiler = PoolProfiler::new(pool);
276 if let Err(e) = profiler.restore_from_snapshot(snapshot.clone()) {
277 log::error!(
278 "Failed to restore profiler from snapshot for {instrument_id}: {e}"
279 );
280 return;
281 }
282 log::debug!("Restored pool profiler for {instrument_id} from snapshot");
283
284 let buffered_events = self
286 .pool_event_buffers
287 .remove(&instrument_id)
288 .unwrap_or_default();
289
290 if !buffered_events.is_empty() {
291 log::info!(
292 "Processing {} buffered events for {instrument_id}",
293 buffered_events.len()
294 );
295
296 let events_to_apply = convert_and_sort_buffered_events(buffered_events);
297 let applied_count = Self::apply_buffered_events_to_profiler(
298 &mut profiler,
299 events_to_apply,
300 &snapshot.block_position,
301 instrument_id,
302 );
303
304 log::info!(
305 "Applied {applied_count} buffered events to profiler for {instrument_id}"
306 );
307 }
308
309 if let Err(e) = self.cache.borrow_mut().add_pool_profiler(profiler) {
311 log::error!("Failed to add pool profiler to cache for {instrument_id}: {e}");
312 return;
313 }
314
315 self.pool_snapshot_pending.remove(&instrument_id);
317 let updater = Rc::new(PoolUpdater::new(&instrument_id, self.cache.clone()));
318
319 self.subscribe_pool_updater_topics(instrument_id, updater.clone());
320 self.pool_updaters.insert(instrument_id, updater);
321
322 log::info!(
323 "Pool profiler setup completed for {instrument_id}, now processing live events"
324 );
325 }
326 DefiData::PoolSwap(swap) => {
327 let instrument_id = swap.instrument_id;
328 if self.pool_snapshot_pending.contains(&instrument_id) {
330 log::debug!("Buffering swap event for {instrument_id} (waiting for snapshot)");
331 self.pool_event_buffers
332 .entry(instrument_id)
333 .or_default()
334 .push(DefiData::PoolSwap(swap));
335 } else {
336 let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
337 msgbus::publish_defi_swap(topic, &swap);
338 }
339 }
340 DefiData::PoolLiquidityUpdate(update) => {
341 let instrument_id = update.instrument_id;
342 if self.pool_snapshot_pending.contains(&instrument_id) {
344 log::debug!(
345 "Buffering liquidity update event for {instrument_id} (waiting for snapshot)"
346 );
347 self.pool_event_buffers
348 .entry(instrument_id)
349 .or_default()
350 .push(DefiData::PoolLiquidityUpdate(update));
351 } else {
352 let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
353 msgbus::publish_defi_liquidity(topic, &update);
354 }
355 }
356 DefiData::PoolFeeCollect(collect) => {
357 let instrument_id = collect.instrument_id;
358 if self.pool_snapshot_pending.contains(&instrument_id) {
360 log::debug!(
361 "Buffering fee collect event for {instrument_id} (waiting for snapshot)"
362 );
363 self.pool_event_buffers
364 .entry(instrument_id)
365 .or_default()
366 .push(DefiData::PoolFeeCollect(collect));
367 } else {
368 let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
369 msgbus::publish_defi_collect(topic, &collect);
370 }
371 }
372 DefiData::PoolFlash(flash) => {
373 let instrument_id = flash.instrument_id;
374 if self.pool_snapshot_pending.contains(&instrument_id) {
376 log::debug!("Buffering flash event for {instrument_id} (waiting for snapshot)");
377 self.pool_event_buffers
378 .entry(instrument_id)
379 .or_default()
380 .push(DefiData::PoolFlash(flash));
381 } else {
382 let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
383 msgbus::publish_defi_flash(topic, &flash);
384 }
385 }
386 }
387 }
388
389 fn subscribe_pool_updater_topics(&self, instrument_id: InstrumentId, updater: Rc<PoolUpdater>) {
391 let priority = Some(self.msgbus_priority);
392
393 let swap_topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
395 let swap_handler = TypedHandler(Rc::new(PoolSwapHandler::new(updater.clone())));
396 msgbus::subscribe_defi_swaps(swap_topic.into(), swap_handler, priority);
397
398 let liq_topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
400 let liq_handler = TypedHandler(Rc::new(PoolLiquidityHandler::new(updater.clone())));
401 msgbus::subscribe_defi_liquidity(liq_topic.into(), liq_handler, priority);
402
403 let collect_topic = defi::switchboard::get_defi_collect_topic(instrument_id);
405 let collect_handler = TypedHandler(Rc::new(PoolCollectHandler::new(updater.clone())));
406 msgbus::subscribe_defi_collects(collect_topic.into(), collect_handler, priority);
407
408 let flash_topic = defi::switchboard::get_defi_flash_topic(instrument_id);
410 let flash_handler = TypedHandler(Rc::new(PoolFlashHandler::new(updater)));
411 msgbus::subscribe_defi_flash(flash_topic.into(), flash_handler, priority);
412 }
413
414 fn apply_buffered_events_to_profiler(
418 profiler: &mut PoolProfiler,
419 events: Vec<DexPoolData>,
420 snapshot_block: &BlockPosition,
421 instrument_id: InstrumentId,
422 ) -> usize {
423 let mut applied_count = 0;
424
425 for event in events {
426 let event_block = get_event_block_position(&event);
427
428 let is_after_snapshot = event_block.0 > snapshot_block.number
430 || (event_block.0 == snapshot_block.number
431 && event_block.1 > snapshot_block.transaction_index)
432 || (event_block.0 == snapshot_block.number
433 && event_block.1 == snapshot_block.transaction_index
434 && event_block.2 > snapshot_block.log_index);
435
436 if is_after_snapshot {
437 if let Err(e) = profiler.process(&event) {
438 log::error!(
439 "Failed to apply buffered event to profiler for {instrument_id}: {e}"
440 );
441 } else {
442 applied_count += 1;
443 }
444 }
445 }
446
447 applied_count
448 }
449
450 fn setup_pool_updater(&mut self, instrument_id: &InstrumentId, client_id: Option<&ClientId>) {
451 if self.pool_updaters.contains_key(instrument_id)
453 || self.pool_updaters_pending.contains(instrument_id)
454 {
455 log::debug!("Pool updater for {instrument_id} already exists");
456 return;
457 }
458
459 log::info!("Setting up pool updater for {instrument_id}");
460
461 {
463 let mut cache = self.cache.borrow_mut();
464
465 if cache.pool_profiler(instrument_id).is_some() {
466 log::debug!("Pool profiler already exists for {instrument_id}");
468 } else if let Some(pool) = cache.pool(instrument_id) {
469 let pool = Arc::new(pool.clone());
471 let mut pool_profiler = PoolProfiler::new(pool.clone());
472
473 if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
474 pool_profiler.initialize(initial_sqrt_price_x96);
475 log::debug!(
476 "Initialized pool profiler for {instrument_id} with sqrt_price {initial_sqrt_price_x96}"
477 );
478 } else {
479 log::debug!("Created pool profiler for {instrument_id}");
480 }
481
482 if let Err(e) = cache.add_pool_profiler(pool_profiler) {
483 log::error!("Failed to add pool profiler for {instrument_id}: {e}");
484 drop(cache);
485 return;
486 }
487 drop(cache);
488 } else {
489 drop(cache);
491
492 let request_id = UUID4::new();
493 let ts_init = self.clock.borrow().timestamp_ns();
494 let request = RequestPoolSnapshot::new(
495 *instrument_id,
496 client_id.copied(),
497 request_id,
498 ts_init,
499 None,
500 );
501
502 if let Err(e) = self.execute_defi_request(DefiRequestCommand::PoolSnapshot(request))
503 {
504 log::warn!("Failed to request pool snapshot for {instrument_id}: {e}");
505 } else {
506 log::debug!("Requested pool snapshot for {instrument_id}");
507 self.pool_snapshot_pending.insert(*instrument_id);
508 self.pool_updaters_pending.insert(*instrument_id);
509 self.pool_event_buffers.entry(*instrument_id).or_default();
510 }
511 return;
512 }
513 }
514
515 let updater = Rc::new(PoolUpdater::new(instrument_id, self.cache.clone()));
517
518 self.subscribe_pool_updater_topics(*instrument_id, updater.clone());
519 self.pool_updaters.insert(*instrument_id, updater);
520
521 log::debug!("Created PoolUpdater for instrument ID {instrument_id}");
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use std::sync::Arc;
528
529 use alloy_primitives::{Address, I256, U160, U256};
530 use nautilus_model::{
531 defi::{
532 Chain, DefiData, PoolFeeCollect, PoolFlash, PoolIdentifier, PoolLiquidityUpdate,
533 PoolLiquidityUpdateType, PoolSwap,
534 chain::chains,
535 data::DexPoolData,
536 dex::{AmmType, Dex, DexType},
537 },
538 identifiers::{InstrumentId, Symbol, Venue},
539 };
540 use rstest::*;
541
542 use super::*;
543
544 #[fixture]
545 fn test_instrument_id() -> InstrumentId {
546 InstrumentId::new(Symbol::from("ETH/USDC"), Venue::from("UNISWAPV3"))
547 }
548
549 #[fixture]
550 fn test_chain() -> Arc<Chain> {
551 Arc::new(chains::ETHEREUM.clone())
552 }
553
554 #[fixture]
555 fn test_dex(test_chain: Arc<Chain>) -> Arc<Dex> {
556 Arc::new(Dex::new(
557 (*test_chain).clone(),
558 DexType::UniswapV3,
559 "0x1F98431c8aD98523631AE4a59f267346ea31F984",
560 12369621,
561 AmmType::CLAMM,
562 "PoolCreated(address,address,uint24,int24,address)",
563 "Swap(address,address,int256,int256,uint160,uint128,int24)",
564 "Mint(address,address,int24,int24,uint128,uint256,uint256)",
565 "Burn(address,int24,int24,uint128,uint256,uint256)",
566 "Collect(address,address,int24,int24,uint128,uint128)",
567 ))
568 }
569
570 fn create_test_swap(
571 test_instrument_id: InstrumentId,
572 test_chain: Arc<Chain>,
573 test_dex: Arc<Dex>,
574 block: u64,
575 tx_index: u32,
576 log_index: u32,
577 ) -> PoolSwap {
578 PoolSwap::new(
579 test_chain,
580 test_dex,
581 test_instrument_id,
582 PoolIdentifier::from_address(Address::ZERO),
583 block,
584 format!("0x{block:064x}"),
585 tx_index,
586 log_index,
587 None,
588 Address::ZERO,
589 Address::ZERO,
590 I256::ZERO,
591 I256::ZERO,
592 U160::ZERO,
593 0,
594 0,
595 )
596 }
597
598 fn create_test_liquidity_update(
599 test_instrument_id: InstrumentId,
600 test_chain: Arc<Chain>,
601 test_dex: Arc<Dex>,
602 block: u64,
603 tx_index: u32,
604 log_index: u32,
605 ) -> PoolLiquidityUpdate {
606 PoolLiquidityUpdate::new(
607 test_chain,
608 test_dex,
609 test_instrument_id,
610 PoolIdentifier::from_address(Address::ZERO),
611 PoolLiquidityUpdateType::Mint,
612 block,
613 format!("0x{block:064x}"),
614 tx_index,
615 log_index,
616 None,
617 Address::ZERO,
618 0,
619 U256::ZERO,
620 U256::ZERO,
621 0,
622 0,
623 None,
624 )
625 }
626
627 fn create_test_fee_collect(
628 test_instrument_id: InstrumentId,
629 test_chain: Arc<Chain>,
630 test_dex: Arc<Dex>,
631 block: u64,
632 tx_index: u32,
633 log_index: u32,
634 ) -> PoolFeeCollect {
635 PoolFeeCollect::new(
636 test_chain,
637 test_dex,
638 test_instrument_id,
639 PoolIdentifier::from_address(Address::ZERO),
640 block,
641 format!("0x{block:064x}"),
642 tx_index,
643 log_index,
644 Address::ZERO,
645 0,
646 0,
647 0,
648 0,
649 None,
650 )
651 }
652
653 fn create_test_flash(
654 test_instrument_id: InstrumentId,
655 test_chain: Arc<Chain>,
656 test_dex: Arc<Dex>,
657 block: u64,
658 tx_index: u32,
659 log_index: u32,
660 ) -> PoolFlash {
661 PoolFlash::new(
662 test_chain,
663 test_dex,
664 test_instrument_id,
665 PoolIdentifier::from_address(Address::ZERO),
666 block,
667 format!("0x{block:064x}"),
668 tx_index,
669 log_index,
670 None,
671 Address::ZERO,
672 Address::ZERO,
673 U256::ZERO,
674 U256::ZERO,
675 U256::ZERO,
676 U256::ZERO,
677 )
678 }
679
680 #[rstest]
681 fn test_get_event_block_position_swap(
682 test_instrument_id: InstrumentId,
683 test_chain: Arc<Chain>,
684 test_dex: Arc<Dex>,
685 ) {
686 let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
687 let pos = get_event_block_position(&DexPoolData::Swap(swap));
688 assert_eq!(pos, (100, 5, 3));
689 }
690
691 #[rstest]
692 fn test_get_event_block_position_liquidity_update(
693 test_instrument_id: InstrumentId,
694 test_chain: Arc<Chain>,
695 test_dex: Arc<Dex>,
696 ) {
697 let update =
698 create_test_liquidity_update(test_instrument_id, test_chain, test_dex, 200, 10, 7);
699 let pos = get_event_block_position(&DexPoolData::LiquidityUpdate(update));
700 assert_eq!(pos, (200, 10, 7));
701 }
702
703 #[rstest]
704 fn test_get_event_block_position_fee_collect(
705 test_instrument_id: InstrumentId,
706 test_chain: Arc<Chain>,
707 test_dex: Arc<Dex>,
708 ) {
709 let collect = create_test_fee_collect(test_instrument_id, test_chain, test_dex, 300, 15, 2);
710 let pos = get_event_block_position(&DexPoolData::FeeCollect(collect));
711 assert_eq!(pos, (300, 15, 2));
712 }
713
714 #[rstest]
715 fn test_get_event_block_position_flash(
716 test_instrument_id: InstrumentId,
717 test_chain: Arc<Chain>,
718 test_dex: Arc<Dex>,
719 ) {
720 let flash = create_test_flash(test_instrument_id, test_chain, test_dex, 400, 20, 8);
721 let pos = get_event_block_position(&DexPoolData::Flash(flash));
722 assert_eq!(pos, (400, 20, 8));
723 }
724
725 #[rstest]
726 fn test_convert_and_sort_empty_events() {
727 let events = convert_and_sort_buffered_events(vec![]);
728 assert!(events.is_empty());
729 }
730
731 #[rstest]
732 fn test_convert_and_sort_filters_non_pool_events(
733 test_instrument_id: InstrumentId,
734 test_chain: Arc<Chain>,
735 test_dex: Arc<Dex>,
736 ) {
737 let events = vec![
738 DefiData::PoolSwap(create_test_swap(
739 test_instrument_id,
740 test_chain,
741 test_dex,
742 100,
743 0,
744 0,
745 )),
746 ];
748 let sorted = convert_and_sort_buffered_events(events);
749 assert_eq!(sorted.len(), 1);
750 }
751
752 #[rstest]
753 fn test_convert_and_sort_single_event(
754 test_instrument_id: InstrumentId,
755 test_chain: Arc<Chain>,
756 test_dex: Arc<Dex>,
757 ) {
758 let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
759 let events = vec![DefiData::PoolSwap(swap)];
760 let sorted = convert_and_sort_buffered_events(events);
761 assert_eq!(sorted.len(), 1);
762 assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 3));
763 }
764
765 #[rstest]
766 fn test_convert_and_sort_already_sorted(
767 test_instrument_id: InstrumentId,
768 test_chain: Arc<Chain>,
769 test_dex: Arc<Dex>,
770 ) {
771 let events = vec![
772 DefiData::PoolSwap(create_test_swap(
773 test_instrument_id,
774 test_chain.clone(),
775 test_dex.clone(),
776 100,
777 0,
778 0,
779 )),
780 DefiData::PoolSwap(create_test_swap(
781 test_instrument_id,
782 test_chain.clone(),
783 test_dex.clone(),
784 100,
785 0,
786 1,
787 )),
788 DefiData::PoolSwap(create_test_swap(
789 test_instrument_id,
790 test_chain,
791 test_dex,
792 100,
793 1,
794 0,
795 )),
796 ];
797 let sorted = convert_and_sort_buffered_events(events);
798 assert_eq!(sorted.len(), 3);
799 assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
800 assert_eq!(get_event_block_position(&sorted[1]), (100, 0, 1));
801 assert_eq!(get_event_block_position(&sorted[2]), (100, 1, 0));
802 }
803
804 #[rstest]
805 fn test_convert_and_sort_reverse_order(
806 test_instrument_id: InstrumentId,
807 test_chain: Arc<Chain>,
808 test_dex: Arc<Dex>,
809 ) {
810 let events = vec![
811 DefiData::PoolSwap(create_test_swap(
812 test_instrument_id,
813 test_chain.clone(),
814 test_dex.clone(),
815 100,
816 2,
817 5,
818 )),
819 DefiData::PoolSwap(create_test_swap(
820 test_instrument_id,
821 test_chain.clone(),
822 test_dex.clone(),
823 100,
824 1,
825 3,
826 )),
827 DefiData::PoolSwap(create_test_swap(
828 test_instrument_id,
829 test_chain,
830 test_dex,
831 100,
832 0,
833 1,
834 )),
835 ];
836 let sorted = convert_and_sort_buffered_events(events);
837 assert_eq!(sorted.len(), 3);
838 assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 1));
839 assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 3));
840 assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 5));
841 }
842
843 #[rstest]
844 fn test_convert_and_sort_mixed_blocks(
845 test_instrument_id: InstrumentId,
846 test_chain: Arc<Chain>,
847 test_dex: Arc<Dex>,
848 ) {
849 let events = vec![
850 DefiData::PoolSwap(create_test_swap(
851 test_instrument_id,
852 test_chain.clone(),
853 test_dex.clone(),
854 102,
855 0,
856 0,
857 )),
858 DefiData::PoolSwap(create_test_swap(
859 test_instrument_id,
860 test_chain.clone(),
861 test_dex.clone(),
862 100,
863 5,
864 2,
865 )),
866 DefiData::PoolSwap(create_test_swap(
867 test_instrument_id,
868 test_chain,
869 test_dex,
870 101,
871 3,
872 1,
873 )),
874 ];
875 let sorted = convert_and_sort_buffered_events(events);
876 assert_eq!(sorted.len(), 3);
877 assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 2));
878 assert_eq!(get_event_block_position(&sorted[1]), (101, 3, 1));
879 assert_eq!(get_event_block_position(&sorted[2]), (102, 0, 0));
880 }
881
882 #[rstest]
883 fn test_convert_and_sort_mixed_event_types(
884 test_instrument_id: InstrumentId,
885 test_chain: Arc<Chain>,
886 test_dex: Arc<Dex>,
887 ) {
888 let events = vec![
889 DefiData::PoolSwap(create_test_swap(
890 test_instrument_id,
891 test_chain.clone(),
892 test_dex.clone(),
893 100,
894 2,
895 0,
896 )),
897 DefiData::PoolLiquidityUpdate(create_test_liquidity_update(
898 test_instrument_id,
899 test_chain.clone(),
900 test_dex.clone(),
901 100,
902 0,
903 0,
904 )),
905 DefiData::PoolFeeCollect(create_test_fee_collect(
906 test_instrument_id,
907 test_chain.clone(),
908 test_dex.clone(),
909 100,
910 1,
911 0,
912 )),
913 DefiData::PoolFlash(create_test_flash(
914 test_instrument_id,
915 test_chain,
916 test_dex,
917 100,
918 3,
919 0,
920 )),
921 ];
922 let sorted = convert_and_sort_buffered_events(events);
923 assert_eq!(sorted.len(), 4);
924 assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
925 assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 0));
926 assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 0));
927 assert_eq!(get_event_block_position(&sorted[3]), (100, 3, 0));
928 }
929
930 #[rstest]
931 fn test_convert_and_sort_same_block_and_tx_different_log_index(
932 test_instrument_id: InstrumentId,
933 test_chain: Arc<Chain>,
934 test_dex: Arc<Dex>,
935 ) {
936 let events = vec![
937 DefiData::PoolSwap(create_test_swap(
938 test_instrument_id,
939 test_chain.clone(),
940 test_dex.clone(),
941 100,
942 5,
943 10,
944 )),
945 DefiData::PoolSwap(create_test_swap(
946 test_instrument_id,
947 test_chain.clone(),
948 test_dex.clone(),
949 100,
950 5,
951 5,
952 )),
953 DefiData::PoolSwap(create_test_swap(
954 test_instrument_id,
955 test_chain,
956 test_dex,
957 100,
958 5,
959 1,
960 )),
961 ];
962 let sorted = convert_and_sort_buffered_events(events);
963 assert_eq!(sorted.len(), 3);
964 assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 1));
965 assert_eq!(get_event_block_position(&sorted[1]), (100, 5, 5));
966 assert_eq!(get_event_block_position(&sorted[2]), (100, 5, 10));
967 }
968}