Skip to main content

nautilus_data/defi/
engine.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! DeFi-specific data engine functionality.
17//!
18//! This module provides DeFi processing methods for the `DataEngine`.
19//! All code in this module requires the `defi` feature flag.
20
21use std::{rc::Rc, sync::Arc};
22
23use nautilus_common::{
24    defi,
25    messages::defi::{
26        DefiRequestCommand, DefiSubscribeCommand, DefiUnsubscribeCommand, RequestPoolSnapshot,
27    },
28    msgbus::{self, TypedHandler},
29};
30use nautilus_core::UUID4;
31use nautilus_model::{
32    defi::{
33        Blockchain, DefiData, PoolProfiler,
34        data::{DexPoolData, block::BlockPosition},
35    },
36    identifiers::{ClientId, InstrumentId},
37};
38
39use crate::engine::{
40    DataEngine,
41    pool::{
42        PoolCollectHandler, PoolFlashHandler, PoolLiquidityHandler, PoolSwapHandler, PoolUpdater,
43    },
44};
45
46/// Extracts the block position tuple from a DexPoolData event.
47fn get_event_block_position(event: &DexPoolData) -> (u64, u32, u32) {
48    match event {
49        DexPoolData::Swap(s) => (s.block, s.transaction_index, s.log_index),
50        DexPoolData::LiquidityUpdate(u) => (u.block, u.transaction_index, u.log_index),
51        DexPoolData::FeeCollect(c) => (c.block, c.transaction_index, c.log_index),
52        DexPoolData::Flash(f) => (f.block, f.transaction_index, f.log_index),
53    }
54}
55
56/// Converts buffered DefiData events to DexPoolData and sorts by block position.
57fn convert_and_sort_buffered_events(buffered_events: Vec<DefiData>) -> Vec<DexPoolData> {
58    let mut events: Vec<DexPoolData> = buffered_events
59        .into_iter()
60        .filter_map(|event| match event {
61            DefiData::PoolSwap(swap) => Some(DexPoolData::Swap(swap)),
62            DefiData::PoolLiquidityUpdate(update) => Some(DexPoolData::LiquidityUpdate(update)),
63            DefiData::PoolFeeCollect(collect) => Some(DexPoolData::FeeCollect(collect)),
64            DefiData::PoolFlash(flash) => Some(DexPoolData::Flash(flash)),
65            _ => None,
66        })
67        .collect();
68
69    events.sort_by(|a, b| {
70        let pos_a = get_event_block_position(a);
71        let pos_b = get_event_block_position(b);
72        pos_a.cmp(&pos_b)
73    });
74
75    events
76}
77
78impl DataEngine {
79    /// Returns all blockchains for which blocks subscriptions exist.
80    #[must_use]
81    pub fn subscribed_blocks(&self) -> Vec<Blockchain> {
82        self.collect_subscriptions(|client| &client.subscriptions_blocks)
83    }
84
85    /// Returns all instrument IDs for which pool subscriptions exist.
86    #[must_use]
87    pub fn subscribed_pools(&self) -> Vec<InstrumentId> {
88        self.collect_subscriptions(|client| &client.subscriptions_pools)
89    }
90
91    /// Returns all instrument IDs for which swap subscriptions exist.
92    #[must_use]
93    pub fn subscribed_pool_swaps(&self) -> Vec<InstrumentId> {
94        self.collect_subscriptions(|client| &client.subscriptions_pool_swaps)
95    }
96
97    /// Returns all instrument IDs for which liquidity update subscriptions exist.
98    #[must_use]
99    pub fn subscribed_pool_liquidity_updates(&self) -> Vec<InstrumentId> {
100        self.collect_subscriptions(|client| &client.subscriptions_pool_liquidity_updates)
101    }
102
103    /// Returns all instrument IDs for which fee collect subscriptions exist.
104    #[must_use]
105    pub fn subscribed_pool_fee_collects(&self) -> Vec<InstrumentId> {
106        self.collect_subscriptions(|client| &client.subscriptions_pool_fee_collects)
107    }
108
109    /// Returns all instrument IDs for which flash loan subscriptions exist.
110    #[must_use]
111    pub fn subscribed_pool_flash(&self) -> Vec<InstrumentId> {
112        self.collect_subscriptions(|client| &client.subscriptions_pool_flash)
113    }
114
115    /// Handles a subscribe command, updating internal state and forwarding to the client.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
120    /// or if the underlying client operation fails.
121    pub fn execute_defi_subscribe(&mut self, cmd: DefiSubscribeCommand) -> anyhow::Result<()> {
122        if let Some(client_id) = cmd.client_id()
123            && self.external_clients.contains(client_id)
124        {
125            if self.config.debug {
126                log::debug!("Skipping defi subscribe for external client {client_id}: {cmd:?}",);
127            }
128            return Ok(());
129        }
130
131        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
132            log::info!("Forwarding subscription to client {}", client.client_id);
133            client.execute_defi_subscribe(cmd.clone());
134        } else {
135            log::error!(
136                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
137                cmd.client_id(),
138                cmd.venue(),
139            );
140        }
141
142        match cmd {
143            DefiSubscribeCommand::Pool(cmd) => {
144                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
145            }
146            DefiSubscribeCommand::PoolSwaps(cmd) => {
147                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
148            }
149            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
150                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
151            }
152            DefiSubscribeCommand::PoolFeeCollects(cmd) => {
153                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
154            }
155            DefiSubscribeCommand::PoolFlashEvents(cmd) => {
156                self.setup_pool_updater(&cmd.instrument_id, cmd.client_id.as_ref());
157            }
158            DefiSubscribeCommand::Blocks(_) => {} // No pool setup needed for blocks
159        }
160
161        Ok(())
162    }
163
164    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
165    ///
166    /// # Errors
167    ///
168    /// Returns an error if the underlying client operation fails.
169    pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) -> anyhow::Result<()> {
170        if let Some(client_id) = cmd.client_id()
171            && self.external_clients.contains(client_id)
172        {
173            if self.config.debug {
174                log::debug!("Skipping defi unsubscribe for external client {client_id}: {cmd:?}",);
175            }
176            return Ok(());
177        }
178
179        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
180            client.execute_defi_unsubscribe(cmd);
181        } else {
182            log::error!(
183                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
184                cmd.client_id(),
185                cmd.venue(),
186            );
187        }
188
189        Ok(())
190    }
191
192    /// Sends a [`DefiRequestCommand`] to a suitable data client implementation.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if no client is found for the given client ID or venue,
197    /// or if the client fails to process the request.
198    pub fn execute_defi_request(&mut self, req: DefiRequestCommand) -> anyhow::Result<()> {
199        // Skip requests for external clients
200        if let Some(cid) = req.client_id()
201            && self.external_clients.contains(cid)
202        {
203            if self.config.debug {
204                log::debug!("Skipping defi data request for external client {cid}: {req:?}");
205            }
206            return Ok(());
207        }
208
209        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
210            client.execute_defi_request(req)
211        } else {
212            anyhow::bail!(
213                "Cannot handle request: no client found for {:?} {:?}",
214                req.client_id(),
215                req.venue()
216            );
217        }
218    }
219
220    /// Processes DeFi-specific data events.
221    pub fn process_defi_data(&mut self, data: DefiData) {
222        self.increment_data_count();
223
224        match data {
225            DefiData::Block(block) => {
226                let topic = defi::switchboard::get_defi_blocks_topic(block.chain());
227                msgbus::publish_defi_block(topic, &block);
228            }
229            DefiData::Pool(pool) => {
230                if let Err(e) = self.cache.borrow_mut().add_pool(pool.clone()) {
231                    log::error!("Failed to add Pool to cache: {e}");
232                }
233
234                // Check if pool profiler creation was deferred
235                if self.pool_updaters_pending.remove(&pool.instrument_id) {
236                    log::info!(
237                        "Pool {} now loaded, creating deferred pool profiler",
238                        pool.instrument_id
239                    );
240                    self.setup_pool_updater(&pool.instrument_id, None);
241                }
242
243                let topic = defi::switchboard::get_defi_pool_topic(pool.instrument_id);
244                msgbus::publish_defi_pool(topic, &pool);
245            }
246            DefiData::PoolSnapshot(snapshot) => {
247                let instrument_id = snapshot.instrument_id;
248                log::info!(
249                    "Received pool snapshot for {instrument_id} at block {} with {} positions and {} ticks",
250                    snapshot.block_position.number,
251                    snapshot.positions.len(),
252                    snapshot.ticks.len()
253                );
254
255                // Validate we're expecting this snapshot
256                if !self.pool_snapshot_pending.contains(&instrument_id) {
257                    log::warn!(
258                        "Received unexpected pool snapshot for {instrument_id} (not in pending set)"
259                    );
260                    return;
261                }
262
263                // Get pool from cache
264                let pool = match self.cache.borrow().pool(&instrument_id) {
265                    Some(pool) => Arc::new(pool.clone()),
266                    None => {
267                        log::error!(
268                            "Pool {instrument_id} not found in cache when processing snapshot"
269                        );
270                        return;
271                    }
272                };
273
274                // Create profiler and restore from snapshot
275                let mut profiler = PoolProfiler::new(pool);
276                if let Err(e) = profiler.restore_from_snapshot(snapshot.clone()) {
277                    log::error!(
278                        "Failed to restore profiler from snapshot for {instrument_id}: {e}"
279                    );
280                    return;
281                }
282                log::debug!("Restored pool profiler for {instrument_id} from snapshot");
283
284                // Process buffered events
285                let buffered_events = self
286                    .pool_event_buffers
287                    .remove(&instrument_id)
288                    .unwrap_or_default();
289
290                if !buffered_events.is_empty() {
291                    log::info!(
292                        "Processing {} buffered events for {instrument_id}",
293                        buffered_events.len()
294                    );
295
296                    let events_to_apply = convert_and_sort_buffered_events(buffered_events);
297                    let applied_count = Self::apply_buffered_events_to_profiler(
298                        &mut profiler,
299                        events_to_apply,
300                        &snapshot.block_position,
301                        instrument_id,
302                    );
303
304                    log::info!(
305                        "Applied {applied_count} buffered events to profiler for {instrument_id}"
306                    );
307                }
308
309                // Add profiler to cache
310                if let Err(e) = self.cache.borrow_mut().add_pool_profiler(profiler) {
311                    log::error!("Failed to add pool profiler to cache for {instrument_id}: {e}");
312                    return;
313                }
314
315                // Create updater and subscribe to topics
316                self.pool_snapshot_pending.remove(&instrument_id);
317                let updater = Rc::new(PoolUpdater::new(&instrument_id, self.cache.clone()));
318
319                self.subscribe_pool_updater_topics(instrument_id, updater.clone());
320                self.pool_updaters.insert(instrument_id, updater);
321
322                log::info!(
323                    "Pool profiler setup completed for {instrument_id}, now processing live events"
324                );
325            }
326            DefiData::PoolSwap(swap) => {
327                let instrument_id = swap.instrument_id;
328                // Buffer if waiting for snapshot, otherwise publish
329                if self.pool_snapshot_pending.contains(&instrument_id) {
330                    log::debug!("Buffering swap event for {instrument_id} (waiting for snapshot)");
331                    self.pool_event_buffers
332                        .entry(instrument_id)
333                        .or_default()
334                        .push(DefiData::PoolSwap(swap));
335                } else {
336                    let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
337                    msgbus::publish_defi_swap(topic, &swap);
338                }
339            }
340            DefiData::PoolLiquidityUpdate(update) => {
341                let instrument_id = update.instrument_id;
342                // Buffer if waiting for snapshot, otherwise publish
343                if self.pool_snapshot_pending.contains(&instrument_id) {
344                    log::debug!(
345                        "Buffering liquidity update event for {instrument_id} (waiting for snapshot)"
346                    );
347                    self.pool_event_buffers
348                        .entry(instrument_id)
349                        .or_default()
350                        .push(DefiData::PoolLiquidityUpdate(update));
351                } else {
352                    let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
353                    msgbus::publish_defi_liquidity(topic, &update);
354                }
355            }
356            DefiData::PoolFeeCollect(collect) => {
357                let instrument_id = collect.instrument_id;
358                // Buffer if waiting for snapshot, otherwise publish
359                if self.pool_snapshot_pending.contains(&instrument_id) {
360                    log::debug!(
361                        "Buffering fee collect event for {instrument_id} (waiting for snapshot)"
362                    );
363                    self.pool_event_buffers
364                        .entry(instrument_id)
365                        .or_default()
366                        .push(DefiData::PoolFeeCollect(collect));
367                } else {
368                    let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
369                    msgbus::publish_defi_collect(topic, &collect);
370                }
371            }
372            DefiData::PoolFlash(flash) => {
373                let instrument_id = flash.instrument_id;
374                // Buffer if waiting for snapshot, otherwise publish
375                if self.pool_snapshot_pending.contains(&instrument_id) {
376                    log::debug!("Buffering flash event for {instrument_id} (waiting for snapshot)");
377                    self.pool_event_buffers
378                        .entry(instrument_id)
379                        .or_default()
380                        .push(DefiData::PoolFlash(flash));
381                } else {
382                    let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
383                    msgbus::publish_defi_flash(topic, &flash);
384                }
385            }
386        }
387    }
388
389    /// Subscribes a pool updater to all relevant pool data topics using typed handlers.
390    fn subscribe_pool_updater_topics(&self, instrument_id: InstrumentId, updater: Rc<PoolUpdater>) {
391        let priority = Some(self.msgbus_priority);
392
393        // Subscribe swap handler
394        let swap_topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
395        let swap_handler = TypedHandler(Rc::new(PoolSwapHandler::new(updater.clone())));
396        msgbus::subscribe_defi_swaps(swap_topic.into(), swap_handler, priority);
397
398        // Subscribe liquidity handler
399        let liq_topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
400        let liq_handler = TypedHandler(Rc::new(PoolLiquidityHandler::new(updater.clone())));
401        msgbus::subscribe_defi_liquidity(liq_topic.into(), liq_handler, priority);
402
403        // Subscribe collect handler
404        let collect_topic = defi::switchboard::get_defi_collect_topic(instrument_id);
405        let collect_handler = TypedHandler(Rc::new(PoolCollectHandler::new(updater.clone())));
406        msgbus::subscribe_defi_collects(collect_topic.into(), collect_handler, priority);
407
408        // Subscribe flash handler
409        let flash_topic = defi::switchboard::get_defi_flash_topic(instrument_id);
410        let flash_handler = TypedHandler(Rc::new(PoolFlashHandler::new(updater)));
411        msgbus::subscribe_defi_flash(flash_topic.into(), flash_handler, priority);
412    }
413
414    /// Applies buffered events to a pool profiler, filtering to events after the snapshot.
415    ///
416    /// Returns the count of successfully applied events.
417    fn apply_buffered_events_to_profiler(
418        profiler: &mut PoolProfiler,
419        events: Vec<DexPoolData>,
420        snapshot_block: &BlockPosition,
421        instrument_id: InstrumentId,
422    ) -> usize {
423        let mut applied_count = 0;
424
425        for event in events {
426            let event_block = get_event_block_position(&event);
427
428            // Only apply events that occurred after the snapshot
429            let is_after_snapshot = event_block.0 > snapshot_block.number
430                || (event_block.0 == snapshot_block.number
431                    && event_block.1 > snapshot_block.transaction_index)
432                || (event_block.0 == snapshot_block.number
433                    && event_block.1 == snapshot_block.transaction_index
434                    && event_block.2 > snapshot_block.log_index);
435
436            if is_after_snapshot {
437                if let Err(e) = profiler.process(&event) {
438                    log::error!(
439                        "Failed to apply buffered event to profiler for {instrument_id}: {e}"
440                    );
441                } else {
442                    applied_count += 1;
443                }
444            }
445        }
446
447        applied_count
448    }
449
450    fn setup_pool_updater(&mut self, instrument_id: &InstrumentId, client_id: Option<&ClientId>) {
451        // Early return if updater already exists or we are in the middle of setting it up.
452        if self.pool_updaters.contains_key(instrument_id)
453            || self.pool_updaters_pending.contains(instrument_id)
454        {
455            log::debug!("Pool updater for {instrument_id} already exists");
456            return;
457        }
458
459        log::info!("Setting up pool updater for {instrument_id}");
460
461        // Check cache state and ensure profiler exists
462        {
463            let mut cache = self.cache.borrow_mut();
464
465            if cache.pool_profiler(instrument_id).is_some() {
466                // Profiler already exists, proceed to create updater
467                log::debug!("Pool profiler already exists for {instrument_id}");
468            } else if let Some(pool) = cache.pool(instrument_id) {
469                // Pool exists but no profiler, create profiler from pool
470                let pool = Arc::new(pool.clone());
471                let mut pool_profiler = PoolProfiler::new(pool.clone());
472
473                if let Some(initial_sqrt_price_x96) = pool.initial_sqrt_price_x96 {
474                    pool_profiler.initialize(initial_sqrt_price_x96);
475                    log::debug!(
476                        "Initialized pool profiler for {instrument_id} with sqrt_price {initial_sqrt_price_x96}"
477                    );
478                } else {
479                    log::debug!("Created pool profiler for {instrument_id}");
480                }
481
482                if let Err(e) = cache.add_pool_profiler(pool_profiler) {
483                    log::error!("Failed to add pool profiler for {instrument_id}: {e}");
484                    drop(cache);
485                    return;
486                }
487                drop(cache);
488            } else {
489                // Neither profiler nor pool exists, request snapshot
490                drop(cache);
491
492                let request_id = UUID4::new();
493                let ts_init = self.clock.borrow().timestamp_ns();
494                let request = RequestPoolSnapshot::new(
495                    *instrument_id,
496                    client_id.copied(),
497                    request_id,
498                    ts_init,
499                    None,
500                );
501
502                if let Err(e) = self.execute_defi_request(DefiRequestCommand::PoolSnapshot(request))
503                {
504                    log::warn!("Failed to request pool snapshot for {instrument_id}: {e}");
505                } else {
506                    log::debug!("Requested pool snapshot for {instrument_id}");
507                    self.pool_snapshot_pending.insert(*instrument_id);
508                    self.pool_updaters_pending.insert(*instrument_id);
509                    self.pool_event_buffers.entry(*instrument_id).or_default();
510                }
511                return;
512            }
513        }
514
515        // Profiler exists, create updater and subscribe to topics
516        let updater = Rc::new(PoolUpdater::new(instrument_id, self.cache.clone()));
517
518        self.subscribe_pool_updater_topics(*instrument_id, updater.clone());
519        self.pool_updaters.insert(*instrument_id, updater);
520
521        log::debug!("Created PoolUpdater for instrument ID {instrument_id}");
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use std::sync::Arc;
528
529    use alloy_primitives::{Address, I256, U160, U256};
530    use nautilus_model::{
531        defi::{
532            Chain, DefiData, PoolFeeCollect, PoolFlash, PoolIdentifier, PoolLiquidityUpdate,
533            PoolLiquidityUpdateType, PoolSwap,
534            chain::chains,
535            data::DexPoolData,
536            dex::{AmmType, Dex, DexType},
537        },
538        identifiers::{InstrumentId, Symbol, Venue},
539    };
540    use rstest::*;
541
542    use super::*;
543
544    #[fixture]
545    fn test_instrument_id() -> InstrumentId {
546        InstrumentId::new(Symbol::from("ETH/USDC"), Venue::from("UNISWAPV3"))
547    }
548
549    #[fixture]
550    fn test_chain() -> Arc<Chain> {
551        Arc::new(chains::ETHEREUM.clone())
552    }
553
554    #[fixture]
555    fn test_dex(test_chain: Arc<Chain>) -> Arc<Dex> {
556        Arc::new(Dex::new(
557            (*test_chain).clone(),
558            DexType::UniswapV3,
559            "0x1F98431c8aD98523631AE4a59f267346ea31F984",
560            12369621,
561            AmmType::CLAMM,
562            "PoolCreated(address,address,uint24,int24,address)",
563            "Swap(address,address,int256,int256,uint160,uint128,int24)",
564            "Mint(address,address,int24,int24,uint128,uint256,uint256)",
565            "Burn(address,int24,int24,uint128,uint256,uint256)",
566            "Collect(address,address,int24,int24,uint128,uint128)",
567        ))
568    }
569
570    fn create_test_swap(
571        test_instrument_id: InstrumentId,
572        test_chain: Arc<Chain>,
573        test_dex: Arc<Dex>,
574        block: u64,
575        tx_index: u32,
576        log_index: u32,
577    ) -> PoolSwap {
578        PoolSwap::new(
579            test_chain,
580            test_dex,
581            test_instrument_id,
582            PoolIdentifier::from_address(Address::ZERO),
583            block,
584            format!("0x{block:064x}"),
585            tx_index,
586            log_index,
587            None,
588            Address::ZERO,
589            Address::ZERO,
590            I256::ZERO,
591            I256::ZERO,
592            U160::ZERO,
593            0,
594            0,
595        )
596    }
597
598    fn create_test_liquidity_update(
599        test_instrument_id: InstrumentId,
600        test_chain: Arc<Chain>,
601        test_dex: Arc<Dex>,
602        block: u64,
603        tx_index: u32,
604        log_index: u32,
605    ) -> PoolLiquidityUpdate {
606        PoolLiquidityUpdate::new(
607            test_chain,
608            test_dex,
609            test_instrument_id,
610            PoolIdentifier::from_address(Address::ZERO),
611            PoolLiquidityUpdateType::Mint,
612            block,
613            format!("0x{block:064x}"),
614            tx_index,
615            log_index,
616            None,
617            Address::ZERO,
618            0,
619            U256::ZERO,
620            U256::ZERO,
621            0,
622            0,
623            None,
624        )
625    }
626
627    fn create_test_fee_collect(
628        test_instrument_id: InstrumentId,
629        test_chain: Arc<Chain>,
630        test_dex: Arc<Dex>,
631        block: u64,
632        tx_index: u32,
633        log_index: u32,
634    ) -> PoolFeeCollect {
635        PoolFeeCollect::new(
636            test_chain,
637            test_dex,
638            test_instrument_id,
639            PoolIdentifier::from_address(Address::ZERO),
640            block,
641            format!("0x{block:064x}"),
642            tx_index,
643            log_index,
644            Address::ZERO,
645            0,
646            0,
647            0,
648            0,
649            None,
650        )
651    }
652
653    fn create_test_flash(
654        test_instrument_id: InstrumentId,
655        test_chain: Arc<Chain>,
656        test_dex: Arc<Dex>,
657        block: u64,
658        tx_index: u32,
659        log_index: u32,
660    ) -> PoolFlash {
661        PoolFlash::new(
662            test_chain,
663            test_dex,
664            test_instrument_id,
665            PoolIdentifier::from_address(Address::ZERO),
666            block,
667            format!("0x{block:064x}"),
668            tx_index,
669            log_index,
670            None,
671            Address::ZERO,
672            Address::ZERO,
673            U256::ZERO,
674            U256::ZERO,
675            U256::ZERO,
676            U256::ZERO,
677        )
678    }
679
680    #[rstest]
681    fn test_get_event_block_position_swap(
682        test_instrument_id: InstrumentId,
683        test_chain: Arc<Chain>,
684        test_dex: Arc<Dex>,
685    ) {
686        let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
687        let pos = get_event_block_position(&DexPoolData::Swap(swap));
688        assert_eq!(pos, (100, 5, 3));
689    }
690
691    #[rstest]
692    fn test_get_event_block_position_liquidity_update(
693        test_instrument_id: InstrumentId,
694        test_chain: Arc<Chain>,
695        test_dex: Arc<Dex>,
696    ) {
697        let update =
698            create_test_liquidity_update(test_instrument_id, test_chain, test_dex, 200, 10, 7);
699        let pos = get_event_block_position(&DexPoolData::LiquidityUpdate(update));
700        assert_eq!(pos, (200, 10, 7));
701    }
702
703    #[rstest]
704    fn test_get_event_block_position_fee_collect(
705        test_instrument_id: InstrumentId,
706        test_chain: Arc<Chain>,
707        test_dex: Arc<Dex>,
708    ) {
709        let collect = create_test_fee_collect(test_instrument_id, test_chain, test_dex, 300, 15, 2);
710        let pos = get_event_block_position(&DexPoolData::FeeCollect(collect));
711        assert_eq!(pos, (300, 15, 2));
712    }
713
714    #[rstest]
715    fn test_get_event_block_position_flash(
716        test_instrument_id: InstrumentId,
717        test_chain: Arc<Chain>,
718        test_dex: Arc<Dex>,
719    ) {
720        let flash = create_test_flash(test_instrument_id, test_chain, test_dex, 400, 20, 8);
721        let pos = get_event_block_position(&DexPoolData::Flash(flash));
722        assert_eq!(pos, (400, 20, 8));
723    }
724
725    #[rstest]
726    fn test_convert_and_sort_empty_events() {
727        let events = convert_and_sort_buffered_events(vec![]);
728        assert!(events.is_empty());
729    }
730
731    #[rstest]
732    fn test_convert_and_sort_filters_non_pool_events(
733        test_instrument_id: InstrumentId,
734        test_chain: Arc<Chain>,
735        test_dex: Arc<Dex>,
736    ) {
737        let events = vec![
738            DefiData::PoolSwap(create_test_swap(
739                test_instrument_id,
740                test_chain,
741                test_dex,
742                100,
743                0,
744                0,
745            )),
746            // Block events would be filtered out
747        ];
748        let sorted = convert_and_sort_buffered_events(events);
749        assert_eq!(sorted.len(), 1);
750    }
751
752    #[rstest]
753    fn test_convert_and_sort_single_event(
754        test_instrument_id: InstrumentId,
755        test_chain: Arc<Chain>,
756        test_dex: Arc<Dex>,
757    ) {
758        let swap = create_test_swap(test_instrument_id, test_chain, test_dex, 100, 5, 3);
759        let events = vec![DefiData::PoolSwap(swap)];
760        let sorted = convert_and_sort_buffered_events(events);
761        assert_eq!(sorted.len(), 1);
762        assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 3));
763    }
764
765    #[rstest]
766    fn test_convert_and_sort_already_sorted(
767        test_instrument_id: InstrumentId,
768        test_chain: Arc<Chain>,
769        test_dex: Arc<Dex>,
770    ) {
771        let events = vec![
772            DefiData::PoolSwap(create_test_swap(
773                test_instrument_id,
774                test_chain.clone(),
775                test_dex.clone(),
776                100,
777                0,
778                0,
779            )),
780            DefiData::PoolSwap(create_test_swap(
781                test_instrument_id,
782                test_chain.clone(),
783                test_dex.clone(),
784                100,
785                0,
786                1,
787            )),
788            DefiData::PoolSwap(create_test_swap(
789                test_instrument_id,
790                test_chain,
791                test_dex,
792                100,
793                1,
794                0,
795            )),
796        ];
797        let sorted = convert_and_sort_buffered_events(events);
798        assert_eq!(sorted.len(), 3);
799        assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
800        assert_eq!(get_event_block_position(&sorted[1]), (100, 0, 1));
801        assert_eq!(get_event_block_position(&sorted[2]), (100, 1, 0));
802    }
803
804    #[rstest]
805    fn test_convert_and_sort_reverse_order(
806        test_instrument_id: InstrumentId,
807        test_chain: Arc<Chain>,
808        test_dex: Arc<Dex>,
809    ) {
810        let events = vec![
811            DefiData::PoolSwap(create_test_swap(
812                test_instrument_id,
813                test_chain.clone(),
814                test_dex.clone(),
815                100,
816                2,
817                5,
818            )),
819            DefiData::PoolSwap(create_test_swap(
820                test_instrument_id,
821                test_chain.clone(),
822                test_dex.clone(),
823                100,
824                1,
825                3,
826            )),
827            DefiData::PoolSwap(create_test_swap(
828                test_instrument_id,
829                test_chain,
830                test_dex,
831                100,
832                0,
833                1,
834            )),
835        ];
836        let sorted = convert_and_sort_buffered_events(events);
837        assert_eq!(sorted.len(), 3);
838        assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 1));
839        assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 3));
840        assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 5));
841    }
842
843    #[rstest]
844    fn test_convert_and_sort_mixed_blocks(
845        test_instrument_id: InstrumentId,
846        test_chain: Arc<Chain>,
847        test_dex: Arc<Dex>,
848    ) {
849        let events = vec![
850            DefiData::PoolSwap(create_test_swap(
851                test_instrument_id,
852                test_chain.clone(),
853                test_dex.clone(),
854                102,
855                0,
856                0,
857            )),
858            DefiData::PoolSwap(create_test_swap(
859                test_instrument_id,
860                test_chain.clone(),
861                test_dex.clone(),
862                100,
863                5,
864                2,
865            )),
866            DefiData::PoolSwap(create_test_swap(
867                test_instrument_id,
868                test_chain,
869                test_dex,
870                101,
871                3,
872                1,
873            )),
874        ];
875        let sorted = convert_and_sort_buffered_events(events);
876        assert_eq!(sorted.len(), 3);
877        assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 2));
878        assert_eq!(get_event_block_position(&sorted[1]), (101, 3, 1));
879        assert_eq!(get_event_block_position(&sorted[2]), (102, 0, 0));
880    }
881
882    #[rstest]
883    fn test_convert_and_sort_mixed_event_types(
884        test_instrument_id: InstrumentId,
885        test_chain: Arc<Chain>,
886        test_dex: Arc<Dex>,
887    ) {
888        let events = vec![
889            DefiData::PoolSwap(create_test_swap(
890                test_instrument_id,
891                test_chain.clone(),
892                test_dex.clone(),
893                100,
894                2,
895                0,
896            )),
897            DefiData::PoolLiquidityUpdate(create_test_liquidity_update(
898                test_instrument_id,
899                test_chain.clone(),
900                test_dex.clone(),
901                100,
902                0,
903                0,
904            )),
905            DefiData::PoolFeeCollect(create_test_fee_collect(
906                test_instrument_id,
907                test_chain.clone(),
908                test_dex.clone(),
909                100,
910                1,
911                0,
912            )),
913            DefiData::PoolFlash(create_test_flash(
914                test_instrument_id,
915                test_chain,
916                test_dex,
917                100,
918                3,
919                0,
920            )),
921        ];
922        let sorted = convert_and_sort_buffered_events(events);
923        assert_eq!(sorted.len(), 4);
924        assert_eq!(get_event_block_position(&sorted[0]), (100, 0, 0));
925        assert_eq!(get_event_block_position(&sorted[1]), (100, 1, 0));
926        assert_eq!(get_event_block_position(&sorted[2]), (100, 2, 0));
927        assert_eq!(get_event_block_position(&sorted[3]), (100, 3, 0));
928    }
929
930    #[rstest]
931    fn test_convert_and_sort_same_block_and_tx_different_log_index(
932        test_instrument_id: InstrumentId,
933        test_chain: Arc<Chain>,
934        test_dex: Arc<Dex>,
935    ) {
936        let events = vec![
937            DefiData::PoolSwap(create_test_swap(
938                test_instrument_id,
939                test_chain.clone(),
940                test_dex.clone(),
941                100,
942                5,
943                10,
944            )),
945            DefiData::PoolSwap(create_test_swap(
946                test_instrument_id,
947                test_chain.clone(),
948                test_dex.clone(),
949                100,
950                5,
951                5,
952            )),
953            DefiData::PoolSwap(create_test_swap(
954                test_instrument_id,
955                test_chain,
956                test_dex,
957                100,
958                5,
959                1,
960            )),
961        ];
962        let sorted = convert_and_sort_buffered_events(events);
963        assert_eq!(sorted.len(), 3);
964        assert_eq!(get_event_block_position(&sorted[0]), (100, 5, 1));
965        assert_eq!(get_event_block_position(&sorted[1]), (100, 5, 5));
966        assert_eq!(get_event_block_position(&sorted[2]), (100, 5, 10));
967    }
968}