Skip to main content

nautilus_data/
aggregation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregation machinery.
17//!
18//! Defines the `BarAggregator` trait and core aggregation types (tick, volume, value, time),
19//! along with the `BarBuilder` and `BarAggregatorCore` helpers for constructing bars.
20
21use std::{
22    any::Any,
23    cell::RefCell,
24    fmt::Debug,
25    ops::Add,
26    rc::{Rc, Weak},
27};
28
29use ahash::AHashMap;
30use chrono::{Duration, TimeDelta};
31use nautilus_common::{
32    clock::{Clock, TestClock},
33    timer::{TimeEvent, TimeEventCallback},
34};
35use nautilus_core::{
36    UnixNanos,
37    correctness::{self, FAILED},
38    datetime::{
39        add_n_months, add_n_months_nanos, add_n_years, add_n_years_nanos, subtract_n_months_nanos,
40        subtract_n_years_nanos,
41    },
42};
43use nautilus_model::{
44    data::{
45        QuoteTick, TradeTick,
46        bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
47    },
48    enums::{AggregationSource, AggressorSide, BarAggregation, BarIntervalType},
49    identifiers::InstrumentId,
50    instruments::{FixedTickScheme, TickSchemeRule},
51    types::{Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
52};
53
54/// Type alias for bar handler to reduce type complexity.
55type BarHandler = Box<dyn FnMut(Bar)>;
56
57/// Trait for aggregating incoming price and trade events into time-, tick-, volume-, or value-based bars.
58///
59/// Implementors receive updates and produce completed bars via handlers.
60pub trait BarAggregator: Any + Debug {
61    /// The [`BarType`] to be aggregated.
62    fn bar_type(&self) -> BarType;
63    /// If the aggregator is running and will receive data from the message bus.
64    fn is_running(&self) -> bool;
65    /// Sets the running state of the aggregator (receiving updates when `true`).
66    fn set_is_running(&mut self, value: bool);
67    /// Updates the aggregator  with the given price and size.
68    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
69    /// Updates the aggregator with the given quote.
70    fn handle_quote(&mut self, quote: QuoteTick) {
71        let spec = self.bar_type().spec();
72        self.update(
73            quote.extract_price(spec.price_type),
74            quote.extract_size(spec.price_type),
75            quote.ts_init,
76        );
77    }
78    /// Updates the aggregator with the given trade.
79    fn handle_trade(&mut self, trade: TradeTick) {
80        self.update(trade.price, trade.size, trade.ts_init);
81    }
82    /// Updates the aggregator with the given bar.
83    fn handle_bar(&mut self, bar: Bar) {
84        self.update_bar(bar, bar.volume, bar.ts_init);
85    }
86    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
87    /// Stop the aggregator, e.g., cancel timers. Default is no-op.
88    fn stop(&mut self) {}
89    /// Sets historical mode (default implementation does nothing, TimeBarAggregator overrides)
90    fn set_historical_mode(&mut self, _historical_mode: bool, _handler: Box<dyn FnMut(Bar)>) {}
91    /// Sets historical events (default implementation does nothing, TimeBarAggregator overrides)
92    fn set_historical_events(&mut self, _events: Vec<TimeEvent>) {}
93    /// Sets clock for time bar aggregators (default implementation does nothing, TimeBarAggregator overrides)
94    fn set_clock(&mut self, _clock: Rc<RefCell<dyn Clock>>) {}
95    /// Builds a bar from a time event (default implementation does nothing, TimeBarAggregator overrides)
96    fn build_bar(&mut self, _event: &TimeEvent) {}
97    /// Starts the timer for time bar aggregators.
98    /// Default implementation does nothing, TimeBarAggregator overrides.
99    /// Takes an optional Rc to create weak reference internally.
100    fn start_timer(&mut self, _aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {}
101    /// Sets the weak reference to the aggregator wrapper (for historical mode).
102    /// Default implementation does nothing, TimeBarAggregator overrides.
103    fn set_aggregator_weak(&mut self, _weak: Weak<RefCell<Box<dyn BarAggregator>>>) {}
104}
105
106impl dyn BarAggregator {
107    /// Returns a reference to this aggregator as `Any` for downcasting.
108    pub fn as_any(&self) -> &dyn Any {
109        self
110    }
111    /// Returns a mutable reference to this aggregator as `Any` for downcasting.
112    pub fn as_any_mut(&mut self) -> &mut dyn Any {
113        self
114    }
115}
116
117/// Provides a generic bar builder for aggregation.
118#[derive(Debug)]
119pub struct BarBuilder {
120    bar_type: BarType,
121    price_precision: u8,
122    size_precision: u8,
123    initialized: bool,
124    ts_last: UnixNanos,
125    count: usize,
126    last_close: Option<Price>,
127    open: Option<Price>,
128    high: Option<Price>,
129    low: Option<Price>,
130    close: Option<Price>,
131    volume: Quantity,
132}
133
134impl BarBuilder {
135    /// Creates a new [`BarBuilder`] instance.
136    ///
137    /// # Panics
138    ///
139    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
140    #[must_use]
141    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
142        correctness::check_equal(
143            &bar_type.aggregation_source(),
144            &AggregationSource::Internal,
145            "bar_type.aggregation_source",
146            "AggregationSource::Internal",
147        )
148        .expect(FAILED);
149
150        Self {
151            bar_type,
152            price_precision,
153            size_precision,
154            initialized: false,
155            ts_last: UnixNanos::default(),
156            count: 0,
157            last_close: None,
158            open: None,
159            high: None,
160            low: None,
161            close: None,
162            volume: Quantity::zero(size_precision),
163        }
164    }
165
166    /// Updates the builder state with the given price, size, and init timestamp.
167    ///
168    /// # Panics
169    ///
170    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
171    pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
172        if ts_init < self.ts_last {
173            return; // Not applicable
174        }
175
176        if self.open.is_none() {
177            self.open = Some(price);
178            self.high = Some(price);
179            self.low = Some(price);
180            self.initialized = true;
181        } else {
182            if price > self.high.unwrap() {
183                self.high = Some(price);
184            }
185
186            if price < self.low.unwrap() {
187                self.low = Some(price);
188            }
189        }
190
191        self.close = Some(price);
192        self.volume = self.volume.add(size);
193        self.count += 1;
194        self.ts_last = ts_init;
195    }
196
197    /// Updates the builder state with a completed bar, its volume, and the bar init timestamp.
198    ///
199    /// # Panics
200    ///
201    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
202    pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
203        if ts_init < self.ts_last {
204            return; // Not applicable
205        }
206
207        if self.open.is_none() {
208            self.open = Some(bar.open);
209            self.high = Some(bar.high);
210            self.low = Some(bar.low);
211            self.initialized = true;
212        } else {
213            if bar.high > self.high.unwrap() {
214                self.high = Some(bar.high);
215            }
216
217            if bar.low < self.low.unwrap() {
218                self.low = Some(bar.low);
219            }
220        }
221
222        self.close = Some(bar.close);
223        self.volume = self.volume.add(volume);
224        self.count += 1;
225        self.ts_last = ts_init;
226    }
227
228    /// Reset the bar builder.
229    ///
230    /// All stateful fields are reset to their initial value.
231    pub fn reset(&mut self) {
232        self.open = None;
233        self.high = None;
234        self.low = None;
235        self.volume = Quantity::zero(self.size_precision);
236        self.count = 0;
237    }
238
239    /// Return the aggregated bar and reset.
240    pub fn build_now(&mut self) -> Bar {
241        self.build(self.ts_last, self.ts_last)
242    }
243
244    /// Returns the aggregated bar for the given timestamps, then resets the builder.
245    ///
246    /// # Panics
247    ///
248    /// Panics if `open`, `high`, `low`, or `close` values are `None` when building the bar.
249    pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
250        if self.open.is_none() {
251            self.open = self.last_close;
252            self.high = self.last_close;
253            self.low = self.last_close;
254            self.close = self.last_close;
255        }
256
257        if let (Some(close), Some(low)) = (self.close, self.low)
258            && close < low
259        {
260            self.low = Some(close);
261        }
262
263        if let (Some(close), Some(high)) = (self.close, self.high)
264            && close > high
265        {
266            self.high = Some(close);
267        }
268
269        // The open was checked, so we can assume all prices are Some
270        let bar = Bar::new(
271            self.bar_type,
272            self.open.unwrap(),
273            self.high.unwrap(),
274            self.low.unwrap(),
275            self.close.unwrap(),
276            self.volume,
277            ts_event,
278            ts_init,
279        );
280
281        self.last_close = self.close;
282        self.reset();
283        bar
284    }
285}
286
287/// Provides a means of aggregating specified bar types and sending to a registered handler.
288pub struct BarAggregatorCore {
289    bar_type: BarType,
290    builder: BarBuilder,
291    handler: BarHandler,
292    is_running: bool,
293}
294
295impl Debug for BarAggregatorCore {
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        f.debug_struct(stringify!(BarAggregatorCore))
298            .field("bar_type", &self.bar_type)
299            .field("builder", &self.builder)
300            .field("is_running", &self.is_running)
301            .finish()
302    }
303}
304
305impl BarAggregatorCore {
306    /// Creates a new [`BarAggregatorCore`] instance.
307    ///
308    /// # Panics
309    ///
310    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
311    pub fn new<H: FnMut(Bar) + 'static>(
312        bar_type: BarType,
313        price_precision: u8,
314        size_precision: u8,
315        handler: H,
316    ) -> Self {
317        Self {
318            bar_type,
319            builder: BarBuilder::new(bar_type, price_precision, size_precision),
320            handler: Box::new(handler),
321            is_running: false,
322        }
323    }
324
325    /// Sets the running state of the aggregator (receives updates when `true`).
326    pub const fn set_is_running(&mut self, value: bool) {
327        self.is_running = value;
328    }
329    fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
330        self.builder.update(price, size, ts_init);
331    }
332
333    fn build_now_and_send(&mut self) {
334        let bar = self.builder.build_now();
335        (self.handler)(bar);
336    }
337
338    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
339        let bar = self.builder.build(ts_event, ts_init);
340        (self.handler)(bar);
341    }
342}
343
344/// Provides a means of building tick bars aggregated from quote and trades.
345///
346/// When received tick count reaches the step threshold of the bar
347/// specification, then a bar is created and sent to the handler.
348pub struct TickBarAggregator {
349    core: BarAggregatorCore,
350}
351
352impl Debug for TickBarAggregator {
353    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
354        f.debug_struct(stringify!(TickBarAggregator))
355            .field("core", &self.core)
356            .finish()
357    }
358}
359
360impl TickBarAggregator {
361    /// Creates a new [`TickBarAggregator`] instance.
362    ///
363    /// # Panics
364    ///
365    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
366    pub fn new<H: FnMut(Bar) + 'static>(
367        bar_type: BarType,
368        price_precision: u8,
369        size_precision: u8,
370        handler: H,
371    ) -> Self {
372        Self {
373            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
374        }
375    }
376}
377
378impl BarAggregator for TickBarAggregator {
379    fn bar_type(&self) -> BarType {
380        self.core.bar_type
381    }
382
383    fn is_running(&self) -> bool {
384        self.core.is_running
385    }
386
387    fn set_is_running(&mut self, value: bool) {
388        self.core.set_is_running(value);
389    }
390
391    /// Apply the given update to the aggregator.
392    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
393        self.core.apply_update(price, size, ts_init);
394        let spec = self.core.bar_type.spec();
395
396        if self.core.builder.count >= spec.step.get() {
397            self.core.build_now_and_send();
398        }
399    }
400
401    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
402        self.core.builder.update_bar(bar, volume, ts_init);
403        let spec = self.core.bar_type.spec();
404
405        if self.core.builder.count >= spec.step.get() {
406            self.core.build_now_and_send();
407        }
408    }
409}
410
411/// Aggregates bars based on tick buy/sell imbalance.
412///
413/// Increments imbalance by +1 for buyer-aggressed trades and -1 for seller-aggressed trades.
414/// Emits a bar when the absolute imbalance reaches the step threshold.
415pub struct TickImbalanceBarAggregator {
416    core: BarAggregatorCore,
417    imbalance: isize,
418}
419
420impl Debug for TickImbalanceBarAggregator {
421    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
422        f.debug_struct(stringify!(TickImbalanceBarAggregator))
423            .field("core", &self.core)
424            .field("imbalance", &self.imbalance)
425            .finish()
426    }
427}
428
429impl TickImbalanceBarAggregator {
430    /// Creates a new [`TickImbalanceBarAggregator`] instance.
431    ///
432    /// # Panics
433    ///
434    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
435    pub fn new<H: FnMut(Bar) + 'static>(
436        bar_type: BarType,
437        price_precision: u8,
438        size_precision: u8,
439        handler: H,
440    ) -> Self {
441        Self {
442            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
443            imbalance: 0,
444        }
445    }
446}
447
448impl BarAggregator for TickImbalanceBarAggregator {
449    fn bar_type(&self) -> BarType {
450        self.core.bar_type
451    }
452
453    fn is_running(&self) -> bool {
454        self.core.is_running
455    }
456
457    fn set_is_running(&mut self, value: bool) {
458        self.core.set_is_running(value);
459    }
460
461    /// Apply the given update to the aggregator.
462    ///
463    /// Note: side-aware logic lives in `handle_trade`. This method is used for
464    /// quote/bar updates where no aggressor side is available.
465    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
466        self.core.apply_update(price, size, ts_init);
467    }
468
469    fn handle_trade(&mut self, trade: TradeTick) {
470        self.core
471            .apply_update(trade.price, trade.size, trade.ts_init);
472
473        let delta = match trade.aggressor_side {
474            AggressorSide::Buyer => 1,
475            AggressorSide::Seller => -1,
476            AggressorSide::NoAggressor => 0,
477        };
478
479        if delta == 0 {
480            return;
481        }
482
483        self.imbalance += delta;
484        let threshold = self.core.bar_type.spec().step.get();
485        if self.imbalance.unsigned_abs() >= threshold {
486            self.core.build_now_and_send();
487            self.imbalance = 0;
488        }
489    }
490
491    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
492        self.core.builder.update_bar(bar, volume, ts_init);
493    }
494}
495
496/// Aggregates bars based on consecutive buy/sell tick runs.
497pub struct TickRunsBarAggregator {
498    core: BarAggregatorCore,
499    current_run_side: Option<AggressorSide>,
500    run_count: usize,
501}
502
503impl Debug for TickRunsBarAggregator {
504    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505        f.debug_struct(stringify!(TickRunsBarAggregator))
506            .field("core", &self.core)
507            .field("current_run_side", &self.current_run_side)
508            .field("run_count", &self.run_count)
509            .finish()
510    }
511}
512
513impl TickRunsBarAggregator {
514    /// Creates a new [`TickRunsBarAggregator`] instance.
515    ///
516    /// # Panics
517    ///
518    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
519    pub fn new<H: FnMut(Bar) + 'static>(
520        bar_type: BarType,
521        price_precision: u8,
522        size_precision: u8,
523        handler: H,
524    ) -> Self {
525        Self {
526            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
527            current_run_side: None,
528            run_count: 0,
529        }
530    }
531}
532
533impl BarAggregator for TickRunsBarAggregator {
534    fn bar_type(&self) -> BarType {
535        self.core.bar_type
536    }
537
538    fn is_running(&self) -> bool {
539        self.core.is_running
540    }
541
542    fn set_is_running(&mut self, value: bool) {
543        self.core.set_is_running(value);
544    }
545
546    /// Apply the given update to the aggregator.
547    ///
548    /// Note: side-aware logic lives in `handle_trade`. This method is used for
549    /// quote/bar updates where no aggressor side is available.
550    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
551        self.core.apply_update(price, size, ts_init);
552    }
553
554    fn handle_trade(&mut self, trade: TradeTick) {
555        let side = match trade.aggressor_side {
556            AggressorSide::Buyer => Some(AggressorSide::Buyer),
557            AggressorSide::Seller => Some(AggressorSide::Seller),
558            AggressorSide::NoAggressor => None,
559        };
560
561        if let Some(side) = side {
562            if self.current_run_side != Some(side) {
563                self.current_run_side = Some(side);
564                self.run_count = 0;
565                self.core.builder.reset();
566            }
567
568            self.core
569                .apply_update(trade.price, trade.size, trade.ts_init);
570            self.run_count += 1;
571
572            let threshold = self.core.bar_type.spec().step.get();
573            if self.run_count >= threshold {
574                self.core.build_now_and_send();
575                self.run_count = 0;
576                self.current_run_side = None;
577            }
578        } else {
579            self.core
580                .apply_update(trade.price, trade.size, trade.ts_init);
581        }
582    }
583
584    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
585        self.core.builder.update_bar(bar, volume, ts_init);
586    }
587}
588
589/// Provides a means of building volume bars aggregated from quote and trades.
590pub struct VolumeBarAggregator {
591    core: BarAggregatorCore,
592}
593
594impl Debug for VolumeBarAggregator {
595    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
596        f.debug_struct(stringify!(VolumeBarAggregator))
597            .field("core", &self.core)
598            .finish()
599    }
600}
601
602impl VolumeBarAggregator {
603    /// Creates a new [`VolumeBarAggregator`] instance.
604    ///
605    /// # Panics
606    ///
607    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
608    pub fn new<H: FnMut(Bar) + 'static>(
609        bar_type: BarType,
610        price_precision: u8,
611        size_precision: u8,
612        handler: H,
613    ) -> Self {
614        Self {
615            core: BarAggregatorCore::new(
616                bar_type.standard(),
617                price_precision,
618                size_precision,
619                handler,
620            ),
621        }
622    }
623}
624
625impl BarAggregator for VolumeBarAggregator {
626    fn bar_type(&self) -> BarType {
627        self.core.bar_type
628    }
629
630    fn is_running(&self) -> bool {
631        self.core.is_running
632    }
633
634    fn set_is_running(&mut self, value: bool) {
635        self.core.set_is_running(value);
636    }
637
638    /// Apply the given update to the aggregator.
639    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
640        let mut raw_size_update = size.raw;
641        let spec = self.core.bar_type.spec();
642        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
643
644        while raw_size_update > 0 {
645            if self.core.builder.volume.raw + raw_size_update < raw_step {
646                self.core.apply_update(
647                    price,
648                    Quantity::from_raw(raw_size_update, size.precision),
649                    ts_init,
650                );
651                break;
652            }
653
654            let raw_size_diff = raw_step - self.core.builder.volume.raw;
655            self.core.apply_update(
656                price,
657                Quantity::from_raw(raw_size_diff, size.precision),
658                ts_init,
659            );
660
661            self.core.build_now_and_send();
662            raw_size_update -= raw_size_diff;
663        }
664    }
665
666    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
667        let mut raw_volume_update = volume.raw;
668        let spec = self.core.bar_type.spec();
669        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
670
671        while raw_volume_update > 0 {
672            if self.core.builder.volume.raw + raw_volume_update < raw_step {
673                self.core.builder.update_bar(
674                    bar,
675                    Quantity::from_raw(raw_volume_update, volume.precision),
676                    ts_init,
677                );
678                break;
679            }
680
681            let raw_volume_diff = raw_step - self.core.builder.volume.raw;
682            self.core.builder.update_bar(
683                bar,
684                Quantity::from_raw(raw_volume_diff, volume.precision),
685                ts_init,
686            );
687
688            self.core.build_now_and_send();
689            raw_volume_update -= raw_volume_diff;
690        }
691    }
692}
693
694/// Aggregates bars based on buy/sell volume imbalance.
695pub struct VolumeImbalanceBarAggregator {
696    core: BarAggregatorCore,
697    imbalance_raw: i128,
698    raw_step: i128,
699}
700
701impl Debug for VolumeImbalanceBarAggregator {
702    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
703        f.debug_struct(stringify!(VolumeImbalanceBarAggregator))
704            .field("core", &self.core)
705            .field("imbalance_raw", &self.imbalance_raw)
706            .field("raw_step", &self.raw_step)
707            .finish()
708    }
709}
710
711impl VolumeImbalanceBarAggregator {
712    /// Creates a new [`VolumeImbalanceBarAggregator`] instance.
713    ///
714    /// # Panics
715    ///
716    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
717    pub fn new<H: FnMut(Bar) + 'static>(
718        bar_type: BarType,
719        price_precision: u8,
720        size_precision: u8,
721        handler: H,
722    ) -> Self {
723        let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as i128;
724        Self {
725            core: BarAggregatorCore::new(
726                bar_type.standard(),
727                price_precision,
728                size_precision,
729                handler,
730            ),
731            imbalance_raw: 0,
732            raw_step,
733        }
734    }
735}
736
737impl BarAggregator for VolumeImbalanceBarAggregator {
738    fn bar_type(&self) -> BarType {
739        self.core.bar_type
740    }
741
742    fn is_running(&self) -> bool {
743        self.core.is_running
744    }
745
746    fn set_is_running(&mut self, value: bool) {
747        self.core.set_is_running(value);
748    }
749
750    /// Apply the given update to the aggregator.
751    ///
752    /// Note: side-aware logic lives in `handle_trade`. This method is used for
753    /// quote/bar updates where no aggressor side is available.
754    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
755        self.core.apply_update(price, size, ts_init);
756    }
757
758    fn handle_trade(&mut self, trade: TradeTick) {
759        let side = match trade.aggressor_side {
760            AggressorSide::Buyer => 1,
761            AggressorSide::Seller => -1,
762            AggressorSide::NoAggressor => {
763                self.core
764                    .apply_update(trade.price, trade.size, trade.ts_init);
765                return;
766            }
767        };
768
769        let mut raw_remaining = trade.size.raw as i128;
770        while raw_remaining > 0 {
771            let imbalance_abs = self.imbalance_raw.abs();
772            let needed = (self.raw_step - imbalance_abs).max(1);
773            let raw_chunk = raw_remaining.min(needed);
774            let qty_chunk = Quantity::from_raw(raw_chunk as QuantityRaw, trade.size.precision);
775
776            self.core
777                .apply_update(trade.price, qty_chunk, trade.ts_init);
778
779            self.imbalance_raw += side * raw_chunk;
780            raw_remaining -= raw_chunk;
781
782            if self.imbalance_raw.abs() >= self.raw_step {
783                self.core.build_now_and_send();
784                self.imbalance_raw = 0;
785            }
786        }
787    }
788
789    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
790        self.core.builder.update_bar(bar, volume, ts_init);
791    }
792}
793
794/// Aggregates bars based on consecutive buy/sell volume runs.
795pub struct VolumeRunsBarAggregator {
796    core: BarAggregatorCore,
797    current_run_side: Option<AggressorSide>,
798    run_volume_raw: QuantityRaw,
799    raw_step: QuantityRaw,
800}
801
802impl Debug for VolumeRunsBarAggregator {
803    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
804        f.debug_struct(stringify!(VolumeRunsBarAggregator))
805            .field("core", &self.core)
806            .field("current_run_side", &self.current_run_side)
807            .field("run_volume_raw", &self.run_volume_raw)
808            .field("raw_step", &self.raw_step)
809            .finish()
810    }
811}
812
813impl VolumeRunsBarAggregator {
814    /// Creates a new [`VolumeRunsBarAggregator`] instance.
815    ///
816    /// # Panics
817    ///
818    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
819    pub fn new<H: FnMut(Bar) + 'static>(
820        bar_type: BarType,
821        price_precision: u8,
822        size_precision: u8,
823        handler: H,
824    ) -> Self {
825        let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
826        Self {
827            core: BarAggregatorCore::new(
828                bar_type.standard(),
829                price_precision,
830                size_precision,
831                handler,
832            ),
833            current_run_side: None,
834            run_volume_raw: 0,
835            raw_step,
836        }
837    }
838}
839
840impl BarAggregator for VolumeRunsBarAggregator {
841    fn bar_type(&self) -> BarType {
842        self.core.bar_type
843    }
844
845    fn is_running(&self) -> bool {
846        self.core.is_running
847    }
848
849    fn set_is_running(&mut self, value: bool) {
850        self.core.set_is_running(value);
851    }
852
853    /// Apply the given update to the aggregator.
854    ///
855    /// Note: side-aware logic lives in `handle_trade`. This method is used for
856    /// quote/bar updates where no aggressor side is available.
857    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
858        self.core.apply_update(price, size, ts_init);
859    }
860
861    fn handle_trade(&mut self, trade: TradeTick) {
862        let side = match trade.aggressor_side {
863            AggressorSide::Buyer => Some(AggressorSide::Buyer),
864            AggressorSide::Seller => Some(AggressorSide::Seller),
865            AggressorSide::NoAggressor => None,
866        };
867
868        let Some(side) = side else {
869            self.core
870                .apply_update(trade.price, trade.size, trade.ts_init);
871            return;
872        };
873
874        if self.current_run_side != Some(side) {
875            self.current_run_side = Some(side);
876            self.run_volume_raw = 0;
877            self.core.builder.reset();
878        }
879
880        let mut raw_remaining = trade.size.raw;
881        while raw_remaining > 0 {
882            let needed = self.raw_step.saturating_sub(self.run_volume_raw).max(1);
883            let raw_chunk = raw_remaining.min(needed);
884
885            self.core.apply_update(
886                trade.price,
887                Quantity::from_raw(raw_chunk, trade.size.precision),
888                trade.ts_init,
889            );
890
891            self.run_volume_raw += raw_chunk;
892            raw_remaining -= raw_chunk;
893
894            if self.run_volume_raw >= self.raw_step {
895                self.core.build_now_and_send();
896                self.run_volume_raw = 0;
897                self.current_run_side = None;
898            }
899        }
900    }
901
902    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
903        self.core.builder.update_bar(bar, volume, ts_init);
904    }
905}
906
907/// Provides a means of building value bars aggregated from quote and trades.
908///
909/// When received value reaches the step threshold of the bar
910/// specification, then a bar is created and sent to the handler.
911pub struct ValueBarAggregator {
912    core: BarAggregatorCore,
913    cum_value: f64,
914}
915
916impl Debug for ValueBarAggregator {
917    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
918        f.debug_struct(stringify!(ValueBarAggregator))
919            .field("core", &self.core)
920            .field("cum_value", &self.cum_value)
921            .finish()
922    }
923}
924
925impl ValueBarAggregator {
926    /// Creates a new [`ValueBarAggregator`] instance.
927    ///
928    /// # Panics
929    ///
930    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
931    pub fn new<H: FnMut(Bar) + 'static>(
932        bar_type: BarType,
933        price_precision: u8,
934        size_precision: u8,
935        handler: H,
936    ) -> Self {
937        Self {
938            core: BarAggregatorCore::new(
939                bar_type.standard(),
940                price_precision,
941                size_precision,
942                handler,
943            ),
944            cum_value: 0.0,
945        }
946    }
947
948    #[must_use]
949    /// Returns the cumulative value for the aggregator.
950    pub const fn get_cumulative_value(&self) -> f64 {
951        self.cum_value
952    }
953}
954
955impl BarAggregator for ValueBarAggregator {
956    fn bar_type(&self) -> BarType {
957        self.core.bar_type
958    }
959
960    fn is_running(&self) -> bool {
961        self.core.is_running
962    }
963
964    fn set_is_running(&mut self, value: bool) {
965        self.core.set_is_running(value);
966    }
967
968    /// Apply the given update to the aggregator.
969    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
970        let mut size_update = size.as_f64();
971        let spec = self.core.bar_type.spec();
972
973        while size_update > 0.0 {
974            let value_update = price.as_f64() * size_update;
975            if value_update == 0.0 {
976                // Prevent division by zero - apply remaining size without triggering bar
977                self.core
978                    .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
979                break;
980            }
981
982            if self.cum_value + value_update < spec.step.get() as f64 {
983                self.cum_value += value_update;
984                self.core
985                    .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
986                break;
987            }
988
989            let value_diff = spec.step.get() as f64 - self.cum_value;
990            let mut size_diff = size_update * (value_diff / value_update);
991
992            // Clamp to minimum representable size to avoid zero-volume bars
993            if is_below_min_size(size_diff, size.precision) {
994                if is_below_min_size(size_update, size.precision) {
995                    break;
996                }
997                size_diff = min_size_f64(size.precision);
998            }
999
1000            self.core
1001                .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
1002
1003            self.core.build_now_and_send();
1004            self.cum_value = 0.0;
1005            size_update -= size_diff;
1006        }
1007    }
1008
1009    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1010        let mut volume_update = volume;
1011        let average_price = Price::new(
1012            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
1013            self.core.builder.price_precision,
1014        );
1015
1016        while volume_update.as_f64() > 0.0 {
1017            let value_update = average_price.as_f64() * volume_update.as_f64();
1018            if value_update == 0.0 {
1019                // Prevent division by zero - apply remaining volume without triggering bar
1020                self.core.builder.update_bar(bar, volume_update, ts_init);
1021                break;
1022            }
1023
1024            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
1025                self.cum_value += value_update;
1026                self.core.builder.update_bar(bar, volume_update, ts_init);
1027                break;
1028            }
1029
1030            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
1031            let mut volume_diff = volume_update.as_f64() * (value_diff / value_update);
1032
1033            // Clamp to minimum representable size to avoid zero-volume bars
1034            if is_below_min_size(volume_diff, volume_update.precision) {
1035                if is_below_min_size(volume_update.as_f64(), volume_update.precision) {
1036                    break;
1037                }
1038                volume_diff = min_size_f64(volume_update.precision);
1039            }
1040
1041            self.core.builder.update_bar(
1042                bar,
1043                Quantity::new(volume_diff, volume_update.precision),
1044                ts_init,
1045            );
1046
1047            self.core.build_now_and_send();
1048            self.cum_value = 0.0;
1049            volume_update = Quantity::new(
1050                volume_update.as_f64() - volume_diff,
1051                volume_update.precision,
1052            );
1053        }
1054    }
1055}
1056
1057/// Aggregates bars based on buy/sell notional imbalance.
1058pub struct ValueImbalanceBarAggregator {
1059    core: BarAggregatorCore,
1060    imbalance_value: f64,
1061    step_value: f64,
1062}
1063
1064impl Debug for ValueImbalanceBarAggregator {
1065    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1066        f.debug_struct(stringify!(ValueImbalanceBarAggregator))
1067            .field("core", &self.core)
1068            .field("imbalance_value", &self.imbalance_value)
1069            .field("step_value", &self.step_value)
1070            .finish()
1071    }
1072}
1073
1074impl ValueImbalanceBarAggregator {
1075    /// Creates a new [`ValueImbalanceBarAggregator`] instance.
1076    ///
1077    /// # Panics
1078    ///
1079    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1080    pub fn new<H: FnMut(Bar) + 'static>(
1081        bar_type: BarType,
1082        price_precision: u8,
1083        size_precision: u8,
1084        handler: H,
1085    ) -> Self {
1086        Self {
1087            core: BarAggregatorCore::new(
1088                bar_type.standard(),
1089                price_precision,
1090                size_precision,
1091                handler,
1092            ),
1093            imbalance_value: 0.0,
1094            step_value: bar_type.spec().step.get() as f64,
1095        }
1096    }
1097}
1098
1099impl BarAggregator for ValueImbalanceBarAggregator {
1100    fn bar_type(&self) -> BarType {
1101        self.core.bar_type
1102    }
1103
1104    fn is_running(&self) -> bool {
1105        self.core.is_running
1106    }
1107
1108    fn set_is_running(&mut self, value: bool) {
1109        self.core.set_is_running(value);
1110    }
1111
1112    /// Apply the given update to the aggregator.
1113    ///
1114    /// Note: side-aware logic lives in `handle_trade`. This method is used for
1115    /// quote/bar updates where no aggressor side is available.
1116    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1117        self.core.apply_update(price, size, ts_init);
1118    }
1119
1120    fn handle_trade(&mut self, trade: TradeTick) {
1121        let price_f64 = trade.price.as_f64();
1122        if price_f64 == 0.0 {
1123            self.core
1124                .apply_update(trade.price, trade.size, trade.ts_init);
1125            return;
1126        }
1127
1128        let side_sign = match trade.aggressor_side {
1129            AggressorSide::Buyer => 1.0,
1130            AggressorSide::Seller => -1.0,
1131            AggressorSide::NoAggressor => {
1132                self.core
1133                    .apply_update(trade.price, trade.size, trade.ts_init);
1134                return;
1135            }
1136        };
1137
1138        let mut size_remaining = trade.size.as_f64();
1139        while size_remaining > 0.0 {
1140            let value_remaining = price_f64 * size_remaining;
1141
1142            if self.imbalance_value == 0.0 || self.imbalance_value.signum() == side_sign {
1143                let needed = self.step_value - self.imbalance_value.abs();
1144                if value_remaining <= needed {
1145                    self.imbalance_value += side_sign * value_remaining;
1146                    self.core.apply_update(
1147                        trade.price,
1148                        Quantity::new(size_remaining, trade.size.precision),
1149                        trade.ts_init,
1150                    );
1151
1152                    if self.imbalance_value.abs() >= self.step_value {
1153                        self.core.build_now_and_send();
1154                        self.imbalance_value = 0.0;
1155                    }
1156                    break;
1157                }
1158
1159                let mut value_chunk = needed;
1160                let mut size_chunk = value_chunk / price_f64;
1161
1162                // Clamp to minimum representable size to avoid zero-volume bars
1163                if is_below_min_size(size_chunk, trade.size.precision) {
1164                    if is_below_min_size(size_remaining, trade.size.precision) {
1165                        break;
1166                    }
1167                    size_chunk = min_size_f64(trade.size.precision);
1168                    value_chunk = price_f64 * size_chunk;
1169                }
1170
1171                self.core.apply_update(
1172                    trade.price,
1173                    Quantity::new(size_chunk, trade.size.precision),
1174                    trade.ts_init,
1175                );
1176                self.imbalance_value += side_sign * value_chunk;
1177                size_remaining -= size_chunk;
1178
1179                if self.imbalance_value.abs() >= self.step_value {
1180                    self.core.build_now_and_send();
1181                    self.imbalance_value = 0.0;
1182                }
1183            } else {
1184                // Opposing side: first neutralize existing imbalance
1185                let mut value_to_flatten = self.imbalance_value.abs().min(value_remaining);
1186                let mut size_chunk = value_to_flatten / price_f64;
1187
1188                // Clamp to minimum representable size to avoid zero-volume bars
1189                if is_below_min_size(size_chunk, trade.size.precision) {
1190                    if is_below_min_size(size_remaining, trade.size.precision) {
1191                        break;
1192                    }
1193                    size_chunk = min_size_f64(trade.size.precision);
1194                    value_to_flatten = price_f64 * size_chunk;
1195                }
1196
1197                self.core.apply_update(
1198                    trade.price,
1199                    Quantity::new(size_chunk, trade.size.precision),
1200                    trade.ts_init,
1201                );
1202                self.imbalance_value += side_sign * value_to_flatten;
1203
1204                // Min-size clamp can overshoot past threshold
1205                if self.imbalance_value.abs() >= self.step_value {
1206                    self.core.build_now_and_send();
1207                    self.imbalance_value = 0.0;
1208                }
1209                size_remaining -= size_chunk;
1210            }
1211        }
1212    }
1213
1214    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1215        self.core.builder.update_bar(bar, volume, ts_init);
1216    }
1217}
1218
1219/// Aggregates bars based on consecutive buy/sell notional runs.
1220pub struct ValueRunsBarAggregator {
1221    core: BarAggregatorCore,
1222    current_run_side: Option<AggressorSide>,
1223    run_value: f64,
1224    step_value: f64,
1225}
1226
1227impl Debug for ValueRunsBarAggregator {
1228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1229        f.debug_struct(stringify!(ValueRunsBarAggregator))
1230            .field("core", &self.core)
1231            .field("current_run_side", &self.current_run_side)
1232            .field("run_value", &self.run_value)
1233            .field("step_value", &self.step_value)
1234            .finish()
1235    }
1236}
1237
1238impl ValueRunsBarAggregator {
1239    /// Creates a new [`ValueRunsBarAggregator`] instance.
1240    ///
1241    /// # Panics
1242    ///
1243    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1244    pub fn new<H: FnMut(Bar) + 'static>(
1245        bar_type: BarType,
1246        price_precision: u8,
1247        size_precision: u8,
1248        handler: H,
1249    ) -> Self {
1250        Self {
1251            core: BarAggregatorCore::new(
1252                bar_type.standard(),
1253                price_precision,
1254                size_precision,
1255                handler,
1256            ),
1257            current_run_side: None,
1258            run_value: 0.0,
1259            step_value: bar_type.spec().step.get() as f64,
1260        }
1261    }
1262}
1263
1264impl BarAggregator for ValueRunsBarAggregator {
1265    fn bar_type(&self) -> BarType {
1266        self.core.bar_type
1267    }
1268
1269    fn is_running(&self) -> bool {
1270        self.core.is_running
1271    }
1272
1273    fn set_is_running(&mut self, value: bool) {
1274        self.core.set_is_running(value);
1275    }
1276
1277    /// Apply the given update to the aggregator.
1278    ///
1279    /// Note: side-aware logic lives in `handle_trade`. This method is used for
1280    /// quote/bar updates where no aggressor side is available.
1281    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1282        self.core.apply_update(price, size, ts_init);
1283    }
1284
1285    fn handle_trade(&mut self, trade: TradeTick) {
1286        let price_f64 = trade.price.as_f64();
1287        if price_f64 == 0.0 {
1288            self.core
1289                .apply_update(trade.price, trade.size, trade.ts_init);
1290            return;
1291        }
1292
1293        let side = match trade.aggressor_side {
1294            AggressorSide::Buyer => Some(AggressorSide::Buyer),
1295            AggressorSide::Seller => Some(AggressorSide::Seller),
1296            AggressorSide::NoAggressor => None,
1297        };
1298
1299        let Some(side) = side else {
1300            self.core
1301                .apply_update(trade.price, trade.size, trade.ts_init);
1302            return;
1303        };
1304
1305        if self.current_run_side != Some(side) {
1306            self.current_run_side = Some(side);
1307            self.run_value = 0.0;
1308            self.core.builder.reset();
1309        }
1310
1311        let mut size_remaining = trade.size.as_f64();
1312        while size_remaining > 0.0 {
1313            let value_update = price_f64 * size_remaining;
1314            if self.run_value + value_update < self.step_value {
1315                self.run_value += value_update;
1316                self.core.apply_update(
1317                    trade.price,
1318                    Quantity::new(size_remaining, trade.size.precision),
1319                    trade.ts_init,
1320                );
1321                break;
1322            }
1323
1324            let value_needed = self.step_value - self.run_value;
1325            let mut size_chunk = value_needed / price_f64;
1326
1327            // Clamp to minimum representable size to avoid zero-volume bars
1328            if is_below_min_size(size_chunk, trade.size.precision) {
1329                if is_below_min_size(size_remaining, trade.size.precision) {
1330                    break;
1331                }
1332                size_chunk = min_size_f64(trade.size.precision);
1333            }
1334
1335            self.core.apply_update(
1336                trade.price,
1337                Quantity::new(size_chunk, trade.size.precision),
1338                trade.ts_init,
1339            );
1340
1341            self.core.build_now_and_send();
1342            self.run_value = 0.0;
1343            self.current_run_side = None;
1344            size_remaining -= size_chunk;
1345        }
1346    }
1347
1348    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1349        self.core.builder.update_bar(bar, volume, ts_init);
1350    }
1351}
1352
1353/// Provides a means of building Renko bars aggregated from quote and trades.
1354///
1355/// Renko bars are created when the price moves by a fixed amount (brick size)
1356/// regardless of time or volume. Each bar represents a price movement equal
1357/// to the step size in the bar specification.
1358pub struct RenkoBarAggregator {
1359    core: BarAggregatorCore,
1360    pub brick_size: PriceRaw,
1361    last_close: Option<Price>,
1362}
1363
1364impl Debug for RenkoBarAggregator {
1365    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1366        f.debug_struct(stringify!(RenkoBarAggregator))
1367            .field("core", &self.core)
1368            .field("brick_size", &self.brick_size)
1369            .field("last_close", &self.last_close)
1370            .finish()
1371    }
1372}
1373
1374impl RenkoBarAggregator {
1375    /// Creates a new [`RenkoBarAggregator`] instance.
1376    ///
1377    /// # Panics
1378    ///
1379    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1380    pub fn new<H: FnMut(Bar) + 'static>(
1381        bar_type: BarType,
1382        price_precision: u8,
1383        size_precision: u8,
1384        price_increment: Price,
1385        handler: H,
1386    ) -> Self {
1387        // Calculate brick size in raw price units (step * price_increment.raw)
1388        let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
1389
1390        Self {
1391            core: BarAggregatorCore::new(
1392                bar_type.standard(),
1393                price_precision,
1394                size_precision,
1395                handler,
1396            ),
1397            brick_size,
1398            last_close: None,
1399        }
1400    }
1401}
1402
1403impl BarAggregator for RenkoBarAggregator {
1404    fn bar_type(&self) -> BarType {
1405        self.core.bar_type
1406    }
1407
1408    fn is_running(&self) -> bool {
1409        self.core.is_running
1410    }
1411
1412    fn set_is_running(&mut self, value: bool) {
1413        self.core.set_is_running(value);
1414    }
1415
1416    /// Apply the given update to the aggregator.
1417    ///
1418    /// For Renko bars, we check if the price movement from the last close
1419    /// is greater than or equal to the brick size. If so, we create new bars.
1420    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1421        // Always update the builder with the current tick
1422        self.core.apply_update(price, size, ts_init);
1423
1424        // Initialize last_close if this is the first update
1425        if self.last_close.is_none() {
1426            self.last_close = Some(price);
1427            return;
1428        }
1429
1430        let last_close = self.last_close.unwrap();
1431
1432        // Convert prices to raw units (integers) to avoid floating point precision issues
1433        let current_raw = price.raw;
1434        let last_close_raw = last_close.raw;
1435        let price_diff_raw = current_raw - last_close_raw;
1436        let abs_price_diff_raw = price_diff_raw.abs();
1437
1438        // Check if we need to create one or more Renko bars
1439        if abs_price_diff_raw >= self.brick_size {
1440            let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1441            let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1442            let mut current_close = last_close;
1443
1444            // Store the current builder volume to distribute across bricks
1445            let total_volume = self.core.builder.volume;
1446
1447            for _i in 0..num_bricks {
1448                // Calculate the close price for this brick using raw price units
1449                let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1450                let brick_close = Price::from_raw(brick_close_raw, price.precision);
1451
1452                // For Renko bars: open = previous close, high/low depend on direction
1453                let (brick_high, brick_low) = if direction > 0.0 {
1454                    (brick_close, current_close)
1455                } else {
1456                    (current_close, brick_close)
1457                };
1458
1459                // Reset builder for this brick
1460                self.core.builder.reset();
1461                self.core.builder.open = Some(current_close);
1462                self.core.builder.high = Some(brick_high);
1463                self.core.builder.low = Some(brick_low);
1464                self.core.builder.close = Some(brick_close);
1465                self.core.builder.volume = total_volume; // Each brick gets the full volume
1466                self.core.builder.count = 1;
1467                self.core.builder.ts_last = ts_init;
1468                self.core.builder.initialized = true;
1469
1470                // Build and send the bar
1471                self.core.build_and_send(ts_init, ts_init);
1472
1473                // Update for the next brick
1474                current_close = brick_close;
1475                self.last_close = Some(brick_close);
1476            }
1477        }
1478    }
1479
1480    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1481        // Always update the builder with the current bar
1482        self.core.builder.update_bar(bar, volume, ts_init);
1483
1484        // Initialize last_close if this is the first update
1485        if self.last_close.is_none() {
1486            self.last_close = Some(bar.close);
1487            return;
1488        }
1489
1490        let last_close = self.last_close.unwrap();
1491
1492        // Convert prices to raw units (integers) to avoid floating point precision issues
1493        let current_raw = bar.close.raw;
1494        let last_close_raw = last_close.raw;
1495        let price_diff_raw = current_raw - last_close_raw;
1496        let abs_price_diff_raw = price_diff_raw.abs();
1497
1498        // Check if we need to create one or more Renko bars
1499        if abs_price_diff_raw >= self.brick_size {
1500            let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1501            let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1502            let mut current_close = last_close;
1503
1504            // Store the current builder volume to distribute across bricks
1505            let total_volume = self.core.builder.volume;
1506
1507            for _i in 0..num_bricks {
1508                // Calculate the close price for this brick using raw price units
1509                let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1510                let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
1511
1512                // For Renko bars: open = previous close, high/low depend on direction
1513                let (brick_high, brick_low) = if direction > 0.0 {
1514                    (brick_close, current_close)
1515                } else {
1516                    (current_close, brick_close)
1517                };
1518
1519                // Reset builder for this brick
1520                self.core.builder.reset();
1521                self.core.builder.open = Some(current_close);
1522                self.core.builder.high = Some(brick_high);
1523                self.core.builder.low = Some(brick_low);
1524                self.core.builder.close = Some(brick_close);
1525                self.core.builder.volume = total_volume; // Each brick gets the full volume
1526                self.core.builder.count = 1;
1527                self.core.builder.ts_last = ts_init;
1528                self.core.builder.initialized = true;
1529
1530                // Build and send the bar
1531                self.core.build_and_send(ts_init, ts_init);
1532
1533                // Update for the next brick
1534                current_close = brick_close;
1535                self.last_close = Some(brick_close);
1536            }
1537        }
1538    }
1539}
1540
1541/// Provides a means of building time bars aggregated from quote and trades.
1542///
1543/// At each aggregation time interval, a bar is created and sent to the handler.
1544pub struct TimeBarAggregator {
1545    core: BarAggregatorCore,
1546    clock: Rc<RefCell<dyn Clock>>,
1547    build_with_no_updates: bool,
1548    timestamp_on_close: bool,
1549    is_left_open: bool,
1550    stored_open_ns: UnixNanos,
1551    timer_name: String,
1552    interval_ns: UnixNanos,
1553    next_close_ns: UnixNanos,
1554    first_close_ns: UnixNanos,
1555    bar_build_delay: u64,
1556    time_bars_origin_offset: Option<TimeDelta>,
1557    skip_first_non_full_bar: bool,
1558    pub historical_mode: bool,
1559    historical_events: Vec<TimeEvent>,
1560    historical_event_at_ts_init: Option<TimeEvent>,
1561    aggregator_weak: Option<Weak<RefCell<Box<dyn BarAggregator>>>>,
1562}
1563
1564impl Debug for TimeBarAggregator {
1565    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1566        f.debug_struct(stringify!(TimeBarAggregator))
1567            .field("core", &self.core)
1568            .field("build_with_no_updates", &self.build_with_no_updates)
1569            .field("timestamp_on_close", &self.timestamp_on_close)
1570            .field("is_left_open", &self.is_left_open)
1571            .field("timer_name", &self.timer_name)
1572            .field("interval_ns", &self.interval_ns)
1573            .field("bar_build_delay", &self.bar_build_delay)
1574            .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
1575            .finish()
1576    }
1577}
1578
1579impl TimeBarAggregator {
1580    /// Creates a new [`TimeBarAggregator`] instance.
1581    ///
1582    /// # Panics
1583    ///
1584    /// Panics if `bar_type.aggregation_source` is not `AggregationSource::Internal`.
1585    #[expect(clippy::too_many_arguments)]
1586    pub fn new<H: FnMut(Bar) + 'static>(
1587        bar_type: BarType,
1588        price_precision: u8,
1589        size_precision: u8,
1590        clock: Rc<RefCell<dyn Clock>>,
1591        handler: H,
1592        build_with_no_updates: bool,
1593        timestamp_on_close: bool,
1594        interval_type: BarIntervalType,
1595        time_bars_origin_offset: Option<TimeDelta>,
1596        bar_build_delay: u64,
1597        skip_first_non_full_bar: bool,
1598    ) -> Self {
1599        let is_left_open = match interval_type {
1600            BarIntervalType::LeftOpen => true,
1601            BarIntervalType::RightOpen => false,
1602        };
1603
1604        let core = BarAggregatorCore::new(
1605            bar_type.standard(),
1606            price_precision,
1607            size_precision,
1608            handler,
1609        );
1610
1611        Self {
1612            core,
1613            clock,
1614            build_with_no_updates,
1615            timestamp_on_close,
1616            is_left_open,
1617            stored_open_ns: UnixNanos::default(),
1618            timer_name: format!("TIME_BAR_{bar_type}"),
1619            interval_ns: get_bar_interval_ns(&bar_type),
1620            next_close_ns: UnixNanos::default(),
1621            first_close_ns: UnixNanos::default(),
1622            bar_build_delay,
1623            time_bars_origin_offset,
1624            skip_first_non_full_bar,
1625            historical_mode: false,
1626            historical_events: Vec::new(),
1627            historical_event_at_ts_init: None,
1628            aggregator_weak: None,
1629        }
1630    }
1631
1632    /// Sets the clock for the aggregator (internal method).
1633    pub fn set_clock_internal(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1634        self.clock = clock;
1635    }
1636
1637    /// Starts the time bar aggregator, scheduling periodic bar builds on the clock.
1638    ///
1639    /// This matches the Cython `start_timer()` method exactly.
1640    /// Creates a callback to `build_bar` using a weak reference to the aggregator.
1641    ///
1642    /// # Panics
1643    ///
1644    /// Panics if aggregator_rc is None and aggregator_weak hasn't been set, or if timer registration fails.
1645    pub fn start_timer_internal(
1646        &mut self,
1647        aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>,
1648    ) {
1649        // Create callback that calls build_bar through the weak reference
1650        let aggregator_weak = if let Some(rc) = aggregator_rc {
1651            // Store weak reference for future use (e.g., in build_bar for month/year)
1652            let weak = Rc::downgrade(&rc);
1653            self.aggregator_weak = Some(weak.clone());
1654            weak
1655        } else {
1656            // Use existing weak reference (for historical mode where it was set earlier)
1657            self.aggregator_weak
1658                .as_ref()
1659                .expect("Aggregator weak reference must be set before calling start_timer()")
1660                .clone()
1661        };
1662
1663        let callback = TimeEventCallback::RustLocal(Rc::new(move |event: TimeEvent| {
1664            if let Some(agg) = aggregator_weak.upgrade() {
1665                agg.borrow_mut().build_bar(&event);
1666            }
1667        }));
1668
1669        // Computing start_time
1670        let now = self.clock.borrow().utc_now();
1671        let mut start_time =
1672            get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1673        start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1674
1675        // Closing a partial bar at the transition from historical to backtest data
1676        let fire_immediately = start_time == now;
1677
1678        let spec = &self.bar_type().spec();
1679        let start_time_ns = UnixNanos::from(start_time);
1680        let step = spec.step.get() as u32;
1681
1682        if spec.aggregation != BarAggregation::Month && spec.aggregation != BarAggregation::Year {
1683            self.clock
1684                .borrow_mut()
1685                .set_timer_ns(
1686                    &self.timer_name,
1687                    self.interval_ns.as_u64(),
1688                    Some(start_time_ns),
1689                    None,
1690                    Some(callback),
1691                    Some(true), // allow_past
1692                    Some(fire_immediately),
1693                )
1694                .expect(FAILED);
1695
1696            if fire_immediately {
1697                self.next_close_ns = start_time_ns;
1698            } else {
1699                let interval_duration = Duration::nanoseconds(self.interval_ns.as_i64());
1700                self.next_close_ns = UnixNanos::from(start_time + interval_duration);
1701            }
1702
1703            self.stored_open_ns = self.next_close_ns.saturating_sub_ns(self.interval_ns);
1704        } else {
1705            // The monthly/yearly alert time is defined iteratively at each alert time as there is no regular interval
1706            let alert_time = if fire_immediately {
1707                start_time
1708            } else if spec.aggregation == BarAggregation::Month {
1709                add_n_months(start_time, step).expect(FAILED)
1710            } else {
1711                add_n_years(start_time, step).expect(FAILED)
1712            };
1713
1714            self.clock
1715                .borrow_mut()
1716                .set_time_alert_ns(
1717                    &self.timer_name,
1718                    UnixNanos::from(alert_time),
1719                    Some(callback),
1720                    Some(true), // allow_past
1721                )
1722                .expect(FAILED);
1723
1724            self.next_close_ns = UnixNanos::from(alert_time);
1725            // Mirror Cython: stored_open = close_time - step, so when fire_immediately the
1726            // current (partial) bar started `step` periods before start_time.
1727            self.stored_open_ns = if fire_immediately {
1728                if spec.aggregation == BarAggregation::Month {
1729                    subtract_n_months_nanos(start_time_ns, step).expect(FAILED)
1730                } else {
1731                    subtract_n_years_nanos(start_time_ns, step).expect(FAILED)
1732                }
1733            } else {
1734                start_time_ns
1735            };
1736        }
1737
1738        if self.skip_first_non_full_bar {
1739            self.first_close_ns = self.next_close_ns;
1740        }
1741
1742        log::debug!(
1743            "Started timer {}, start_time={:?}, historical_mode={}, fire_immediately={}, now={:?}, bar_build_delay={}",
1744            self.timer_name,
1745            start_time,
1746            self.historical_mode,
1747            fire_immediately,
1748            now,
1749            self.bar_build_delay
1750        );
1751    }
1752
1753    /// Stops the time bar aggregator.
1754    pub fn stop(&mut self) {
1755        self.clock.borrow_mut().cancel_timer(&self.timer_name);
1756    }
1757
1758    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1759        if self.skip_first_non_full_bar && ts_init <= self.first_close_ns {
1760            self.core.builder.reset();
1761        } else {
1762            // Clear for the transition from historical to live data; subsequent
1763            // bars always emit regardless of timestamp.
1764            self.skip_first_non_full_bar = false;
1765            self.core.build_and_send(ts_event, ts_init);
1766        }
1767    }
1768
1769    fn build_bar(&mut self, event: &TimeEvent) {
1770        if !self.core.builder.initialized {
1771            return;
1772        }
1773
1774        if !self.build_with_no_updates && self.core.builder.count == 0 {
1775            return; // Do not build bar when no update
1776        }
1777
1778        let ts_init = event.ts_event;
1779        let ts_event = if self.is_left_open {
1780            if self.timestamp_on_close {
1781                event.ts_event
1782            } else {
1783                self.stored_open_ns
1784            }
1785        } else {
1786            self.stored_open_ns
1787        };
1788
1789        self.build_and_send(ts_event, ts_init);
1790
1791        // Close time becomes the next open time
1792        self.stored_open_ns = event.ts_event;
1793
1794        if self.bar_type().spec().aggregation == BarAggregation::Month {
1795            let step = self.bar_type().spec().step.get() as u32;
1796            let alert_time_ns = add_n_months_nanos(event.ts_event, step).expect(FAILED);
1797
1798            self.clock
1799                .borrow_mut()
1800                .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1801                .expect(FAILED);
1802
1803            self.next_close_ns = alert_time_ns;
1804        } else if self.bar_type().spec().aggregation == BarAggregation::Year {
1805            let step = self.bar_type().spec().step.get() as u32;
1806            let alert_time_ns = add_n_years_nanos(event.ts_event, step).expect(FAILED);
1807
1808            self.clock
1809                .borrow_mut()
1810                .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1811                .expect(FAILED);
1812
1813            self.next_close_ns = alert_time_ns;
1814        } else {
1815            // On receiving this event, timer should now have a new `next_time_ns`
1816            self.next_close_ns = self
1817                .clock
1818                .borrow()
1819                .next_time_ns(&self.timer_name)
1820                .unwrap_or_default();
1821        }
1822    }
1823
1824    fn preprocess_historical_events(&mut self, ts_init: UnixNanos) {
1825        if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
1826            // In historical mode, clock is always a TestClock (set by data engine)
1827            {
1828                let mut clock_borrow = self.clock.borrow_mut();
1829                let test_clock = clock_borrow
1830                    .as_any_mut()
1831                    .downcast_mut::<TestClock>()
1832                    .expect("Expected TestClock in historical mode");
1833                test_clock.set_time(ts_init);
1834            }
1835            // In historical mode, weak reference should already be set
1836            self.start_timer_internal(None);
1837        }
1838
1839        // Advance this aggregator's independent clock and collect timer events.
1840        let events = {
1841            let mut clock_borrow = self.clock.borrow_mut();
1842            let test_clock = clock_borrow
1843                .as_any_mut()
1844                .downcast_mut::<TestClock>()
1845                .expect("Expected TestClock in historical mode");
1846            test_clock.advance_time(ts_init, true)
1847        };
1848
1849        for event in events {
1850            if event.ts_event == ts_init {
1851                self.historical_event_at_ts_init = Some(event);
1852            } else {
1853                self.build_bar(&event);
1854            }
1855        }
1856    }
1857
1858    fn postprocess_historical_events(&mut self, _ts_init: UnixNanos) {
1859        if let Some(ref event) = self.historical_event_at_ts_init.take() {
1860            self.build_bar(event);
1861        }
1862    }
1863
1864    /// Sets historical events (called by data engine after advancing clock)
1865    pub fn set_historical_events_internal(&mut self, events: Vec<TimeEvent>) {
1866        self.historical_events = events;
1867    }
1868}
1869
1870impl BarAggregator for TimeBarAggregator {
1871    fn bar_type(&self) -> BarType {
1872        self.core.bar_type
1873    }
1874
1875    fn is_running(&self) -> bool {
1876        self.core.is_running
1877    }
1878
1879    fn set_is_running(&mut self, value: bool) {
1880        self.core.set_is_running(value);
1881    }
1882
1883    /// Stop time-based aggregator by canceling its timer.
1884    fn stop(&mut self) {
1885        Self::stop(self);
1886    }
1887
1888    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1889        if self.historical_mode {
1890            self.preprocess_historical_events(ts_init);
1891        }
1892
1893        self.core.apply_update(price, size, ts_init);
1894
1895        if self.historical_mode {
1896            self.postprocess_historical_events(ts_init);
1897        }
1898    }
1899
1900    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1901        if self.historical_mode {
1902            self.preprocess_historical_events(ts_init);
1903        }
1904
1905        self.core.builder.update_bar(bar, volume, ts_init);
1906
1907        if self.historical_mode {
1908            self.postprocess_historical_events(ts_init);
1909        }
1910    }
1911
1912    fn set_historical_mode(&mut self, historical_mode: bool, handler: Box<dyn FnMut(Bar)>) {
1913        self.historical_mode = historical_mode;
1914        self.core.handler = handler;
1915    }
1916
1917    fn set_historical_events(&mut self, events: Vec<TimeEvent>) {
1918        self.set_historical_events_internal(events);
1919    }
1920
1921    fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1922        self.set_clock_internal(clock);
1923    }
1924
1925    fn build_bar(&mut self, event: &TimeEvent) {
1926        // Delegate to the implementation method
1927        // We use the struct name here to disambiguate from the trait method
1928        {
1929            #[expect(clippy::use_self)]
1930            TimeBarAggregator::build_bar(self, event);
1931        }
1932    }
1933
1934    fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Box<dyn BarAggregator>>>) {
1935        self.aggregator_weak = Some(weak);
1936    }
1937
1938    fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {
1939        self.start_timer_internal(aggregator_rc);
1940    }
1941}
1942
1943fn is_below_min_size(size: f64, precision: u8) -> bool {
1944    Quantity::new(size, precision).raw == 0
1945}
1946
1947fn min_size_f64(precision: u8) -> f64 {
1948    10_f64.powi(-(precision as i32))
1949}
1950
1951/// Provider for vega per leg (option spreads). Returns `None` when greeks are unavailable.
1952pub trait VegaProvider {
1953    /// Returns vega for the given leg instrument, or `None` if not available.
1954    fn vega_for_leg(&self, instrument_id: InstrumentId) -> Option<f64>;
1955}
1956
1957/// Rounder for spread bid/ask (e.g. tick scheme). When absent, raw prices are used with instrument precision.
1958pub trait SpreadPriceRounder {
1959    /// Rounds raw bid/ask to valid prices (handles negative prices with mirroring when using tick scheme).
1960    fn round_prices(&self, raw_bid: f64, raw_ask: f64, precision: u8) -> (Price, Price);
1961}
1962
1963/// Vega provider that returns leg vegas from a map (e.g. populated from greeks cache).
1964#[derive(Debug, Default)]
1965pub struct MapVegaProvider {
1966    vegas: AHashMap<InstrumentId, f64>,
1967}
1968
1969impl MapVegaProvider {
1970    pub fn new() -> Self {
1971        Self {
1972            vegas: AHashMap::new(),
1973        }
1974    }
1975
1976    pub fn insert(&mut self, instrument_id: InstrumentId, vega: f64) {
1977        self.vegas.insert(instrument_id, vega);
1978    }
1979
1980    pub fn get(&self, instrument_id: &InstrumentId) -> Option<f64> {
1981        self.vegas.get(instrument_id).copied()
1982    }
1983}
1984
1985impl VegaProvider for MapVegaProvider {
1986    fn vega_for_leg(&self, instrument_id: InstrumentId) -> Option<f64> {
1987        self.vegas.get(&instrument_id).copied()
1988    }
1989}
1990
1991/// Rounder that uses a fixed tick size; mirrors negative prices for tick alignment (Cython parity).
1992#[derive(Debug)]
1993pub struct FixedTickSchemeRounder {
1994    scheme: FixedTickScheme,
1995}
1996
1997impl FixedTickSchemeRounder {
1998    /// Creates a rounder with the given tick size.
1999    ///
2000    /// # Errors
2001    ///
2002    /// Returns an error if `tick` is not positive.
2003    pub fn new(tick: f64) -> anyhow::Result<Self> {
2004        Ok(Self {
2005            scheme: FixedTickScheme::new(tick)?,
2006        })
2007    }
2008
2009    fn round_one(&self, raw: f64, precision: u8, use_bid_rounding: bool) -> Price {
2010        if raw >= 0.0 {
2011            let p = if use_bid_rounding {
2012                self.scheme.next_bid_price(raw, 0, precision)
2013            } else {
2014                self.scheme.next_ask_price(raw, 0, precision)
2015            };
2016            p.unwrap_or_else(|| price_from_f64(raw, precision))
2017        } else {
2018            let p = if use_bid_rounding {
2019                self.scheme.next_ask_price(-raw, 0, precision)
2020            } else {
2021                self.scheme.next_bid_price(-raw, 0, precision)
2022            };
2023            p.map_or_else(
2024                || price_from_f64(raw, precision),
2025                |q| price_from_f64(-q.as_f64(), precision),
2026            )
2027        }
2028    }
2029}
2030
2031impl SpreadPriceRounder for FixedTickSchemeRounder {
2032    fn round_prices(&self, raw_bid: f64, raw_ask: f64, precision: u8) -> (Price, Price) {
2033        let bid = self.round_one(raw_bid, precision, true);
2034        let ask = self.round_one(raw_ask, precision, false);
2035        (bid, ask)
2036    }
2037}
2038
2039/// Spread quote aggregator: builds synthetic quotes from leg quotes (Cython parity).
2040///
2041/// Quote-driven mode (`update_interval_seconds == None`): emits when all legs have quotes.
2042/// Timer-driven mode: emits on timer fire when `_has_update` is true.
2043/// Historical mode: defers timer event at `ts_init` until after the update.
2044pub struct SpreadQuoteAggregator {
2045    spread_instrument_id: InstrumentId,
2046    leg_ids: Vec<InstrumentId>,
2047    ratios: Vec<i64>,
2048    n_legs: usize,
2049    is_futures_spread: bool,
2050    price_precision: u8,
2051    size_precision: u8,
2052    last_quotes: AHashMap<InstrumentId, QuoteTick>,
2053    mid_prices: Vec<f64>,
2054    bid_prices: Vec<f64>,
2055    ask_prices: Vec<f64>,
2056    vegas: Vec<f64>,
2057    bid_ask_spreads: Vec<f64>,
2058    bid_sizes: Vec<f64>,
2059    ask_sizes: Vec<f64>,
2060    handler: Box<dyn FnMut(QuoteTick)>,
2061    clock: Rc<RefCell<dyn Clock>>,
2062    historical_mode: bool,
2063    update_interval_seconds: Option<u64>,
2064    quote_build_delay: u64,
2065    has_update: bool,
2066    timer_name: String,
2067    historical_event_at_ts_init: Option<TimeEvent>,
2068    vega_provider: Option<Box<dyn VegaProvider>>,
2069    price_rounder: Option<Box<dyn SpreadPriceRounder>>,
2070    is_running: bool,
2071    aggregator_weak: Option<Weak<RefCell<Self>>>,
2072}
2073
2074impl Debug for SpreadQuoteAggregator {
2075    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2076        f.debug_struct(stringify!(SpreadQuoteAggregator))
2077            .field("spread_instrument_id", &self.spread_instrument_id)
2078            .field("n_legs", &self.n_legs)
2079            .field("is_futures_spread", &self.is_futures_spread)
2080            .field("update_interval_seconds", &self.update_interval_seconds)
2081            .finish()
2082    }
2083}
2084
2085impl SpreadQuoteAggregator {
2086    /// Creates a new [`SpreadQuoteAggregator`].
2087    ///
2088    /// # Panics
2089    ///
2090    /// Panics if `legs` has fewer than 2 entries or any ratio is zero.
2091    #[expect(clippy::too_many_arguments)]
2092    pub fn new(
2093        spread_instrument_id: InstrumentId,
2094        legs: &[(InstrumentId, i64)],
2095        is_futures_spread: bool,
2096        price_precision: u8,
2097        size_precision: u8,
2098        handler: Box<dyn FnMut(QuoteTick)>,
2099        clock: Rc<RefCell<dyn Clock>>,
2100        historical_mode: bool,
2101        update_interval_seconds: Option<u64>,
2102        quote_build_delay: u64,
2103        vega_provider: Option<Box<dyn VegaProvider>>,
2104        price_rounder: Option<Box<dyn SpreadPriceRounder>>,
2105    ) -> Self {
2106        assert!(legs.len() >= 2, "Spread must have more than one leg");
2107        let n_legs = legs.len();
2108        let leg_ids: Vec<InstrumentId> = legs.iter().map(|(id, _)| *id).collect();
2109        let ratios: Vec<i64> = legs.iter().map(|(_, r)| *r).collect();
2110        for &r in &ratios {
2111            assert!(r != 0, "Ratio cannot be zero");
2112        }
2113        let timer_name = format!("SPREAD_QUOTE_{spread_instrument_id}");
2114        Self {
2115            spread_instrument_id,
2116            leg_ids,
2117            ratios,
2118            n_legs,
2119            is_futures_spread,
2120            price_precision,
2121            size_precision,
2122            last_quotes: AHashMap::new(),
2123            mid_prices: vec![0.0; n_legs],
2124            bid_prices: vec![0.0; n_legs],
2125            ask_prices: vec![0.0; n_legs],
2126            vegas: vec![0.0; n_legs],
2127            bid_ask_spreads: vec![0.0; n_legs],
2128            bid_sizes: vec![0.0; n_legs],
2129            ask_sizes: vec![0.0; n_legs],
2130            handler,
2131            clock,
2132            historical_mode,
2133            update_interval_seconds,
2134            quote_build_delay,
2135            has_update: false,
2136            timer_name,
2137            historical_event_at_ts_init: None,
2138            vega_provider,
2139            price_rounder,
2140            is_running: false,
2141            aggregator_weak: None,
2142        }
2143    }
2144
2145    /// Sets the weak reference to this aggregator (used when starting the timer so the callback can call back).
2146    /// Prefer [`Self::prepare_for_timer_mode`] so the owner passes the owning `Rc` in one step.
2147    pub fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Self>>) {
2148        self.aggregator_weak = Some(weak);
2149    }
2150
2151    /// One-step setup for timer-driven mode (live or historical). Call this with the `Rc` that owns
2152    /// this aggregator before feeding any quotes when `update_interval_seconds` is set. The timer
2153    /// callback will use the stored weak reference to call back into this aggregator; without this,
2154    /// [`Self::start_timer`] will panic in historical mode or when called with `None`.
2155    pub fn prepare_for_timer_mode(&mut self, self_rc: &Rc<RefCell<Self>>) {
2156        self.aggregator_weak = Some(Rc::downgrade(self_rc));
2157    }
2158
2159    /// Sets historical mode and handler (and optionally greeks provider when switching).
2160    pub fn set_historical_mode(
2161        &mut self,
2162        historical_mode: bool,
2163        handler: Box<dyn FnMut(QuoteTick)>,
2164        vega_provider: Option<Box<dyn VegaProvider>>,
2165    ) {
2166        self.historical_mode = historical_mode;
2167        self.handler = handler;
2168
2169        if let Some(vp) = vega_provider {
2170            self.vega_provider = Some(vp);
2171        }
2172    }
2173
2174    pub fn set_running(&mut self, is_running: bool) {
2175        self.is_running = is_running;
2176    }
2177
2178    pub fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
2179        self.clock = clock;
2180    }
2181
2182    /// Starts the timer when `update_interval_seconds` is set (timer-driven mode).
2183    /// In live mode pass `Some(rc)` so the weak is set and the timer can call back.
2184    /// In historical mode the owner must have called [`Self::prepare_for_timer_mode`] with the
2185    /// owning `Rc` before any quote is processed, then call with `None` here.
2186    ///
2187    /// # Panics
2188    ///
2189    /// Panics if called with `None` in timer mode without a prior [`Self::prepare_for_timer_mode`] call.
2190    pub fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Self>>>) {
2191        let Some(interval_secs) = self.update_interval_seconds else {
2192            return;
2193        };
2194        let aggregator_weak = if let Some(rc) = aggregator_rc {
2195            let weak = Rc::downgrade(&rc);
2196            self.aggregator_weak = Some(weak.clone());
2197            weak
2198        } else {
2199            self.aggregator_weak.as_ref().cloned().expect(
2200                "SpreadQuoteAggregator: timer mode requires prepare_for_timer_mode(rc) to be \
2201                 called first with the Rc that wraps this aggregator (before feeding quotes in \
2202                 historical mode or before start_timer(None)).",
2203            )
2204        };
2205
2206        let callback = TimeEventCallback::RustLocal(Rc::new(move |event: TimeEvent| {
2207            if let Some(agg) = aggregator_weak.upgrade() {
2208                agg.borrow_mut().on_timer_fire(event.ts_event);
2209            }
2210        }));
2211
2212        let now_ns = self.clock.borrow().timestamp_ns();
2213        let interval_ns = interval_secs * 1_000_000_000;
2214        let start_ns = (now_ns.as_u64() / interval_ns) * interval_ns;
2215        let start_ns = start_ns + self.quote_build_delay * 1_000; // quote_build_delay in microseconds
2216        let start_time = UnixNanos::from(start_ns);
2217        let fire_immediately = now_ns == start_time;
2218        self.clock
2219            .borrow_mut()
2220            .set_timer_ns(
2221                &self.timer_name,
2222                interval_ns,
2223                Some(start_time),
2224                None,
2225                Some(callback),
2226                Some(true),
2227                Some(fire_immediately),
2228            )
2229            .expect("Failed to set spread quote timer");
2230    }
2231
2232    /// Called when the timer fires (live mode). Builds and sends a spread quote using the timer event timestamp.
2233    pub fn on_timer_fire(&mut self, ts_event: UnixNanos) {
2234        if self.last_quotes.len() == self.n_legs {
2235            self.build_and_send_quote(ts_event);
2236        }
2237    }
2238
2239    /// Stops the timer when in timer-driven mode.
2240    pub fn stop_timer(&mut self) {
2241        if self.update_interval_seconds.is_none() {
2242            return;
2243        }
2244
2245        if self
2246            .clock
2247            .borrow()
2248            .timer_names()
2249            .contains(&self.timer_name.as_str())
2250        {
2251            self.clock.borrow_mut().cancel_timer(&self.timer_name);
2252        }
2253    }
2254
2255    /// Handles an incoming leg quote (Cython `handle_quote_tick`).
2256    pub fn handle_quote_tick(&mut self, tick: QuoteTick) {
2257        let ts_init = tick.ts_init;
2258
2259        if self.update_interval_seconds.is_some() && self.historical_mode {
2260            self.process_historical_events(ts_init);
2261        }
2262        self.last_quotes.insert(tick.instrument_id, tick);
2263        self.has_update = true;
2264
2265        if self.update_interval_seconds.is_none() && self.last_quotes.len() == self.n_legs {
2266            self.build_and_send_quote(ts_init);
2267        }
2268    }
2269
2270    /// Flushes the deferred historical timer event, if any.
2271    ///
2272    /// This is intended for historical request finalization, where we know no more historical
2273    /// quotes will arrive for the requested range and should not require a later live tick just
2274    /// to release the final same-timestamp spread quote.
2275    pub fn flush_pending_historical_quote(&mut self) {
2276        if self.update_interval_seconds.is_none() || !self.historical_mode {
2277            return;
2278        }
2279
2280        let Some(event) = self.historical_event_at_ts_init.take() else {
2281            return;
2282        };
2283
2284        if self.last_quotes.len() == self.n_legs {
2285            self.build_and_send_quote(event.ts_event);
2286        }
2287    }
2288
2289    /// Advances the historical clock and collects timer events. Events at `ts_init` are
2290    /// deferred until the next call when time advances. The deferred event is only flushed
2291    /// when all legs have quotes and time has moved past the deferred timestamp. This
2292    /// prevents building a spread quote with stale leg data when multiple legs update at
2293    /// the same timestamp (Cython parity).
2294    fn process_historical_events(&mut self, ts_init: UnixNanos) {
2295        if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
2296            let mut clock_borrow = self.clock.borrow_mut();
2297            let test_clock = clock_borrow
2298                .as_any_mut()
2299                .downcast_mut::<TestClock>()
2300                .expect("Expected TestClock in historical mode");
2301            test_clock.set_time(ts_init);
2302            drop(clock_borrow);
2303            self.start_timer(None);
2304        }
2305
2306        if self.last_quotes.len() == self.n_legs
2307            && let Some(ref event) = self.historical_event_at_ts_init
2308            && event.ts_event < ts_init
2309        {
2310            // Guarded by `let Some(ref event)` above
2311            let event = self.historical_event_at_ts_init.take().unwrap();
2312            self.build_and_send_quote(event.ts_event);
2313        }
2314
2315        let events = {
2316            let mut clock_borrow = self.clock.borrow_mut();
2317            let test_clock = clock_borrow
2318                .as_any_mut()
2319                .downcast_mut::<TestClock>()
2320                .expect("Expected TestClock in historical mode");
2321            test_clock.advance_time(ts_init, true)
2322        };
2323
2324        for event in events {
2325            if event.ts_event == ts_init {
2326                self.historical_event_at_ts_init = Some(event);
2327            } else if self.last_quotes.len() == self.n_legs {
2328                self.build_and_send_quote(event.ts_event);
2329            }
2330        }
2331    }
2332
2333    /// Builds and sends one spread quote (Cython `_build_and_send_quote`).
2334    fn build_and_send_quote(&mut self, ts_event: UnixNanos) {
2335        if !self.has_update {
2336            return;
2337        }
2338
2339        for (idx, &leg_id) in self.leg_ids.iter().enumerate() {
2340            let Some(tick) = self.last_quotes.get(&leg_id) else {
2341                log::error!(
2342                    "SpreadQuoteAggregator[{}]: Missing quote for leg {}",
2343                    self.spread_instrument_id,
2344                    leg_id
2345                );
2346                return;
2347            };
2348            let ask_price = tick.ask_price.as_f64();
2349            let bid_price = tick.bid_price.as_f64();
2350            self.bid_prices[idx] = bid_price;
2351            self.ask_prices[idx] = ask_price;
2352            self.bid_sizes[idx] = tick.bid_size.as_f64();
2353            self.ask_sizes[idx] = tick.ask_size.as_f64();
2354
2355            if !self.is_futures_spread {
2356                self.mid_prices[idx] = (ask_price + bid_price) * 0.5;
2357                self.bid_ask_spreads[idx] = ask_price - bid_price;
2358
2359                if let Some(ref vp) = self.vega_provider
2360                    && let Some(vega) = vp.vega_for_leg(leg_id)
2361                {
2362                    self.vegas[idx] = vega;
2363                }
2364            }
2365        }
2366        let (raw_bid, raw_ask) = if self.is_futures_spread {
2367            self.create_futures_spread_prices()
2368        } else {
2369            self.create_option_spread_prices()
2370        };
2371        let spread_quote = self.create_quote_tick_from_raw_prices(raw_bid, raw_ask, ts_event);
2372        self.has_update = false;
2373        (self.handler)(spread_quote);
2374    }
2375
2376    fn create_option_spread_prices(&self) -> (f64, f64) {
2377        let vega_multipliers: Vec<f64> = (0..self.n_legs)
2378            .map(|i| {
2379                if self.vegas[i] == 0.0 {
2380                    0.0
2381                } else {
2382                    self.bid_ask_spreads[i] / self.vegas[i]
2383                }
2384            })
2385            .collect();
2386        let non_zero: Vec<f64> = vega_multipliers
2387            .iter()
2388            .copied()
2389            .filter(|&x| x != 0.0)
2390            .collect();
2391
2392        if non_zero.is_empty() {
2393            log::warn!(
2394                "No vega information available for the components of {}. Will generate spread quote using component quotes only",
2395                self.spread_instrument_id
2396            );
2397            return self.create_futures_spread_prices();
2398        }
2399        let vega_multiplier = non_zero.iter().map(|x| x.abs()).sum::<f64>() / non_zero.len() as f64;
2400        let spread_vega = self
2401            .vegas
2402            .iter()
2403            .zip(self.ratios.iter())
2404            .map(|(v, r)| v * (*r as f64))
2405            .sum::<f64>()
2406            .abs();
2407        let bid_ask_spread = spread_vega * vega_multiplier;
2408        let spread_mid_price: f64 = self
2409            .mid_prices
2410            .iter()
2411            .zip(self.ratios.iter())
2412            .map(|(m, r)| m * (*r as f64))
2413            .sum();
2414        let raw_bid = spread_mid_price - bid_ask_spread * 0.5;
2415        let raw_ask = spread_mid_price + bid_ask_spread * 0.5;
2416        (raw_bid, raw_ask)
2417    }
2418
2419    fn create_futures_spread_prices(&self) -> (f64, f64) {
2420        let mut raw_ask = 0.0_f64;
2421        let mut raw_bid = 0.0_f64;
2422
2423        for i in 0..self.n_legs {
2424            let r = self.ratios[i] as f64;
2425            if self.ratios[i] >= 0 {
2426                raw_ask += r * self.ask_prices[i];
2427                raw_bid += r * self.bid_prices[i];
2428            } else {
2429                raw_ask += r * self.bid_prices[i];
2430                raw_bid += r * self.ask_prices[i];
2431            }
2432        }
2433        (raw_bid, raw_ask)
2434    }
2435
2436    fn create_quote_tick_from_raw_prices(
2437        &self,
2438        raw_bid_price: f64,
2439        raw_ask_price: f64,
2440        ts_event: UnixNanos,
2441    ) -> QuoteTick {
2442        let (bid_price, ask_price) = if let Some(ref rounder) = self.price_rounder {
2443            rounder.round_prices(raw_bid_price, raw_ask_price, self.price_precision)
2444        } else {
2445            let bid = price_from_f64(raw_bid_price, self.price_precision);
2446            let ask = price_from_f64(raw_ask_price, self.price_precision);
2447            (bid, ask)
2448        };
2449        let mut min_bid_size = f64::INFINITY;
2450        let mut min_ask_size = f64::INFINITY;
2451        for i in 0..self.n_legs {
2452            let abs_ratio = self.ratios[i].unsigned_abs() as f64;
2453            if self.ratios[i] >= 0 {
2454                let b = self.bid_sizes[i] / abs_ratio;
2455                if b < min_bid_size {
2456                    min_bid_size = b;
2457                }
2458                let a = self.ask_sizes[i] / abs_ratio;
2459                if a < min_ask_size {
2460                    min_ask_size = a;
2461                }
2462            } else {
2463                let b = self.ask_sizes[i] / abs_ratio;
2464                if b < min_bid_size {
2465                    min_bid_size = b;
2466                }
2467                let a = self.bid_sizes[i] / abs_ratio;
2468                if a < min_ask_size {
2469                    min_ask_size = a;
2470                }
2471            }
2472        }
2473        let bid_size = Quantity::new(min_bid_size, self.size_precision);
2474        let ask_size = Quantity::new(min_ask_size, self.size_precision);
2475        QuoteTick::new(
2476            self.spread_instrument_id,
2477            bid_price,
2478            ask_price,
2479            bid_size,
2480            ask_size,
2481            ts_event,
2482            ts_event,
2483        )
2484    }
2485}
2486
2487fn price_from_f64(v: f64, precision: u8) -> Price {
2488    Price::new(v, precision)
2489}
2490
2491#[cfg(test)]
2492mod tests {
2493    use std::sync::{Arc, Mutex};
2494
2495    use nautilus_common::{clock::TestClock, timer::TimeEvent};
2496    use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
2497    use nautilus_model::{
2498        data::{BarSpecification, BarType, QuoteTick},
2499        enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
2500        identifiers::InstrumentId,
2501        instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
2502        types::{Price, Quantity},
2503    };
2504    use rstest::rstest;
2505    use ustr::Ustr;
2506
2507    use super::*;
2508
2509    #[rstest]
2510    fn test_bar_builder_initialization(equity_aapl: Equity) {
2511        let instrument = InstrumentAny::Equity(equity_aapl);
2512        let bar_type = BarType::new(
2513            instrument.id(),
2514            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2515            AggregationSource::Internal,
2516        );
2517        let builder = BarBuilder::new(
2518            bar_type,
2519            instrument.price_precision(),
2520            instrument.size_precision(),
2521        );
2522
2523        assert!(!builder.initialized);
2524        assert_eq!(builder.ts_last, 0);
2525        assert_eq!(builder.count, 0);
2526    }
2527
2528    #[rstest]
2529    fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
2530        let instrument = InstrumentAny::Equity(equity_aapl);
2531        let bar_type = BarType::new(
2532            instrument.id(),
2533            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2534            AggregationSource::Internal,
2535        );
2536        let mut builder = BarBuilder::new(
2537            bar_type,
2538            instrument.price_precision(),
2539            instrument.size_precision(),
2540        );
2541
2542        builder.update(
2543            Price::from("100.00"),
2544            Quantity::from(1),
2545            UnixNanos::from(1000),
2546        );
2547        builder.update(
2548            Price::from("95.00"),
2549            Quantity::from(1),
2550            UnixNanos::from(2000),
2551        );
2552        builder.update(
2553            Price::from("105.00"),
2554            Quantity::from(1),
2555            UnixNanos::from(3000),
2556        );
2557
2558        let bar = builder.build_now();
2559        assert!(bar.high > bar.low);
2560        assert_eq!(bar.open, Price::from("100.00"));
2561        assert_eq!(bar.high, Price::from("105.00"));
2562        assert_eq!(bar.low, Price::from("95.00"));
2563        assert_eq!(bar.close, Price::from("105.00"));
2564    }
2565
2566    #[rstest]
2567    fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
2568        let instrument = InstrumentAny::Equity(equity_aapl);
2569        let bar_type = BarType::new(
2570            instrument.id(),
2571            BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
2572            AggregationSource::Internal,
2573        );
2574        let mut builder = BarBuilder::new(
2575            bar_type,
2576            instrument.price_precision(),
2577            instrument.size_precision(),
2578        );
2579
2580        builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
2581        builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
2582
2583        assert_eq!(builder.ts_last, 1_000);
2584        assert_eq!(builder.count, 1);
2585    }
2586
2587    #[rstest]
2588    fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
2589        let instrument = InstrumentAny::Equity(equity_aapl);
2590        let bar_type = BarType::new(
2591            instrument.id(),
2592            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2593            AggregationSource::Internal,
2594        );
2595        let mut builder = BarBuilder::new(
2596            bar_type,
2597            instrument.price_precision(),
2598            instrument.size_precision(),
2599        );
2600
2601        builder.update(
2602            Price::from("1.00000"),
2603            Quantity::from(1),
2604            UnixNanos::default(),
2605        );
2606
2607        assert!(builder.initialized);
2608        assert_eq!(builder.ts_last, 0);
2609        assert_eq!(builder.count, 1);
2610    }
2611
2612    #[rstest]
2613    fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
2614        equity_aapl: Equity,
2615    ) {
2616        let instrument = InstrumentAny::Equity(equity_aapl);
2617        let bar_type = BarType::new(
2618            instrument.id(),
2619            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2620            AggregationSource::Internal,
2621        );
2622        let mut builder = BarBuilder::new(bar_type, 2, 0);
2623
2624        builder.update(
2625            Price::from("1.00000"),
2626            Quantity::from(1),
2627            UnixNanos::from(1_000),
2628        );
2629        builder.update(
2630            Price::from("1.00001"),
2631            Quantity::from(1),
2632            UnixNanos::from(500),
2633        );
2634
2635        assert!(builder.initialized);
2636        assert_eq!(builder.ts_last, 1_000);
2637        assert_eq!(builder.count, 1);
2638    }
2639
2640    #[rstest]
2641    fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
2642        let instrument = InstrumentAny::Equity(equity_aapl);
2643        let bar_type = BarType::new(
2644            instrument.id(),
2645            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2646            AggregationSource::Internal,
2647        );
2648        let mut builder = BarBuilder::new(
2649            bar_type,
2650            instrument.price_precision(),
2651            instrument.size_precision(),
2652        );
2653
2654        for _ in 0..5 {
2655            builder.update(
2656                Price::from("1.00000"),
2657                Quantity::from(1),
2658                UnixNanos::from(1_000),
2659            );
2660        }
2661
2662        assert_eq!(builder.count, 5);
2663    }
2664
2665    #[rstest]
2666    #[should_panic]
2667    fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
2668        let instrument = InstrumentAny::Equity(equity_aapl);
2669        let bar_type = BarType::new(
2670            instrument.id(),
2671            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2672            AggregationSource::Internal,
2673        );
2674        let mut builder = BarBuilder::new(
2675            bar_type,
2676            instrument.price_precision(),
2677            instrument.size_precision(),
2678        );
2679        let _ = builder.build_now();
2680    }
2681
2682    #[rstest]
2683    fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
2684        let instrument = InstrumentAny::Equity(equity_aapl);
2685        let bar_type = BarType::new(
2686            instrument.id(),
2687            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2688            AggregationSource::Internal,
2689        );
2690        let mut builder = BarBuilder::new(
2691            bar_type,
2692            instrument.price_precision(),
2693            instrument.size_precision(),
2694        );
2695
2696        builder.update(
2697            Price::from("1.00001"),
2698            Quantity::from(2),
2699            UnixNanos::default(),
2700        );
2701        builder.update(
2702            Price::from("1.00002"),
2703            Quantity::from(2),
2704            UnixNanos::default(),
2705        );
2706        builder.update(
2707            Price::from("1.00000"),
2708            Quantity::from(1),
2709            UnixNanos::from(1_000_000_000),
2710        );
2711
2712        let bar = builder.build_now();
2713
2714        assert_eq!(bar.open, Price::from("1.00001"));
2715        assert_eq!(bar.high, Price::from("1.00002"));
2716        assert_eq!(bar.low, Price::from("1.00000"));
2717        assert_eq!(bar.close, Price::from("1.00000"));
2718        assert_eq!(bar.volume, Quantity::from(5));
2719        assert_eq!(bar.ts_init, 1_000_000_000);
2720        assert_eq!(builder.ts_last, 1_000_000_000);
2721        assert_eq!(builder.count, 0);
2722    }
2723
2724    #[rstest]
2725    fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
2726        let instrument = InstrumentAny::Equity(equity_aapl);
2727        let bar_type = BarType::new(
2728            instrument.id(),
2729            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2730            AggregationSource::Internal,
2731        );
2732        let mut builder = BarBuilder::new(bar_type, 2, 0);
2733
2734        builder.update(
2735            Price::from("1.00001"),
2736            Quantity::from(1),
2737            UnixNanos::default(),
2738        );
2739        builder.build_now();
2740
2741        builder.update(
2742            Price::from("1.00000"),
2743            Quantity::from(1),
2744            UnixNanos::default(),
2745        );
2746        builder.update(
2747            Price::from("1.00003"),
2748            Quantity::from(1),
2749            UnixNanos::default(),
2750        );
2751        builder.update(
2752            Price::from("1.00002"),
2753            Quantity::from(1),
2754            UnixNanos::default(),
2755        );
2756
2757        let bar = builder.build_now();
2758
2759        assert_eq!(bar.open, Price::from("1.00000"));
2760        assert_eq!(bar.high, Price::from("1.00003"));
2761        assert_eq!(bar.low, Price::from("1.00000"));
2762        assert_eq!(bar.close, Price::from("1.00002"));
2763        assert_eq!(bar.volume, Quantity::from(3));
2764    }
2765
2766    #[rstest]
2767    fn test_bar_builder_update_bar_initializes_then_accumulates(equity_aapl: Equity) {
2768        let instrument = InstrumentAny::Equity(equity_aapl);
2769        let bar_type = BarType::new(
2770            instrument.id(),
2771            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2772            AggregationSource::Internal,
2773        );
2774        let mut builder = BarBuilder::new(
2775            bar_type,
2776            instrument.price_precision(),
2777            instrument.size_precision(),
2778        );
2779
2780        let bar_one = Bar::new(
2781            bar_type,
2782            Price::from("100.00"),
2783            Price::from("102.00"),
2784            Price::from("99.00"),
2785            Price::from("101.00"),
2786            Quantity::from(10),
2787            UnixNanos::from(1_000),
2788            UnixNanos::from(1_000),
2789        );
2790        let bar_two = Bar::new(
2791            bar_type,
2792            Price::from("101.00"),
2793            Price::from("103.00"),
2794            Price::from("98.00"),
2795            Price::from("102.00"),
2796            Quantity::from(5),
2797            UnixNanos::from(2_000),
2798            UnixNanos::from(2_000),
2799        );
2800
2801        builder.update_bar(bar_one, bar_one.volume, bar_one.ts_init);
2802        builder.update_bar(bar_two, bar_two.volume, bar_two.ts_init);
2803        let bar = builder.build_now();
2804
2805        assert_eq!(bar.open, Price::from("100.00"));
2806        assert_eq!(bar.high, Price::from("103.00"));
2807        assert_eq!(bar.low, Price::from("98.00"));
2808        assert_eq!(bar.close, Price::from("102.00"));
2809        assert_eq!(bar.volume, Quantity::from(15));
2810        assert_eq!(builder.count, 0);
2811    }
2812
2813    #[rstest]
2814    fn test_bar_builder_update_bar_ignores_earlier_timestamp(equity_aapl: Equity) {
2815        let instrument = InstrumentAny::Equity(equity_aapl);
2816        let bar_type = BarType::new(
2817            instrument.id(),
2818            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2819            AggregationSource::Internal,
2820        );
2821        let mut builder = BarBuilder::new(
2822            bar_type,
2823            instrument.price_precision(),
2824            instrument.size_precision(),
2825        );
2826
2827        let bar_later = Bar::new(
2828            bar_type,
2829            Price::from("100.00"),
2830            Price::from("101.00"),
2831            Price::from("99.00"),
2832            Price::from("100.50"),
2833            Quantity::from(10),
2834            UnixNanos::from(2_000),
2835            UnixNanos::from(2_000),
2836        );
2837        let bar_earlier = Bar::new(
2838            bar_type,
2839            Price::from("200.00"),
2840            Price::from("210.00"),
2841            Price::from("190.00"),
2842            Price::from("205.00"),
2843            Quantity::from(50),
2844            UnixNanos::from(1_000),
2845            UnixNanos::from(1_000),
2846        );
2847
2848        builder.update_bar(bar_later, bar_later.volume, bar_later.ts_init);
2849        builder.update_bar(bar_earlier, bar_earlier.volume, bar_earlier.ts_init);
2850
2851        assert_eq!(builder.ts_last, 2_000);
2852        assert_eq!(builder.count, 1);
2853        assert_eq!(builder.volume, Quantity::from(10));
2854    }
2855
2856    #[rstest]
2857    fn test_bar_builder_build_promotes_close_above_high_from_previous_close(equity_aapl: Equity) {
2858        let instrument = InstrumentAny::Equity(equity_aapl);
2859        let bar_type = BarType::new(
2860            instrument.id(),
2861            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2862            AggregationSource::Internal,
2863        );
2864        let mut builder = BarBuilder::new(bar_type, 2, 0);
2865
2866        builder.update(
2867            Price::from("110.00"),
2868            Quantity::from(1),
2869            UnixNanos::from(100),
2870        );
2871        builder.build_now();
2872
2873        builder.update(
2874            Price::from("100.00"),
2875            Quantity::from(1),
2876            UnixNanos::from(200),
2877        );
2878        builder.update(
2879            Price::from("101.00"),
2880            Quantity::from(1),
2881            UnixNanos::from(300),
2882        );
2883        builder.update(
2884            Price::from("200.00"),
2885            Quantity::from(1),
2886            UnixNanos::from(400),
2887        );
2888
2889        let bar = builder.build_now();
2890        assert_eq!(bar.open, Price::from("100.00"));
2891        assert_eq!(bar.high, Price::from("200.00"));
2892        assert_eq!(bar.low, Price::from("100.00"));
2893        assert_eq!(bar.close, Price::from("200.00"));
2894    }
2895
2896    #[rstest]
2897    fn test_bar_builder_build_clamps_low_to_close(equity_aapl: Equity) {
2898        // Rust BarBuilder mirrors Cython: on `build`, if `close < low` the low is pulled down to close.
2899        // Reaching this branch requires bypassing `update`'s low tracking (e.g. via bar updates where
2900        // a later bar's close is below the accumulated low). We simulate by direct field assignment.
2901        let instrument = InstrumentAny::Equity(equity_aapl);
2902        let bar_type = BarType::new(
2903            instrument.id(),
2904            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2905            AggregationSource::Internal,
2906        );
2907        let mut builder = BarBuilder::new(bar_type, 2, 0);
2908
2909        builder.update(
2910            Price::from("100.00"),
2911            Quantity::from(1),
2912            UnixNanos::from(100),
2913        );
2914        builder.close = Some(Price::from("50.00"));
2915
2916        let bar = builder.build_now();
2917        assert_eq!(bar.low, Price::from("50.00"));
2918        assert_eq!(bar.close, Price::from("50.00"));
2919        assert!(bar.low <= bar.open);
2920    }
2921
2922    #[rstest]
2923    fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
2924        let instrument = InstrumentAny::Equity(equity_aapl);
2925        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2926        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2927        let handler = Arc::new(Mutex::new(Vec::new()));
2928        let handler_clone = Arc::clone(&handler);
2929
2930        let mut aggregator = TickBarAggregator::new(
2931            bar_type,
2932            instrument.price_precision(),
2933            instrument.size_precision(),
2934            move |bar: Bar| {
2935                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2936                handler_guard.push(bar);
2937            },
2938        );
2939
2940        let trade = TradeTick::default();
2941        aggregator.handle_trade(trade);
2942
2943        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2944        assert_eq!(handler_guard.len(), 0);
2945    }
2946
2947    #[rstest]
2948    fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
2949        let instrument = InstrumentAny::Equity(equity_aapl);
2950        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2951        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2952        let handler = Arc::new(Mutex::new(Vec::new()));
2953        let handler_clone = Arc::clone(&handler);
2954
2955        let mut aggregator = TickBarAggregator::new(
2956            bar_type,
2957            instrument.price_precision(),
2958            instrument.size_precision(),
2959            move |bar: Bar| {
2960                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2961                handler_guard.push(bar);
2962            },
2963        );
2964
2965        let trade = TradeTick::default();
2966        aggregator.handle_trade(trade);
2967        aggregator.handle_trade(trade);
2968        aggregator.handle_trade(trade);
2969
2970        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2971        let bar = handler_guard.first().unwrap();
2972        assert_eq!(handler_guard.len(), 1);
2973        assert_eq!(bar.open, trade.price);
2974        assert_eq!(bar.high, trade.price);
2975        assert_eq!(bar.low, trade.price);
2976        assert_eq!(bar.close, trade.price);
2977        assert_eq!(bar.volume, Quantity::from(300000));
2978        assert_eq!(bar.ts_event, trade.ts_event);
2979        assert_eq!(bar.ts_init, trade.ts_init);
2980    }
2981
2982    #[rstest]
2983    fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
2984        let instrument = InstrumentAny::Equity(equity_aapl);
2985        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2986        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2987        let handler = Arc::new(Mutex::new(Vec::new()));
2988        let handler_clone = Arc::clone(&handler);
2989
2990        let mut aggregator = TickBarAggregator::new(
2991            bar_type,
2992            instrument.price_precision(),
2993            instrument.size_precision(),
2994            move |bar: Bar| {
2995                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2996                handler_guard.push(bar);
2997            },
2998        );
2999
3000        aggregator.update(
3001            Price::from("1.00001"),
3002            Quantity::from(1),
3003            UnixNanos::default(),
3004        );
3005        aggregator.update(
3006            Price::from("1.00002"),
3007            Quantity::from(1),
3008            UnixNanos::from(1000),
3009        );
3010        aggregator.update(
3011            Price::from("1.00003"),
3012            Quantity::from(1),
3013            UnixNanos::from(2000),
3014        );
3015
3016        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3017        assert_eq!(handler_guard.len(), 1);
3018
3019        let bar = handler_guard.first().unwrap();
3020        assert_eq!(bar.open, Price::from("1.00001"));
3021        assert_eq!(bar.high, Price::from("1.00003"));
3022        assert_eq!(bar.low, Price::from("1.00001"));
3023        assert_eq!(bar.close, Price::from("1.00003"));
3024        assert_eq!(bar.volume, Quantity::from(3));
3025    }
3026
3027    #[rstest]
3028    fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
3029        let instrument = InstrumentAny::Equity(equity_aapl);
3030        let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
3031        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3032        let handler = Arc::new(Mutex::new(Vec::new()));
3033        let handler_clone = Arc::clone(&handler);
3034
3035        let mut aggregator = TickBarAggregator::new(
3036            bar_type,
3037            instrument.price_precision(),
3038            instrument.size_precision(),
3039            move |bar: Bar| {
3040                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3041                handler_guard.push(bar);
3042            },
3043        );
3044
3045        aggregator.update(
3046            Price::from("1.00001"),
3047            Quantity::from(1),
3048            UnixNanos::default(),
3049        );
3050        aggregator.update(
3051            Price::from("1.00002"),
3052            Quantity::from(1),
3053            UnixNanos::from(1000),
3054        );
3055        aggregator.update(
3056            Price::from("1.00003"),
3057            Quantity::from(1),
3058            UnixNanos::from(2000),
3059        );
3060        aggregator.update(
3061            Price::from("1.00004"),
3062            Quantity::from(1),
3063            UnixNanos::from(3000),
3064        );
3065
3066        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3067        assert_eq!(handler_guard.len(), 2);
3068
3069        let bar1 = &handler_guard[0];
3070        assert_eq!(bar1.open, Price::from("1.00001"));
3071        assert_eq!(bar1.close, Price::from("1.00002"));
3072        assert_eq!(bar1.volume, Quantity::from(2));
3073
3074        let bar2 = &handler_guard[1];
3075        assert_eq!(bar2.open, Price::from("1.00003"));
3076        assert_eq!(bar2.close, Price::from("1.00004"));
3077        assert_eq!(bar2.volume, Quantity::from(2));
3078    }
3079
3080    #[rstest]
3081    fn test_tick_imbalance_bar_aggregator_emits_at_threshold(equity_aapl: Equity) {
3082        let instrument = InstrumentAny::Equity(equity_aapl);
3083        let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
3084        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3085        let handler = Arc::new(Mutex::new(Vec::new()));
3086        let handler_clone = Arc::clone(&handler);
3087
3088        let mut aggregator = TickImbalanceBarAggregator::new(
3089            bar_type,
3090            instrument.price_precision(),
3091            instrument.size_precision(),
3092            move |bar: Bar| {
3093                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3094                handler_guard.push(bar);
3095            },
3096        );
3097
3098        let trade = TradeTick::default();
3099        aggregator.handle_trade(trade);
3100        aggregator.handle_trade(trade);
3101
3102        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3103        assert_eq!(handler_guard.len(), 1);
3104        let bar = handler_guard.first().unwrap();
3105        assert_eq!(bar.volume, Quantity::from(200000));
3106    }
3107
3108    #[rstest]
3109    fn test_tick_imbalance_bar_aggregator_handles_seller_direction(equity_aapl: Equity) {
3110        let instrument = InstrumentAny::Equity(equity_aapl);
3111        let bar_spec = BarSpecification::new(1, BarAggregation::TickImbalance, PriceType::Last);
3112        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3113        let handler = Arc::new(Mutex::new(Vec::new()));
3114        let handler_clone = Arc::clone(&handler);
3115
3116        let mut aggregator = TickImbalanceBarAggregator::new(
3117            bar_type,
3118            instrument.price_precision(),
3119            instrument.size_precision(),
3120            move |bar: Bar| {
3121                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3122                handler_guard.push(bar);
3123            },
3124        );
3125
3126        let sell = TradeTick {
3127            aggressor_side: AggressorSide::Seller,
3128            ..TradeTick::default()
3129        };
3130
3131        aggregator.handle_trade(sell);
3132
3133        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3134        assert_eq!(handler_guard.len(), 1);
3135    }
3136
3137    #[rstest]
3138    fn test_tick_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
3139        let instrument = InstrumentAny::Equity(equity_aapl);
3140        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3141        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3142        let handler = Arc::new(Mutex::new(Vec::new()));
3143        let handler_clone = Arc::clone(&handler);
3144
3145        let mut aggregator = TickRunsBarAggregator::new(
3146            bar_type,
3147            instrument.price_precision(),
3148            instrument.size_precision(),
3149            move |bar: Bar| {
3150                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3151                handler_guard.push(bar);
3152            },
3153        );
3154
3155        let buy = TradeTick::default();
3156        let sell = TradeTick {
3157            aggressor_side: AggressorSide::Seller,
3158            ..buy
3159        };
3160
3161        aggregator.handle_trade(buy);
3162        aggregator.handle_trade(buy);
3163        aggregator.handle_trade(sell);
3164        aggregator.handle_trade(sell);
3165
3166        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3167        assert_eq!(handler_guard.len(), 2);
3168    }
3169
3170    #[rstest]
3171    fn test_tick_runs_bar_aggregator_volume_conservation(equity_aapl: Equity) {
3172        let instrument = InstrumentAny::Equity(equity_aapl);
3173        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3174        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3175        let handler = Arc::new(Mutex::new(Vec::new()));
3176        let handler_clone = Arc::clone(&handler);
3177
3178        let mut aggregator = TickRunsBarAggregator::new(
3179            bar_type,
3180            instrument.price_precision(),
3181            instrument.size_precision(),
3182            move |bar: Bar| {
3183                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3184                handler_guard.push(bar);
3185            },
3186        );
3187
3188        let buy = TradeTick {
3189            size: Quantity::from(1),
3190            ..TradeTick::default()
3191        };
3192        let sell = TradeTick {
3193            aggressor_side: AggressorSide::Seller,
3194            size: Quantity::from(1),
3195            ..buy
3196        };
3197
3198        aggregator.handle_trade(buy);
3199        aggregator.handle_trade(buy);
3200        aggregator.handle_trade(sell);
3201        aggregator.handle_trade(sell);
3202
3203        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3204        assert_eq!(handler_guard.len(), 2);
3205        assert_eq!(handler_guard[0].volume, Quantity::from(2));
3206        assert_eq!(handler_guard[1].volume, Quantity::from(2));
3207    }
3208
3209    #[rstest]
3210    fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
3211        let instrument = InstrumentAny::Equity(equity_aapl);
3212        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
3213        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3214        let handler = Arc::new(Mutex::new(Vec::new()));
3215        let handler_clone = Arc::clone(&handler);
3216
3217        let mut aggregator = VolumeBarAggregator::new(
3218            bar_type,
3219            instrument.price_precision(),
3220            instrument.size_precision(),
3221            move |bar: Bar| {
3222                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3223                handler_guard.push(bar);
3224            },
3225        );
3226
3227        aggregator.update(
3228            Price::from("1.00001"),
3229            Quantity::from(25),
3230            UnixNanos::default(),
3231        );
3232
3233        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3234        assert_eq!(handler_guard.len(), 2);
3235        let bar1 = &handler_guard[0];
3236        assert_eq!(bar1.volume, Quantity::from(10));
3237        let bar2 = &handler_guard[1];
3238        assert_eq!(bar2.volume, Quantity::from(10));
3239    }
3240
3241    #[rstest]
3242    fn test_volume_bar_aggregator_zero_size_update_is_noop(equity_aapl: Equity) {
3243        let instrument = InstrumentAny::Equity(equity_aapl);
3244        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
3245        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3246        let handler = Arc::new(Mutex::new(Vec::new()));
3247        let handler_clone = Arc::clone(&handler);
3248
3249        let mut aggregator = VolumeBarAggregator::new(
3250            bar_type,
3251            instrument.price_precision(),
3252            instrument.size_precision(),
3253            move |bar: Bar| {
3254                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3255                handler_guard.push(bar);
3256            },
3257        );
3258
3259        aggregator.update(
3260            Price::from("100.00"),
3261            Quantity::from(0),
3262            UnixNanos::default(),
3263        );
3264
3265        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3266        assert_eq!(handler_guard.len(), 0);
3267    }
3268
3269    #[rstest]
3270    fn test_volume_bar_aggregator_exact_threshold_emits_single_bar(equity_aapl: Equity) {
3271        let instrument = InstrumentAny::Equity(equity_aapl);
3272        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
3273        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3274        let handler = Arc::new(Mutex::new(Vec::new()));
3275        let handler_clone = Arc::clone(&handler);
3276
3277        let mut aggregator = VolumeBarAggregator::new(
3278            bar_type,
3279            instrument.price_precision(),
3280            instrument.size_precision(),
3281            move |bar: Bar| {
3282                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3283                handler_guard.push(bar);
3284            },
3285        );
3286
3287        aggregator.update(
3288            Price::from("100.00"),
3289            Quantity::from(7),
3290            UnixNanos::from(1_000),
3291        );
3292        aggregator.update(
3293            Price::from("101.00"),
3294            Quantity::from(3),
3295            UnixNanos::from(2_000),
3296        );
3297
3298        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3299        assert_eq!(handler_guard.len(), 1);
3300        assert_eq!(handler_guard[0].volume, Quantity::from(10));
3301        assert_eq!(handler_guard[0].close, Price::from("101.00"));
3302    }
3303
3304    #[rstest]
3305    fn test_volume_bar_aggregator_step_of_one_emits_per_unit(equity_aapl: Equity) {
3306        let instrument = InstrumentAny::Equity(equity_aapl);
3307        let bar_spec = BarSpecification::new(1, BarAggregation::Volume, PriceType::Last);
3308        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3309        let handler = Arc::new(Mutex::new(Vec::new()));
3310        let handler_clone = Arc::clone(&handler);
3311
3312        let mut aggregator = VolumeBarAggregator::new(
3313            bar_type,
3314            instrument.price_precision(),
3315            instrument.size_precision(),
3316            move |bar: Bar| {
3317                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3318                handler_guard.push(bar);
3319            },
3320        );
3321
3322        aggregator.update(
3323            Price::from("100.00"),
3324            Quantity::from(1),
3325            UnixNanos::default(),
3326        );
3327
3328        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3329        assert_eq!(handler_guard.len(), 1);
3330        assert_eq!(handler_guard[0].volume, Quantity::from(1));
3331    }
3332
3333    #[rstest]
3334    fn test_volume_runs_bar_aggregator_side_change_resets(equity_aapl: Equity) {
3335        let instrument = InstrumentAny::Equity(equity_aapl);
3336        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
3337        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3338        let handler = Arc::new(Mutex::new(Vec::new()));
3339        let handler_clone = Arc::clone(&handler);
3340
3341        let mut aggregator = VolumeRunsBarAggregator::new(
3342            bar_type,
3343            instrument.price_precision(),
3344            instrument.size_precision(),
3345            move |bar: Bar| {
3346                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3347                handler_guard.push(bar);
3348            },
3349        );
3350
3351        let buy = TradeTick {
3352            instrument_id: instrument.id(),
3353            price: Price::from("1.0"),
3354            size: Quantity::from(1),
3355            ..TradeTick::default()
3356        };
3357        let sell = TradeTick {
3358            aggressor_side: AggressorSide::Seller,
3359            ..buy
3360        };
3361
3362        aggregator.handle_trade(buy);
3363        aggregator.handle_trade(buy); // emit first bar at 2
3364        aggregator.handle_trade(sell);
3365        aggregator.handle_trade(sell); // emit second bar at 2 sell-side
3366
3367        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3368        assert!(handler_guard.len() >= 2);
3369        assert!(
3370            (handler_guard[0].volume.as_f64() - handler_guard[1].volume.as_f64()).abs()
3371                < f64::EPSILON
3372        );
3373    }
3374
3375    #[rstest]
3376    fn test_volume_runs_bar_aggregator_handles_large_single_trade(equity_aapl: Equity) {
3377        let instrument = InstrumentAny::Equity(equity_aapl);
3378        let bar_spec = BarSpecification::new(3, BarAggregation::VolumeRuns, PriceType::Last);
3379        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3380        let handler = Arc::new(Mutex::new(Vec::new()));
3381        let handler_clone = Arc::clone(&handler);
3382
3383        let mut aggregator = VolumeRunsBarAggregator::new(
3384            bar_type,
3385            instrument.price_precision(),
3386            instrument.size_precision(),
3387            move |bar: Bar| {
3388                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3389                handler_guard.push(bar);
3390            },
3391        );
3392
3393        let trade = TradeTick {
3394            instrument_id: instrument.id(),
3395            price: Price::from("1.0"),
3396            size: Quantity::from(5),
3397            ..TradeTick::default()
3398        };
3399
3400        aggregator.handle_trade(trade);
3401
3402        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3403        assert!(!handler_guard.is_empty());
3404        assert!(handler_guard[0].volume.as_f64() > 0.0);
3405        assert!(handler_guard[0].volume.as_f64() < trade.size.as_f64());
3406    }
3407
3408    #[rstest]
3409    fn test_volume_imbalance_bar_aggregator_splits_large_trade(equity_aapl: Equity) {
3410        let instrument = InstrumentAny::Equity(equity_aapl);
3411        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeImbalance, PriceType::Last);
3412        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3413        let handler = Arc::new(Mutex::new(Vec::new()));
3414        let handler_clone = Arc::clone(&handler);
3415
3416        let mut aggregator = VolumeImbalanceBarAggregator::new(
3417            bar_type,
3418            instrument.price_precision(),
3419            instrument.size_precision(),
3420            move |bar: Bar| {
3421                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3422                handler_guard.push(bar);
3423            },
3424        );
3425
3426        let trade_small = TradeTick {
3427            instrument_id: instrument.id(),
3428            price: Price::from("1.0"),
3429            size: Quantity::from(1),
3430            ..TradeTick::default()
3431        };
3432        let trade_large = TradeTick {
3433            size: Quantity::from(3),
3434            ..trade_small
3435        };
3436
3437        aggregator.handle_trade(trade_small);
3438        aggregator.handle_trade(trade_large);
3439
3440        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3441        assert_eq!(handler_guard.len(), 2);
3442        let total_output = handler_guard
3443            .iter()
3444            .map(|bar| bar.volume.as_f64())
3445            .sum::<f64>();
3446        let total_input = trade_small.size.as_f64() + trade_large.size.as_f64();
3447        assert!((total_output - total_input).abs() < f64::EPSILON);
3448    }
3449
3450    #[rstest]
3451    fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
3452        let instrument = InstrumentAny::Equity(equity_aapl);
3453        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); // $1000 value step
3454        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3455        let handler = Arc::new(Mutex::new(Vec::new()));
3456        let handler_clone = Arc::clone(&handler);
3457
3458        let mut aggregator = ValueBarAggregator::new(
3459            bar_type,
3460            instrument.price_precision(),
3461            instrument.size_precision(),
3462            move |bar: Bar| {
3463                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3464                handler_guard.push(bar);
3465            },
3466        );
3467
3468        // Updates to reach value threshold: 100 * 5 + 100 * 5 = $1000
3469        aggregator.update(
3470            Price::from("100.00"),
3471            Quantity::from(5),
3472            UnixNanos::default(),
3473        );
3474        aggregator.update(
3475            Price::from("100.00"),
3476            Quantity::from(5),
3477            UnixNanos::from(1000),
3478        );
3479
3480        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3481        assert_eq!(handler_guard.len(), 1);
3482        let bar = handler_guard.first().unwrap();
3483        assert_eq!(bar.volume, Quantity::from(10));
3484    }
3485
3486    #[rstest]
3487    fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
3488        let instrument = InstrumentAny::Equity(equity_aapl);
3489        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3490        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3491        let handler = Arc::new(Mutex::new(Vec::new()));
3492        let handler_clone = Arc::clone(&handler);
3493
3494        let mut aggregator = ValueBarAggregator::new(
3495            bar_type,
3496            instrument.price_precision(),
3497            instrument.size_precision(),
3498            move |bar: Bar| {
3499                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3500                handler_guard.push(bar);
3501            },
3502        );
3503
3504        // Single large update: $100 * 25 = $2500 (should create 2 bars)
3505        aggregator.update(
3506            Price::from("100.00"),
3507            Quantity::from(25),
3508            UnixNanos::default(),
3509        );
3510
3511        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3512        assert_eq!(handler_guard.len(), 2);
3513        let remaining_value = aggregator.get_cumulative_value();
3514        assert!(remaining_value < 1000.0); // Should be less than threshold
3515    }
3516
3517    #[rstest]
3518    fn test_value_bar_aggregator_handles_zero_price(equity_aapl: Equity) {
3519        let instrument = InstrumentAny::Equity(equity_aapl);
3520        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3521        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3522        let handler = Arc::new(Mutex::new(Vec::new()));
3523        let handler_clone = Arc::clone(&handler);
3524
3525        let mut aggregator = ValueBarAggregator::new(
3526            bar_type,
3527            instrument.price_precision(),
3528            instrument.size_precision(),
3529            move |bar: Bar| {
3530                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3531                handler_guard.push(bar);
3532            },
3533        );
3534
3535        // Update with zero price should not cause division by zero
3536        aggregator.update(
3537            Price::from("0.00"),
3538            Quantity::from(100),
3539            UnixNanos::default(),
3540        );
3541
3542        // No bars should be emitted since value is zero
3543        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3544        assert_eq!(handler_guard.len(), 0);
3545
3546        // Cumulative value should remain zero
3547        assert_eq!(aggregator.get_cumulative_value(), 0.0);
3548    }
3549
3550    #[rstest]
3551    fn test_value_bar_aggregator_handles_zero_size(equity_aapl: Equity) {
3552        let instrument = InstrumentAny::Equity(equity_aapl);
3553        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3554        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3555        let handler = Arc::new(Mutex::new(Vec::new()));
3556        let handler_clone = Arc::clone(&handler);
3557
3558        let mut aggregator = ValueBarAggregator::new(
3559            bar_type,
3560            instrument.price_precision(),
3561            instrument.size_precision(),
3562            move |bar: Bar| {
3563                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3564                handler_guard.push(bar);
3565            },
3566        );
3567
3568        // Update with zero size should not cause issues
3569        aggregator.update(
3570            Price::from("100.00"),
3571            Quantity::from(0),
3572            UnixNanos::default(),
3573        );
3574
3575        // No bars should be emitted
3576        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3577        assert_eq!(handler_guard.len(), 0);
3578
3579        // Cumulative value should remain zero
3580        assert_eq!(aggregator.get_cumulative_value(), 0.0);
3581    }
3582
3583    #[rstest]
3584    fn test_value_bar_aggregator_exact_threshold_emits_one_bar(equity_aapl: Equity) {
3585        let instrument = InstrumentAny::Equity(equity_aapl);
3586        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3587        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3588        let handler = Arc::new(Mutex::new(Vec::new()));
3589        let handler_clone = Arc::clone(&handler);
3590
3591        let mut aggregator = ValueBarAggregator::new(
3592            bar_type,
3593            instrument.price_precision(),
3594            instrument.size_precision(),
3595            move |bar: Bar| {
3596                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3597                handler_guard.push(bar);
3598            },
3599        );
3600
3601        aggregator.update(
3602            Price::from("100.00"),
3603            Quantity::from(5),
3604            UnixNanos::from(1_000),
3605        );
3606        aggregator.update(
3607            Price::from("100.00"),
3608            Quantity::from(5),
3609            UnixNanos::from(2_000),
3610        );
3611
3612        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3613        assert_eq!(handler_guard.len(), 1);
3614        assert_eq!(handler_guard[0].volume, Quantity::from(10));
3615        assert_eq!(aggregator.get_cumulative_value(), 0.0);
3616    }
3617
3618    #[rstest]
3619    fn test_value_bar_aggregator_precision_boundary_min_size_clamp(equity_aapl: Equity) {
3620        // step=100, price=100 per-unit value=100 with size_precision=0 lands the divided
3621        // size_chunk at the precision floor. Verifies the min-size clamp branch in update()
3622        // emits one bar per unit rather than looping on zero-volume chunks.
3623        let instrument = InstrumentAny::Equity(equity_aapl);
3624        let bar_spec = BarSpecification::new(100, BarAggregation::Value, PriceType::Last);
3625        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3626        let handler = Arc::new(Mutex::new(Vec::new()));
3627        let handler_clone = Arc::clone(&handler);
3628
3629        let mut aggregator = ValueBarAggregator::new(
3630            bar_type,
3631            instrument.price_precision(),
3632            instrument.size_precision(),
3633            move |bar: Bar| {
3634                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3635                handler_guard.push(bar);
3636            },
3637        );
3638
3639        // 4 units at $100 = $400 value, with step $100 gives 4 bars exactly.
3640        aggregator.update(
3641            Price::from("100.00"),
3642            Quantity::from(4),
3643            UnixNanos::default(),
3644        );
3645
3646        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3647        assert_eq!(handler_guard.len(), 4);
3648        for bar in handler_guard.iter() {
3649            assert_eq!(bar.volume, Quantity::from(1));
3650        }
3651    }
3652
3653    #[rstest]
3654    fn test_value_imbalance_bar_aggregator_emits_on_opposing_overflow(equity_aapl: Equity) {
3655        let instrument = InstrumentAny::Equity(equity_aapl);
3656        let bar_spec = BarSpecification::new(10, BarAggregation::ValueImbalance, PriceType::Last);
3657        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3658        let handler = Arc::new(Mutex::new(Vec::new()));
3659        let handler_clone = Arc::clone(&handler);
3660
3661        let mut aggregator = ValueImbalanceBarAggregator::new(
3662            bar_type,
3663            instrument.price_precision(),
3664            instrument.size_precision(),
3665            move |bar: Bar| {
3666                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3667                handler_guard.push(bar);
3668            },
3669        );
3670
3671        let buy = TradeTick {
3672            price: Price::from("5.0"),
3673            size: Quantity::from(2), // value 10, should emit one bar
3674            instrument_id: instrument.id(),
3675            ..TradeTick::default()
3676        };
3677        let sell = TradeTick {
3678            price: Price::from("5.0"),
3679            size: Quantity::from(2), // value 10, should emit another bar
3680            aggressor_side: AggressorSide::Seller,
3681            instrument_id: instrument.id(),
3682            ..buy
3683        };
3684
3685        aggregator.handle_trade(buy);
3686        aggregator.handle_trade(sell);
3687
3688        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3689        assert_eq!(handler_guard.len(), 2);
3690    }
3691
3692    #[rstest]
3693    fn test_value_runs_bar_aggregator_emits_on_consecutive_side(equity_aapl: Equity) {
3694        let instrument = InstrumentAny::Equity(equity_aapl);
3695        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
3696        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3697        let handler = Arc::new(Mutex::new(Vec::new()));
3698        let handler_clone = Arc::clone(&handler);
3699
3700        let mut aggregator = ValueRunsBarAggregator::new(
3701            bar_type,
3702            instrument.price_precision(),
3703            instrument.size_precision(),
3704            move |bar: Bar| {
3705                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3706                handler_guard.push(bar);
3707            },
3708        );
3709
3710        let trade = TradeTick {
3711            price: Price::from("10.0"),
3712            size: Quantity::from(5),
3713            instrument_id: instrument.id(),
3714            ..TradeTick::default()
3715        };
3716
3717        aggregator.handle_trade(trade);
3718        aggregator.handle_trade(trade);
3719
3720        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3721        assert_eq!(handler_guard.len(), 1);
3722        let bar = handler_guard.first().unwrap();
3723        assert_eq!(bar.volume, Quantity::from(10));
3724    }
3725
3726    #[rstest]
3727    fn test_value_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
3728        let instrument = InstrumentAny::Equity(equity_aapl);
3729        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
3730        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3731        let handler = Arc::new(Mutex::new(Vec::new()));
3732        let handler_clone = Arc::clone(&handler);
3733
3734        let mut aggregator = ValueRunsBarAggregator::new(
3735            bar_type,
3736            instrument.price_precision(),
3737            instrument.size_precision(),
3738            move |bar: Bar| {
3739                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3740                handler_guard.push(bar);
3741            },
3742        );
3743
3744        let buy = TradeTick {
3745            price: Price::from("10.0"),
3746            size: Quantity::from(5),
3747            instrument_id: instrument.id(),
3748            ..TradeTick::default()
3749        }; // value 50
3750        let sell = TradeTick {
3751            price: Price::from("10.0"),
3752            size: Quantity::from(10),
3753            aggressor_side: AggressorSide::Seller,
3754            ..buy
3755        }; // value 100
3756
3757        aggregator.handle_trade(buy);
3758        aggregator.handle_trade(sell);
3759
3760        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3761        assert_eq!(handler_guard.len(), 1);
3762        assert_eq!(handler_guard[0].volume, Quantity::from(10));
3763    }
3764
3765    #[rstest]
3766    fn test_tick_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
3767        let instrument = InstrumentAny::Equity(equity_aapl);
3768        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3769        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3770        let handler = Arc::new(Mutex::new(Vec::new()));
3771        let handler_clone = Arc::clone(&handler);
3772
3773        let mut aggregator = TickRunsBarAggregator::new(
3774            bar_type,
3775            instrument.price_precision(),
3776            instrument.size_precision(),
3777            move |bar: Bar| {
3778                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3779                handler_guard.push(bar);
3780            },
3781        );
3782
3783        let buy = TradeTick::default();
3784
3785        aggregator.handle_trade(buy);
3786        aggregator.handle_trade(buy); // Emit bar 1 (run complete)
3787        aggregator.handle_trade(buy); // Start new run
3788        aggregator.handle_trade(buy); // Emit bar 2 (new run complete)
3789
3790        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3791        assert_eq!(handler_guard.len(), 2);
3792    }
3793
3794    #[rstest]
3795    fn test_tick_runs_bar_aggregator_handles_no_aggressor_trades(equity_aapl: Equity) {
3796        let instrument = InstrumentAny::Equity(equity_aapl);
3797        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3798        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3799        let handler = Arc::new(Mutex::new(Vec::new()));
3800        let handler_clone = Arc::clone(&handler);
3801
3802        let mut aggregator = TickRunsBarAggregator::new(
3803            bar_type,
3804            instrument.price_precision(),
3805            instrument.size_precision(),
3806            move |bar: Bar| {
3807                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3808                handler_guard.push(bar);
3809            },
3810        );
3811
3812        let buy = TradeTick::default();
3813        let no_aggressor = TradeTick {
3814            aggressor_side: AggressorSide::NoAggressor,
3815            ..buy
3816        };
3817
3818        aggregator.handle_trade(buy);
3819        aggregator.handle_trade(no_aggressor); // Should not affect run count
3820        aggregator.handle_trade(no_aggressor); // Should not affect run count
3821        aggregator.handle_trade(buy); // Continue run to threshold
3822
3823        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3824        assert_eq!(handler_guard.len(), 1);
3825    }
3826
3827    #[rstest]
3828    fn test_volume_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
3829        let instrument = InstrumentAny::Equity(equity_aapl);
3830        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
3831        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3832        let handler = Arc::new(Mutex::new(Vec::new()));
3833        let handler_clone = Arc::clone(&handler);
3834
3835        let mut aggregator = VolumeRunsBarAggregator::new(
3836            bar_type,
3837            instrument.price_precision(),
3838            instrument.size_precision(),
3839            move |bar: Bar| {
3840                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3841                handler_guard.push(bar);
3842            },
3843        );
3844
3845        let buy = TradeTick {
3846            instrument_id: instrument.id(),
3847            price: Price::from("1.0"),
3848            size: Quantity::from(1),
3849            ..TradeTick::default()
3850        };
3851
3852        aggregator.handle_trade(buy);
3853        aggregator.handle_trade(buy); // Emit bar 1 (2.0 volume reached)
3854        aggregator.handle_trade(buy); // Start new run
3855        aggregator.handle_trade(buy); // Emit bar 2 (new 2.0 volume reached)
3856
3857        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3858        assert_eq!(handler_guard.len(), 2);
3859        assert_eq!(handler_guard[0].volume, Quantity::from(2));
3860        assert_eq!(handler_guard[1].volume, Quantity::from(2));
3861    }
3862
3863    #[rstest]
3864    fn test_value_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
3865        let instrument = InstrumentAny::Equity(equity_aapl);
3866        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
3867        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3868        let handler = Arc::new(Mutex::new(Vec::new()));
3869        let handler_clone = Arc::clone(&handler);
3870
3871        let mut aggregator = ValueRunsBarAggregator::new(
3872            bar_type,
3873            instrument.price_precision(),
3874            instrument.size_precision(),
3875            move |bar: Bar| {
3876                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3877                handler_guard.push(bar);
3878            },
3879        );
3880
3881        let buy = TradeTick {
3882            instrument_id: instrument.id(),
3883            price: Price::from("10.0"),
3884            size: Quantity::from(5),
3885            ..TradeTick::default()
3886        }; // value 50 per trade
3887
3888        aggregator.handle_trade(buy);
3889        aggregator.handle_trade(buy); // Emit bar 1 (100 value reached)
3890        aggregator.handle_trade(buy); // Start new run
3891        aggregator.handle_trade(buy); // Emit bar 2 (new 100 value reached)
3892
3893        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3894        assert_eq!(handler_guard.len(), 2);
3895        assert_eq!(handler_guard[0].volume, Quantity::from(10));
3896        assert_eq!(handler_guard[1].volume, Quantity::from(10));
3897    }
3898
3899    #[rstest]
3900    fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
3901        let instrument = InstrumentAny::Equity(equity_aapl);
3902        // One second bars
3903        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3904        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3905        let handler = Arc::new(Mutex::new(Vec::new()));
3906        let handler_clone = Arc::clone(&handler);
3907        let clock = Rc::new(RefCell::new(TestClock::new()));
3908
3909        let mut aggregator = TimeBarAggregator::new(
3910            bar_type,
3911            instrument.price_precision(),
3912            instrument.size_precision(),
3913            clock.clone(),
3914            move |bar: Bar| {
3915                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3916                handler_guard.push(bar);
3917            },
3918            true,  // build_with_no_updates
3919            false, // timestamp_on_close
3920            BarIntervalType::LeftOpen,
3921            None,  // time_bars_origin_offset
3922            15,    // bar_build_delay
3923            false, // skip_first_non_full_bar
3924        );
3925
3926        aggregator.update(
3927            Price::from("100.00"),
3928            Quantity::from(1),
3929            UnixNanos::default(),
3930        );
3931
3932        let next_sec = UnixNanos::from(1_000_000_000);
3933        clock.borrow_mut().set_time(next_sec);
3934
3935        let event = TimeEvent::new(
3936            Ustr::from("1-SECOND-LAST"),
3937            UUID4::new(),
3938            next_sec,
3939            next_sec,
3940        );
3941        aggregator.build_bar(&event);
3942
3943        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3944        assert_eq!(handler_guard.len(), 1);
3945        let bar = handler_guard.first().unwrap();
3946        assert_eq!(bar.ts_event, UnixNanos::default());
3947        assert_eq!(bar.ts_init, next_sec);
3948    }
3949
3950    #[rstest]
3951    fn test_time_bar_aggregator_stop_clears_timer_and_allows_restart(equity_aapl: Equity) {
3952        let instrument = InstrumentAny::Equity(equity_aapl);
3953        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3954        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3955        let timer_name = format!("TIME_BAR_{bar_type}");
3956        let clock = Rc::new(RefCell::new(TestClock::new()));
3957
3958        let aggregator = TimeBarAggregator::new(
3959            bar_type,
3960            instrument.price_precision(),
3961            instrument.size_precision(),
3962            clock.clone(),
3963            |_bar: Bar| {},
3964            true,
3965            false,
3966            BarIntervalType::LeftOpen,
3967            None,
3968            15,
3969            false,
3970        );
3971
3972        let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
3973        let rc = Rc::new(RefCell::new(boxed));
3974
3975        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
3976        assert_eq!(clock.borrow().timer_names(), vec![timer_name.as_str()]);
3977
3978        rc.borrow_mut().stop();
3979        assert!(clock.borrow().timer_names().is_empty());
3980
3981        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
3982        assert_eq!(clock.borrow().timer_names(), vec![timer_name.as_str()]);
3983    }
3984
3985    #[rstest]
3986    fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
3987        let instrument = InstrumentAny::Equity(equity_aapl);
3988        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3989        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3990        let handler = Arc::new(Mutex::new(Vec::new()));
3991        let handler_clone = Arc::clone(&handler);
3992        let clock = Rc::new(RefCell::new(TestClock::new()));
3993
3994        let mut aggregator = TimeBarAggregator::new(
3995            bar_type,
3996            instrument.price_precision(),
3997            instrument.size_precision(),
3998            clock.clone(),
3999            move |bar: Bar| {
4000                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4001                handler_guard.push(bar);
4002            },
4003            true, // build_with_no_updates
4004            true, // timestamp_on_close - changed to true to verify left-open behavior
4005            BarIntervalType::LeftOpen,
4006            None,
4007            15,
4008            false, // skip_first_non_full_bar
4009        );
4010
4011        // Update in first interval
4012        aggregator.update(
4013            Price::from("100.00"),
4014            Quantity::from(1),
4015            UnixNanos::default(),
4016        );
4017
4018        // First interval close
4019        let ts1 = UnixNanos::from(1_000_000_000);
4020        clock.borrow_mut().set_time(ts1);
4021        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4022        aggregator.build_bar(&event);
4023
4024        // Update in second interval
4025        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
4026
4027        // Second interval close
4028        let ts2 = UnixNanos::from(2_000_000_000);
4029        clock.borrow_mut().set_time(ts2);
4030        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4031        aggregator.build_bar(&event);
4032
4033        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4034        assert_eq!(handler_guard.len(), 2);
4035
4036        let bar1 = &handler_guard[0];
4037        assert_eq!(bar1.ts_event, ts1); // For left-open with timestamp_on_close=true
4038        assert_eq!(bar1.ts_init, ts1);
4039        assert_eq!(bar1.close, Price::from("100.00"));
4040        let bar2 = &handler_guard[1];
4041        assert_eq!(bar2.ts_event, ts2);
4042        assert_eq!(bar2.ts_init, ts2);
4043        assert_eq!(bar2.close, Price::from("101.00"));
4044    }
4045
4046    #[rstest]
4047    fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
4048        let instrument = InstrumentAny::Equity(equity_aapl);
4049        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4050        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4051        let handler = Arc::new(Mutex::new(Vec::new()));
4052        let handler_clone = Arc::clone(&handler);
4053        let clock = Rc::new(RefCell::new(TestClock::new()));
4054        let mut aggregator = TimeBarAggregator::new(
4055            bar_type,
4056            instrument.price_precision(),
4057            instrument.size_precision(),
4058            clock.clone(),
4059            move |bar: Bar| {
4060                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4061                handler_guard.push(bar);
4062            },
4063            true, // build_with_no_updates
4064            true, // timestamp_on_close
4065            BarIntervalType::RightOpen,
4066            None,
4067            15,
4068            false, // skip_first_non_full_bar
4069        );
4070
4071        // Update in first interval
4072        aggregator.update(
4073            Price::from("100.00"),
4074            Quantity::from(1),
4075            UnixNanos::default(),
4076        );
4077
4078        // First interval close
4079        let ts1 = UnixNanos::from(1_000_000_000);
4080        clock.borrow_mut().set_time(ts1);
4081        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4082        aggregator.build_bar(&event);
4083
4084        // Update in second interval
4085        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
4086
4087        // Second interval close
4088        let ts2 = UnixNanos::from(2_000_000_000);
4089        clock.borrow_mut().set_time(ts2);
4090        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4091        aggregator.build_bar(&event);
4092
4093        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4094        assert_eq!(handler_guard.len(), 2);
4095
4096        let bar1 = &handler_guard[0];
4097        assert_eq!(bar1.ts_event, UnixNanos::default()); // Right-open interval starts inclusive
4098        assert_eq!(bar1.ts_init, ts1);
4099        assert_eq!(bar1.close, Price::from("100.00"));
4100
4101        let bar2 = &handler_guard[1];
4102        assert_eq!(bar2.ts_event, ts1);
4103        assert_eq!(bar2.ts_init, ts2);
4104        assert_eq!(bar2.close, Price::from("101.00"));
4105    }
4106
4107    #[rstest]
4108    fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
4109        let instrument = InstrumentAny::Equity(equity_aapl);
4110        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4111        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4112        let handler = Arc::new(Mutex::new(Vec::new()));
4113        let handler_clone = Arc::clone(&handler);
4114        let clock = Rc::new(RefCell::new(TestClock::new()));
4115
4116        // First test with build_with_no_updates = false
4117        let mut aggregator = TimeBarAggregator::new(
4118            bar_type,
4119            instrument.price_precision(),
4120            instrument.size_precision(),
4121            clock.clone(),
4122            move |bar: Bar| {
4123                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4124                handler_guard.push(bar);
4125            },
4126            false, // build_with_no_updates disabled
4127            true,  // timestamp_on_close
4128            BarIntervalType::LeftOpen,
4129            None,
4130            15,
4131            false, // skip_first_non_full_bar
4132        );
4133
4134        // No updates, just interval close
4135        let ts1 = UnixNanos::from(1_000_000_000);
4136        clock.borrow_mut().set_time(ts1);
4137        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4138        aggregator.build_bar(&event);
4139
4140        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4141        assert_eq!(handler_guard.len(), 0); // No bar should be built without updates
4142        drop(handler_guard);
4143
4144        // Now test with build_with_no_updates = true
4145        let handler = Arc::new(Mutex::new(Vec::new()));
4146        let handler_clone = Arc::clone(&handler);
4147        let mut aggregator = TimeBarAggregator::new(
4148            bar_type,
4149            instrument.price_precision(),
4150            instrument.size_precision(),
4151            clock.clone(),
4152            move |bar: Bar| {
4153                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4154                handler_guard.push(bar);
4155            },
4156            true, // build_with_no_updates enabled
4157            true, // timestamp_on_close
4158            BarIntervalType::LeftOpen,
4159            None,
4160            15,
4161            false, // skip_first_non_full_bar
4162        );
4163
4164        aggregator.update(
4165            Price::from("100.00"),
4166            Quantity::from(1),
4167            UnixNanos::default(),
4168        );
4169
4170        // First interval with update
4171        let ts1 = UnixNanos::from(1_000_000_000);
4172        clock.borrow_mut().set_time(ts1);
4173        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4174        aggregator.build_bar(&event);
4175
4176        // Second interval without updates
4177        let ts2 = UnixNanos::from(2_000_000_000);
4178        clock.borrow_mut().set_time(ts2);
4179        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4180        aggregator.build_bar(&event);
4181
4182        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4183        assert_eq!(handler_guard.len(), 2); // Both bars should be built
4184        let bar1 = &handler_guard[0];
4185        assert_eq!(bar1.close, Price::from("100.00"));
4186        let bar2 = &handler_guard[1];
4187        assert_eq!(bar2.close, Price::from("100.00")); // Should use last close
4188    }
4189
4190    #[rstest]
4191    fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
4192        let instrument = InstrumentAny::Equity(equity_aapl);
4193        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4194        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4195        let clock = Rc::new(RefCell::new(TestClock::new()));
4196        let handler = Arc::new(Mutex::new(Vec::new()));
4197        let handler_clone = Arc::clone(&handler);
4198
4199        let mut aggregator = TimeBarAggregator::new(
4200            bar_type,
4201            instrument.price_precision(),
4202            instrument.size_precision(),
4203            clock.clone(),
4204            move |bar: Bar| {
4205                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4206                handler_guard.push(bar);
4207            },
4208            true, // build_with_no_updates
4209            true, // timestamp_on_close
4210            BarIntervalType::RightOpen,
4211            None,
4212            15,
4213            false, // skip_first_non_full_bar
4214        );
4215
4216        let ts1 = UnixNanos::from(1_000_000_000);
4217        aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
4218
4219        let ts2 = UnixNanos::from(2_000_000_000);
4220        clock.borrow_mut().set_time(ts2);
4221
4222        // Simulate timestamp on close
4223        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4224        aggregator.build_bar(&event);
4225
4226        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4227        let bar = handler_guard.first().unwrap();
4228        assert_eq!(bar.ts_event, UnixNanos::default());
4229        assert_eq!(bar.ts_init, ts2);
4230    }
4231
4232    #[rstest]
4233    fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
4234        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4235        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4236        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4237        let handler = Arc::new(Mutex::new(Vec::new()));
4238        let handler_clone = Arc::clone(&handler);
4239
4240        let aggregator = RenkoBarAggregator::new(
4241            bar_type,
4242            instrument.price_precision(),
4243            instrument.size_precision(),
4244            instrument.price_increment(),
4245            move |bar: Bar| {
4246                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4247                handler_guard.push(bar);
4248            },
4249        );
4250
4251        assert_eq!(aggregator.bar_type(), bar_type);
4252        assert!(!aggregator.is_running());
4253        // 10 pips * price_increment.raw (depends on precision mode)
4254        let expected_brick_size = 10 * instrument.price_increment().raw;
4255        assert_eq!(aggregator.brick_size, expected_brick_size);
4256    }
4257
4258    #[rstest]
4259    fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
4260        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4261        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4262        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4263        let handler = Arc::new(Mutex::new(Vec::new()));
4264        let handler_clone = Arc::clone(&handler);
4265
4266        let mut aggregator = RenkoBarAggregator::new(
4267            bar_type,
4268            instrument.price_precision(),
4269            instrument.size_precision(),
4270            instrument.price_increment(),
4271            move |bar: Bar| {
4272                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4273                handler_guard.push(bar);
4274            },
4275        );
4276
4277        // Small price movement (5 pips, less than 10 pip brick size)
4278        aggregator.update(
4279            Price::from("1.00000"),
4280            Quantity::from(1),
4281            UnixNanos::default(),
4282        );
4283        aggregator.update(
4284            Price::from("1.00005"),
4285            Quantity::from(1),
4286            UnixNanos::from(1000),
4287        );
4288
4289        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4290        assert_eq!(handler_guard.len(), 0); // No bar created yet
4291    }
4292
4293    #[rstest]
4294    fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
4295        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4296        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4297        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4298        let handler = Arc::new(Mutex::new(Vec::new()));
4299        let handler_clone = Arc::clone(&handler);
4300
4301        let mut aggregator = RenkoBarAggregator::new(
4302            bar_type,
4303            instrument.price_precision(),
4304            instrument.size_precision(),
4305            instrument.price_increment(),
4306            move |bar: Bar| {
4307                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4308                handler_guard.push(bar);
4309            },
4310        );
4311
4312        // Price movement exceeding brick size (15 pips)
4313        aggregator.update(
4314            Price::from("1.00000"),
4315            Quantity::from(1),
4316            UnixNanos::default(),
4317        );
4318        aggregator.update(
4319            Price::from("1.00015"),
4320            Quantity::from(1),
4321            UnixNanos::from(1000),
4322        );
4323
4324        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4325        assert_eq!(handler_guard.len(), 1);
4326
4327        let bar = handler_guard.first().unwrap();
4328        assert_eq!(bar.open, Price::from("1.00000"));
4329        assert_eq!(bar.high, Price::from("1.00010"));
4330        assert_eq!(bar.low, Price::from("1.00000"));
4331        assert_eq!(bar.close, Price::from("1.00010"));
4332        assert_eq!(bar.volume, Quantity::from(2));
4333        assert_eq!(bar.ts_event, UnixNanos::from(1000));
4334        assert_eq!(bar.ts_init, UnixNanos::from(1000));
4335    }
4336
4337    #[rstest]
4338    fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
4339        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4340        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4341        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4342        let handler = Arc::new(Mutex::new(Vec::new()));
4343        let handler_clone = Arc::clone(&handler);
4344
4345        let mut aggregator = RenkoBarAggregator::new(
4346            bar_type,
4347            instrument.price_precision(),
4348            instrument.size_precision(),
4349            instrument.price_increment(),
4350            move |bar: Bar| {
4351                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4352                handler_guard.push(bar);
4353            },
4354        );
4355
4356        // Large price movement creating multiple bricks (25 pips = 2 bricks)
4357        aggregator.update(
4358            Price::from("1.00000"),
4359            Quantity::from(1),
4360            UnixNanos::default(),
4361        );
4362        aggregator.update(
4363            Price::from("1.00025"),
4364            Quantity::from(1),
4365            UnixNanos::from(1000),
4366        );
4367
4368        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4369        assert_eq!(handler_guard.len(), 2);
4370
4371        let bar1 = &handler_guard[0];
4372        assert_eq!(bar1.open, Price::from("1.00000"));
4373        assert_eq!(bar1.high, Price::from("1.00010"));
4374        assert_eq!(bar1.low, Price::from("1.00000"));
4375        assert_eq!(bar1.close, Price::from("1.00010"));
4376
4377        let bar2 = &handler_guard[1];
4378        assert_eq!(bar2.open, Price::from("1.00010"));
4379        assert_eq!(bar2.high, Price::from("1.00020"));
4380        assert_eq!(bar2.low, Price::from("1.00010"));
4381        assert_eq!(bar2.close, Price::from("1.00020"));
4382    }
4383
4384    #[rstest]
4385    fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
4386        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4387        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4388        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4389        let handler = Arc::new(Mutex::new(Vec::new()));
4390        let handler_clone = Arc::clone(&handler);
4391
4392        let mut aggregator = RenkoBarAggregator::new(
4393            bar_type,
4394            instrument.price_precision(),
4395            instrument.size_precision(),
4396            instrument.price_increment(),
4397            move |bar: Bar| {
4398                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4399                handler_guard.push(bar);
4400            },
4401        );
4402
4403        // Start at higher price and move down
4404        aggregator.update(
4405            Price::from("1.00020"),
4406            Quantity::from(1),
4407            UnixNanos::default(),
4408        );
4409        aggregator.update(
4410            Price::from("1.00005"),
4411            Quantity::from(1),
4412            UnixNanos::from(1000),
4413        );
4414
4415        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4416        assert_eq!(handler_guard.len(), 1);
4417
4418        let bar = handler_guard.first().unwrap();
4419        assert_eq!(bar.open, Price::from("1.00020"));
4420        assert_eq!(bar.high, Price::from("1.00020"));
4421        assert_eq!(bar.low, Price::from("1.00010"));
4422        assert_eq!(bar.close, Price::from("1.00010"));
4423        assert_eq!(bar.volume, Quantity::from(2));
4424    }
4425
4426    #[rstest]
4427    fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
4428        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4429        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4430        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4431        let handler = Arc::new(Mutex::new(Vec::new()));
4432        let handler_clone = Arc::clone(&handler);
4433
4434        let mut aggregator = RenkoBarAggregator::new(
4435            bar_type,
4436            instrument.price_precision(),
4437            instrument.size_precision(),
4438            instrument.price_increment(),
4439            move |bar: Bar| {
4440                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4441                handler_guard.push(bar);
4442            },
4443        );
4444
4445        // Create a bar with small price movement (5 pips)
4446        let input_bar = Bar::new(
4447            BarType::new(
4448                instrument.id(),
4449                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4450                AggregationSource::Internal,
4451            ),
4452            Price::from("1.00000"),
4453            Price::from("1.00005"),
4454            Price::from("0.99995"),
4455            Price::from("1.00005"), // 5 pip move up (less than 10 pip brick)
4456            Quantity::from(100),
4457            UnixNanos::default(),
4458            UnixNanos::from(1000),
4459        );
4460
4461        aggregator.handle_bar(input_bar);
4462
4463        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4464        assert_eq!(handler_guard.len(), 0); // No bar created yet
4465    }
4466
4467    #[rstest]
4468    fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
4469        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4470        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4471        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4472        let handler = Arc::new(Mutex::new(Vec::new()));
4473        let handler_clone = Arc::clone(&handler);
4474
4475        let mut aggregator = RenkoBarAggregator::new(
4476            bar_type,
4477            instrument.price_precision(),
4478            instrument.size_precision(),
4479            instrument.price_increment(),
4480            move |bar: Bar| {
4481                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4482                handler_guard.push(bar);
4483            },
4484        );
4485
4486        // First bar to establish baseline
4487        let bar1 = Bar::new(
4488            BarType::new(
4489                instrument.id(),
4490                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4491                AggregationSource::Internal,
4492            ),
4493            Price::from("1.00000"),
4494            Price::from("1.00005"),
4495            Price::from("0.99995"),
4496            Price::from("1.00000"),
4497            Quantity::from(100),
4498            UnixNanos::default(),
4499            UnixNanos::default(),
4500        );
4501
4502        // Second bar with price movement exceeding brick size (10 pips)
4503        let bar2 = Bar::new(
4504            BarType::new(
4505                instrument.id(),
4506                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4507                AggregationSource::Internal,
4508            ),
4509            Price::from("1.00000"),
4510            Price::from("1.00015"),
4511            Price::from("0.99995"),
4512            Price::from("1.00010"), // 10 pip move up (exactly 1 brick)
4513            Quantity::from(50),
4514            UnixNanos::from(60_000_000_000),
4515            UnixNanos::from(60_000_000_000),
4516        );
4517
4518        aggregator.handle_bar(bar1);
4519        aggregator.handle_bar(bar2);
4520
4521        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4522        assert_eq!(handler_guard.len(), 1);
4523
4524        let bar = handler_guard.first().unwrap();
4525        assert_eq!(bar.open, Price::from("1.00000"));
4526        assert_eq!(bar.high, Price::from("1.00010"));
4527        assert_eq!(bar.low, Price::from("1.00000"));
4528        assert_eq!(bar.close, Price::from("1.00010"));
4529        assert_eq!(bar.volume, Quantity::from(150));
4530    }
4531
4532    #[rstest]
4533    fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
4534        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4535        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4536        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4537        let handler = Arc::new(Mutex::new(Vec::new()));
4538        let handler_clone = Arc::clone(&handler);
4539
4540        let mut aggregator = RenkoBarAggregator::new(
4541            bar_type,
4542            instrument.price_precision(),
4543            instrument.size_precision(),
4544            instrument.price_increment(),
4545            move |bar: Bar| {
4546                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4547                handler_guard.push(bar);
4548            },
4549        );
4550
4551        // First bar to establish baseline
4552        let bar1 = Bar::new(
4553            BarType::new(
4554                instrument.id(),
4555                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4556                AggregationSource::Internal,
4557            ),
4558            Price::from("1.00000"),
4559            Price::from("1.00005"),
4560            Price::from("0.99995"),
4561            Price::from("1.00000"),
4562            Quantity::from(100),
4563            UnixNanos::default(),
4564            UnixNanos::default(),
4565        );
4566
4567        // Second bar with large price movement (30 pips = 3 bricks)
4568        let bar2 = Bar::new(
4569            BarType::new(
4570                instrument.id(),
4571                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4572                AggregationSource::Internal,
4573            ),
4574            Price::from("1.00000"),
4575            Price::from("1.00035"),
4576            Price::from("0.99995"),
4577            Price::from("1.00030"), // 30 pip move up (exactly 3 bricks)
4578            Quantity::from(50),
4579            UnixNanos::from(60_000_000_000),
4580            UnixNanos::from(60_000_000_000),
4581        );
4582
4583        aggregator.handle_bar(bar1);
4584        aggregator.handle_bar(bar2);
4585
4586        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4587        assert_eq!(handler_guard.len(), 3);
4588
4589        let bar1 = &handler_guard[0];
4590        assert_eq!(bar1.open, Price::from("1.00000"));
4591        assert_eq!(bar1.close, Price::from("1.00010"));
4592
4593        let bar2 = &handler_guard[1];
4594        assert_eq!(bar2.open, Price::from("1.00010"));
4595        assert_eq!(bar2.close, Price::from("1.00020"));
4596
4597        let bar3 = &handler_guard[2];
4598        assert_eq!(bar3.open, Price::from("1.00020"));
4599        assert_eq!(bar3.close, Price::from("1.00030"));
4600    }
4601
4602    #[rstest]
4603    fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
4604        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4605        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4606        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4607        let handler = Arc::new(Mutex::new(Vec::new()));
4608        let handler_clone = Arc::clone(&handler);
4609
4610        let mut aggregator = RenkoBarAggregator::new(
4611            bar_type,
4612            instrument.price_precision(),
4613            instrument.size_precision(),
4614            instrument.price_increment(),
4615            move |bar: Bar| {
4616                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4617                handler_guard.push(bar);
4618            },
4619        );
4620
4621        // First bar to establish baseline
4622        let bar1 = Bar::new(
4623            BarType::new(
4624                instrument.id(),
4625                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4626                AggregationSource::Internal,
4627            ),
4628            Price::from("1.00020"),
4629            Price::from("1.00025"),
4630            Price::from("1.00015"),
4631            Price::from("1.00020"),
4632            Quantity::from(100),
4633            UnixNanos::default(),
4634            UnixNanos::default(),
4635        );
4636
4637        // Second bar with downward price movement (10 pips down)
4638        let bar2 = Bar::new(
4639            BarType::new(
4640                instrument.id(),
4641                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4642                AggregationSource::Internal,
4643            ),
4644            Price::from("1.00020"),
4645            Price::from("1.00025"),
4646            Price::from("1.00005"),
4647            Price::from("1.00010"), // 10 pip move down (exactly 1 brick)
4648            Quantity::from(50),
4649            UnixNanos::from(60_000_000_000),
4650            UnixNanos::from(60_000_000_000),
4651        );
4652
4653        aggregator.handle_bar(bar1);
4654        aggregator.handle_bar(bar2);
4655
4656        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4657        assert_eq!(handler_guard.len(), 1);
4658
4659        let bar = handler_guard.first().unwrap();
4660        assert_eq!(bar.open, Price::from("1.00020"));
4661        assert_eq!(bar.high, Price::from("1.00020"));
4662        assert_eq!(bar.low, Price::from("1.00010"));
4663        assert_eq!(bar.close, Price::from("1.00010"));
4664        assert_eq!(bar.volume, Quantity::from(150));
4665    }
4666
4667    #[rstest]
4668    fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
4669        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4670
4671        // Test different brick sizes
4672        let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); // 5 pip brick size
4673        let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
4674        let handler = Arc::new(Mutex::new(Vec::new()));
4675        let handler_clone = Arc::clone(&handler);
4676
4677        let aggregator_5 = RenkoBarAggregator::new(
4678            bar_type_5,
4679            instrument.price_precision(),
4680            instrument.size_precision(),
4681            instrument.price_increment(),
4682            move |_bar: Bar| {
4683                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4684                handler_guard.push(_bar);
4685            },
4686        );
4687
4688        // 5 pips * price_increment.raw (depends on precision mode)
4689        let expected_brick_size_5 = 5 * instrument.price_increment().raw;
4690        assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
4691
4692        let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); // 20 pip brick size
4693        let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
4694        let handler2 = Arc::new(Mutex::new(Vec::new()));
4695        let handler2_clone = Arc::clone(&handler2);
4696
4697        let aggregator_20 = RenkoBarAggregator::new(
4698            bar_type_20,
4699            instrument.price_precision(),
4700            instrument.size_precision(),
4701            instrument.price_increment(),
4702            move |_bar: Bar| {
4703                let mut handler_guard = handler2_clone.lock().expect(MUTEX_POISONED);
4704                handler_guard.push(_bar);
4705            },
4706        );
4707
4708        // 20 pips * price_increment.raw (depends on precision mode)
4709        let expected_brick_size_20 = 20 * instrument.price_increment().raw;
4710        assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
4711    }
4712
4713    #[rstest]
4714    fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
4715        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4716        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4717        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4718        let handler = Arc::new(Mutex::new(Vec::new()));
4719        let handler_clone = Arc::clone(&handler);
4720
4721        let mut aggregator = RenkoBarAggregator::new(
4722            bar_type,
4723            instrument.price_precision(),
4724            instrument.size_precision(),
4725            instrument.price_increment(),
4726            move |bar: Bar| {
4727                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4728                handler_guard.push(bar);
4729            },
4730        );
4731
4732        // Sequential updates creating multiple bars
4733        aggregator.update(
4734            Price::from("1.00000"),
4735            Quantity::from(1),
4736            UnixNanos::from(1000),
4737        );
4738        aggregator.update(
4739            Price::from("1.00010"),
4740            Quantity::from(1),
4741            UnixNanos::from(2000),
4742        ); // First brick
4743        aggregator.update(
4744            Price::from("1.00020"),
4745            Quantity::from(1),
4746            UnixNanos::from(3000),
4747        ); // Second brick
4748        aggregator.update(
4749            Price::from("1.00025"),
4750            Quantity::from(1),
4751            UnixNanos::from(4000),
4752        ); // Partial third brick
4753        aggregator.update(
4754            Price::from("1.00030"),
4755            Quantity::from(1),
4756            UnixNanos::from(5000),
4757        ); // Complete third brick
4758
4759        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4760        assert_eq!(handler_guard.len(), 3);
4761
4762        let bar1 = &handler_guard[0];
4763        assert_eq!(bar1.open, Price::from("1.00000"));
4764        assert_eq!(bar1.close, Price::from("1.00010"));
4765
4766        let bar2 = &handler_guard[1];
4767        assert_eq!(bar2.open, Price::from("1.00010"));
4768        assert_eq!(bar2.close, Price::from("1.00020"));
4769
4770        let bar3 = &handler_guard[2];
4771        assert_eq!(bar3.open, Price::from("1.00020"));
4772        assert_eq!(bar3.close, Price::from("1.00030"));
4773    }
4774
4775    #[rstest]
4776    fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
4777        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4778        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
4779        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4780        let handler = Arc::new(Mutex::new(Vec::new()));
4781        let handler_clone = Arc::clone(&handler);
4782
4783        let mut aggregator = RenkoBarAggregator::new(
4784            bar_type,
4785            instrument.price_precision(),
4786            instrument.size_precision(),
4787            instrument.price_increment(),
4788            move |bar: Bar| {
4789                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4790                handler_guard.push(bar);
4791            },
4792        );
4793
4794        // Mixed direction movement: up then down
4795        aggregator.update(
4796            Price::from("1.00000"),
4797            Quantity::from(1),
4798            UnixNanos::from(1000),
4799        );
4800        aggregator.update(
4801            Price::from("1.00010"),
4802            Quantity::from(1),
4803            UnixNanos::from(2000),
4804        ); // Up brick
4805        aggregator.update(
4806            Price::from("0.99990"),
4807            Quantity::from(1),
4808            UnixNanos::from(3000),
4809        ); // Down 2 bricks (20 pips)
4810
4811        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4812        assert_eq!(handler_guard.len(), 3);
4813
4814        let bar1 = &handler_guard[0]; // Up brick
4815        assert_eq!(bar1.open, Price::from("1.00000"));
4816        assert_eq!(bar1.high, Price::from("1.00010"));
4817        assert_eq!(bar1.low, Price::from("1.00000"));
4818        assert_eq!(bar1.close, Price::from("1.00010"));
4819
4820        let bar2 = &handler_guard[1]; // First down brick
4821        assert_eq!(bar2.open, Price::from("1.00010"));
4822        assert_eq!(bar2.high, Price::from("1.00010"));
4823        assert_eq!(bar2.low, Price::from("1.00000"));
4824        assert_eq!(bar2.close, Price::from("1.00000"));
4825
4826        let bar3 = &handler_guard[2]; // Second down brick
4827        assert_eq!(bar3.open, Price::from("1.00000"));
4828        assert_eq!(bar3.high, Price::from("1.00000"));
4829        assert_eq!(bar3.low, Price::from("0.99990"));
4830        assert_eq!(bar3.close, Price::from("0.99990"));
4831    }
4832
4833    #[rstest]
4834    fn test_tick_imbalance_bar_aggregator_mixed_trades_cancel_out(equity_aapl: Equity) {
4835        let instrument = InstrumentAny::Equity(equity_aapl);
4836        let bar_spec = BarSpecification::new(3, BarAggregation::TickImbalance, PriceType::Last);
4837        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4838        let handler = Arc::new(Mutex::new(Vec::new()));
4839        let handler_clone = Arc::clone(&handler);
4840
4841        let mut aggregator = TickImbalanceBarAggregator::new(
4842            bar_type,
4843            instrument.price_precision(),
4844            instrument.size_precision(),
4845            move |bar: Bar| {
4846                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4847                handler_guard.push(bar);
4848            },
4849        );
4850
4851        let buy = TradeTick {
4852            aggressor_side: AggressorSide::Buyer,
4853            ..TradeTick::default()
4854        };
4855        let sell = TradeTick {
4856            aggressor_side: AggressorSide::Seller,
4857            ..TradeTick::default()
4858        };
4859
4860        aggregator.handle_trade(buy);
4861        aggregator.handle_trade(sell);
4862        aggregator.handle_trade(buy);
4863
4864        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4865        assert_eq!(handler_guard.len(), 0);
4866    }
4867
4868    #[rstest]
4869    fn test_tick_imbalance_bar_aggregator_no_aggressor_ignored(equity_aapl: Equity) {
4870        let instrument = InstrumentAny::Equity(equity_aapl);
4871        let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
4872        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4873        let handler = Arc::new(Mutex::new(Vec::new()));
4874        let handler_clone = Arc::clone(&handler);
4875
4876        let mut aggregator = TickImbalanceBarAggregator::new(
4877            bar_type,
4878            instrument.price_precision(),
4879            instrument.size_precision(),
4880            move |bar: Bar| {
4881                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4882                handler_guard.push(bar);
4883            },
4884        );
4885
4886        let buy = TradeTick {
4887            aggressor_side: AggressorSide::Buyer,
4888            ..TradeTick::default()
4889        };
4890        let no_aggressor = TradeTick {
4891            aggressor_side: AggressorSide::NoAggressor,
4892            ..TradeTick::default()
4893        };
4894
4895        aggregator.handle_trade(buy);
4896        aggregator.handle_trade(no_aggressor);
4897        aggregator.handle_trade(buy);
4898
4899        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4900        assert_eq!(handler_guard.len(), 1);
4901    }
4902
4903    #[rstest]
4904    fn test_tick_runs_bar_aggregator_multiple_consecutive_runs(equity_aapl: Equity) {
4905        let instrument = InstrumentAny::Equity(equity_aapl);
4906        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
4907        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4908        let handler = Arc::new(Mutex::new(Vec::new()));
4909        let handler_clone = Arc::clone(&handler);
4910
4911        let mut aggregator = TickRunsBarAggregator::new(
4912            bar_type,
4913            instrument.price_precision(),
4914            instrument.size_precision(),
4915            move |bar: Bar| {
4916                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4917                handler_guard.push(bar);
4918            },
4919        );
4920
4921        let buy = TradeTick {
4922            aggressor_side: AggressorSide::Buyer,
4923            ..TradeTick::default()
4924        };
4925        let sell = TradeTick {
4926            aggressor_side: AggressorSide::Seller,
4927            ..TradeTick::default()
4928        };
4929
4930        aggregator.handle_trade(buy);
4931        aggregator.handle_trade(buy);
4932        aggregator.handle_trade(sell);
4933        aggregator.handle_trade(sell);
4934
4935        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4936        assert_eq!(handler_guard.len(), 2);
4937    }
4938
4939    #[rstest]
4940    fn test_volume_imbalance_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4941        let instrument = InstrumentAny::Equity(equity_aapl);
4942        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4943        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4944        let handler = Arc::new(Mutex::new(Vec::new()));
4945        let handler_clone = Arc::clone(&handler);
4946
4947        let mut aggregator = VolumeImbalanceBarAggregator::new(
4948            bar_type,
4949            instrument.price_precision(),
4950            instrument.size_precision(),
4951            move |bar: Bar| {
4952                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4953                handler_guard.push(bar);
4954            },
4955        );
4956
4957        let large_trade = TradeTick {
4958            size: Quantity::from(25),
4959            aggressor_side: AggressorSide::Buyer,
4960            ..TradeTick::default()
4961        };
4962
4963        aggregator.handle_trade(large_trade);
4964
4965        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4966        assert_eq!(handler_guard.len(), 2);
4967    }
4968
4969    #[rstest]
4970    fn test_volume_imbalance_bar_aggregator_no_aggressor_does_not_affect_imbalance(
4971        equity_aapl: Equity,
4972    ) {
4973        let instrument = InstrumentAny::Equity(equity_aapl);
4974        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4975        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4976        let handler = Arc::new(Mutex::new(Vec::new()));
4977        let handler_clone = Arc::clone(&handler);
4978
4979        let mut aggregator = VolumeImbalanceBarAggregator::new(
4980            bar_type,
4981            instrument.price_precision(),
4982            instrument.size_precision(),
4983            move |bar: Bar| {
4984                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4985                handler_guard.push(bar);
4986            },
4987        );
4988
4989        let buy = TradeTick {
4990            size: Quantity::from(5),
4991            aggressor_side: AggressorSide::Buyer,
4992            ..TradeTick::default()
4993        };
4994        let no_aggressor = TradeTick {
4995            size: Quantity::from(3),
4996            aggressor_side: AggressorSide::NoAggressor,
4997            ..TradeTick::default()
4998        };
4999
5000        aggregator.handle_trade(buy);
5001        aggregator.handle_trade(no_aggressor);
5002        aggregator.handle_trade(buy);
5003
5004        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5005        assert_eq!(handler_guard.len(), 1);
5006    }
5007
5008    #[rstest]
5009    fn test_volume_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
5010        let instrument = InstrumentAny::Equity(equity_aapl);
5011        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeRuns, PriceType::Last);
5012        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5013        let handler = Arc::new(Mutex::new(Vec::new()));
5014        let handler_clone = Arc::clone(&handler);
5015
5016        let mut aggregator = VolumeRunsBarAggregator::new(
5017            bar_type,
5018            instrument.price_precision(),
5019            instrument.size_precision(),
5020            move |bar: Bar| {
5021                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5022                handler_guard.push(bar);
5023            },
5024        );
5025
5026        let large_trade = TradeTick {
5027            size: Quantity::from(25),
5028            aggressor_side: AggressorSide::Buyer,
5029            ..TradeTick::default()
5030        };
5031
5032        aggregator.handle_trade(large_trade);
5033
5034        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5035        assert_eq!(handler_guard.len(), 2);
5036    }
5037
5038    #[rstest]
5039    fn test_value_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
5040        let instrument = InstrumentAny::Equity(equity_aapl);
5041        let bar_spec = BarSpecification::new(50, BarAggregation::ValueRuns, PriceType::Last);
5042        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5043        let handler = Arc::new(Mutex::new(Vec::new()));
5044        let handler_clone = Arc::clone(&handler);
5045
5046        let mut aggregator = ValueRunsBarAggregator::new(
5047            bar_type,
5048            instrument.price_precision(),
5049            instrument.size_precision(),
5050            move |bar: Bar| {
5051                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5052                handler_guard.push(bar);
5053            },
5054        );
5055
5056        let large_trade = TradeTick {
5057            price: Price::from("5.00"),
5058            size: Quantity::from(25),
5059            aggressor_side: AggressorSide::Buyer,
5060            ..TradeTick::default()
5061        };
5062
5063        aggregator.handle_trade(large_trade);
5064
5065        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5066        assert_eq!(handler_guard.len(), 2);
5067    }
5068
5069    #[rstest]
5070    fn test_value_bar_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
5071        let instrument = InstrumentAny::Equity(equity_aapl);
5072        let bar_spec = BarSpecification::new(100, BarAggregation::Value, PriceType::Last);
5073        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5074        let handler = Arc::new(Mutex::new(Vec::new()));
5075        let handler_clone = Arc::clone(&handler);
5076
5077        let mut aggregator = ValueBarAggregator::new(
5078            bar_type,
5079            instrument.price_precision(),
5080            instrument.size_precision(),
5081            move |bar: Bar| {
5082                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5083                handler_guard.push(bar);
5084            },
5085        );
5086
5087        // price=1000, size=3, value=3000, step=100 → size_chunk=0.1 rounds to 0 at precision 0
5088        aggregator.update(
5089            Price::from("1000.00"),
5090            Quantity::from(3),
5091            UnixNanos::default(),
5092        );
5093
5094        // 3 bars (one per min-size unit), not 30 zero-volume bars
5095        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5096        assert_eq!(handler_guard.len(), 3);
5097        for bar in handler_guard.iter() {
5098            assert_eq!(bar.volume, Quantity::from(1));
5099        }
5100    }
5101
5102    #[rstest]
5103    fn test_value_imbalance_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
5104        let instrument = InstrumentAny::Equity(equity_aapl);
5105        let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
5106        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5107        let handler = Arc::new(Mutex::new(Vec::new()));
5108        let handler_clone = Arc::clone(&handler);
5109
5110        let mut aggregator = ValueImbalanceBarAggregator::new(
5111            bar_type,
5112            instrument.price_precision(),
5113            instrument.size_precision(),
5114            move |bar: Bar| {
5115                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5116                handler_guard.push(bar);
5117            },
5118        );
5119
5120        let trade = TradeTick {
5121            price: Price::from("1000.00"),
5122            size: Quantity::from(3),
5123            aggressor_side: AggressorSide::Buyer,
5124            instrument_id: instrument.id(),
5125            ..TradeTick::default()
5126        };
5127
5128        aggregator.handle_trade(trade);
5129
5130        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5131        assert_eq!(handler_guard.len(), 3);
5132        for bar in handler_guard.iter() {
5133            assert_eq!(bar.volume, Quantity::from(1));
5134        }
5135    }
5136
5137    #[rstest]
5138    fn test_value_imbalance_opposite_side_overshoot_emits_bar(equity_aapl: Equity) {
5139        let instrument = InstrumentAny::Equity(equity_aapl);
5140        let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
5141        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5142        let handler = Arc::new(Mutex::new(Vec::new()));
5143        let handler_clone = Arc::clone(&handler);
5144
5145        let mut aggregator = ValueImbalanceBarAggregator::new(
5146            bar_type,
5147            instrument.price_precision(),
5148            instrument.size_precision(),
5149            move |bar: Bar| {
5150                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5151                handler_guard.push(bar);
5152            },
5153        );
5154
5155        // Build seller imbalance of -50 (below step=100, no bar yet)
5156        let sell_tick = TradeTick {
5157            price: Price::from("10.00"),
5158            size: Quantity::from(5),
5159            aggressor_side: AggressorSide::Seller,
5160            instrument_id: instrument.id(),
5161            ..TradeTick::default()
5162        };
5163
5164        // Opposite-side buyer: flatten amount 50/1000=0.05 < min_size (1),
5165        // clamp overshoots imbalance from -50 to +950, crossing threshold
5166        let buy_tick = TradeTick {
5167            price: Price::from("1000.00"),
5168            size: Quantity::from(1),
5169            aggressor_side: AggressorSide::Buyer,
5170            instrument_id: instrument.id(),
5171            ts_init: UnixNanos::from(1),
5172            ts_event: UnixNanos::from(1),
5173            ..TradeTick::default()
5174        };
5175
5176        aggregator.handle_trade(sell_tick);
5177        aggregator.handle_trade(buy_tick);
5178
5179        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5180        assert_eq!(handler_guard.len(), 1);
5181        assert_eq!(handler_guard[0].volume, Quantity::from(6));
5182    }
5183
5184    #[rstest]
5185    fn test_value_runs_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
5186        let instrument = InstrumentAny::Equity(equity_aapl);
5187        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
5188        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5189        let handler = Arc::new(Mutex::new(Vec::new()));
5190        let handler_clone = Arc::clone(&handler);
5191
5192        let mut aggregator = ValueRunsBarAggregator::new(
5193            bar_type,
5194            instrument.price_precision(),
5195            instrument.size_precision(),
5196            move |bar: Bar| {
5197                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5198                handler_guard.push(bar);
5199            },
5200        );
5201
5202        let trade = TradeTick {
5203            price: Price::from("1000.00"),
5204            size: Quantity::from(3),
5205            aggressor_side: AggressorSide::Buyer,
5206            instrument_id: instrument.id(),
5207            ..TradeTick::default()
5208        };
5209
5210        aggregator.handle_trade(trade);
5211
5212        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5213        assert_eq!(handler_guard.len(), 3);
5214        for bar in handler_guard.iter() {
5215            assert_eq!(bar.volume, Quantity::from(1));
5216        }
5217    }
5218
5219    #[rstest]
5220    #[case(1000_u64)]
5221    #[case(1500_u64)]
5222    fn test_volume_imbalance_bar_aggregator_large_step_no_overflow(
5223        equity_aapl: Equity,
5224        #[case] step: u64,
5225    ) {
5226        let instrument = InstrumentAny::Equity(equity_aapl);
5227        let bar_spec = BarSpecification::new(
5228            step as usize,
5229            BarAggregation::VolumeImbalance,
5230            PriceType::Last,
5231        );
5232        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5233        let handler = Arc::new(Mutex::new(Vec::new()));
5234        let handler_clone = Arc::clone(&handler);
5235
5236        let mut aggregator = VolumeImbalanceBarAggregator::new(
5237            bar_type,
5238            instrument.price_precision(),
5239            instrument.size_precision(),
5240            move |bar: Bar| {
5241                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5242                handler_guard.push(bar);
5243            },
5244        );
5245
5246        let trade = TradeTick {
5247            size: Quantity::from(step * 2),
5248            aggressor_side: AggressorSide::Buyer,
5249            ..TradeTick::default()
5250        };
5251
5252        aggregator.handle_trade(trade);
5253
5254        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5255        assert_eq!(handler_guard.len(), 2);
5256        for bar in handler_guard.iter() {
5257            assert_eq!(bar.volume.as_f64(), step as f64);
5258        }
5259    }
5260
5261    #[rstest]
5262    fn test_volume_imbalance_bar_aggregator_different_large_steps_produce_different_bar_counts(
5263        equity_aapl: Equity,
5264    ) {
5265        let instrument = InstrumentAny::Equity(equity_aapl);
5266        let total_volume = 3000_u64;
5267        let mut results = Vec::new();
5268
5269        for step in [1000_usize, 1500] {
5270            let bar_spec =
5271                BarSpecification::new(step, BarAggregation::VolumeImbalance, PriceType::Last);
5272            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5273            let handler = Arc::new(Mutex::new(Vec::new()));
5274            let handler_clone = Arc::clone(&handler);
5275
5276            let mut aggregator = VolumeImbalanceBarAggregator::new(
5277                bar_type,
5278                instrument.price_precision(),
5279                instrument.size_precision(),
5280                move |bar: Bar| {
5281                    let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5282                    handler_guard.push(bar);
5283                },
5284            );
5285
5286            let trade = TradeTick {
5287                size: Quantity::from(total_volume),
5288                aggressor_side: AggressorSide::Buyer,
5289                ..TradeTick::default()
5290            };
5291
5292            aggregator.handle_trade(trade);
5293
5294            let handler_guard = handler.lock().expect(MUTEX_POISONED);
5295            results.push(handler_guard.len());
5296        }
5297
5298        assert_eq!(results[0], 3); // 3000 / 1000
5299        assert_eq!(results[1], 2); // 3000 / 1500
5300        assert_ne!(results[0], results[1]);
5301    }
5302
5303    #[rstest]
5304    #[case(1000_u64)]
5305    #[case(1500_u64)]
5306    fn test_volume_runs_bar_aggregator_large_step_no_overflow(
5307        equity_aapl: Equity,
5308        #[case] step: u64,
5309    ) {
5310        let instrument = InstrumentAny::Equity(equity_aapl);
5311        let bar_spec =
5312            BarSpecification::new(step as usize, BarAggregation::VolumeRuns, PriceType::Last);
5313        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5314        let handler = Arc::new(Mutex::new(Vec::new()));
5315        let handler_clone = Arc::clone(&handler);
5316
5317        let mut aggregator = VolumeRunsBarAggregator::new(
5318            bar_type,
5319            instrument.price_precision(),
5320            instrument.size_precision(),
5321            move |bar: Bar| {
5322                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5323                handler_guard.push(bar);
5324            },
5325        );
5326
5327        let trade = TradeTick {
5328            size: Quantity::from(step * 2),
5329            aggressor_side: AggressorSide::Buyer,
5330            ..TradeTick::default()
5331        };
5332
5333        aggregator.handle_trade(trade);
5334
5335        let handler_guard = handler.lock().expect(MUTEX_POISONED);
5336        assert_eq!(handler_guard.len(), 2);
5337        for bar in handler_guard.iter() {
5338            assert_eq!(bar.volume.as_f64(), step as f64);
5339        }
5340    }
5341
5342    #[rstest]
5343    fn test_volume_runs_bar_aggregator_different_large_steps_produce_different_bar_counts(
5344        equity_aapl: Equity,
5345    ) {
5346        let instrument = InstrumentAny::Equity(equity_aapl);
5347        let total_volume = 3000_u64;
5348        let mut results = Vec::new();
5349
5350        for step in [1000_usize, 1500] {
5351            let bar_spec = BarSpecification::new(step, BarAggregation::VolumeRuns, PriceType::Last);
5352            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5353            let handler = Arc::new(Mutex::new(Vec::new()));
5354            let handler_clone = Arc::clone(&handler);
5355
5356            let mut aggregator = VolumeRunsBarAggregator::new(
5357                bar_type,
5358                instrument.price_precision(),
5359                instrument.size_precision(),
5360                move |bar: Bar| {
5361                    let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5362                    handler_guard.push(bar);
5363                },
5364            );
5365
5366            let trade = TradeTick {
5367                size: Quantity::from(total_volume),
5368                aggressor_side: AggressorSide::Buyer,
5369                ..TradeTick::default()
5370            };
5371
5372            aggregator.handle_trade(trade);
5373
5374            let handler_guard = handler.lock().expect(MUTEX_POISONED);
5375            results.push(handler_guard.len());
5376        }
5377
5378        assert_eq!(results[0], 3); // 3000 / 1000
5379        assert_eq!(results[1], 2); // 3000 / 1500
5380        assert_ne!(results[0], results[1]);
5381    }
5382
5383    /// Historical time-bar: event at ts_init is deferred until after the update (Cython parity).
5384    #[rstest]
5385    fn test_time_bar_historical_defers_event_at_ts_init_until_after_update(equity_aapl: Equity) {
5386        let instrument = InstrumentAny::Equity(equity_aapl);
5387        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
5388        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5389        let handler = Arc::new(Mutex::new(Vec::new()));
5390        let handler_clone = Arc::clone(&handler);
5391        let clock = Rc::new(RefCell::new(TestClock::new()));
5392
5393        let mut agg = TimeBarAggregator::new(
5394            bar_type,
5395            instrument.price_precision(),
5396            instrument.size_precision(),
5397            clock.clone(),
5398            move |bar: Bar| {
5399                let mut h = handler_clone.lock().expect(MUTEX_POISONED);
5400                h.push(bar);
5401            },
5402            true,
5403            true,
5404            BarIntervalType::LeftOpen,
5405            None,
5406            0,
5407            false,
5408        );
5409        agg.historical_mode = true;
5410        agg.set_clock_internal(clock);
5411        let boxed: Box<dyn BarAggregator> = Box::new(agg);
5412        let rc = Rc::new(RefCell::new(boxed));
5413        rc.borrow_mut().set_aggregator_weak(Rc::downgrade(&rc));
5414
5415        rc.borrow_mut().update(
5416            Price::from("100.00"),
5417            Quantity::from(1),
5418            UnixNanos::default(),
5419        );
5420        rc.borrow_mut().update(
5421            Price::from("100.00"),
5422            Quantity::from(1),
5423            UnixNanos::from(1_000_000_000),
5424        );
5425
5426        let bars = handler.lock().expect(MUTEX_POISONED);
5427        assert!(
5428            !bars.is_empty(),
5429            "deferred event at ts_init should produce a bar that includes the update"
5430        );
5431        let last_bar = bars.last().unwrap();
5432        assert_eq!(last_bar.close, Price::from("100.00"));
5433        assert!(
5434            last_bar.volume.as_f64() >= 1.0,
5435            "bar built after deferred event should include the update at ts_init"
5436        );
5437    }
5438
5439    #[rstest]
5440    fn test_spread_quote_quote_driven_emits_when_all_legs_received(equity_aapl: Equity) {
5441        let instrument = InstrumentAny::Equity(equity_aapl);
5442        let leg1 = instrument.id();
5443        let leg2 = InstrumentId::from("MSFT.XNAS");
5444        let spread_id = InstrumentId::from("SPREAD.XNAS");
5445        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5446        let handler = Arc::new(Mutex::new(Vec::new()));
5447        let handler_clone = Arc::clone(&handler);
5448        let clock = Rc::new(RefCell::new(TestClock::new()));
5449
5450        let mut agg = SpreadQuoteAggregator::new(
5451            spread_id,
5452            &legs,
5453            true,
5454            instrument.price_precision(),
5455            0,
5456            Box::new(move |q: QuoteTick| {
5457                handler_clone.lock().expect(MUTEX_POISONED).push(q);
5458            }),
5459            clock,
5460            false,
5461            None,
5462            0,
5463            None,
5464            None,
5465        );
5466
5467        let ts = UnixNanos::from(1_000_000_000);
5468        agg.handle_quote_tick(QuoteTick::new(
5469            leg1,
5470            Price::from("100.00"),
5471            Price::from("100.10"),
5472            Quantity::from(10),
5473            Quantity::from(10),
5474            ts,
5475            ts,
5476        ));
5477        assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
5478
5479        agg.handle_quote_tick(QuoteTick::new(
5480            leg2,
5481            Price::from("99.00"),
5482            Price::from("99.10"),
5483            Quantity::from(10),
5484            Quantity::from(10),
5485            ts,
5486            ts,
5487        ));
5488        let quotes = handler.lock().expect(MUTEX_POISONED);
5489        assert_eq!(quotes.len(), 1);
5490        assert_eq!(quotes[0].instrument_id, spread_id);
5491        assert!(quotes[0].bid_price < quotes[0].ask_price);
5492    }
5493
5494    #[rstest]
5495    fn test_spread_quote_futures_pricing_signed_ratios(equity_aapl: Equity) {
5496        let instrument = InstrumentAny::Equity(equity_aapl);
5497        let leg1 = instrument.id();
5498        let leg2 = InstrumentId::from("MSFT.XNAS");
5499        let spread_id = InstrumentId::from("SPREAD.XNAS");
5500        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5501        let handler = Arc::new(Mutex::new(Vec::new()));
5502        let handler_clone = Arc::clone(&handler);
5503        let clock = Rc::new(RefCell::new(TestClock::new()));
5504
5505        let mut agg = SpreadQuoteAggregator::new(
5506            spread_id,
5507            &legs,
5508            true,
5509            instrument.price_precision(),
5510            0,
5511            Box::new(move |q: QuoteTick| {
5512                handler_clone.lock().expect(MUTEX_POISONED).push(q);
5513            }),
5514            clock,
5515            false,
5516            None,
5517            0,
5518            None,
5519            None,
5520        );
5521
5522        let ts = UnixNanos::from(1_000_000_000);
5523        agg.handle_quote_tick(QuoteTick::new(
5524            leg1,
5525            Price::from("10.00"),
5526            Price::from("10.10"),
5527            Quantity::from(100),
5528            Quantity::from(100),
5529            ts,
5530            ts,
5531        ));
5532        agg.handle_quote_tick(QuoteTick::new(
5533            leg2,
5534            Price::from("20.00"),
5535            Price::from("20.10"),
5536            Quantity::from(100),
5537            Quantity::from(100),
5538            ts,
5539            ts,
5540        ));
5541        let quotes = handler.lock().expect(MUTEX_POISONED);
5542        assert_eq!(quotes.len(), 1);
5543        let q = &quotes[0];
5544        assert_eq!(q.instrument_id, spread_id);
5545        assert_eq!(q.bid_price, Price::from("-10.10"));
5546        assert_eq!(q.ask_price, Price::from("-9.90"));
5547    }
5548
5549    #[rstest]
5550    fn test_spread_quote_size_calculation_non_unit_ratios(equity_aapl: Equity) {
5551        let instrument = InstrumentAny::Equity(equity_aapl);
5552        let leg1 = instrument.id();
5553        let leg2 = InstrumentId::from("MSFT.XNAS");
5554        let spread_id = InstrumentId::from("SPREAD.XNAS");
5555        let legs = vec![(leg1, 2_i64), (leg2, -1_i64)];
5556        let handler = Arc::new(Mutex::new(Vec::new()));
5557        let handler_clone = Arc::clone(&handler);
5558        let clock = Rc::new(RefCell::new(TestClock::new()));
5559
5560        let mut agg = SpreadQuoteAggregator::new(
5561            spread_id,
5562            &legs,
5563            true,
5564            instrument.price_precision(),
5565            0,
5566            Box::new(move |q: QuoteTick| {
5567                handler_clone.lock().expect(MUTEX_POISONED).push(q);
5568            }),
5569            clock,
5570            false,
5571            None,
5572            0,
5573            None,
5574            None,
5575        );
5576
5577        let ts = UnixNanos::from(1_000_000_000);
5578        agg.handle_quote_tick(QuoteTick::new(
5579            leg1,
5580            Price::from("10.00"),
5581            Price::from("10.10"),
5582            Quantity::from(100),
5583            Quantity::from(40),
5584            ts,
5585            ts,
5586        ));
5587        agg.handle_quote_tick(QuoteTick::new(
5588            leg2,
5589            Price::from("10.00"),
5590            Price::from("10.10"),
5591            Quantity::from(50),
5592            Quantity::from(30),
5593            ts,
5594            ts,
5595        ));
5596        let quotes = handler.lock().expect(MUTEX_POISONED);
5597        assert_eq!(quotes.len(), 1);
5598        let q = &quotes[0];
5599        assert_eq!(q.bid_size.as_f64(), 30.0);
5600        assert_eq!(q.ask_size.as_f64(), 20.0);
5601    }
5602
5603    #[rstest]
5604    fn test_spread_quote_timer_driven_emission_cadence(equity_aapl: Equity) {
5605        let instrument = InstrumentAny::Equity(equity_aapl);
5606        let leg1 = instrument.id();
5607        let leg2 = InstrumentId::from("MSFT.XNAS");
5608        let spread_id = InstrumentId::from("SPREAD.XNAS");
5609        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5610        let handler = Arc::new(Mutex::new(Vec::new()));
5611        let handler_clone = Arc::clone(&handler);
5612        let clock = Rc::new(RefCell::new(TestClock::new()));
5613        clock.borrow_mut().set_time(UnixNanos::from(0));
5614
5615        let agg = SpreadQuoteAggregator::new(
5616            spread_id,
5617            &legs,
5618            true,
5619            instrument.price_precision(),
5620            0,
5621            Box::new(move |q: QuoteTick| {
5622                handler_clone.lock().expect(MUTEX_POISONED).push(q);
5623            }),
5624            clock.clone(),
5625            false,
5626            Some(1),
5627            0,
5628            None,
5629            None,
5630        );
5631        let rc = Rc::new(RefCell::new(agg));
5632        rc.borrow_mut().prepare_for_timer_mode(&rc);
5633        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
5634
5635        for event in clock.borrow_mut().advance_time(UnixNanos::from(0), true) {
5636            rc.borrow_mut().on_timer_fire(event.ts_event);
5637        }
5638        assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
5639
5640        let ts1 = UnixNanos::from(1_000_000_000);
5641        rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5642            leg1,
5643            Price::from("100.00"),
5644            Price::from("100.10"),
5645            Quantity::from(10),
5646            Quantity::from(10),
5647            ts1,
5648            ts1,
5649        ));
5650        rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5651            leg2,
5652            Price::from("99.00"),
5653            Price::from("99.10"),
5654            Quantity::from(10),
5655            Quantity::from(10),
5656            ts1,
5657            ts1,
5658        ));
5659
5660        for event in clock.borrow_mut().advance_time(ts1, true) {
5661            rc.borrow_mut().on_timer_fire(event.ts_event);
5662        }
5663
5664        {
5665            let quotes = handler.lock().expect(MUTEX_POISONED);
5666            assert_eq!(quotes.len(), 1);
5667            assert_eq!(quotes[0].ts_event, ts1);
5668            assert_eq!(quotes[0].ts_init, ts1);
5669        }
5670
5671        let ts2 = UnixNanos::from(2_000_000_000);
5672        for event in clock.borrow_mut().advance_time(ts2, true) {
5673            rc.borrow_mut().on_timer_fire(event.ts_event);
5674        }
5675
5676        let quotes = handler.lock().expect(MUTEX_POISONED);
5677        assert_eq!(quotes.len(), 1);
5678    }
5679
5680    #[rstest]
5681    fn test_spread_quote_historical_timer_waits_for_all_legs(equity_aapl: Equity) {
5682        let instrument = InstrumentAny::Equity(equity_aapl);
5683        let leg1 = instrument.id();
5684        let leg2 = InstrumentId::from("MSFT.XNAS");
5685        let spread_id = InstrumentId::from("SPREAD.XNAS");
5686        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5687        let handler = Arc::new(Mutex::new(Vec::new()));
5688        let handler_clone = Arc::clone(&handler);
5689        let clock = Rc::new(RefCell::new(TestClock::new()));
5690
5691        let agg = SpreadQuoteAggregator::new(
5692            spread_id,
5693            &legs,
5694            true,
5695            instrument.price_precision(),
5696            0,
5697            Box::new(move |q: QuoteTick| {
5698                handler_clone.lock().expect(MUTEX_POISONED).push(q);
5699            }),
5700            // need clock for set_clock after
5701            clock.clone(),
5702            true,
5703            Some(1),
5704            0,
5705            None,
5706            None,
5707        );
5708        let rc = Rc::new(RefCell::new(agg));
5709        rc.borrow_mut().prepare_for_timer_mode(&rc);
5710        rc.borrow_mut().set_clock(clock);
5711
5712        let ts1 = UnixNanos::from(1_000_000_000);
5713        let ts2 = UnixNanos::from(2_000_000_000);
5714        let ts3 = UnixNanos::from(3_000_000_000);
5715        rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5716            leg1,
5717            Price::from("100.00"),
5718            Price::from("100.10"),
5719            Quantity::from(10),
5720            Quantity::from(10),
5721            ts1,
5722            ts1,
5723        ));
5724        assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
5725
5726        rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5727            leg2,
5728            Price::from("99.00"),
5729            Price::from("99.10"),
5730            Quantity::from(10),
5731            Quantity::from(10),
5732            ts2,
5733            ts2,
5734        ));
5735        assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
5736
5737        rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5738            leg1,
5739            Price::from("100.00"),
5740            Price::from("100.10"),
5741            Quantity::from(10),
5742            Quantity::from(10),
5743            ts3,
5744            ts3,
5745        ));
5746        let quotes = handler.lock().expect(MUTEX_POISONED);
5747        assert_eq!(
5748            quotes.len(),
5749            1,
5750            "deferred event at ts2 is processed when we have all legs and advance to ts3"
5751        );
5752    }
5753
5754    #[rstest]
5755    fn test_spread_quote_historical_flush_emits_pending_final_quote(equity_aapl: Equity) {
5756        let instrument = InstrumentAny::Equity(equity_aapl);
5757        let leg1 = instrument.id();
5758        let leg2 = InstrumentId::from("MSFT.XNAS");
5759        let spread_id = InstrumentId::from("SPREAD.XNAS");
5760        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5761        let handler = Arc::new(Mutex::new(Vec::new()));
5762        let handler_clone = Arc::clone(&handler);
5763        let clock = Rc::new(RefCell::new(TestClock::new()));
5764
5765        let agg = SpreadQuoteAggregator::new(
5766            spread_id,
5767            &legs,
5768            true,
5769            instrument.price_precision(),
5770            0,
5771            Box::new(move |q: QuoteTick| {
5772                handler_clone.lock().expect(MUTEX_POISONED).push(q);
5773            }),
5774            // need clock for set_clock after
5775            clock.clone(),
5776            true,
5777            Some(1),
5778            0,
5779            None,
5780            None,
5781        );
5782        let rc = Rc::new(RefCell::new(agg));
5783        rc.borrow_mut().prepare_for_timer_mode(&rc);
5784        rc.borrow_mut().set_clock(clock);
5785
5786        let ts1 = UnixNanos::from(1_000_000_000);
5787        let ts2 = UnixNanos::from(2_000_000_000);
5788        rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5789            leg1,
5790            Price::from("100.00"),
5791            Price::from("100.10"),
5792            Quantity::from(10),
5793            Quantity::from(10),
5794            ts1,
5795            ts1,
5796        ));
5797        rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5798            leg2,
5799            Price::from("99.00"),
5800            Price::from("99.10"),
5801            Quantity::from(10),
5802            Quantity::from(10),
5803            ts2,
5804            ts2,
5805        ));
5806
5807        assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
5808
5809        rc.borrow_mut().flush_pending_historical_quote();
5810
5811        let quotes = handler.lock().expect(MUTEX_POISONED);
5812        assert_eq!(
5813            quotes.len(),
5814            1,
5815            "final historical quote should be emitted when the deferred event is flushed",
5816        );
5817        assert_eq!(quotes[0].ts_event, ts2);
5818    }
5819
5820    #[rstest]
5821    fn test_spread_quote_option_vega_weighting(equity_aapl: Equity) {
5822        let instrument = InstrumentAny::Equity(equity_aapl);
5823        let leg1 = instrument.id();
5824        let leg2 = InstrumentId::from("MSFT.XNAS");
5825        let spread_id = InstrumentId::from("SPREAD.XNAS");
5826        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5827        let handler = Arc::new(Mutex::new(Vec::new()));
5828        let handler_clone = Arc::clone(&handler);
5829        let clock = Rc::new(RefCell::new(TestClock::new()));
5830
5831        let mut vega_provider = MapVegaProvider::new();
5832        vega_provider.insert(leg1, 0.15);
5833        vega_provider.insert(leg2, 0.12);
5834
5835        let mut agg = SpreadQuoteAggregator::new(
5836            spread_id,
5837            &legs,
5838            false,
5839            instrument.price_precision(),
5840            0,
5841            Box::new(move |q: QuoteTick| {
5842                handler_clone.lock().expect(MUTEX_POISONED).push(q);
5843            }),
5844            clock,
5845            false,
5846            None,
5847            0,
5848            Some(Box::new(vega_provider)),
5849            None,
5850        );
5851
5852        let ts = UnixNanos::from(1_000_000_000);
5853        agg.handle_quote_tick(QuoteTick::new(
5854            leg1,
5855            Price::from("10.00"),
5856            Price::from("10.20"),
5857            Quantity::from(100),
5858            Quantity::from(100),
5859            ts,
5860            ts,
5861        ));
5862        agg.handle_quote_tick(QuoteTick::new(
5863            leg2,
5864            Price::from("11.00"),
5865            Price::from("11.20"),
5866            Quantity::from(100),
5867            Quantity::from(100),
5868            ts,
5869            ts,
5870        ));
5871        let quotes = handler.lock().expect(MUTEX_POISONED);
5872        assert_eq!(quotes.len(), 1);
5873        let q = &quotes[0];
5874        assert!(q.bid_price < q.ask_price);
5875        assert!(q.ask_price.as_f64() - q.bid_price.as_f64() > 0.0);
5876    }
5877
5878    #[rstest]
5879    fn test_spread_quote_all_zero_vega_fallback(equity_aapl: Equity) {
5880        let instrument = InstrumentAny::Equity(equity_aapl);
5881        let leg1 = instrument.id();
5882        let leg2 = InstrumentId::from("MSFT.XNAS");
5883        let spread_id = InstrumentId::from("SPREAD.XNAS");
5884        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5885        let handler = Arc::new(Mutex::new(Vec::new()));
5886        let handler_clone = Arc::clone(&handler);
5887        let clock = Rc::new(RefCell::new(TestClock::new()));
5888
5889        let mut vega_provider = MapVegaProvider::new();
5890        vega_provider.insert(leg1, 0.0);
5891        vega_provider.insert(leg2, 0.0);
5892
5893        let mut agg = SpreadQuoteAggregator::new(
5894            spread_id,
5895            &legs,
5896            false,
5897            instrument.price_precision(),
5898            0,
5899            Box::new(move |q: QuoteTick| {
5900                handler_clone.lock().expect(MUTEX_POISONED).push(q);
5901            }),
5902            clock,
5903            false,
5904            None,
5905            0,
5906            Some(Box::new(vega_provider)),
5907            None,
5908        );
5909
5910        let ts = UnixNanos::from(1_000_000_000);
5911        agg.handle_quote_tick(QuoteTick::new(
5912            leg1,
5913            Price::from("10.00"),
5914            Price::from("10.10"),
5915            Quantity::from(100),
5916            Quantity::from(100),
5917            ts,
5918            ts,
5919        ));
5920        agg.handle_quote_tick(QuoteTick::new(
5921            leg2,
5922            Price::from("20.00"),
5923            Price::from("20.10"),
5924            Quantity::from(100),
5925            Quantity::from(100),
5926            ts,
5927            ts,
5928        ));
5929        let quotes = handler.lock().expect(MUTEX_POISONED);
5930        assert_eq!(quotes.len(), 1);
5931        let q = &quotes[0];
5932        assert_eq!(q.bid_price, Price::from("-10.10"));
5933        assert_eq!(q.ask_price, Price::from("-9.90"));
5934    }
5935
5936    #[rstest]
5937    fn test_spread_quote_negative_prices_tick_scheme(equity_aapl: Equity) {
5938        let instrument = InstrumentAny::Equity(equity_aapl);
5939        let leg1 = instrument.id();
5940        let leg2 = InstrumentId::from("MSFT.XNAS");
5941        let spread_id = InstrumentId::from("SPREAD.XNAS");
5942        let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5943        let handler = Arc::new(Mutex::new(Vec::new()));
5944        let handler_clone = Arc::clone(&handler);
5945        let clock = Rc::new(RefCell::new(TestClock::new()));
5946        let rounder = FixedTickSchemeRounder::new(0.01).unwrap();
5947
5948        let mut agg = SpreadQuoteAggregator::new(
5949            spread_id,
5950            &legs,
5951            true,
5952            2,
5953            0,
5954            Box::new(move |q: QuoteTick| {
5955                handler_clone.lock().expect(MUTEX_POISONED).push(q);
5956            }),
5957            clock,
5958            false,
5959            None,
5960            0,
5961            None,
5962            Some(Box::new(rounder)),
5963        );
5964
5965        let ts = UnixNanos::from(1_000_000_000);
5966        agg.handle_quote_tick(QuoteTick::new(
5967            leg1,
5968            Price::from("10.00"),
5969            Price::from("10.10"),
5970            Quantity::from(100),
5971            Quantity::from(100),
5972            ts,
5973            ts,
5974        ));
5975        agg.handle_quote_tick(QuoteTick::new(
5976            leg2,
5977            Price::from("20.00"),
5978            Price::from("20.10"),
5979            Quantity::from(100),
5980            Quantity::from(100),
5981            ts,
5982            ts,
5983        ));
5984        let quotes = handler.lock().expect(MUTEX_POISONED);
5985        assert_eq!(quotes.len(), 1);
5986        let q = &quotes[0];
5987        assert!(q.bid_price.as_f64() < 0.0);
5988        assert!(q.ask_price.as_f64() < 0.0);
5989        assert!(q.bid_price < q.ask_price);
5990    }
5991
5992    #[rstest]
5993    #[case(BarIntervalType::LeftOpen)]
5994    #[case(BarIntervalType::RightOpen)]
5995    fn test_time_bar_skip_first_non_full_bar_noop_on_boundary(
5996        equity_aapl: Equity,
5997        #[case] interval_type: BarIntervalType,
5998    ) {
5999        // When the clock sits on a bar boundary, fire_immediately=true and
6000        // first_close_ns equals that boundary. Every subsequent bar closes
6001        // strictly after first_close_ns, so skip_first_non_full_bar never
6002        // triggers and both bars emit.
6003        let instrument = InstrumentAny::Equity(equity_aapl);
6004        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6005        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6006        let handler = Arc::new(Mutex::new(Vec::new()));
6007        let handler_clone = Arc::clone(&handler);
6008        let clock = Rc::new(RefCell::new(TestClock::new()));
6009        clock.borrow_mut().set_time(UnixNanos::from(1_000_000_000));
6010        let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6011
6012        let aggregator = TimeBarAggregator::new(
6013            bar_type,
6014            instrument.price_precision(),
6015            instrument.size_precision(),
6016            clock,
6017            move |bar: Bar| {
6018                let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6019                h.push(bar);
6020            },
6021            false,
6022            false,
6023            interval_type,
6024            None,
6025            0,
6026            true, // skip_first_non_full_bar
6027        );
6028
6029        let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6030        let rc = Rc::new(RefCell::new(boxed));
6031        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6032
6033        rc.borrow_mut().update(
6034            Price::from("100.00"),
6035            Quantity::from(1),
6036            UnixNanos::from(1_000_000_000),
6037        );
6038        rc.borrow_mut().build_bar(&TimeEvent::new(
6039            event_name,
6040            UUID4::new(),
6041            UnixNanos::from(2_000_000_000),
6042            UnixNanos::from(2_000_000_000),
6043        ));
6044        rc.borrow_mut().update(
6045            Price::from("101.00"),
6046            Quantity::from(1),
6047            UnixNanos::from(2_500_000_000),
6048        );
6049        rc.borrow_mut().build_bar(&TimeEvent::new(
6050            event_name,
6051            UUID4::new(),
6052            UnixNanos::from(3_000_000_000),
6053            UnixNanos::from(3_000_000_000),
6054        ));
6055
6056        let bars = handler.lock().expect(MUTEX_POISONED);
6057        assert_eq!(bars.len(), 2);
6058        assert_eq!(bars[0].close, Price::from("100.00"));
6059        assert_eq!(bars[1].close, Price::from("101.00"));
6060    }
6061
6062    #[rstest]
6063    #[case(BarIntervalType::LeftOpen)]
6064    #[case(BarIntervalType::RightOpen)]
6065    fn test_time_bar_skip_first_non_full_bar_drops_partial_bar(
6066        equity_aapl: Equity,
6067        #[case] interval_type: BarIntervalType,
6068    ) {
6069        // When the clock starts past a boundary (mid-interval), first_close_ns
6070        // is the upcoming boundary. The bar closing at first_close_ns is partial,
6071        // so skip_first_non_full_bar drops it; subsequent full bars emit.
6072        let instrument = InstrumentAny::Equity(equity_aapl);
6073        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6074        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6075        let handler = Arc::new(Mutex::new(Vec::new()));
6076        let handler_clone = Arc::clone(&handler);
6077        let clock = Rc::new(RefCell::new(TestClock::new()));
6078        clock.borrow_mut().set_time(UnixNanos::from(1_500_000_000));
6079        let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6080
6081        let aggregator = TimeBarAggregator::new(
6082            bar_type,
6083            instrument.price_precision(),
6084            instrument.size_precision(),
6085            clock,
6086            move |bar: Bar| {
6087                let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6088                h.push(bar);
6089            },
6090            false,
6091            false,
6092            interval_type,
6093            None,
6094            0,
6095            true, // skip_first_non_full_bar
6096        );
6097
6098        let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6099        let rc = Rc::new(RefCell::new(boxed));
6100        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6101
6102        rc.borrow_mut().update(
6103            Price::from("100.00"),
6104            Quantity::from(1),
6105            UnixNanos::from(1_500_000_000),
6106        );
6107        rc.borrow_mut().build_bar(&TimeEvent::new(
6108            event_name,
6109            UUID4::new(),
6110            UnixNanos::from(2_000_000_000),
6111            UnixNanos::from(2_000_000_000),
6112        ));
6113        rc.borrow_mut().update(
6114            Price::from("101.00"),
6115            Quantity::from(1),
6116            UnixNanos::from(2_500_000_000),
6117        );
6118        rc.borrow_mut().build_bar(&TimeEvent::new(
6119            event_name,
6120            UUID4::new(),
6121            UnixNanos::from(3_000_000_000),
6122            UnixNanos::from(3_000_000_000),
6123        ));
6124
6125        let bars = handler.lock().expect(MUTEX_POISONED);
6126        assert_eq!(bars.len(), 1);
6127        assert_eq!(bars[0].close, Price::from("101.00"));
6128    }
6129
6130    #[rstest]
6131    fn test_time_bar_skip_first_non_full_bar_skips_every_call_before_first_close(
6132        equity_aapl: Equity,
6133    ) {
6134        // The flag must remain set across every build_and_send call whose
6135        // ts_init <= first_close_ns, and only flip once a bar actually emits.
6136        // Catches a mutation that flips skip_first_non_full_bar early.
6137        let instrument = InstrumentAny::Equity(equity_aapl);
6138        let bar_spec = BarSpecification::new(10, BarAggregation::Second, PriceType::Last);
6139        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6140        let handler = Arc::new(Mutex::new(Vec::new()));
6141        let handler_clone = Arc::clone(&handler);
6142        let clock = Rc::new(RefCell::new(TestClock::new()));
6143        clock.borrow_mut().set_time(UnixNanos::from(5_000_000_000));
6144        let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6145
6146        let aggregator = TimeBarAggregator::new(
6147            bar_type,
6148            instrument.price_precision(),
6149            instrument.size_precision(),
6150            clock,
6151            move |bar: Bar| {
6152                let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6153                h.push(bar);
6154            },
6155            false,
6156            false,
6157            BarIntervalType::LeftOpen,
6158            None,
6159            0,
6160            true, // skip_first_non_full_bar
6161        );
6162
6163        let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6164        let rc = Rc::new(RefCell::new(boxed));
6165        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6166
6167        // first_close_ns is 10_000_000_000 (first 10s boundary after start).
6168        // Drive three build_bar calls at ts <= first_close_ns, each preceded by a
6169        // distinct update. Every one of them must be skipped.
6170        for (price, update_ts, event_ts) in [
6171            ("100.00", 5_500_000_000_u64, 7_000_000_000_u64),
6172            ("101.00", 7_500_000_000_u64, 8_000_000_000_u64),
6173            ("102.00", 9_000_000_000_u64, 10_000_000_000_u64),
6174        ] {
6175            rc.borrow_mut().update(
6176                Price::from(price),
6177                Quantity::from(1),
6178                UnixNanos::from(update_ts),
6179            );
6180            rc.borrow_mut().build_bar(&TimeEvent::new(
6181                event_name,
6182                UUID4::new(),
6183                UnixNanos::from(event_ts),
6184                UnixNanos::from(event_ts),
6185            ));
6186        }
6187
6188        // Final update + build past first_close_ns emits for the first time.
6189        rc.borrow_mut().update(
6190            Price::from("103.00"),
6191            Quantity::from(1),
6192            UnixNanos::from(10_500_000_000),
6193        );
6194        rc.borrow_mut().build_bar(&TimeEvent::new(
6195            event_name,
6196            UUID4::new(),
6197            UnixNanos::from(11_000_000_000),
6198            UnixNanos::from(11_000_000_000),
6199        ));
6200
6201        let bars = handler.lock().expect(MUTEX_POISONED);
6202        assert_eq!(bars.len(), 1);
6203        assert_eq!(bars[0].close, Price::from("103.00"));
6204    }
6205
6206    #[rstest]
6207    fn test_time_bar_skip_first_non_full_bar_skips_when_build_delay_shifts_start(
6208        equity_aapl: Equity,
6209    ) {
6210        // Cython parity: when bar_build_delay > 0 pushes start_time past a
6211        // boundary (even if `now` is on a boundary), first_close_ns is set and
6212        // the first bar is skipped. The previous Rust `now > start_time` guard
6213        // incorrectly kept this first bar.
6214        let instrument = InstrumentAny::Equity(equity_aapl);
6215        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6216        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6217        let handler = Arc::new(Mutex::new(Vec::new()));
6218        let handler_clone = Arc::clone(&handler);
6219        let clock = Rc::new(RefCell::new(TestClock::new()));
6220        clock.borrow_mut().set_time(UnixNanos::from(2_000_000_000));
6221        let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6222
6223        let aggregator = TimeBarAggregator::new(
6224            bar_type,
6225            instrument.price_precision(),
6226            instrument.size_precision(),
6227            clock,
6228            move |bar: Bar| {
6229                let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6230                h.push(bar);
6231            },
6232            false,
6233            false,
6234            BarIntervalType::LeftOpen,
6235            None,
6236            100,  // bar_build_delay (microseconds)
6237            true, // skip_first_non_full_bar
6238        );
6239
6240        let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6241        let rc = Rc::new(RefCell::new(boxed));
6242        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6243
6244        // start_time = 2s + 100us = 2_000_100_000 ns; first_close_ns = 3_000_100_000 ns.
6245        rc.borrow_mut().update(
6246            Price::from("100.00"),
6247            Quantity::from(1),
6248            UnixNanos::from(2_500_000_000),
6249        );
6250        rc.borrow_mut().build_bar(&TimeEvent::new(
6251            event_name,
6252            UUID4::new(),
6253            UnixNanos::from(3_000_100_000),
6254            UnixNanos::from(3_000_100_000),
6255        ));
6256        rc.borrow_mut().update(
6257            Price::from("101.00"),
6258            Quantity::from(1),
6259            UnixNanos::from(3_500_000_000),
6260        );
6261        rc.borrow_mut().build_bar(&TimeEvent::new(
6262            event_name,
6263            UUID4::new(),
6264            UnixNanos::from(4_000_100_000),
6265            UnixNanos::from(4_000_100_000),
6266        ));
6267
6268        let bars = handler.lock().expect(MUTEX_POISONED);
6269        assert_eq!(bars.len(), 1);
6270        assert_eq!(bars[0].close, Price::from("101.00"));
6271    }
6272
6273    #[rstest]
6274    #[case(
6275        BarAggregation::Month,
6276        1_735_689_600_000_000_000_u64,
6277        1_733_011_200_000_000_000_u64
6278    )]
6279    #[case(
6280        BarAggregation::Year,
6281        1_735_689_600_000_000_000_u64,
6282        1_704_067_200_000_000_000_u64
6283    )]
6284    fn test_time_bar_fire_immediately_month_year_stored_open_points_to_previous_period(
6285        equity_aapl: Equity,
6286        #[case] aggregation: BarAggregation,
6287        #[case] start_ns: u64,
6288        #[case] expected_stored_open_ns: u64,
6289    ) {
6290        // When the clock is exactly on a month/year boundary, fire_immediately=true.
6291        // stored_open_ns must resolve to one step before start_time (mirrors Cython
6292        // close_time - step arithmetic) so the first bar's open timestamp marks
6293        // the true start of the in-progress interval.
6294        let instrument = InstrumentAny::Equity(equity_aapl);
6295        let bar_spec = BarSpecification::new(1, aggregation, PriceType::Last);
6296        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6297        let handler = Arc::new(Mutex::new(Vec::new()));
6298        let handler_clone = Arc::clone(&handler);
6299        let clock = Rc::new(RefCell::new(TestClock::new()));
6300        clock.borrow_mut().set_time(UnixNanos::from(start_ns));
6301        let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6302
6303        let aggregator = TimeBarAggregator::new(
6304            bar_type,
6305            instrument.price_precision(),
6306            instrument.size_precision(),
6307            clock,
6308            move |bar: Bar| {
6309                let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6310                h.push(bar);
6311            },
6312            false,
6313            false,
6314            BarIntervalType::RightOpen, // ts_event = stored_open_ns
6315            None,
6316            0,
6317            false, // skip_first_non_full_bar
6318        );
6319
6320        let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6321        let rc = Rc::new(RefCell::new(boxed));
6322        rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6323
6324        rc.borrow_mut().update(
6325            Price::from("100.00"),
6326            Quantity::from(1),
6327            UnixNanos::from(start_ns),
6328        );
6329        rc.borrow_mut().build_bar(&TimeEvent::new(
6330            event_name,
6331            UUID4::new(),
6332            UnixNanos::from(start_ns),
6333            UnixNanos::from(start_ns),
6334        ));
6335
6336        let bars = handler.lock().expect(MUTEX_POISONED);
6337        assert_eq!(bars.len(), 1);
6338        assert_eq!(bars[0].ts_event, UnixNanos::from(expected_stored_open_ns));
6339        assert_eq!(bars[0].ts_init, UnixNanos::from(start_ns));
6340    }
6341
6342    #[rstest]
6343    fn test_time_bar_historical_prevents_bars_for_timer_before_last_data(equity_aapl: Equity) {
6344        let instrument = InstrumentAny::Equity(equity_aapl);
6345        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6346        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6347        let handler = Arc::new(Mutex::new(Vec::new()));
6348        let handler_clone = Arc::clone(&handler);
6349        let clock = Rc::new(RefCell::new(TestClock::new()));
6350
6351        let mut agg = TimeBarAggregator::new(
6352            bar_type,
6353            instrument.price_precision(),
6354            instrument.size_precision(),
6355            clock.clone(),
6356            move |bar: Bar| {
6357                let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6358                h.push(bar);
6359            },
6360            true,
6361            true,
6362            BarIntervalType::LeftOpen,
6363            None,
6364            0,
6365            false,
6366        );
6367        agg.historical_mode = true;
6368        agg.set_clock_internal(clock);
6369        let boxed: Box<dyn BarAggregator> = Box::new(agg);
6370        let rc = Rc::new(RefCell::new(boxed));
6371        rc.borrow_mut().set_aggregator_weak(Rc::downgrade(&rc));
6372
6373        let ts1 = UnixNanos::from(2_000_000_000);
6374        rc.borrow_mut()
6375            .update(Price::from("100.00"), Quantity::from(1), ts1);
6376
6377        let ts2 = UnixNanos::from(3_000_000_000);
6378        rc.borrow_mut()
6379            .update(Price::from("101.00"), Quantity::from(1), ts2);
6380
6381        let bars = handler.lock().expect(MUTEX_POISONED);
6382        assert!(
6383            !bars.is_empty(),
6384            "advancing time from ts1 to ts2 should produce at least one bar"
6385        );
6386        assert_eq!(bars[0].close, Price::from("100.00"));
6387    }
6388}
6389
6390#[cfg(test)]
6391mod property_tests {
6392    use std::{
6393        cell::RefCell,
6394        rc::Rc,
6395        sync::{Arc, Mutex},
6396    };
6397
6398    use nautilus_common::{clock::TestClock, timer::TimeEvent};
6399    use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
6400    use nautilus_model::{
6401        data::{Bar, BarSpecification, BarType, bar::get_bar_interval_ns},
6402        enums::{AggregationSource, BarAggregation, BarIntervalType, PriceType},
6403        instruments::{Instrument, InstrumentAny, stubs::equity_aapl},
6404        types::{Price, Quantity},
6405    };
6406    use proptest::prelude::*;
6407    use rstest::rstest;
6408    use ustr::Ustr;
6409
6410    use super::*;
6411
6412    fn aggregation_strategy() -> impl Strategy<Value = BarAggregation> {
6413        prop_oneof![
6414            Just(BarAggregation::Second),
6415            Just(BarAggregation::Minute),
6416            Just(BarAggregation::Hour),
6417        ]
6418    }
6419
6420    fn interval_type_strategy() -> impl Strategy<Value = BarIntervalType> {
6421        prop_oneof![
6422            Just(BarIntervalType::LeftOpen),
6423            Just(BarIntervalType::RightOpen),
6424        ]
6425    }
6426
6427    proptest! {
6428        #[rstest]
6429        fn prop_skip_first_drops_partial_then_emits(
6430            aggregation in aggregation_strategy(),
6431            step in 1usize..=5,
6432            interval_type in interval_type_strategy(),
6433            skip_first in any::<bool>(),
6434        ) {
6435            let instrument = InstrumentAny::Equity(equity_aapl());
6436            let bar_spec = BarSpecification::new(step, aggregation, PriceType::Last);
6437            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6438            let interval_ns = get_bar_interval_ns(&bar_type).as_u64();
6439
6440            // Anchor the clock one full interval past epoch plus a half-interval offset
6441            // so start_time lands mid-interval and fire_immediately is false.
6442            let now_ns = interval_ns + interval_ns / 2;
6443
6444            let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6445            let handler_clone = Arc::clone(&handler);
6446            let clock = Rc::new(RefCell::new(TestClock::new()));
6447            clock.borrow_mut().set_time(UnixNanos::from(now_ns));
6448            let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6449
6450            let aggregator = TimeBarAggregator::new(
6451                bar_type,
6452                instrument.price_precision(),
6453                instrument.size_precision(),
6454                clock,
6455                move |bar: Bar| {
6456                    let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6457                    h.push(bar);
6458                },
6459                false,
6460                false,
6461                interval_type,
6462                None,
6463                0,
6464                skip_first,
6465            );
6466
6467            let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6468            let rc = Rc::new(RefCell::new(boxed));
6469            rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6470
6471            // First tick + first close event. start_time = 1 * interval, first_close
6472            // = 2 * interval. ts_init == first_close_ns: partial bar.
6473            rc.borrow_mut().update(
6474                Price::from("100.00"),
6475                Quantity::from(1),
6476                UnixNanos::from(now_ns),
6477            );
6478            let first_close = 2 * interval_ns;
6479            rc.borrow_mut().build_bar(&TimeEvent::new(
6480                event_name,
6481                UUID4::new(),
6482                UnixNanos::from(first_close),
6483                UnixNanos::from(first_close),
6484            ));
6485
6486            // Second tick + later close; emits unconditionally.
6487            rc.borrow_mut().update(
6488                Price::from("101.00"),
6489                Quantity::from(1),
6490                UnixNanos::from(first_close + interval_ns / 2),
6491            );
6492            let second_close = first_close + interval_ns;
6493            rc.borrow_mut().build_bar(&TimeEvent::new(
6494                event_name,
6495                UUID4::new(),
6496                UnixNanos::from(second_close),
6497                UnixNanos::from(second_close),
6498            ));
6499
6500            let bars = handler.lock().expect(MUTEX_POISONED);
6501            let expected = if skip_first { 1 } else { 2 };
6502            prop_assert_eq!(bars.len(), expected);
6503            prop_assert_eq!(bars.last().unwrap().close, Price::from("101.00"));
6504            for bar in bars.iter() {
6505                prop_assert!(bar.high >= bar.open);
6506                prop_assert!(bar.high >= bar.close);
6507                prop_assert!(bar.low <= bar.open);
6508                prop_assert!(bar.low <= bar.close);
6509            }
6510        }
6511
6512        #[rstest]
6513        fn prop_skip_first_noop_on_exact_boundary(
6514            aggregation in aggregation_strategy(),
6515            step in 1usize..=5,
6516            interval_type in interval_type_strategy(),
6517        ) {
6518            let instrument = InstrumentAny::Equity(equity_aapl());
6519            let bar_spec = BarSpecification::new(step, aggregation, PriceType::Last);
6520            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6521            let interval_ns = get_bar_interval_ns(&bar_type).as_u64();
6522
6523            // Clock exactly on a bar boundary: fire_immediately=true, so the first
6524            // bar that reaches build_and_send must emit regardless of skip_first.
6525            let now_ns = interval_ns;
6526            let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6527            let handler_clone = Arc::clone(&handler);
6528            let clock = Rc::new(RefCell::new(TestClock::new()));
6529            clock.borrow_mut().set_time(UnixNanos::from(now_ns));
6530            let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6531
6532            let aggregator = TimeBarAggregator::new(
6533                bar_type,
6534                instrument.price_precision(),
6535                instrument.size_precision(),
6536                clock,
6537                move |bar: Bar| {
6538                    let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6539                    h.push(bar);
6540                },
6541                false,
6542                false,
6543                interval_type,
6544                None,
6545                0,
6546                true, // skip_first_non_full_bar
6547            );
6548
6549            let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6550            let rc = Rc::new(RefCell::new(boxed));
6551            rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6552
6553            rc.borrow_mut().update(
6554                Price::from("100.00"),
6555                Quantity::from(1),
6556                UnixNanos::from(now_ns),
6557            );
6558            let next_close = now_ns + interval_ns;
6559            rc.borrow_mut().build_bar(&TimeEvent::new(
6560                event_name,
6561                UUID4::new(),
6562                UnixNanos::from(next_close),
6563                UnixNanos::from(next_close),
6564            ));
6565
6566            let bars = handler.lock().expect(MUTEX_POISONED);
6567            prop_assert_eq!(bars.len(), 1);
6568            prop_assert_eq!(bars[0].close, Price::from("100.00"));
6569        }
6570
6571        #[rstest]
6572        fn prop_bar_builder_ohlc_invariants(
6573            updates in prop::collection::vec((1i64..=100_000i64, 1u64..=1_000u64), 1..=50),
6574        ) {
6575            let instrument = InstrumentAny::Equity(equity_aapl());
6576            let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
6577            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6578            let mut builder = BarBuilder::new(bar_type, 2, 0);
6579
6580            let mut total_volume: u64 = 0;
6581
6582            for (i, (price_cents, size)) in updates.iter().enumerate() {
6583                let price = Price::new((*price_cents as f64) / 100.0, 2);
6584                let qty = Quantity::new(*size as f64, 0);
6585                let ts = UnixNanos::from((i as u64 + 1) * 1_000);
6586                total_volume += *size;
6587                builder.update(price, qty, ts);
6588            }
6589
6590            let bar = builder.build_now();
6591            prop_assert!(bar.low <= bar.open);
6592            prop_assert!(bar.low <= bar.close);
6593            prop_assert!(bar.high >= bar.open);
6594            prop_assert!(bar.high >= bar.close);
6595            prop_assert!(bar.low <= bar.high);
6596            prop_assert_eq!(bar.volume.as_f64(), total_volume as f64);
6597        }
6598
6599        #[rstest]
6600        fn prop_tick_bar_aggregator_volume_conservation(
6601            ticks in prop::collection::vec((1i64..=1_000i64, 1u64..=100u64), 3..=60),
6602            step in 1usize..=5,
6603        ) {
6604            let instrument = InstrumentAny::Equity(equity_aapl());
6605            let bar_spec = BarSpecification::new(step, BarAggregation::Tick, PriceType::Last);
6606            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6607            let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6608            let handler_clone = Arc::clone(&handler);
6609
6610            let mut aggregator = TickBarAggregator::new(
6611                bar_type,
6612                instrument.price_precision(),
6613                instrument.size_precision(),
6614                move |bar: Bar| {
6615                    handler_clone.lock().expect(MUTEX_POISONED).push(bar);
6616                },
6617            );
6618
6619            let mut total_input: u64 = 0;
6620
6621            for (i, (price_cents, size)) in ticks.iter().enumerate() {
6622                let price = Price::new((*price_cents as f64) / 100.0, 2);
6623                let qty = Quantity::new(*size as f64, 0);
6624                aggregator.update(price, qty, UnixNanos::from((i as u64 + 1) * 1_000));
6625                total_input += *size;
6626            }
6627
6628            let bars = handler.lock().expect(MUTEX_POISONED);
6629            let emitted_count = bars.len();
6630            prop_assert_eq!(emitted_count, ticks.len() / step);
6631
6632            let mut sum_emitted: f64 = 0.0;
6633
6634            for bar in bars.iter() {
6635                prop_assert!(bar.low <= bar.open);
6636                prop_assert!(bar.low <= bar.close);
6637                prop_assert!(bar.high >= bar.open);
6638                prop_assert!(bar.high >= bar.close);
6639                sum_emitted += bar.volume.as_f64();
6640            }
6641
6642            // Unemitted pending size remains in the builder for the remainder `ticks.len() % step` ticks.
6643            let pending_size: u64 = ticks.iter()
6644                .skip(emitted_count * step)
6645                .map(|(_, s)| *s)
6646                .sum();
6647            prop_assert!((sum_emitted + pending_size as f64 - total_input as f64).abs() < 1e-6);
6648        }
6649
6650        #[rstest]
6651        fn prop_volume_bar_aggregator_conservation(
6652            sizes in prop::collection::vec(1u64..=50u64, 3..=40),
6653            step in 2u64..=10u64,
6654        ) {
6655            let instrument = InstrumentAny::Equity(equity_aapl());
6656            let bar_spec = BarSpecification::new(step as usize, BarAggregation::Volume, PriceType::Last);
6657            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6658            let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6659            let handler_clone = Arc::clone(&handler);
6660
6661            let mut aggregator = VolumeBarAggregator::new(
6662                bar_type,
6663                instrument.price_precision(),
6664                instrument.size_precision(),
6665                move |bar: Bar| {
6666                    handler_clone.lock().expect(MUTEX_POISONED).push(bar);
6667                },
6668            );
6669
6670            let mut total_input: u64 = 0;
6671
6672            for (i, size) in sizes.iter().enumerate() {
6673                aggregator.update(
6674                    Price::from("100.00"),
6675                    Quantity::new(*size as f64, 0),
6676                    UnixNanos::from((i as u64 + 1) * 1_000),
6677                );
6678                total_input += *size;
6679            }
6680
6681            let bars = handler.lock().expect(MUTEX_POISONED);
6682
6683            // Every emitted bar has exactly `step` volume and OHLC ordering holds.
6684            for bar in bars.iter() {
6685                prop_assert_eq!(bar.volume, Quantity::from(step));
6686                prop_assert!(bar.low <= bar.open);
6687                prop_assert!(bar.low <= bar.close);
6688                prop_assert!(bar.high >= bar.open);
6689                prop_assert!(bar.high >= bar.close);
6690            }
6691
6692            // Conservation: total emitted + pending builder volume equals total input.
6693            let emitted_total: u64 = bars.len() as u64 * step;
6694            let pending = aggregator.core.builder.volume.as_f64();
6695            prop_assert!((emitted_total as f64 + pending - total_input as f64).abs() < 1e-6);
6696        }
6697
6698        #[rstest]
6699        fn prop_value_bar_aggregator_ohlc_invariants(
6700            ticks in prop::collection::vec((50i64..=500i64, 1u64..=20u64), 2..=30),
6701            step in 100u64..=2_000u64,
6702        ) {
6703            let instrument = InstrumentAny::Equity(equity_aapl());
6704            let bar_spec = BarSpecification::new(step as usize, BarAggregation::Value, PriceType::Last);
6705            let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6706            let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6707            let handler_clone = Arc::clone(&handler);
6708
6709            let mut aggregator = ValueBarAggregator::new(
6710                bar_type,
6711                instrument.price_precision(),
6712                instrument.size_precision(),
6713                move |bar: Bar| {
6714                    handler_clone.lock().expect(MUTEX_POISONED).push(bar);
6715                },
6716            );
6717
6718            for (i, (price_cents, size)) in ticks.iter().enumerate() {
6719                aggregator.update(
6720                    Price::new((*price_cents as f64) / 100.0, 2),
6721                    Quantity::new(*size as f64, 0),
6722                    UnixNanos::from((i as u64 + 1) * 1_000),
6723                );
6724            }
6725
6726            let bars = handler.lock().expect(MUTEX_POISONED);
6727            for bar in bars.iter() {
6728                prop_assert!(bar.low <= bar.open);
6729                prop_assert!(bar.low <= bar.close);
6730                prop_assert!(bar.high >= bar.open);
6731                prop_assert!(bar.high >= bar.close);
6732                prop_assert!(bar.volume.as_f64() > 0.0);
6733            }
6734        }
6735    }
6736}