1use std::{
22 any::Any,
23 cell::RefCell,
24 fmt::Debug,
25 ops::Add,
26 rc::{Rc, Weak},
27};
28
29use ahash::AHashMap;
30use chrono::{Duration, TimeDelta};
31use nautilus_common::{
32 clock::{Clock, TestClock},
33 timer::{TimeEvent, TimeEventCallback},
34};
35use nautilus_core::{
36 UnixNanos,
37 correctness::{self, FAILED},
38 datetime::{
39 add_n_months, add_n_months_nanos, add_n_years, add_n_years_nanos, subtract_n_months_nanos,
40 subtract_n_years_nanos,
41 },
42};
43use nautilus_model::{
44 data::{
45 QuoteTick, TradeTick,
46 bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
47 },
48 enums::{AggregationSource, AggressorSide, BarAggregation, BarIntervalType},
49 identifiers::InstrumentId,
50 instruments::{FixedTickScheme, TickSchemeRule},
51 types::{Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
52};
53
54type BarHandler = Box<dyn FnMut(Bar)>;
56
57pub trait BarAggregator: Any + Debug {
61 fn bar_type(&self) -> BarType;
63 fn is_running(&self) -> bool;
65 fn set_is_running(&mut self, value: bool);
67 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
69 fn handle_quote(&mut self, quote: QuoteTick) {
71 let spec = self.bar_type().spec();
72 self.update(
73 quote.extract_price(spec.price_type),
74 quote.extract_size(spec.price_type),
75 quote.ts_init,
76 );
77 }
78 fn handle_trade(&mut self, trade: TradeTick) {
80 self.update(trade.price, trade.size, trade.ts_init);
81 }
82 fn handle_bar(&mut self, bar: Bar) {
84 self.update_bar(bar, bar.volume, bar.ts_init);
85 }
86 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
87 fn stop(&mut self) {}
89 fn set_historical_mode(&mut self, _historical_mode: bool, _handler: Box<dyn FnMut(Bar)>) {}
91 fn set_historical_events(&mut self, _events: Vec<TimeEvent>) {}
93 fn set_clock(&mut self, _clock: Rc<RefCell<dyn Clock>>) {}
95 fn build_bar(&mut self, _event: &TimeEvent) {}
97 fn start_timer(&mut self, _aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {}
101 fn set_aggregator_weak(&mut self, _weak: Weak<RefCell<Box<dyn BarAggregator>>>) {}
104}
105
106impl dyn BarAggregator {
107 pub fn as_any(&self) -> &dyn Any {
109 self
110 }
111 pub fn as_any_mut(&mut self) -> &mut dyn Any {
113 self
114 }
115}
116
117#[derive(Debug)]
119pub struct BarBuilder {
120 bar_type: BarType,
121 price_precision: u8,
122 size_precision: u8,
123 initialized: bool,
124 ts_last: UnixNanos,
125 count: usize,
126 last_close: Option<Price>,
127 open: Option<Price>,
128 high: Option<Price>,
129 low: Option<Price>,
130 close: Option<Price>,
131 volume: Quantity,
132}
133
134impl BarBuilder {
135 #[must_use]
141 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
142 correctness::check_equal(
143 &bar_type.aggregation_source(),
144 &AggregationSource::Internal,
145 "bar_type.aggregation_source",
146 "AggregationSource::Internal",
147 )
148 .expect(FAILED);
149
150 Self {
151 bar_type,
152 price_precision,
153 size_precision,
154 initialized: false,
155 ts_last: UnixNanos::default(),
156 count: 0,
157 last_close: None,
158 open: None,
159 high: None,
160 low: None,
161 close: None,
162 volume: Quantity::zero(size_precision),
163 }
164 }
165
166 pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
172 if ts_init < self.ts_last {
173 return; }
175
176 if self.open.is_none() {
177 self.open = Some(price);
178 self.high = Some(price);
179 self.low = Some(price);
180 self.initialized = true;
181 } else {
182 if price > self.high.unwrap() {
183 self.high = Some(price);
184 }
185
186 if price < self.low.unwrap() {
187 self.low = Some(price);
188 }
189 }
190
191 self.close = Some(price);
192 self.volume = self.volume.add(size);
193 self.count += 1;
194 self.ts_last = ts_init;
195 }
196
197 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
203 if ts_init < self.ts_last {
204 return; }
206
207 if self.open.is_none() {
208 self.open = Some(bar.open);
209 self.high = Some(bar.high);
210 self.low = Some(bar.low);
211 self.initialized = true;
212 } else {
213 if bar.high > self.high.unwrap() {
214 self.high = Some(bar.high);
215 }
216
217 if bar.low < self.low.unwrap() {
218 self.low = Some(bar.low);
219 }
220 }
221
222 self.close = Some(bar.close);
223 self.volume = self.volume.add(volume);
224 self.count += 1;
225 self.ts_last = ts_init;
226 }
227
228 pub fn reset(&mut self) {
232 self.open = None;
233 self.high = None;
234 self.low = None;
235 self.volume = Quantity::zero(self.size_precision);
236 self.count = 0;
237 }
238
239 pub fn build_now(&mut self) -> Bar {
241 self.build(self.ts_last, self.ts_last)
242 }
243
244 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
250 if self.open.is_none() {
251 self.open = self.last_close;
252 self.high = self.last_close;
253 self.low = self.last_close;
254 self.close = self.last_close;
255 }
256
257 if let (Some(close), Some(low)) = (self.close, self.low)
258 && close < low
259 {
260 self.low = Some(close);
261 }
262
263 if let (Some(close), Some(high)) = (self.close, self.high)
264 && close > high
265 {
266 self.high = Some(close);
267 }
268
269 let bar = Bar::new(
271 self.bar_type,
272 self.open.unwrap(),
273 self.high.unwrap(),
274 self.low.unwrap(),
275 self.close.unwrap(),
276 self.volume,
277 ts_event,
278 ts_init,
279 );
280
281 self.last_close = self.close;
282 self.reset();
283 bar
284 }
285}
286
287pub struct BarAggregatorCore {
289 bar_type: BarType,
290 builder: BarBuilder,
291 handler: BarHandler,
292 is_running: bool,
293}
294
295impl Debug for BarAggregatorCore {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 f.debug_struct(stringify!(BarAggregatorCore))
298 .field("bar_type", &self.bar_type)
299 .field("builder", &self.builder)
300 .field("is_running", &self.is_running)
301 .finish()
302 }
303}
304
305impl BarAggregatorCore {
306 pub fn new<H: FnMut(Bar) + 'static>(
312 bar_type: BarType,
313 price_precision: u8,
314 size_precision: u8,
315 handler: H,
316 ) -> Self {
317 Self {
318 bar_type,
319 builder: BarBuilder::new(bar_type, price_precision, size_precision),
320 handler: Box::new(handler),
321 is_running: false,
322 }
323 }
324
325 pub const fn set_is_running(&mut self, value: bool) {
327 self.is_running = value;
328 }
329 fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
330 self.builder.update(price, size, ts_init);
331 }
332
333 fn build_now_and_send(&mut self) {
334 let bar = self.builder.build_now();
335 (self.handler)(bar);
336 }
337
338 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
339 let bar = self.builder.build(ts_event, ts_init);
340 (self.handler)(bar);
341 }
342}
343
344pub struct TickBarAggregator {
349 core: BarAggregatorCore,
350}
351
352impl Debug for TickBarAggregator {
353 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
354 f.debug_struct(stringify!(TickBarAggregator))
355 .field("core", &self.core)
356 .finish()
357 }
358}
359
360impl TickBarAggregator {
361 pub fn new<H: FnMut(Bar) + 'static>(
367 bar_type: BarType,
368 price_precision: u8,
369 size_precision: u8,
370 handler: H,
371 ) -> Self {
372 Self {
373 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
374 }
375 }
376}
377
378impl BarAggregator for TickBarAggregator {
379 fn bar_type(&self) -> BarType {
380 self.core.bar_type
381 }
382
383 fn is_running(&self) -> bool {
384 self.core.is_running
385 }
386
387 fn set_is_running(&mut self, value: bool) {
388 self.core.set_is_running(value);
389 }
390
391 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
393 self.core.apply_update(price, size, ts_init);
394 let spec = self.core.bar_type.spec();
395
396 if self.core.builder.count >= spec.step.get() {
397 self.core.build_now_and_send();
398 }
399 }
400
401 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
402 self.core.builder.update_bar(bar, volume, ts_init);
403 let spec = self.core.bar_type.spec();
404
405 if self.core.builder.count >= spec.step.get() {
406 self.core.build_now_and_send();
407 }
408 }
409}
410
411pub struct TickImbalanceBarAggregator {
416 core: BarAggregatorCore,
417 imbalance: isize,
418}
419
420impl Debug for TickImbalanceBarAggregator {
421 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
422 f.debug_struct(stringify!(TickImbalanceBarAggregator))
423 .field("core", &self.core)
424 .field("imbalance", &self.imbalance)
425 .finish()
426 }
427}
428
429impl TickImbalanceBarAggregator {
430 pub fn new<H: FnMut(Bar) + 'static>(
436 bar_type: BarType,
437 price_precision: u8,
438 size_precision: u8,
439 handler: H,
440 ) -> Self {
441 Self {
442 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
443 imbalance: 0,
444 }
445 }
446}
447
448impl BarAggregator for TickImbalanceBarAggregator {
449 fn bar_type(&self) -> BarType {
450 self.core.bar_type
451 }
452
453 fn is_running(&self) -> bool {
454 self.core.is_running
455 }
456
457 fn set_is_running(&mut self, value: bool) {
458 self.core.set_is_running(value);
459 }
460
461 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
466 self.core.apply_update(price, size, ts_init);
467 }
468
469 fn handle_trade(&mut self, trade: TradeTick) {
470 self.core
471 .apply_update(trade.price, trade.size, trade.ts_init);
472
473 let delta = match trade.aggressor_side {
474 AggressorSide::Buyer => 1,
475 AggressorSide::Seller => -1,
476 AggressorSide::NoAggressor => 0,
477 };
478
479 if delta == 0 {
480 return;
481 }
482
483 self.imbalance += delta;
484 let threshold = self.core.bar_type.spec().step.get();
485 if self.imbalance.unsigned_abs() >= threshold {
486 self.core.build_now_and_send();
487 self.imbalance = 0;
488 }
489 }
490
491 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
492 self.core.builder.update_bar(bar, volume, ts_init);
493 }
494}
495
496pub struct TickRunsBarAggregator {
498 core: BarAggregatorCore,
499 current_run_side: Option<AggressorSide>,
500 run_count: usize,
501}
502
503impl Debug for TickRunsBarAggregator {
504 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505 f.debug_struct(stringify!(TickRunsBarAggregator))
506 .field("core", &self.core)
507 .field("current_run_side", &self.current_run_side)
508 .field("run_count", &self.run_count)
509 .finish()
510 }
511}
512
513impl TickRunsBarAggregator {
514 pub fn new<H: FnMut(Bar) + 'static>(
520 bar_type: BarType,
521 price_precision: u8,
522 size_precision: u8,
523 handler: H,
524 ) -> Self {
525 Self {
526 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
527 current_run_side: None,
528 run_count: 0,
529 }
530 }
531}
532
533impl BarAggregator for TickRunsBarAggregator {
534 fn bar_type(&self) -> BarType {
535 self.core.bar_type
536 }
537
538 fn is_running(&self) -> bool {
539 self.core.is_running
540 }
541
542 fn set_is_running(&mut self, value: bool) {
543 self.core.set_is_running(value);
544 }
545
546 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
551 self.core.apply_update(price, size, ts_init);
552 }
553
554 fn handle_trade(&mut self, trade: TradeTick) {
555 let side = match trade.aggressor_side {
556 AggressorSide::Buyer => Some(AggressorSide::Buyer),
557 AggressorSide::Seller => Some(AggressorSide::Seller),
558 AggressorSide::NoAggressor => None,
559 };
560
561 if let Some(side) = side {
562 if self.current_run_side != Some(side) {
563 self.current_run_side = Some(side);
564 self.run_count = 0;
565 self.core.builder.reset();
566 }
567
568 self.core
569 .apply_update(trade.price, trade.size, trade.ts_init);
570 self.run_count += 1;
571
572 let threshold = self.core.bar_type.spec().step.get();
573 if self.run_count >= threshold {
574 self.core.build_now_and_send();
575 self.run_count = 0;
576 self.current_run_side = None;
577 }
578 } else {
579 self.core
580 .apply_update(trade.price, trade.size, trade.ts_init);
581 }
582 }
583
584 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
585 self.core.builder.update_bar(bar, volume, ts_init);
586 }
587}
588
589pub struct VolumeBarAggregator {
591 core: BarAggregatorCore,
592}
593
594impl Debug for VolumeBarAggregator {
595 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
596 f.debug_struct(stringify!(VolumeBarAggregator))
597 .field("core", &self.core)
598 .finish()
599 }
600}
601
602impl VolumeBarAggregator {
603 pub fn new<H: FnMut(Bar) + 'static>(
609 bar_type: BarType,
610 price_precision: u8,
611 size_precision: u8,
612 handler: H,
613 ) -> Self {
614 Self {
615 core: BarAggregatorCore::new(
616 bar_type.standard(),
617 price_precision,
618 size_precision,
619 handler,
620 ),
621 }
622 }
623}
624
625impl BarAggregator for VolumeBarAggregator {
626 fn bar_type(&self) -> BarType {
627 self.core.bar_type
628 }
629
630 fn is_running(&self) -> bool {
631 self.core.is_running
632 }
633
634 fn set_is_running(&mut self, value: bool) {
635 self.core.set_is_running(value);
636 }
637
638 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
640 let mut raw_size_update = size.raw;
641 let spec = self.core.bar_type.spec();
642 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
643
644 while raw_size_update > 0 {
645 if self.core.builder.volume.raw + raw_size_update < raw_step {
646 self.core.apply_update(
647 price,
648 Quantity::from_raw(raw_size_update, size.precision),
649 ts_init,
650 );
651 break;
652 }
653
654 let raw_size_diff = raw_step - self.core.builder.volume.raw;
655 self.core.apply_update(
656 price,
657 Quantity::from_raw(raw_size_diff, size.precision),
658 ts_init,
659 );
660
661 self.core.build_now_and_send();
662 raw_size_update -= raw_size_diff;
663 }
664 }
665
666 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
667 let mut raw_volume_update = volume.raw;
668 let spec = self.core.bar_type.spec();
669 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
670
671 while raw_volume_update > 0 {
672 if self.core.builder.volume.raw + raw_volume_update < raw_step {
673 self.core.builder.update_bar(
674 bar,
675 Quantity::from_raw(raw_volume_update, volume.precision),
676 ts_init,
677 );
678 break;
679 }
680
681 let raw_volume_diff = raw_step - self.core.builder.volume.raw;
682 self.core.builder.update_bar(
683 bar,
684 Quantity::from_raw(raw_volume_diff, volume.precision),
685 ts_init,
686 );
687
688 self.core.build_now_and_send();
689 raw_volume_update -= raw_volume_diff;
690 }
691 }
692}
693
694pub struct VolumeImbalanceBarAggregator {
696 core: BarAggregatorCore,
697 imbalance_raw: i128,
698 raw_step: i128,
699}
700
701impl Debug for VolumeImbalanceBarAggregator {
702 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
703 f.debug_struct(stringify!(VolumeImbalanceBarAggregator))
704 .field("core", &self.core)
705 .field("imbalance_raw", &self.imbalance_raw)
706 .field("raw_step", &self.raw_step)
707 .finish()
708 }
709}
710
711impl VolumeImbalanceBarAggregator {
712 pub fn new<H: FnMut(Bar) + 'static>(
718 bar_type: BarType,
719 price_precision: u8,
720 size_precision: u8,
721 handler: H,
722 ) -> Self {
723 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as i128;
724 Self {
725 core: BarAggregatorCore::new(
726 bar_type.standard(),
727 price_precision,
728 size_precision,
729 handler,
730 ),
731 imbalance_raw: 0,
732 raw_step,
733 }
734 }
735}
736
737impl BarAggregator for VolumeImbalanceBarAggregator {
738 fn bar_type(&self) -> BarType {
739 self.core.bar_type
740 }
741
742 fn is_running(&self) -> bool {
743 self.core.is_running
744 }
745
746 fn set_is_running(&mut self, value: bool) {
747 self.core.set_is_running(value);
748 }
749
750 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
755 self.core.apply_update(price, size, ts_init);
756 }
757
758 fn handle_trade(&mut self, trade: TradeTick) {
759 let side = match trade.aggressor_side {
760 AggressorSide::Buyer => 1,
761 AggressorSide::Seller => -1,
762 AggressorSide::NoAggressor => {
763 self.core
764 .apply_update(trade.price, trade.size, trade.ts_init);
765 return;
766 }
767 };
768
769 let mut raw_remaining = trade.size.raw as i128;
770 while raw_remaining > 0 {
771 let imbalance_abs = self.imbalance_raw.abs();
772 let needed = (self.raw_step - imbalance_abs).max(1);
773 let raw_chunk = raw_remaining.min(needed);
774 let qty_chunk = Quantity::from_raw(raw_chunk as QuantityRaw, trade.size.precision);
775
776 self.core
777 .apply_update(trade.price, qty_chunk, trade.ts_init);
778
779 self.imbalance_raw += side * raw_chunk;
780 raw_remaining -= raw_chunk;
781
782 if self.imbalance_raw.abs() >= self.raw_step {
783 self.core.build_now_and_send();
784 self.imbalance_raw = 0;
785 }
786 }
787 }
788
789 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
790 self.core.builder.update_bar(bar, volume, ts_init);
791 }
792}
793
794pub struct VolumeRunsBarAggregator {
796 core: BarAggregatorCore,
797 current_run_side: Option<AggressorSide>,
798 run_volume_raw: QuantityRaw,
799 raw_step: QuantityRaw,
800}
801
802impl Debug for VolumeRunsBarAggregator {
803 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
804 f.debug_struct(stringify!(VolumeRunsBarAggregator))
805 .field("core", &self.core)
806 .field("current_run_side", &self.current_run_side)
807 .field("run_volume_raw", &self.run_volume_raw)
808 .field("raw_step", &self.raw_step)
809 .finish()
810 }
811}
812
813impl VolumeRunsBarAggregator {
814 pub fn new<H: FnMut(Bar) + 'static>(
820 bar_type: BarType,
821 price_precision: u8,
822 size_precision: u8,
823 handler: H,
824 ) -> Self {
825 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
826 Self {
827 core: BarAggregatorCore::new(
828 bar_type.standard(),
829 price_precision,
830 size_precision,
831 handler,
832 ),
833 current_run_side: None,
834 run_volume_raw: 0,
835 raw_step,
836 }
837 }
838}
839
840impl BarAggregator for VolumeRunsBarAggregator {
841 fn bar_type(&self) -> BarType {
842 self.core.bar_type
843 }
844
845 fn is_running(&self) -> bool {
846 self.core.is_running
847 }
848
849 fn set_is_running(&mut self, value: bool) {
850 self.core.set_is_running(value);
851 }
852
853 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
858 self.core.apply_update(price, size, ts_init);
859 }
860
861 fn handle_trade(&mut self, trade: TradeTick) {
862 let side = match trade.aggressor_side {
863 AggressorSide::Buyer => Some(AggressorSide::Buyer),
864 AggressorSide::Seller => Some(AggressorSide::Seller),
865 AggressorSide::NoAggressor => None,
866 };
867
868 let Some(side) = side else {
869 self.core
870 .apply_update(trade.price, trade.size, trade.ts_init);
871 return;
872 };
873
874 if self.current_run_side != Some(side) {
875 self.current_run_side = Some(side);
876 self.run_volume_raw = 0;
877 self.core.builder.reset();
878 }
879
880 let mut raw_remaining = trade.size.raw;
881 while raw_remaining > 0 {
882 let needed = self.raw_step.saturating_sub(self.run_volume_raw).max(1);
883 let raw_chunk = raw_remaining.min(needed);
884
885 self.core.apply_update(
886 trade.price,
887 Quantity::from_raw(raw_chunk, trade.size.precision),
888 trade.ts_init,
889 );
890
891 self.run_volume_raw += raw_chunk;
892 raw_remaining -= raw_chunk;
893
894 if self.run_volume_raw >= self.raw_step {
895 self.core.build_now_and_send();
896 self.run_volume_raw = 0;
897 self.current_run_side = None;
898 }
899 }
900 }
901
902 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
903 self.core.builder.update_bar(bar, volume, ts_init);
904 }
905}
906
907pub struct ValueBarAggregator {
912 core: BarAggregatorCore,
913 cum_value: f64,
914}
915
916impl Debug for ValueBarAggregator {
917 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
918 f.debug_struct(stringify!(ValueBarAggregator))
919 .field("core", &self.core)
920 .field("cum_value", &self.cum_value)
921 .finish()
922 }
923}
924
925impl ValueBarAggregator {
926 pub fn new<H: FnMut(Bar) + 'static>(
932 bar_type: BarType,
933 price_precision: u8,
934 size_precision: u8,
935 handler: H,
936 ) -> Self {
937 Self {
938 core: BarAggregatorCore::new(
939 bar_type.standard(),
940 price_precision,
941 size_precision,
942 handler,
943 ),
944 cum_value: 0.0,
945 }
946 }
947
948 #[must_use]
949 pub const fn get_cumulative_value(&self) -> f64 {
951 self.cum_value
952 }
953}
954
955impl BarAggregator for ValueBarAggregator {
956 fn bar_type(&self) -> BarType {
957 self.core.bar_type
958 }
959
960 fn is_running(&self) -> bool {
961 self.core.is_running
962 }
963
964 fn set_is_running(&mut self, value: bool) {
965 self.core.set_is_running(value);
966 }
967
968 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
970 let mut size_update = size.as_f64();
971 let spec = self.core.bar_type.spec();
972
973 while size_update > 0.0 {
974 let value_update = price.as_f64() * size_update;
975 if value_update == 0.0 {
976 self.core
978 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
979 break;
980 }
981
982 if self.cum_value + value_update < spec.step.get() as f64 {
983 self.cum_value += value_update;
984 self.core
985 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
986 break;
987 }
988
989 let value_diff = spec.step.get() as f64 - self.cum_value;
990 let mut size_diff = size_update * (value_diff / value_update);
991
992 if is_below_min_size(size_diff, size.precision) {
994 if is_below_min_size(size_update, size.precision) {
995 break;
996 }
997 size_diff = min_size_f64(size.precision);
998 }
999
1000 self.core
1001 .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
1002
1003 self.core.build_now_and_send();
1004 self.cum_value = 0.0;
1005 size_update -= size_diff;
1006 }
1007 }
1008
1009 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1010 let mut volume_update = volume;
1011 let average_price = Price::new(
1012 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
1013 self.core.builder.price_precision,
1014 );
1015
1016 while volume_update.as_f64() > 0.0 {
1017 let value_update = average_price.as_f64() * volume_update.as_f64();
1018 if value_update == 0.0 {
1019 self.core.builder.update_bar(bar, volume_update, ts_init);
1021 break;
1022 }
1023
1024 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
1025 self.cum_value += value_update;
1026 self.core.builder.update_bar(bar, volume_update, ts_init);
1027 break;
1028 }
1029
1030 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
1031 let mut volume_diff = volume_update.as_f64() * (value_diff / value_update);
1032
1033 if is_below_min_size(volume_diff, volume_update.precision) {
1035 if is_below_min_size(volume_update.as_f64(), volume_update.precision) {
1036 break;
1037 }
1038 volume_diff = min_size_f64(volume_update.precision);
1039 }
1040
1041 self.core.builder.update_bar(
1042 bar,
1043 Quantity::new(volume_diff, volume_update.precision),
1044 ts_init,
1045 );
1046
1047 self.core.build_now_and_send();
1048 self.cum_value = 0.0;
1049 volume_update = Quantity::new(
1050 volume_update.as_f64() - volume_diff,
1051 volume_update.precision,
1052 );
1053 }
1054 }
1055}
1056
1057pub struct ValueImbalanceBarAggregator {
1059 core: BarAggregatorCore,
1060 imbalance_value: f64,
1061 step_value: f64,
1062}
1063
1064impl Debug for ValueImbalanceBarAggregator {
1065 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1066 f.debug_struct(stringify!(ValueImbalanceBarAggregator))
1067 .field("core", &self.core)
1068 .field("imbalance_value", &self.imbalance_value)
1069 .field("step_value", &self.step_value)
1070 .finish()
1071 }
1072}
1073
1074impl ValueImbalanceBarAggregator {
1075 pub fn new<H: FnMut(Bar) + 'static>(
1081 bar_type: BarType,
1082 price_precision: u8,
1083 size_precision: u8,
1084 handler: H,
1085 ) -> Self {
1086 Self {
1087 core: BarAggregatorCore::new(
1088 bar_type.standard(),
1089 price_precision,
1090 size_precision,
1091 handler,
1092 ),
1093 imbalance_value: 0.0,
1094 step_value: bar_type.spec().step.get() as f64,
1095 }
1096 }
1097}
1098
1099impl BarAggregator for ValueImbalanceBarAggregator {
1100 fn bar_type(&self) -> BarType {
1101 self.core.bar_type
1102 }
1103
1104 fn is_running(&self) -> bool {
1105 self.core.is_running
1106 }
1107
1108 fn set_is_running(&mut self, value: bool) {
1109 self.core.set_is_running(value);
1110 }
1111
1112 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1117 self.core.apply_update(price, size, ts_init);
1118 }
1119
1120 fn handle_trade(&mut self, trade: TradeTick) {
1121 let price_f64 = trade.price.as_f64();
1122 if price_f64 == 0.0 {
1123 self.core
1124 .apply_update(trade.price, trade.size, trade.ts_init);
1125 return;
1126 }
1127
1128 let side_sign = match trade.aggressor_side {
1129 AggressorSide::Buyer => 1.0,
1130 AggressorSide::Seller => -1.0,
1131 AggressorSide::NoAggressor => {
1132 self.core
1133 .apply_update(trade.price, trade.size, trade.ts_init);
1134 return;
1135 }
1136 };
1137
1138 let mut size_remaining = trade.size.as_f64();
1139 while size_remaining > 0.0 {
1140 let value_remaining = price_f64 * size_remaining;
1141
1142 if self.imbalance_value == 0.0 || self.imbalance_value.signum() == side_sign {
1143 let needed = self.step_value - self.imbalance_value.abs();
1144 if value_remaining <= needed {
1145 self.imbalance_value += side_sign * value_remaining;
1146 self.core.apply_update(
1147 trade.price,
1148 Quantity::new(size_remaining, trade.size.precision),
1149 trade.ts_init,
1150 );
1151
1152 if self.imbalance_value.abs() >= self.step_value {
1153 self.core.build_now_and_send();
1154 self.imbalance_value = 0.0;
1155 }
1156 break;
1157 }
1158
1159 let mut value_chunk = needed;
1160 let mut size_chunk = value_chunk / price_f64;
1161
1162 if is_below_min_size(size_chunk, trade.size.precision) {
1164 if is_below_min_size(size_remaining, trade.size.precision) {
1165 break;
1166 }
1167 size_chunk = min_size_f64(trade.size.precision);
1168 value_chunk = price_f64 * size_chunk;
1169 }
1170
1171 self.core.apply_update(
1172 trade.price,
1173 Quantity::new(size_chunk, trade.size.precision),
1174 trade.ts_init,
1175 );
1176 self.imbalance_value += side_sign * value_chunk;
1177 size_remaining -= size_chunk;
1178
1179 if self.imbalance_value.abs() >= self.step_value {
1180 self.core.build_now_and_send();
1181 self.imbalance_value = 0.0;
1182 }
1183 } else {
1184 let mut value_to_flatten = self.imbalance_value.abs().min(value_remaining);
1186 let mut size_chunk = value_to_flatten / price_f64;
1187
1188 if is_below_min_size(size_chunk, trade.size.precision) {
1190 if is_below_min_size(size_remaining, trade.size.precision) {
1191 break;
1192 }
1193 size_chunk = min_size_f64(trade.size.precision);
1194 value_to_flatten = price_f64 * size_chunk;
1195 }
1196
1197 self.core.apply_update(
1198 trade.price,
1199 Quantity::new(size_chunk, trade.size.precision),
1200 trade.ts_init,
1201 );
1202 self.imbalance_value += side_sign * value_to_flatten;
1203
1204 if self.imbalance_value.abs() >= self.step_value {
1206 self.core.build_now_and_send();
1207 self.imbalance_value = 0.0;
1208 }
1209 size_remaining -= size_chunk;
1210 }
1211 }
1212 }
1213
1214 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1215 self.core.builder.update_bar(bar, volume, ts_init);
1216 }
1217}
1218
1219pub struct ValueRunsBarAggregator {
1221 core: BarAggregatorCore,
1222 current_run_side: Option<AggressorSide>,
1223 run_value: f64,
1224 step_value: f64,
1225}
1226
1227impl Debug for ValueRunsBarAggregator {
1228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1229 f.debug_struct(stringify!(ValueRunsBarAggregator))
1230 .field("core", &self.core)
1231 .field("current_run_side", &self.current_run_side)
1232 .field("run_value", &self.run_value)
1233 .field("step_value", &self.step_value)
1234 .finish()
1235 }
1236}
1237
1238impl ValueRunsBarAggregator {
1239 pub fn new<H: FnMut(Bar) + 'static>(
1245 bar_type: BarType,
1246 price_precision: u8,
1247 size_precision: u8,
1248 handler: H,
1249 ) -> Self {
1250 Self {
1251 core: BarAggregatorCore::new(
1252 bar_type.standard(),
1253 price_precision,
1254 size_precision,
1255 handler,
1256 ),
1257 current_run_side: None,
1258 run_value: 0.0,
1259 step_value: bar_type.spec().step.get() as f64,
1260 }
1261 }
1262}
1263
1264impl BarAggregator for ValueRunsBarAggregator {
1265 fn bar_type(&self) -> BarType {
1266 self.core.bar_type
1267 }
1268
1269 fn is_running(&self) -> bool {
1270 self.core.is_running
1271 }
1272
1273 fn set_is_running(&mut self, value: bool) {
1274 self.core.set_is_running(value);
1275 }
1276
1277 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1282 self.core.apply_update(price, size, ts_init);
1283 }
1284
1285 fn handle_trade(&mut self, trade: TradeTick) {
1286 let price_f64 = trade.price.as_f64();
1287 if price_f64 == 0.0 {
1288 self.core
1289 .apply_update(trade.price, trade.size, trade.ts_init);
1290 return;
1291 }
1292
1293 let side = match trade.aggressor_side {
1294 AggressorSide::Buyer => Some(AggressorSide::Buyer),
1295 AggressorSide::Seller => Some(AggressorSide::Seller),
1296 AggressorSide::NoAggressor => None,
1297 };
1298
1299 let Some(side) = side else {
1300 self.core
1301 .apply_update(trade.price, trade.size, trade.ts_init);
1302 return;
1303 };
1304
1305 if self.current_run_side != Some(side) {
1306 self.current_run_side = Some(side);
1307 self.run_value = 0.0;
1308 self.core.builder.reset();
1309 }
1310
1311 let mut size_remaining = trade.size.as_f64();
1312 while size_remaining > 0.0 {
1313 let value_update = price_f64 * size_remaining;
1314 if self.run_value + value_update < self.step_value {
1315 self.run_value += value_update;
1316 self.core.apply_update(
1317 trade.price,
1318 Quantity::new(size_remaining, trade.size.precision),
1319 trade.ts_init,
1320 );
1321 break;
1322 }
1323
1324 let value_needed = self.step_value - self.run_value;
1325 let mut size_chunk = value_needed / price_f64;
1326
1327 if is_below_min_size(size_chunk, trade.size.precision) {
1329 if is_below_min_size(size_remaining, trade.size.precision) {
1330 break;
1331 }
1332 size_chunk = min_size_f64(trade.size.precision);
1333 }
1334
1335 self.core.apply_update(
1336 trade.price,
1337 Quantity::new(size_chunk, trade.size.precision),
1338 trade.ts_init,
1339 );
1340
1341 self.core.build_now_and_send();
1342 self.run_value = 0.0;
1343 self.current_run_side = None;
1344 size_remaining -= size_chunk;
1345 }
1346 }
1347
1348 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1349 self.core.builder.update_bar(bar, volume, ts_init);
1350 }
1351}
1352
1353pub struct RenkoBarAggregator {
1359 core: BarAggregatorCore,
1360 pub brick_size: PriceRaw,
1361 last_close: Option<Price>,
1362}
1363
1364impl Debug for RenkoBarAggregator {
1365 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1366 f.debug_struct(stringify!(RenkoBarAggregator))
1367 .field("core", &self.core)
1368 .field("brick_size", &self.brick_size)
1369 .field("last_close", &self.last_close)
1370 .finish()
1371 }
1372}
1373
1374impl RenkoBarAggregator {
1375 pub fn new<H: FnMut(Bar) + 'static>(
1381 bar_type: BarType,
1382 price_precision: u8,
1383 size_precision: u8,
1384 price_increment: Price,
1385 handler: H,
1386 ) -> Self {
1387 let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
1389
1390 Self {
1391 core: BarAggregatorCore::new(
1392 bar_type.standard(),
1393 price_precision,
1394 size_precision,
1395 handler,
1396 ),
1397 brick_size,
1398 last_close: None,
1399 }
1400 }
1401}
1402
1403impl BarAggregator for RenkoBarAggregator {
1404 fn bar_type(&self) -> BarType {
1405 self.core.bar_type
1406 }
1407
1408 fn is_running(&self) -> bool {
1409 self.core.is_running
1410 }
1411
1412 fn set_is_running(&mut self, value: bool) {
1413 self.core.set_is_running(value);
1414 }
1415
1416 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1421 self.core.apply_update(price, size, ts_init);
1423
1424 if self.last_close.is_none() {
1426 self.last_close = Some(price);
1427 return;
1428 }
1429
1430 let last_close = self.last_close.unwrap();
1431
1432 let current_raw = price.raw;
1434 let last_close_raw = last_close.raw;
1435 let price_diff_raw = current_raw - last_close_raw;
1436 let abs_price_diff_raw = price_diff_raw.abs();
1437
1438 if abs_price_diff_raw >= self.brick_size {
1440 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1441 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1442 let mut current_close = last_close;
1443
1444 let total_volume = self.core.builder.volume;
1446
1447 for _i in 0..num_bricks {
1448 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1450 let brick_close = Price::from_raw(brick_close_raw, price.precision);
1451
1452 let (brick_high, brick_low) = if direction > 0.0 {
1454 (brick_close, current_close)
1455 } else {
1456 (current_close, brick_close)
1457 };
1458
1459 self.core.builder.reset();
1461 self.core.builder.open = Some(current_close);
1462 self.core.builder.high = Some(brick_high);
1463 self.core.builder.low = Some(brick_low);
1464 self.core.builder.close = Some(brick_close);
1465 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1467 self.core.builder.ts_last = ts_init;
1468 self.core.builder.initialized = true;
1469
1470 self.core.build_and_send(ts_init, ts_init);
1472
1473 current_close = brick_close;
1475 self.last_close = Some(brick_close);
1476 }
1477 }
1478 }
1479
1480 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1481 self.core.builder.update_bar(bar, volume, ts_init);
1483
1484 if self.last_close.is_none() {
1486 self.last_close = Some(bar.close);
1487 return;
1488 }
1489
1490 let last_close = self.last_close.unwrap();
1491
1492 let current_raw = bar.close.raw;
1494 let last_close_raw = last_close.raw;
1495 let price_diff_raw = current_raw - last_close_raw;
1496 let abs_price_diff_raw = price_diff_raw.abs();
1497
1498 if abs_price_diff_raw >= self.brick_size {
1500 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1501 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1502 let mut current_close = last_close;
1503
1504 let total_volume = self.core.builder.volume;
1506
1507 for _i in 0..num_bricks {
1508 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1510 let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
1511
1512 let (brick_high, brick_low) = if direction > 0.0 {
1514 (brick_close, current_close)
1515 } else {
1516 (current_close, brick_close)
1517 };
1518
1519 self.core.builder.reset();
1521 self.core.builder.open = Some(current_close);
1522 self.core.builder.high = Some(brick_high);
1523 self.core.builder.low = Some(brick_low);
1524 self.core.builder.close = Some(brick_close);
1525 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1527 self.core.builder.ts_last = ts_init;
1528 self.core.builder.initialized = true;
1529
1530 self.core.build_and_send(ts_init, ts_init);
1532
1533 current_close = brick_close;
1535 self.last_close = Some(brick_close);
1536 }
1537 }
1538 }
1539}
1540
1541pub struct TimeBarAggregator {
1545 core: BarAggregatorCore,
1546 clock: Rc<RefCell<dyn Clock>>,
1547 build_with_no_updates: bool,
1548 timestamp_on_close: bool,
1549 is_left_open: bool,
1550 stored_open_ns: UnixNanos,
1551 timer_name: String,
1552 interval_ns: UnixNanos,
1553 next_close_ns: UnixNanos,
1554 first_close_ns: UnixNanos,
1555 bar_build_delay: u64,
1556 time_bars_origin_offset: Option<TimeDelta>,
1557 skip_first_non_full_bar: bool,
1558 pub historical_mode: bool,
1559 historical_events: Vec<TimeEvent>,
1560 historical_event_at_ts_init: Option<TimeEvent>,
1561 aggregator_weak: Option<Weak<RefCell<Box<dyn BarAggregator>>>>,
1562}
1563
1564impl Debug for TimeBarAggregator {
1565 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1566 f.debug_struct(stringify!(TimeBarAggregator))
1567 .field("core", &self.core)
1568 .field("build_with_no_updates", &self.build_with_no_updates)
1569 .field("timestamp_on_close", &self.timestamp_on_close)
1570 .field("is_left_open", &self.is_left_open)
1571 .field("timer_name", &self.timer_name)
1572 .field("interval_ns", &self.interval_ns)
1573 .field("bar_build_delay", &self.bar_build_delay)
1574 .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
1575 .finish()
1576 }
1577}
1578
1579impl TimeBarAggregator {
1580 #[expect(clippy::too_many_arguments)]
1586 pub fn new<H: FnMut(Bar) + 'static>(
1587 bar_type: BarType,
1588 price_precision: u8,
1589 size_precision: u8,
1590 clock: Rc<RefCell<dyn Clock>>,
1591 handler: H,
1592 build_with_no_updates: bool,
1593 timestamp_on_close: bool,
1594 interval_type: BarIntervalType,
1595 time_bars_origin_offset: Option<TimeDelta>,
1596 bar_build_delay: u64,
1597 skip_first_non_full_bar: bool,
1598 ) -> Self {
1599 let is_left_open = match interval_type {
1600 BarIntervalType::LeftOpen => true,
1601 BarIntervalType::RightOpen => false,
1602 };
1603
1604 let core = BarAggregatorCore::new(
1605 bar_type.standard(),
1606 price_precision,
1607 size_precision,
1608 handler,
1609 );
1610
1611 Self {
1612 core,
1613 clock,
1614 build_with_no_updates,
1615 timestamp_on_close,
1616 is_left_open,
1617 stored_open_ns: UnixNanos::default(),
1618 timer_name: format!("TIME_BAR_{bar_type}"),
1619 interval_ns: get_bar_interval_ns(&bar_type),
1620 next_close_ns: UnixNanos::default(),
1621 first_close_ns: UnixNanos::default(),
1622 bar_build_delay,
1623 time_bars_origin_offset,
1624 skip_first_non_full_bar,
1625 historical_mode: false,
1626 historical_events: Vec::new(),
1627 historical_event_at_ts_init: None,
1628 aggregator_weak: None,
1629 }
1630 }
1631
1632 pub fn set_clock_internal(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1634 self.clock = clock;
1635 }
1636
1637 pub fn start_timer_internal(
1646 &mut self,
1647 aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>,
1648 ) {
1649 let aggregator_weak = if let Some(rc) = aggregator_rc {
1651 let weak = Rc::downgrade(&rc);
1653 self.aggregator_weak = Some(weak.clone());
1654 weak
1655 } else {
1656 self.aggregator_weak
1658 .as_ref()
1659 .expect("Aggregator weak reference must be set before calling start_timer()")
1660 .clone()
1661 };
1662
1663 let callback = TimeEventCallback::RustLocal(Rc::new(move |event: TimeEvent| {
1664 if let Some(agg) = aggregator_weak.upgrade() {
1665 agg.borrow_mut().build_bar(&event);
1666 }
1667 }));
1668
1669 let now = self.clock.borrow().utc_now();
1671 let mut start_time =
1672 get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1673 start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1674
1675 let fire_immediately = start_time == now;
1677
1678 let spec = &self.bar_type().spec();
1679 let start_time_ns = UnixNanos::from(start_time);
1680 let step = spec.step.get() as u32;
1681
1682 if spec.aggregation != BarAggregation::Month && spec.aggregation != BarAggregation::Year {
1683 self.clock
1684 .borrow_mut()
1685 .set_timer_ns(
1686 &self.timer_name,
1687 self.interval_ns.as_u64(),
1688 Some(start_time_ns),
1689 None,
1690 Some(callback),
1691 Some(true), Some(fire_immediately),
1693 )
1694 .expect(FAILED);
1695
1696 if fire_immediately {
1697 self.next_close_ns = start_time_ns;
1698 } else {
1699 let interval_duration = Duration::nanoseconds(self.interval_ns.as_i64());
1700 self.next_close_ns = UnixNanos::from(start_time + interval_duration);
1701 }
1702
1703 self.stored_open_ns = self.next_close_ns.saturating_sub_ns(self.interval_ns);
1704 } else {
1705 let alert_time = if fire_immediately {
1707 start_time
1708 } else if spec.aggregation == BarAggregation::Month {
1709 add_n_months(start_time, step).expect(FAILED)
1710 } else {
1711 add_n_years(start_time, step).expect(FAILED)
1712 };
1713
1714 self.clock
1715 .borrow_mut()
1716 .set_time_alert_ns(
1717 &self.timer_name,
1718 UnixNanos::from(alert_time),
1719 Some(callback),
1720 Some(true), )
1722 .expect(FAILED);
1723
1724 self.next_close_ns = UnixNanos::from(alert_time);
1725 self.stored_open_ns = if fire_immediately {
1728 if spec.aggregation == BarAggregation::Month {
1729 subtract_n_months_nanos(start_time_ns, step).expect(FAILED)
1730 } else {
1731 subtract_n_years_nanos(start_time_ns, step).expect(FAILED)
1732 }
1733 } else {
1734 start_time_ns
1735 };
1736 }
1737
1738 if self.skip_first_non_full_bar {
1739 self.first_close_ns = self.next_close_ns;
1740 }
1741
1742 log::debug!(
1743 "Started timer {}, start_time={:?}, historical_mode={}, fire_immediately={}, now={:?}, bar_build_delay={}",
1744 self.timer_name,
1745 start_time,
1746 self.historical_mode,
1747 fire_immediately,
1748 now,
1749 self.bar_build_delay
1750 );
1751 }
1752
1753 pub fn stop(&mut self) {
1755 self.clock.borrow_mut().cancel_timer(&self.timer_name);
1756 }
1757
1758 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1759 if self.skip_first_non_full_bar && ts_init <= self.first_close_ns {
1760 self.core.builder.reset();
1761 } else {
1762 self.skip_first_non_full_bar = false;
1765 self.core.build_and_send(ts_event, ts_init);
1766 }
1767 }
1768
1769 fn build_bar(&mut self, event: &TimeEvent) {
1770 if !self.core.builder.initialized {
1771 return;
1772 }
1773
1774 if !self.build_with_no_updates && self.core.builder.count == 0 {
1775 return; }
1777
1778 let ts_init = event.ts_event;
1779 let ts_event = if self.is_left_open {
1780 if self.timestamp_on_close {
1781 event.ts_event
1782 } else {
1783 self.stored_open_ns
1784 }
1785 } else {
1786 self.stored_open_ns
1787 };
1788
1789 self.build_and_send(ts_event, ts_init);
1790
1791 self.stored_open_ns = event.ts_event;
1793
1794 if self.bar_type().spec().aggregation == BarAggregation::Month {
1795 let step = self.bar_type().spec().step.get() as u32;
1796 let alert_time_ns = add_n_months_nanos(event.ts_event, step).expect(FAILED);
1797
1798 self.clock
1799 .borrow_mut()
1800 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1801 .expect(FAILED);
1802
1803 self.next_close_ns = alert_time_ns;
1804 } else if self.bar_type().spec().aggregation == BarAggregation::Year {
1805 let step = self.bar_type().spec().step.get() as u32;
1806 let alert_time_ns = add_n_years_nanos(event.ts_event, step).expect(FAILED);
1807
1808 self.clock
1809 .borrow_mut()
1810 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1811 .expect(FAILED);
1812
1813 self.next_close_ns = alert_time_ns;
1814 } else {
1815 self.next_close_ns = self
1817 .clock
1818 .borrow()
1819 .next_time_ns(&self.timer_name)
1820 .unwrap_or_default();
1821 }
1822 }
1823
1824 fn preprocess_historical_events(&mut self, ts_init: UnixNanos) {
1825 if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
1826 {
1828 let mut clock_borrow = self.clock.borrow_mut();
1829 let test_clock = clock_borrow
1830 .as_any_mut()
1831 .downcast_mut::<TestClock>()
1832 .expect("Expected TestClock in historical mode");
1833 test_clock.set_time(ts_init);
1834 }
1835 self.start_timer_internal(None);
1837 }
1838
1839 let events = {
1841 let mut clock_borrow = self.clock.borrow_mut();
1842 let test_clock = clock_borrow
1843 .as_any_mut()
1844 .downcast_mut::<TestClock>()
1845 .expect("Expected TestClock in historical mode");
1846 test_clock.advance_time(ts_init, true)
1847 };
1848
1849 for event in events {
1850 if event.ts_event == ts_init {
1851 self.historical_event_at_ts_init = Some(event);
1852 } else {
1853 self.build_bar(&event);
1854 }
1855 }
1856 }
1857
1858 fn postprocess_historical_events(&mut self, _ts_init: UnixNanos) {
1859 if let Some(ref event) = self.historical_event_at_ts_init.take() {
1860 self.build_bar(event);
1861 }
1862 }
1863
1864 pub fn set_historical_events_internal(&mut self, events: Vec<TimeEvent>) {
1866 self.historical_events = events;
1867 }
1868}
1869
1870impl BarAggregator for TimeBarAggregator {
1871 fn bar_type(&self) -> BarType {
1872 self.core.bar_type
1873 }
1874
1875 fn is_running(&self) -> bool {
1876 self.core.is_running
1877 }
1878
1879 fn set_is_running(&mut self, value: bool) {
1880 self.core.set_is_running(value);
1881 }
1882
1883 fn stop(&mut self) {
1885 Self::stop(self);
1886 }
1887
1888 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1889 if self.historical_mode {
1890 self.preprocess_historical_events(ts_init);
1891 }
1892
1893 self.core.apply_update(price, size, ts_init);
1894
1895 if self.historical_mode {
1896 self.postprocess_historical_events(ts_init);
1897 }
1898 }
1899
1900 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1901 if self.historical_mode {
1902 self.preprocess_historical_events(ts_init);
1903 }
1904
1905 self.core.builder.update_bar(bar, volume, ts_init);
1906
1907 if self.historical_mode {
1908 self.postprocess_historical_events(ts_init);
1909 }
1910 }
1911
1912 fn set_historical_mode(&mut self, historical_mode: bool, handler: Box<dyn FnMut(Bar)>) {
1913 self.historical_mode = historical_mode;
1914 self.core.handler = handler;
1915 }
1916
1917 fn set_historical_events(&mut self, events: Vec<TimeEvent>) {
1918 self.set_historical_events_internal(events);
1919 }
1920
1921 fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1922 self.set_clock_internal(clock);
1923 }
1924
1925 fn build_bar(&mut self, event: &TimeEvent) {
1926 {
1929 #[expect(clippy::use_self)]
1930 TimeBarAggregator::build_bar(self, event);
1931 }
1932 }
1933
1934 fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Box<dyn BarAggregator>>>) {
1935 self.aggregator_weak = Some(weak);
1936 }
1937
1938 fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {
1939 self.start_timer_internal(aggregator_rc);
1940 }
1941}
1942
1943fn is_below_min_size(size: f64, precision: u8) -> bool {
1944 Quantity::new(size, precision).raw == 0
1945}
1946
1947fn min_size_f64(precision: u8) -> f64 {
1948 10_f64.powi(-(precision as i32))
1949}
1950
1951pub trait VegaProvider {
1953 fn vega_for_leg(&self, instrument_id: InstrumentId) -> Option<f64>;
1955}
1956
1957pub trait SpreadPriceRounder {
1959 fn round_prices(&self, raw_bid: f64, raw_ask: f64, precision: u8) -> (Price, Price);
1961}
1962
1963#[derive(Debug, Default)]
1965pub struct MapVegaProvider {
1966 vegas: AHashMap<InstrumentId, f64>,
1967}
1968
1969impl MapVegaProvider {
1970 pub fn new() -> Self {
1971 Self {
1972 vegas: AHashMap::new(),
1973 }
1974 }
1975
1976 pub fn insert(&mut self, instrument_id: InstrumentId, vega: f64) {
1977 self.vegas.insert(instrument_id, vega);
1978 }
1979
1980 pub fn get(&self, instrument_id: &InstrumentId) -> Option<f64> {
1981 self.vegas.get(instrument_id).copied()
1982 }
1983}
1984
1985impl VegaProvider for MapVegaProvider {
1986 fn vega_for_leg(&self, instrument_id: InstrumentId) -> Option<f64> {
1987 self.vegas.get(&instrument_id).copied()
1988 }
1989}
1990
1991#[derive(Debug)]
1993pub struct FixedTickSchemeRounder {
1994 scheme: FixedTickScheme,
1995}
1996
1997impl FixedTickSchemeRounder {
1998 pub fn new(tick: f64) -> anyhow::Result<Self> {
2004 Ok(Self {
2005 scheme: FixedTickScheme::new(tick)?,
2006 })
2007 }
2008
2009 fn round_one(&self, raw: f64, precision: u8, use_bid_rounding: bool) -> Price {
2010 if raw >= 0.0 {
2011 let p = if use_bid_rounding {
2012 self.scheme.next_bid_price(raw, 0, precision)
2013 } else {
2014 self.scheme.next_ask_price(raw, 0, precision)
2015 };
2016 p.unwrap_or_else(|| price_from_f64(raw, precision))
2017 } else {
2018 let p = if use_bid_rounding {
2019 self.scheme.next_ask_price(-raw, 0, precision)
2020 } else {
2021 self.scheme.next_bid_price(-raw, 0, precision)
2022 };
2023 p.map_or_else(
2024 || price_from_f64(raw, precision),
2025 |q| price_from_f64(-q.as_f64(), precision),
2026 )
2027 }
2028 }
2029}
2030
2031impl SpreadPriceRounder for FixedTickSchemeRounder {
2032 fn round_prices(&self, raw_bid: f64, raw_ask: f64, precision: u8) -> (Price, Price) {
2033 let bid = self.round_one(raw_bid, precision, true);
2034 let ask = self.round_one(raw_ask, precision, false);
2035 (bid, ask)
2036 }
2037}
2038
2039pub struct SpreadQuoteAggregator {
2045 spread_instrument_id: InstrumentId,
2046 leg_ids: Vec<InstrumentId>,
2047 ratios: Vec<i64>,
2048 n_legs: usize,
2049 is_futures_spread: bool,
2050 price_precision: u8,
2051 size_precision: u8,
2052 last_quotes: AHashMap<InstrumentId, QuoteTick>,
2053 mid_prices: Vec<f64>,
2054 bid_prices: Vec<f64>,
2055 ask_prices: Vec<f64>,
2056 vegas: Vec<f64>,
2057 bid_ask_spreads: Vec<f64>,
2058 bid_sizes: Vec<f64>,
2059 ask_sizes: Vec<f64>,
2060 handler: Box<dyn FnMut(QuoteTick)>,
2061 clock: Rc<RefCell<dyn Clock>>,
2062 historical_mode: bool,
2063 update_interval_seconds: Option<u64>,
2064 quote_build_delay: u64,
2065 has_update: bool,
2066 timer_name: String,
2067 historical_event_at_ts_init: Option<TimeEvent>,
2068 vega_provider: Option<Box<dyn VegaProvider>>,
2069 price_rounder: Option<Box<dyn SpreadPriceRounder>>,
2070 is_running: bool,
2071 aggregator_weak: Option<Weak<RefCell<Self>>>,
2072}
2073
2074impl Debug for SpreadQuoteAggregator {
2075 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2076 f.debug_struct(stringify!(SpreadQuoteAggregator))
2077 .field("spread_instrument_id", &self.spread_instrument_id)
2078 .field("n_legs", &self.n_legs)
2079 .field("is_futures_spread", &self.is_futures_spread)
2080 .field("update_interval_seconds", &self.update_interval_seconds)
2081 .finish()
2082 }
2083}
2084
2085impl SpreadQuoteAggregator {
2086 #[expect(clippy::too_many_arguments)]
2092 pub fn new(
2093 spread_instrument_id: InstrumentId,
2094 legs: &[(InstrumentId, i64)],
2095 is_futures_spread: bool,
2096 price_precision: u8,
2097 size_precision: u8,
2098 handler: Box<dyn FnMut(QuoteTick)>,
2099 clock: Rc<RefCell<dyn Clock>>,
2100 historical_mode: bool,
2101 update_interval_seconds: Option<u64>,
2102 quote_build_delay: u64,
2103 vega_provider: Option<Box<dyn VegaProvider>>,
2104 price_rounder: Option<Box<dyn SpreadPriceRounder>>,
2105 ) -> Self {
2106 assert!(legs.len() >= 2, "Spread must have more than one leg");
2107 let n_legs = legs.len();
2108 let leg_ids: Vec<InstrumentId> = legs.iter().map(|(id, _)| *id).collect();
2109 let ratios: Vec<i64> = legs.iter().map(|(_, r)| *r).collect();
2110 for &r in &ratios {
2111 assert!(r != 0, "Ratio cannot be zero");
2112 }
2113 let timer_name = format!("SPREAD_QUOTE_{spread_instrument_id}");
2114 Self {
2115 spread_instrument_id,
2116 leg_ids,
2117 ratios,
2118 n_legs,
2119 is_futures_spread,
2120 price_precision,
2121 size_precision,
2122 last_quotes: AHashMap::new(),
2123 mid_prices: vec![0.0; n_legs],
2124 bid_prices: vec![0.0; n_legs],
2125 ask_prices: vec![0.0; n_legs],
2126 vegas: vec![0.0; n_legs],
2127 bid_ask_spreads: vec![0.0; n_legs],
2128 bid_sizes: vec![0.0; n_legs],
2129 ask_sizes: vec![0.0; n_legs],
2130 handler,
2131 clock,
2132 historical_mode,
2133 update_interval_seconds,
2134 quote_build_delay,
2135 has_update: false,
2136 timer_name,
2137 historical_event_at_ts_init: None,
2138 vega_provider,
2139 price_rounder,
2140 is_running: false,
2141 aggregator_weak: None,
2142 }
2143 }
2144
2145 pub fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Self>>) {
2148 self.aggregator_weak = Some(weak);
2149 }
2150
2151 pub fn prepare_for_timer_mode(&mut self, self_rc: &Rc<RefCell<Self>>) {
2156 self.aggregator_weak = Some(Rc::downgrade(self_rc));
2157 }
2158
2159 pub fn set_historical_mode(
2161 &mut self,
2162 historical_mode: bool,
2163 handler: Box<dyn FnMut(QuoteTick)>,
2164 vega_provider: Option<Box<dyn VegaProvider>>,
2165 ) {
2166 self.historical_mode = historical_mode;
2167 self.handler = handler;
2168
2169 if let Some(vp) = vega_provider {
2170 self.vega_provider = Some(vp);
2171 }
2172 }
2173
2174 pub fn set_running(&mut self, is_running: bool) {
2175 self.is_running = is_running;
2176 }
2177
2178 pub fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
2179 self.clock = clock;
2180 }
2181
2182 pub fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Self>>>) {
2191 let Some(interval_secs) = self.update_interval_seconds else {
2192 return;
2193 };
2194 let aggregator_weak = if let Some(rc) = aggregator_rc {
2195 let weak = Rc::downgrade(&rc);
2196 self.aggregator_weak = Some(weak.clone());
2197 weak
2198 } else {
2199 self.aggregator_weak.as_ref().cloned().expect(
2200 "SpreadQuoteAggregator: timer mode requires prepare_for_timer_mode(rc) to be \
2201 called first with the Rc that wraps this aggregator (before feeding quotes in \
2202 historical mode or before start_timer(None)).",
2203 )
2204 };
2205
2206 let callback = TimeEventCallback::RustLocal(Rc::new(move |event: TimeEvent| {
2207 if let Some(agg) = aggregator_weak.upgrade() {
2208 agg.borrow_mut().on_timer_fire(event.ts_event);
2209 }
2210 }));
2211
2212 let now_ns = self.clock.borrow().timestamp_ns();
2213 let interval_ns = interval_secs * 1_000_000_000;
2214 let start_ns = (now_ns.as_u64() / interval_ns) * interval_ns;
2215 let start_ns = start_ns + self.quote_build_delay * 1_000; let start_time = UnixNanos::from(start_ns);
2217 let fire_immediately = now_ns == start_time;
2218 self.clock
2219 .borrow_mut()
2220 .set_timer_ns(
2221 &self.timer_name,
2222 interval_ns,
2223 Some(start_time),
2224 None,
2225 Some(callback),
2226 Some(true),
2227 Some(fire_immediately),
2228 )
2229 .expect("Failed to set spread quote timer");
2230 }
2231
2232 pub fn on_timer_fire(&mut self, ts_event: UnixNanos) {
2234 if self.last_quotes.len() == self.n_legs {
2235 self.build_and_send_quote(ts_event);
2236 }
2237 }
2238
2239 pub fn stop_timer(&mut self) {
2241 if self.update_interval_seconds.is_none() {
2242 return;
2243 }
2244
2245 if self
2246 .clock
2247 .borrow()
2248 .timer_names()
2249 .contains(&self.timer_name.as_str())
2250 {
2251 self.clock.borrow_mut().cancel_timer(&self.timer_name);
2252 }
2253 }
2254
2255 pub fn handle_quote_tick(&mut self, tick: QuoteTick) {
2257 let ts_init = tick.ts_init;
2258
2259 if self.update_interval_seconds.is_some() && self.historical_mode {
2260 self.process_historical_events(ts_init);
2261 }
2262 self.last_quotes.insert(tick.instrument_id, tick);
2263 self.has_update = true;
2264
2265 if self.update_interval_seconds.is_none() && self.last_quotes.len() == self.n_legs {
2266 self.build_and_send_quote(ts_init);
2267 }
2268 }
2269
2270 pub fn flush_pending_historical_quote(&mut self) {
2276 if self.update_interval_seconds.is_none() || !self.historical_mode {
2277 return;
2278 }
2279
2280 let Some(event) = self.historical_event_at_ts_init.take() else {
2281 return;
2282 };
2283
2284 if self.last_quotes.len() == self.n_legs {
2285 self.build_and_send_quote(event.ts_event);
2286 }
2287 }
2288
2289 fn process_historical_events(&mut self, ts_init: UnixNanos) {
2295 if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
2296 let mut clock_borrow = self.clock.borrow_mut();
2297 let test_clock = clock_borrow
2298 .as_any_mut()
2299 .downcast_mut::<TestClock>()
2300 .expect("Expected TestClock in historical mode");
2301 test_clock.set_time(ts_init);
2302 drop(clock_borrow);
2303 self.start_timer(None);
2304 }
2305
2306 if self.last_quotes.len() == self.n_legs
2307 && let Some(ref event) = self.historical_event_at_ts_init
2308 && event.ts_event < ts_init
2309 {
2310 let event = self.historical_event_at_ts_init.take().unwrap();
2312 self.build_and_send_quote(event.ts_event);
2313 }
2314
2315 let events = {
2316 let mut clock_borrow = self.clock.borrow_mut();
2317 let test_clock = clock_borrow
2318 .as_any_mut()
2319 .downcast_mut::<TestClock>()
2320 .expect("Expected TestClock in historical mode");
2321 test_clock.advance_time(ts_init, true)
2322 };
2323
2324 for event in events {
2325 if event.ts_event == ts_init {
2326 self.historical_event_at_ts_init = Some(event);
2327 } else if self.last_quotes.len() == self.n_legs {
2328 self.build_and_send_quote(event.ts_event);
2329 }
2330 }
2331 }
2332
2333 fn build_and_send_quote(&mut self, ts_event: UnixNanos) {
2335 if !self.has_update {
2336 return;
2337 }
2338
2339 for (idx, &leg_id) in self.leg_ids.iter().enumerate() {
2340 let Some(tick) = self.last_quotes.get(&leg_id) else {
2341 log::error!(
2342 "SpreadQuoteAggregator[{}]: Missing quote for leg {}",
2343 self.spread_instrument_id,
2344 leg_id
2345 );
2346 return;
2347 };
2348 let ask_price = tick.ask_price.as_f64();
2349 let bid_price = tick.bid_price.as_f64();
2350 self.bid_prices[idx] = bid_price;
2351 self.ask_prices[idx] = ask_price;
2352 self.bid_sizes[idx] = tick.bid_size.as_f64();
2353 self.ask_sizes[idx] = tick.ask_size.as_f64();
2354
2355 if !self.is_futures_spread {
2356 self.mid_prices[idx] = (ask_price + bid_price) * 0.5;
2357 self.bid_ask_spreads[idx] = ask_price - bid_price;
2358
2359 if let Some(ref vp) = self.vega_provider
2360 && let Some(vega) = vp.vega_for_leg(leg_id)
2361 {
2362 self.vegas[idx] = vega;
2363 }
2364 }
2365 }
2366 let (raw_bid, raw_ask) = if self.is_futures_spread {
2367 self.create_futures_spread_prices()
2368 } else {
2369 self.create_option_spread_prices()
2370 };
2371 let spread_quote = self.create_quote_tick_from_raw_prices(raw_bid, raw_ask, ts_event);
2372 self.has_update = false;
2373 (self.handler)(spread_quote);
2374 }
2375
2376 fn create_option_spread_prices(&self) -> (f64, f64) {
2377 let vega_multipliers: Vec<f64> = (0..self.n_legs)
2378 .map(|i| {
2379 if self.vegas[i] == 0.0 {
2380 0.0
2381 } else {
2382 self.bid_ask_spreads[i] / self.vegas[i]
2383 }
2384 })
2385 .collect();
2386 let non_zero: Vec<f64> = vega_multipliers
2387 .iter()
2388 .copied()
2389 .filter(|&x| x != 0.0)
2390 .collect();
2391
2392 if non_zero.is_empty() {
2393 log::warn!(
2394 "No vega information available for the components of {}. Will generate spread quote using component quotes only",
2395 self.spread_instrument_id
2396 );
2397 return self.create_futures_spread_prices();
2398 }
2399 let vega_multiplier = non_zero.iter().map(|x| x.abs()).sum::<f64>() / non_zero.len() as f64;
2400 let spread_vega = self
2401 .vegas
2402 .iter()
2403 .zip(self.ratios.iter())
2404 .map(|(v, r)| v * (*r as f64))
2405 .sum::<f64>()
2406 .abs();
2407 let bid_ask_spread = spread_vega * vega_multiplier;
2408 let spread_mid_price: f64 = self
2409 .mid_prices
2410 .iter()
2411 .zip(self.ratios.iter())
2412 .map(|(m, r)| m * (*r as f64))
2413 .sum();
2414 let raw_bid = spread_mid_price - bid_ask_spread * 0.5;
2415 let raw_ask = spread_mid_price + bid_ask_spread * 0.5;
2416 (raw_bid, raw_ask)
2417 }
2418
2419 fn create_futures_spread_prices(&self) -> (f64, f64) {
2420 let mut raw_ask = 0.0_f64;
2421 let mut raw_bid = 0.0_f64;
2422
2423 for i in 0..self.n_legs {
2424 let r = self.ratios[i] as f64;
2425 if self.ratios[i] >= 0 {
2426 raw_ask += r * self.ask_prices[i];
2427 raw_bid += r * self.bid_prices[i];
2428 } else {
2429 raw_ask += r * self.bid_prices[i];
2430 raw_bid += r * self.ask_prices[i];
2431 }
2432 }
2433 (raw_bid, raw_ask)
2434 }
2435
2436 fn create_quote_tick_from_raw_prices(
2437 &self,
2438 raw_bid_price: f64,
2439 raw_ask_price: f64,
2440 ts_event: UnixNanos,
2441 ) -> QuoteTick {
2442 let (bid_price, ask_price) = if let Some(ref rounder) = self.price_rounder {
2443 rounder.round_prices(raw_bid_price, raw_ask_price, self.price_precision)
2444 } else {
2445 let bid = price_from_f64(raw_bid_price, self.price_precision);
2446 let ask = price_from_f64(raw_ask_price, self.price_precision);
2447 (bid, ask)
2448 };
2449 let mut min_bid_size = f64::INFINITY;
2450 let mut min_ask_size = f64::INFINITY;
2451 for i in 0..self.n_legs {
2452 let abs_ratio = self.ratios[i].unsigned_abs() as f64;
2453 if self.ratios[i] >= 0 {
2454 let b = self.bid_sizes[i] / abs_ratio;
2455 if b < min_bid_size {
2456 min_bid_size = b;
2457 }
2458 let a = self.ask_sizes[i] / abs_ratio;
2459 if a < min_ask_size {
2460 min_ask_size = a;
2461 }
2462 } else {
2463 let b = self.ask_sizes[i] / abs_ratio;
2464 if b < min_bid_size {
2465 min_bid_size = b;
2466 }
2467 let a = self.bid_sizes[i] / abs_ratio;
2468 if a < min_ask_size {
2469 min_ask_size = a;
2470 }
2471 }
2472 }
2473 let bid_size = Quantity::new(min_bid_size, self.size_precision);
2474 let ask_size = Quantity::new(min_ask_size, self.size_precision);
2475 QuoteTick::new(
2476 self.spread_instrument_id,
2477 bid_price,
2478 ask_price,
2479 bid_size,
2480 ask_size,
2481 ts_event,
2482 ts_event,
2483 )
2484 }
2485}
2486
2487fn price_from_f64(v: f64, precision: u8) -> Price {
2488 Price::new(v, precision)
2489}
2490
2491#[cfg(test)]
2492mod tests {
2493 use std::sync::{Arc, Mutex};
2494
2495 use nautilus_common::{clock::TestClock, timer::TimeEvent};
2496 use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
2497 use nautilus_model::{
2498 data::{BarSpecification, BarType, QuoteTick},
2499 enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
2500 identifiers::InstrumentId,
2501 instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
2502 types::{Price, Quantity},
2503 };
2504 use rstest::rstest;
2505 use ustr::Ustr;
2506
2507 use super::*;
2508
2509 #[rstest]
2510 fn test_bar_builder_initialization(equity_aapl: Equity) {
2511 let instrument = InstrumentAny::Equity(equity_aapl);
2512 let bar_type = BarType::new(
2513 instrument.id(),
2514 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2515 AggregationSource::Internal,
2516 );
2517 let builder = BarBuilder::new(
2518 bar_type,
2519 instrument.price_precision(),
2520 instrument.size_precision(),
2521 );
2522
2523 assert!(!builder.initialized);
2524 assert_eq!(builder.ts_last, 0);
2525 assert_eq!(builder.count, 0);
2526 }
2527
2528 #[rstest]
2529 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
2530 let instrument = InstrumentAny::Equity(equity_aapl);
2531 let bar_type = BarType::new(
2532 instrument.id(),
2533 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2534 AggregationSource::Internal,
2535 );
2536 let mut builder = BarBuilder::new(
2537 bar_type,
2538 instrument.price_precision(),
2539 instrument.size_precision(),
2540 );
2541
2542 builder.update(
2543 Price::from("100.00"),
2544 Quantity::from(1),
2545 UnixNanos::from(1000),
2546 );
2547 builder.update(
2548 Price::from("95.00"),
2549 Quantity::from(1),
2550 UnixNanos::from(2000),
2551 );
2552 builder.update(
2553 Price::from("105.00"),
2554 Quantity::from(1),
2555 UnixNanos::from(3000),
2556 );
2557
2558 let bar = builder.build_now();
2559 assert!(bar.high > bar.low);
2560 assert_eq!(bar.open, Price::from("100.00"));
2561 assert_eq!(bar.high, Price::from("105.00"));
2562 assert_eq!(bar.low, Price::from("95.00"));
2563 assert_eq!(bar.close, Price::from("105.00"));
2564 }
2565
2566 #[rstest]
2567 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
2568 let instrument = InstrumentAny::Equity(equity_aapl);
2569 let bar_type = BarType::new(
2570 instrument.id(),
2571 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
2572 AggregationSource::Internal,
2573 );
2574 let mut builder = BarBuilder::new(
2575 bar_type,
2576 instrument.price_precision(),
2577 instrument.size_precision(),
2578 );
2579
2580 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
2581 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
2582
2583 assert_eq!(builder.ts_last, 1_000);
2584 assert_eq!(builder.count, 1);
2585 }
2586
2587 #[rstest]
2588 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
2589 let instrument = InstrumentAny::Equity(equity_aapl);
2590 let bar_type = BarType::new(
2591 instrument.id(),
2592 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2593 AggregationSource::Internal,
2594 );
2595 let mut builder = BarBuilder::new(
2596 bar_type,
2597 instrument.price_precision(),
2598 instrument.size_precision(),
2599 );
2600
2601 builder.update(
2602 Price::from("1.00000"),
2603 Quantity::from(1),
2604 UnixNanos::default(),
2605 );
2606
2607 assert!(builder.initialized);
2608 assert_eq!(builder.ts_last, 0);
2609 assert_eq!(builder.count, 1);
2610 }
2611
2612 #[rstest]
2613 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
2614 equity_aapl: Equity,
2615 ) {
2616 let instrument = InstrumentAny::Equity(equity_aapl);
2617 let bar_type = BarType::new(
2618 instrument.id(),
2619 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2620 AggregationSource::Internal,
2621 );
2622 let mut builder = BarBuilder::new(bar_type, 2, 0);
2623
2624 builder.update(
2625 Price::from("1.00000"),
2626 Quantity::from(1),
2627 UnixNanos::from(1_000),
2628 );
2629 builder.update(
2630 Price::from("1.00001"),
2631 Quantity::from(1),
2632 UnixNanos::from(500),
2633 );
2634
2635 assert!(builder.initialized);
2636 assert_eq!(builder.ts_last, 1_000);
2637 assert_eq!(builder.count, 1);
2638 }
2639
2640 #[rstest]
2641 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
2642 let instrument = InstrumentAny::Equity(equity_aapl);
2643 let bar_type = BarType::new(
2644 instrument.id(),
2645 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2646 AggregationSource::Internal,
2647 );
2648 let mut builder = BarBuilder::new(
2649 bar_type,
2650 instrument.price_precision(),
2651 instrument.size_precision(),
2652 );
2653
2654 for _ in 0..5 {
2655 builder.update(
2656 Price::from("1.00000"),
2657 Quantity::from(1),
2658 UnixNanos::from(1_000),
2659 );
2660 }
2661
2662 assert_eq!(builder.count, 5);
2663 }
2664
2665 #[rstest]
2666 #[should_panic]
2667 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
2668 let instrument = InstrumentAny::Equity(equity_aapl);
2669 let bar_type = BarType::new(
2670 instrument.id(),
2671 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2672 AggregationSource::Internal,
2673 );
2674 let mut builder = BarBuilder::new(
2675 bar_type,
2676 instrument.price_precision(),
2677 instrument.size_precision(),
2678 );
2679 let _ = builder.build_now();
2680 }
2681
2682 #[rstest]
2683 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
2684 let instrument = InstrumentAny::Equity(equity_aapl);
2685 let bar_type = BarType::new(
2686 instrument.id(),
2687 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2688 AggregationSource::Internal,
2689 );
2690 let mut builder = BarBuilder::new(
2691 bar_type,
2692 instrument.price_precision(),
2693 instrument.size_precision(),
2694 );
2695
2696 builder.update(
2697 Price::from("1.00001"),
2698 Quantity::from(2),
2699 UnixNanos::default(),
2700 );
2701 builder.update(
2702 Price::from("1.00002"),
2703 Quantity::from(2),
2704 UnixNanos::default(),
2705 );
2706 builder.update(
2707 Price::from("1.00000"),
2708 Quantity::from(1),
2709 UnixNanos::from(1_000_000_000),
2710 );
2711
2712 let bar = builder.build_now();
2713
2714 assert_eq!(bar.open, Price::from("1.00001"));
2715 assert_eq!(bar.high, Price::from("1.00002"));
2716 assert_eq!(bar.low, Price::from("1.00000"));
2717 assert_eq!(bar.close, Price::from("1.00000"));
2718 assert_eq!(bar.volume, Quantity::from(5));
2719 assert_eq!(bar.ts_init, 1_000_000_000);
2720 assert_eq!(builder.ts_last, 1_000_000_000);
2721 assert_eq!(builder.count, 0);
2722 }
2723
2724 #[rstest]
2725 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
2726 let instrument = InstrumentAny::Equity(equity_aapl);
2727 let bar_type = BarType::new(
2728 instrument.id(),
2729 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2730 AggregationSource::Internal,
2731 );
2732 let mut builder = BarBuilder::new(bar_type, 2, 0);
2733
2734 builder.update(
2735 Price::from("1.00001"),
2736 Quantity::from(1),
2737 UnixNanos::default(),
2738 );
2739 builder.build_now();
2740
2741 builder.update(
2742 Price::from("1.00000"),
2743 Quantity::from(1),
2744 UnixNanos::default(),
2745 );
2746 builder.update(
2747 Price::from("1.00003"),
2748 Quantity::from(1),
2749 UnixNanos::default(),
2750 );
2751 builder.update(
2752 Price::from("1.00002"),
2753 Quantity::from(1),
2754 UnixNanos::default(),
2755 );
2756
2757 let bar = builder.build_now();
2758
2759 assert_eq!(bar.open, Price::from("1.00000"));
2760 assert_eq!(bar.high, Price::from("1.00003"));
2761 assert_eq!(bar.low, Price::from("1.00000"));
2762 assert_eq!(bar.close, Price::from("1.00002"));
2763 assert_eq!(bar.volume, Quantity::from(3));
2764 }
2765
2766 #[rstest]
2767 fn test_bar_builder_update_bar_initializes_then_accumulates(equity_aapl: Equity) {
2768 let instrument = InstrumentAny::Equity(equity_aapl);
2769 let bar_type = BarType::new(
2770 instrument.id(),
2771 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2772 AggregationSource::Internal,
2773 );
2774 let mut builder = BarBuilder::new(
2775 bar_type,
2776 instrument.price_precision(),
2777 instrument.size_precision(),
2778 );
2779
2780 let bar_one = Bar::new(
2781 bar_type,
2782 Price::from("100.00"),
2783 Price::from("102.00"),
2784 Price::from("99.00"),
2785 Price::from("101.00"),
2786 Quantity::from(10),
2787 UnixNanos::from(1_000),
2788 UnixNanos::from(1_000),
2789 );
2790 let bar_two = Bar::new(
2791 bar_type,
2792 Price::from("101.00"),
2793 Price::from("103.00"),
2794 Price::from("98.00"),
2795 Price::from("102.00"),
2796 Quantity::from(5),
2797 UnixNanos::from(2_000),
2798 UnixNanos::from(2_000),
2799 );
2800
2801 builder.update_bar(bar_one, bar_one.volume, bar_one.ts_init);
2802 builder.update_bar(bar_two, bar_two.volume, bar_two.ts_init);
2803 let bar = builder.build_now();
2804
2805 assert_eq!(bar.open, Price::from("100.00"));
2806 assert_eq!(bar.high, Price::from("103.00"));
2807 assert_eq!(bar.low, Price::from("98.00"));
2808 assert_eq!(bar.close, Price::from("102.00"));
2809 assert_eq!(bar.volume, Quantity::from(15));
2810 assert_eq!(builder.count, 0);
2811 }
2812
2813 #[rstest]
2814 fn test_bar_builder_update_bar_ignores_earlier_timestamp(equity_aapl: Equity) {
2815 let instrument = InstrumentAny::Equity(equity_aapl);
2816 let bar_type = BarType::new(
2817 instrument.id(),
2818 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2819 AggregationSource::Internal,
2820 );
2821 let mut builder = BarBuilder::new(
2822 bar_type,
2823 instrument.price_precision(),
2824 instrument.size_precision(),
2825 );
2826
2827 let bar_later = Bar::new(
2828 bar_type,
2829 Price::from("100.00"),
2830 Price::from("101.00"),
2831 Price::from("99.00"),
2832 Price::from("100.50"),
2833 Quantity::from(10),
2834 UnixNanos::from(2_000),
2835 UnixNanos::from(2_000),
2836 );
2837 let bar_earlier = Bar::new(
2838 bar_type,
2839 Price::from("200.00"),
2840 Price::from("210.00"),
2841 Price::from("190.00"),
2842 Price::from("205.00"),
2843 Quantity::from(50),
2844 UnixNanos::from(1_000),
2845 UnixNanos::from(1_000),
2846 );
2847
2848 builder.update_bar(bar_later, bar_later.volume, bar_later.ts_init);
2849 builder.update_bar(bar_earlier, bar_earlier.volume, bar_earlier.ts_init);
2850
2851 assert_eq!(builder.ts_last, 2_000);
2852 assert_eq!(builder.count, 1);
2853 assert_eq!(builder.volume, Quantity::from(10));
2854 }
2855
2856 #[rstest]
2857 fn test_bar_builder_build_promotes_close_above_high_from_previous_close(equity_aapl: Equity) {
2858 let instrument = InstrumentAny::Equity(equity_aapl);
2859 let bar_type = BarType::new(
2860 instrument.id(),
2861 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2862 AggregationSource::Internal,
2863 );
2864 let mut builder = BarBuilder::new(bar_type, 2, 0);
2865
2866 builder.update(
2867 Price::from("110.00"),
2868 Quantity::from(1),
2869 UnixNanos::from(100),
2870 );
2871 builder.build_now();
2872
2873 builder.update(
2874 Price::from("100.00"),
2875 Quantity::from(1),
2876 UnixNanos::from(200),
2877 );
2878 builder.update(
2879 Price::from("101.00"),
2880 Quantity::from(1),
2881 UnixNanos::from(300),
2882 );
2883 builder.update(
2884 Price::from("200.00"),
2885 Quantity::from(1),
2886 UnixNanos::from(400),
2887 );
2888
2889 let bar = builder.build_now();
2890 assert_eq!(bar.open, Price::from("100.00"));
2891 assert_eq!(bar.high, Price::from("200.00"));
2892 assert_eq!(bar.low, Price::from("100.00"));
2893 assert_eq!(bar.close, Price::from("200.00"));
2894 }
2895
2896 #[rstest]
2897 fn test_bar_builder_build_clamps_low_to_close(equity_aapl: Equity) {
2898 let instrument = InstrumentAny::Equity(equity_aapl);
2902 let bar_type = BarType::new(
2903 instrument.id(),
2904 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2905 AggregationSource::Internal,
2906 );
2907 let mut builder = BarBuilder::new(bar_type, 2, 0);
2908
2909 builder.update(
2910 Price::from("100.00"),
2911 Quantity::from(1),
2912 UnixNanos::from(100),
2913 );
2914 builder.close = Some(Price::from("50.00"));
2915
2916 let bar = builder.build_now();
2917 assert_eq!(bar.low, Price::from("50.00"));
2918 assert_eq!(bar.close, Price::from("50.00"));
2919 assert!(bar.low <= bar.open);
2920 }
2921
2922 #[rstest]
2923 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
2924 let instrument = InstrumentAny::Equity(equity_aapl);
2925 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2926 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2927 let handler = Arc::new(Mutex::new(Vec::new()));
2928 let handler_clone = Arc::clone(&handler);
2929
2930 let mut aggregator = TickBarAggregator::new(
2931 bar_type,
2932 instrument.price_precision(),
2933 instrument.size_precision(),
2934 move |bar: Bar| {
2935 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2936 handler_guard.push(bar);
2937 },
2938 );
2939
2940 let trade = TradeTick::default();
2941 aggregator.handle_trade(trade);
2942
2943 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2944 assert_eq!(handler_guard.len(), 0);
2945 }
2946
2947 #[rstest]
2948 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
2949 let instrument = InstrumentAny::Equity(equity_aapl);
2950 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2951 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2952 let handler = Arc::new(Mutex::new(Vec::new()));
2953 let handler_clone = Arc::clone(&handler);
2954
2955 let mut aggregator = TickBarAggregator::new(
2956 bar_type,
2957 instrument.price_precision(),
2958 instrument.size_precision(),
2959 move |bar: Bar| {
2960 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2961 handler_guard.push(bar);
2962 },
2963 );
2964
2965 let trade = TradeTick::default();
2966 aggregator.handle_trade(trade);
2967 aggregator.handle_trade(trade);
2968 aggregator.handle_trade(trade);
2969
2970 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2971 let bar = handler_guard.first().unwrap();
2972 assert_eq!(handler_guard.len(), 1);
2973 assert_eq!(bar.open, trade.price);
2974 assert_eq!(bar.high, trade.price);
2975 assert_eq!(bar.low, trade.price);
2976 assert_eq!(bar.close, trade.price);
2977 assert_eq!(bar.volume, Quantity::from(300000));
2978 assert_eq!(bar.ts_event, trade.ts_event);
2979 assert_eq!(bar.ts_init, trade.ts_init);
2980 }
2981
2982 #[rstest]
2983 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
2984 let instrument = InstrumentAny::Equity(equity_aapl);
2985 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2986 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2987 let handler = Arc::new(Mutex::new(Vec::new()));
2988 let handler_clone = Arc::clone(&handler);
2989
2990 let mut aggregator = TickBarAggregator::new(
2991 bar_type,
2992 instrument.price_precision(),
2993 instrument.size_precision(),
2994 move |bar: Bar| {
2995 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2996 handler_guard.push(bar);
2997 },
2998 );
2999
3000 aggregator.update(
3001 Price::from("1.00001"),
3002 Quantity::from(1),
3003 UnixNanos::default(),
3004 );
3005 aggregator.update(
3006 Price::from("1.00002"),
3007 Quantity::from(1),
3008 UnixNanos::from(1000),
3009 );
3010 aggregator.update(
3011 Price::from("1.00003"),
3012 Quantity::from(1),
3013 UnixNanos::from(2000),
3014 );
3015
3016 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3017 assert_eq!(handler_guard.len(), 1);
3018
3019 let bar = handler_guard.first().unwrap();
3020 assert_eq!(bar.open, Price::from("1.00001"));
3021 assert_eq!(bar.high, Price::from("1.00003"));
3022 assert_eq!(bar.low, Price::from("1.00001"));
3023 assert_eq!(bar.close, Price::from("1.00003"));
3024 assert_eq!(bar.volume, Quantity::from(3));
3025 }
3026
3027 #[rstest]
3028 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
3029 let instrument = InstrumentAny::Equity(equity_aapl);
3030 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
3031 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3032 let handler = Arc::new(Mutex::new(Vec::new()));
3033 let handler_clone = Arc::clone(&handler);
3034
3035 let mut aggregator = TickBarAggregator::new(
3036 bar_type,
3037 instrument.price_precision(),
3038 instrument.size_precision(),
3039 move |bar: Bar| {
3040 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3041 handler_guard.push(bar);
3042 },
3043 );
3044
3045 aggregator.update(
3046 Price::from("1.00001"),
3047 Quantity::from(1),
3048 UnixNanos::default(),
3049 );
3050 aggregator.update(
3051 Price::from("1.00002"),
3052 Quantity::from(1),
3053 UnixNanos::from(1000),
3054 );
3055 aggregator.update(
3056 Price::from("1.00003"),
3057 Quantity::from(1),
3058 UnixNanos::from(2000),
3059 );
3060 aggregator.update(
3061 Price::from("1.00004"),
3062 Quantity::from(1),
3063 UnixNanos::from(3000),
3064 );
3065
3066 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3067 assert_eq!(handler_guard.len(), 2);
3068
3069 let bar1 = &handler_guard[0];
3070 assert_eq!(bar1.open, Price::from("1.00001"));
3071 assert_eq!(bar1.close, Price::from("1.00002"));
3072 assert_eq!(bar1.volume, Quantity::from(2));
3073
3074 let bar2 = &handler_guard[1];
3075 assert_eq!(bar2.open, Price::from("1.00003"));
3076 assert_eq!(bar2.close, Price::from("1.00004"));
3077 assert_eq!(bar2.volume, Quantity::from(2));
3078 }
3079
3080 #[rstest]
3081 fn test_tick_imbalance_bar_aggregator_emits_at_threshold(equity_aapl: Equity) {
3082 let instrument = InstrumentAny::Equity(equity_aapl);
3083 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
3084 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3085 let handler = Arc::new(Mutex::new(Vec::new()));
3086 let handler_clone = Arc::clone(&handler);
3087
3088 let mut aggregator = TickImbalanceBarAggregator::new(
3089 bar_type,
3090 instrument.price_precision(),
3091 instrument.size_precision(),
3092 move |bar: Bar| {
3093 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3094 handler_guard.push(bar);
3095 },
3096 );
3097
3098 let trade = TradeTick::default();
3099 aggregator.handle_trade(trade);
3100 aggregator.handle_trade(trade);
3101
3102 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3103 assert_eq!(handler_guard.len(), 1);
3104 let bar = handler_guard.first().unwrap();
3105 assert_eq!(bar.volume, Quantity::from(200000));
3106 }
3107
3108 #[rstest]
3109 fn test_tick_imbalance_bar_aggregator_handles_seller_direction(equity_aapl: Equity) {
3110 let instrument = InstrumentAny::Equity(equity_aapl);
3111 let bar_spec = BarSpecification::new(1, BarAggregation::TickImbalance, PriceType::Last);
3112 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3113 let handler = Arc::new(Mutex::new(Vec::new()));
3114 let handler_clone = Arc::clone(&handler);
3115
3116 let mut aggregator = TickImbalanceBarAggregator::new(
3117 bar_type,
3118 instrument.price_precision(),
3119 instrument.size_precision(),
3120 move |bar: Bar| {
3121 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3122 handler_guard.push(bar);
3123 },
3124 );
3125
3126 let sell = TradeTick {
3127 aggressor_side: AggressorSide::Seller,
3128 ..TradeTick::default()
3129 };
3130
3131 aggregator.handle_trade(sell);
3132
3133 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3134 assert_eq!(handler_guard.len(), 1);
3135 }
3136
3137 #[rstest]
3138 fn test_tick_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
3139 let instrument = InstrumentAny::Equity(equity_aapl);
3140 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3141 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3142 let handler = Arc::new(Mutex::new(Vec::new()));
3143 let handler_clone = Arc::clone(&handler);
3144
3145 let mut aggregator = TickRunsBarAggregator::new(
3146 bar_type,
3147 instrument.price_precision(),
3148 instrument.size_precision(),
3149 move |bar: Bar| {
3150 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3151 handler_guard.push(bar);
3152 },
3153 );
3154
3155 let buy = TradeTick::default();
3156 let sell = TradeTick {
3157 aggressor_side: AggressorSide::Seller,
3158 ..buy
3159 };
3160
3161 aggregator.handle_trade(buy);
3162 aggregator.handle_trade(buy);
3163 aggregator.handle_trade(sell);
3164 aggregator.handle_trade(sell);
3165
3166 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3167 assert_eq!(handler_guard.len(), 2);
3168 }
3169
3170 #[rstest]
3171 fn test_tick_runs_bar_aggregator_volume_conservation(equity_aapl: Equity) {
3172 let instrument = InstrumentAny::Equity(equity_aapl);
3173 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3174 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3175 let handler = Arc::new(Mutex::new(Vec::new()));
3176 let handler_clone = Arc::clone(&handler);
3177
3178 let mut aggregator = TickRunsBarAggregator::new(
3179 bar_type,
3180 instrument.price_precision(),
3181 instrument.size_precision(),
3182 move |bar: Bar| {
3183 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3184 handler_guard.push(bar);
3185 },
3186 );
3187
3188 let buy = TradeTick {
3189 size: Quantity::from(1),
3190 ..TradeTick::default()
3191 };
3192 let sell = TradeTick {
3193 aggressor_side: AggressorSide::Seller,
3194 size: Quantity::from(1),
3195 ..buy
3196 };
3197
3198 aggregator.handle_trade(buy);
3199 aggregator.handle_trade(buy);
3200 aggregator.handle_trade(sell);
3201 aggregator.handle_trade(sell);
3202
3203 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3204 assert_eq!(handler_guard.len(), 2);
3205 assert_eq!(handler_guard[0].volume, Quantity::from(2));
3206 assert_eq!(handler_guard[1].volume, Quantity::from(2));
3207 }
3208
3209 #[rstest]
3210 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
3211 let instrument = InstrumentAny::Equity(equity_aapl);
3212 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
3213 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3214 let handler = Arc::new(Mutex::new(Vec::new()));
3215 let handler_clone = Arc::clone(&handler);
3216
3217 let mut aggregator = VolumeBarAggregator::new(
3218 bar_type,
3219 instrument.price_precision(),
3220 instrument.size_precision(),
3221 move |bar: Bar| {
3222 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3223 handler_guard.push(bar);
3224 },
3225 );
3226
3227 aggregator.update(
3228 Price::from("1.00001"),
3229 Quantity::from(25),
3230 UnixNanos::default(),
3231 );
3232
3233 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3234 assert_eq!(handler_guard.len(), 2);
3235 let bar1 = &handler_guard[0];
3236 assert_eq!(bar1.volume, Quantity::from(10));
3237 let bar2 = &handler_guard[1];
3238 assert_eq!(bar2.volume, Quantity::from(10));
3239 }
3240
3241 #[rstest]
3242 fn test_volume_bar_aggregator_zero_size_update_is_noop(equity_aapl: Equity) {
3243 let instrument = InstrumentAny::Equity(equity_aapl);
3244 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
3245 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3246 let handler = Arc::new(Mutex::new(Vec::new()));
3247 let handler_clone = Arc::clone(&handler);
3248
3249 let mut aggregator = VolumeBarAggregator::new(
3250 bar_type,
3251 instrument.price_precision(),
3252 instrument.size_precision(),
3253 move |bar: Bar| {
3254 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3255 handler_guard.push(bar);
3256 },
3257 );
3258
3259 aggregator.update(
3260 Price::from("100.00"),
3261 Quantity::from(0),
3262 UnixNanos::default(),
3263 );
3264
3265 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3266 assert_eq!(handler_guard.len(), 0);
3267 }
3268
3269 #[rstest]
3270 fn test_volume_bar_aggregator_exact_threshold_emits_single_bar(equity_aapl: Equity) {
3271 let instrument = InstrumentAny::Equity(equity_aapl);
3272 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
3273 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3274 let handler = Arc::new(Mutex::new(Vec::new()));
3275 let handler_clone = Arc::clone(&handler);
3276
3277 let mut aggregator = VolumeBarAggregator::new(
3278 bar_type,
3279 instrument.price_precision(),
3280 instrument.size_precision(),
3281 move |bar: Bar| {
3282 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3283 handler_guard.push(bar);
3284 },
3285 );
3286
3287 aggregator.update(
3288 Price::from("100.00"),
3289 Quantity::from(7),
3290 UnixNanos::from(1_000),
3291 );
3292 aggregator.update(
3293 Price::from("101.00"),
3294 Quantity::from(3),
3295 UnixNanos::from(2_000),
3296 );
3297
3298 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3299 assert_eq!(handler_guard.len(), 1);
3300 assert_eq!(handler_guard[0].volume, Quantity::from(10));
3301 assert_eq!(handler_guard[0].close, Price::from("101.00"));
3302 }
3303
3304 #[rstest]
3305 fn test_volume_bar_aggregator_step_of_one_emits_per_unit(equity_aapl: Equity) {
3306 let instrument = InstrumentAny::Equity(equity_aapl);
3307 let bar_spec = BarSpecification::new(1, BarAggregation::Volume, PriceType::Last);
3308 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3309 let handler = Arc::new(Mutex::new(Vec::new()));
3310 let handler_clone = Arc::clone(&handler);
3311
3312 let mut aggregator = VolumeBarAggregator::new(
3313 bar_type,
3314 instrument.price_precision(),
3315 instrument.size_precision(),
3316 move |bar: Bar| {
3317 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3318 handler_guard.push(bar);
3319 },
3320 );
3321
3322 aggregator.update(
3323 Price::from("100.00"),
3324 Quantity::from(1),
3325 UnixNanos::default(),
3326 );
3327
3328 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3329 assert_eq!(handler_guard.len(), 1);
3330 assert_eq!(handler_guard[0].volume, Quantity::from(1));
3331 }
3332
3333 #[rstest]
3334 fn test_volume_runs_bar_aggregator_side_change_resets(equity_aapl: Equity) {
3335 let instrument = InstrumentAny::Equity(equity_aapl);
3336 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
3337 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3338 let handler = Arc::new(Mutex::new(Vec::new()));
3339 let handler_clone = Arc::clone(&handler);
3340
3341 let mut aggregator = VolumeRunsBarAggregator::new(
3342 bar_type,
3343 instrument.price_precision(),
3344 instrument.size_precision(),
3345 move |bar: Bar| {
3346 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3347 handler_guard.push(bar);
3348 },
3349 );
3350
3351 let buy = TradeTick {
3352 instrument_id: instrument.id(),
3353 price: Price::from("1.0"),
3354 size: Quantity::from(1),
3355 ..TradeTick::default()
3356 };
3357 let sell = TradeTick {
3358 aggressor_side: AggressorSide::Seller,
3359 ..buy
3360 };
3361
3362 aggregator.handle_trade(buy);
3363 aggregator.handle_trade(buy); aggregator.handle_trade(sell);
3365 aggregator.handle_trade(sell); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3368 assert!(handler_guard.len() >= 2);
3369 assert!(
3370 (handler_guard[0].volume.as_f64() - handler_guard[1].volume.as_f64()).abs()
3371 < f64::EPSILON
3372 );
3373 }
3374
3375 #[rstest]
3376 fn test_volume_runs_bar_aggregator_handles_large_single_trade(equity_aapl: Equity) {
3377 let instrument = InstrumentAny::Equity(equity_aapl);
3378 let bar_spec = BarSpecification::new(3, BarAggregation::VolumeRuns, PriceType::Last);
3379 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3380 let handler = Arc::new(Mutex::new(Vec::new()));
3381 let handler_clone = Arc::clone(&handler);
3382
3383 let mut aggregator = VolumeRunsBarAggregator::new(
3384 bar_type,
3385 instrument.price_precision(),
3386 instrument.size_precision(),
3387 move |bar: Bar| {
3388 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3389 handler_guard.push(bar);
3390 },
3391 );
3392
3393 let trade = TradeTick {
3394 instrument_id: instrument.id(),
3395 price: Price::from("1.0"),
3396 size: Quantity::from(5),
3397 ..TradeTick::default()
3398 };
3399
3400 aggregator.handle_trade(trade);
3401
3402 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3403 assert!(!handler_guard.is_empty());
3404 assert!(handler_guard[0].volume.as_f64() > 0.0);
3405 assert!(handler_guard[0].volume.as_f64() < trade.size.as_f64());
3406 }
3407
3408 #[rstest]
3409 fn test_volume_imbalance_bar_aggregator_splits_large_trade(equity_aapl: Equity) {
3410 let instrument = InstrumentAny::Equity(equity_aapl);
3411 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeImbalance, PriceType::Last);
3412 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3413 let handler = Arc::new(Mutex::new(Vec::new()));
3414 let handler_clone = Arc::clone(&handler);
3415
3416 let mut aggregator = VolumeImbalanceBarAggregator::new(
3417 bar_type,
3418 instrument.price_precision(),
3419 instrument.size_precision(),
3420 move |bar: Bar| {
3421 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3422 handler_guard.push(bar);
3423 },
3424 );
3425
3426 let trade_small = TradeTick {
3427 instrument_id: instrument.id(),
3428 price: Price::from("1.0"),
3429 size: Quantity::from(1),
3430 ..TradeTick::default()
3431 };
3432 let trade_large = TradeTick {
3433 size: Quantity::from(3),
3434 ..trade_small
3435 };
3436
3437 aggregator.handle_trade(trade_small);
3438 aggregator.handle_trade(trade_large);
3439
3440 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3441 assert_eq!(handler_guard.len(), 2);
3442 let total_output = handler_guard
3443 .iter()
3444 .map(|bar| bar.volume.as_f64())
3445 .sum::<f64>();
3446 let total_input = trade_small.size.as_f64() + trade_large.size.as_f64();
3447 assert!((total_output - total_input).abs() < f64::EPSILON);
3448 }
3449
3450 #[rstest]
3451 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
3452 let instrument = InstrumentAny::Equity(equity_aapl);
3453 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3455 let handler = Arc::new(Mutex::new(Vec::new()));
3456 let handler_clone = Arc::clone(&handler);
3457
3458 let mut aggregator = ValueBarAggregator::new(
3459 bar_type,
3460 instrument.price_precision(),
3461 instrument.size_precision(),
3462 move |bar: Bar| {
3463 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3464 handler_guard.push(bar);
3465 },
3466 );
3467
3468 aggregator.update(
3470 Price::from("100.00"),
3471 Quantity::from(5),
3472 UnixNanos::default(),
3473 );
3474 aggregator.update(
3475 Price::from("100.00"),
3476 Quantity::from(5),
3477 UnixNanos::from(1000),
3478 );
3479
3480 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3481 assert_eq!(handler_guard.len(), 1);
3482 let bar = handler_guard.first().unwrap();
3483 assert_eq!(bar.volume, Quantity::from(10));
3484 }
3485
3486 #[rstest]
3487 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
3488 let instrument = InstrumentAny::Equity(equity_aapl);
3489 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3490 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3491 let handler = Arc::new(Mutex::new(Vec::new()));
3492 let handler_clone = Arc::clone(&handler);
3493
3494 let mut aggregator = ValueBarAggregator::new(
3495 bar_type,
3496 instrument.price_precision(),
3497 instrument.size_precision(),
3498 move |bar: Bar| {
3499 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3500 handler_guard.push(bar);
3501 },
3502 );
3503
3504 aggregator.update(
3506 Price::from("100.00"),
3507 Quantity::from(25),
3508 UnixNanos::default(),
3509 );
3510
3511 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3512 assert_eq!(handler_guard.len(), 2);
3513 let remaining_value = aggregator.get_cumulative_value();
3514 assert!(remaining_value < 1000.0); }
3516
3517 #[rstest]
3518 fn test_value_bar_aggregator_handles_zero_price(equity_aapl: Equity) {
3519 let instrument = InstrumentAny::Equity(equity_aapl);
3520 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3521 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3522 let handler = Arc::new(Mutex::new(Vec::new()));
3523 let handler_clone = Arc::clone(&handler);
3524
3525 let mut aggregator = ValueBarAggregator::new(
3526 bar_type,
3527 instrument.price_precision(),
3528 instrument.size_precision(),
3529 move |bar: Bar| {
3530 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3531 handler_guard.push(bar);
3532 },
3533 );
3534
3535 aggregator.update(
3537 Price::from("0.00"),
3538 Quantity::from(100),
3539 UnixNanos::default(),
3540 );
3541
3542 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3544 assert_eq!(handler_guard.len(), 0);
3545
3546 assert_eq!(aggregator.get_cumulative_value(), 0.0);
3548 }
3549
3550 #[rstest]
3551 fn test_value_bar_aggregator_handles_zero_size(equity_aapl: Equity) {
3552 let instrument = InstrumentAny::Equity(equity_aapl);
3553 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3554 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3555 let handler = Arc::new(Mutex::new(Vec::new()));
3556 let handler_clone = Arc::clone(&handler);
3557
3558 let mut aggregator = ValueBarAggregator::new(
3559 bar_type,
3560 instrument.price_precision(),
3561 instrument.size_precision(),
3562 move |bar: Bar| {
3563 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3564 handler_guard.push(bar);
3565 },
3566 );
3567
3568 aggregator.update(
3570 Price::from("100.00"),
3571 Quantity::from(0),
3572 UnixNanos::default(),
3573 );
3574
3575 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3577 assert_eq!(handler_guard.len(), 0);
3578
3579 assert_eq!(aggregator.get_cumulative_value(), 0.0);
3581 }
3582
3583 #[rstest]
3584 fn test_value_bar_aggregator_exact_threshold_emits_one_bar(equity_aapl: Equity) {
3585 let instrument = InstrumentAny::Equity(equity_aapl);
3586 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
3587 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3588 let handler = Arc::new(Mutex::new(Vec::new()));
3589 let handler_clone = Arc::clone(&handler);
3590
3591 let mut aggregator = ValueBarAggregator::new(
3592 bar_type,
3593 instrument.price_precision(),
3594 instrument.size_precision(),
3595 move |bar: Bar| {
3596 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3597 handler_guard.push(bar);
3598 },
3599 );
3600
3601 aggregator.update(
3602 Price::from("100.00"),
3603 Quantity::from(5),
3604 UnixNanos::from(1_000),
3605 );
3606 aggregator.update(
3607 Price::from("100.00"),
3608 Quantity::from(5),
3609 UnixNanos::from(2_000),
3610 );
3611
3612 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3613 assert_eq!(handler_guard.len(), 1);
3614 assert_eq!(handler_guard[0].volume, Quantity::from(10));
3615 assert_eq!(aggregator.get_cumulative_value(), 0.0);
3616 }
3617
3618 #[rstest]
3619 fn test_value_bar_aggregator_precision_boundary_min_size_clamp(equity_aapl: Equity) {
3620 let instrument = InstrumentAny::Equity(equity_aapl);
3624 let bar_spec = BarSpecification::new(100, BarAggregation::Value, PriceType::Last);
3625 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3626 let handler = Arc::new(Mutex::new(Vec::new()));
3627 let handler_clone = Arc::clone(&handler);
3628
3629 let mut aggregator = ValueBarAggregator::new(
3630 bar_type,
3631 instrument.price_precision(),
3632 instrument.size_precision(),
3633 move |bar: Bar| {
3634 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3635 handler_guard.push(bar);
3636 },
3637 );
3638
3639 aggregator.update(
3641 Price::from("100.00"),
3642 Quantity::from(4),
3643 UnixNanos::default(),
3644 );
3645
3646 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3647 assert_eq!(handler_guard.len(), 4);
3648 for bar in handler_guard.iter() {
3649 assert_eq!(bar.volume, Quantity::from(1));
3650 }
3651 }
3652
3653 #[rstest]
3654 fn test_value_imbalance_bar_aggregator_emits_on_opposing_overflow(equity_aapl: Equity) {
3655 let instrument = InstrumentAny::Equity(equity_aapl);
3656 let bar_spec = BarSpecification::new(10, BarAggregation::ValueImbalance, PriceType::Last);
3657 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3658 let handler = Arc::new(Mutex::new(Vec::new()));
3659 let handler_clone = Arc::clone(&handler);
3660
3661 let mut aggregator = ValueImbalanceBarAggregator::new(
3662 bar_type,
3663 instrument.price_precision(),
3664 instrument.size_precision(),
3665 move |bar: Bar| {
3666 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3667 handler_guard.push(bar);
3668 },
3669 );
3670
3671 let buy = TradeTick {
3672 price: Price::from("5.0"),
3673 size: Quantity::from(2), instrument_id: instrument.id(),
3675 ..TradeTick::default()
3676 };
3677 let sell = TradeTick {
3678 price: Price::from("5.0"),
3679 size: Quantity::from(2), aggressor_side: AggressorSide::Seller,
3681 instrument_id: instrument.id(),
3682 ..buy
3683 };
3684
3685 aggregator.handle_trade(buy);
3686 aggregator.handle_trade(sell);
3687
3688 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3689 assert_eq!(handler_guard.len(), 2);
3690 }
3691
3692 #[rstest]
3693 fn test_value_runs_bar_aggregator_emits_on_consecutive_side(equity_aapl: Equity) {
3694 let instrument = InstrumentAny::Equity(equity_aapl);
3695 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
3696 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3697 let handler = Arc::new(Mutex::new(Vec::new()));
3698 let handler_clone = Arc::clone(&handler);
3699
3700 let mut aggregator = ValueRunsBarAggregator::new(
3701 bar_type,
3702 instrument.price_precision(),
3703 instrument.size_precision(),
3704 move |bar: Bar| {
3705 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3706 handler_guard.push(bar);
3707 },
3708 );
3709
3710 let trade = TradeTick {
3711 price: Price::from("10.0"),
3712 size: Quantity::from(5),
3713 instrument_id: instrument.id(),
3714 ..TradeTick::default()
3715 };
3716
3717 aggregator.handle_trade(trade);
3718 aggregator.handle_trade(trade);
3719
3720 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3721 assert_eq!(handler_guard.len(), 1);
3722 let bar = handler_guard.first().unwrap();
3723 assert_eq!(bar.volume, Quantity::from(10));
3724 }
3725
3726 #[rstest]
3727 fn test_value_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
3728 let instrument = InstrumentAny::Equity(equity_aapl);
3729 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
3730 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3731 let handler = Arc::new(Mutex::new(Vec::new()));
3732 let handler_clone = Arc::clone(&handler);
3733
3734 let mut aggregator = ValueRunsBarAggregator::new(
3735 bar_type,
3736 instrument.price_precision(),
3737 instrument.size_precision(),
3738 move |bar: Bar| {
3739 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3740 handler_guard.push(bar);
3741 },
3742 );
3743
3744 let buy = TradeTick {
3745 price: Price::from("10.0"),
3746 size: Quantity::from(5),
3747 instrument_id: instrument.id(),
3748 ..TradeTick::default()
3749 }; let sell = TradeTick {
3751 price: Price::from("10.0"),
3752 size: Quantity::from(10),
3753 aggressor_side: AggressorSide::Seller,
3754 ..buy
3755 }; aggregator.handle_trade(buy);
3758 aggregator.handle_trade(sell);
3759
3760 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3761 assert_eq!(handler_guard.len(), 1);
3762 assert_eq!(handler_guard[0].volume, Quantity::from(10));
3763 }
3764
3765 #[rstest]
3766 fn test_tick_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
3767 let instrument = InstrumentAny::Equity(equity_aapl);
3768 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3769 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3770 let handler = Arc::new(Mutex::new(Vec::new()));
3771 let handler_clone = Arc::clone(&handler);
3772
3773 let mut aggregator = TickRunsBarAggregator::new(
3774 bar_type,
3775 instrument.price_precision(),
3776 instrument.size_precision(),
3777 move |bar: Bar| {
3778 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3779 handler_guard.push(bar);
3780 },
3781 );
3782
3783 let buy = TradeTick::default();
3784
3785 aggregator.handle_trade(buy);
3786 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3791 assert_eq!(handler_guard.len(), 2);
3792 }
3793
3794 #[rstest]
3795 fn test_tick_runs_bar_aggregator_handles_no_aggressor_trades(equity_aapl: Equity) {
3796 let instrument = InstrumentAny::Equity(equity_aapl);
3797 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3798 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3799 let handler = Arc::new(Mutex::new(Vec::new()));
3800 let handler_clone = Arc::clone(&handler);
3801
3802 let mut aggregator = TickRunsBarAggregator::new(
3803 bar_type,
3804 instrument.price_precision(),
3805 instrument.size_precision(),
3806 move |bar: Bar| {
3807 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3808 handler_guard.push(bar);
3809 },
3810 );
3811
3812 let buy = TradeTick::default();
3813 let no_aggressor = TradeTick {
3814 aggressor_side: AggressorSide::NoAggressor,
3815 ..buy
3816 };
3817
3818 aggregator.handle_trade(buy);
3819 aggregator.handle_trade(no_aggressor); aggregator.handle_trade(no_aggressor); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3824 assert_eq!(handler_guard.len(), 1);
3825 }
3826
3827 #[rstest]
3828 fn test_volume_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
3829 let instrument = InstrumentAny::Equity(equity_aapl);
3830 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
3831 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3832 let handler = Arc::new(Mutex::new(Vec::new()));
3833 let handler_clone = Arc::clone(&handler);
3834
3835 let mut aggregator = VolumeRunsBarAggregator::new(
3836 bar_type,
3837 instrument.price_precision(),
3838 instrument.size_precision(),
3839 move |bar: Bar| {
3840 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3841 handler_guard.push(bar);
3842 },
3843 );
3844
3845 let buy = TradeTick {
3846 instrument_id: instrument.id(),
3847 price: Price::from("1.0"),
3848 size: Quantity::from(1),
3849 ..TradeTick::default()
3850 };
3851
3852 aggregator.handle_trade(buy);
3853 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3858 assert_eq!(handler_guard.len(), 2);
3859 assert_eq!(handler_guard[0].volume, Quantity::from(2));
3860 assert_eq!(handler_guard[1].volume, Quantity::from(2));
3861 }
3862
3863 #[rstest]
3864 fn test_value_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
3865 let instrument = InstrumentAny::Equity(equity_aapl);
3866 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
3867 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3868 let handler = Arc::new(Mutex::new(Vec::new()));
3869 let handler_clone = Arc::clone(&handler);
3870
3871 let mut aggregator = ValueRunsBarAggregator::new(
3872 bar_type,
3873 instrument.price_precision(),
3874 instrument.size_precision(),
3875 move |bar: Bar| {
3876 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3877 handler_guard.push(bar);
3878 },
3879 );
3880
3881 let buy = TradeTick {
3882 instrument_id: instrument.id(),
3883 price: Price::from("10.0"),
3884 size: Quantity::from(5),
3885 ..TradeTick::default()
3886 }; aggregator.handle_trade(buy);
3889 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3894 assert_eq!(handler_guard.len(), 2);
3895 assert_eq!(handler_guard[0].volume, Quantity::from(10));
3896 assert_eq!(handler_guard[1].volume, Quantity::from(10));
3897 }
3898
3899 #[rstest]
3900 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
3901 let instrument = InstrumentAny::Equity(equity_aapl);
3902 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3904 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3905 let handler = Arc::new(Mutex::new(Vec::new()));
3906 let handler_clone = Arc::clone(&handler);
3907 let clock = Rc::new(RefCell::new(TestClock::new()));
3908
3909 let mut aggregator = TimeBarAggregator::new(
3910 bar_type,
3911 instrument.price_precision(),
3912 instrument.size_precision(),
3913 clock.clone(),
3914 move |bar: Bar| {
3915 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3916 handler_guard.push(bar);
3917 },
3918 true, false, BarIntervalType::LeftOpen,
3921 None, 15, false, );
3925
3926 aggregator.update(
3927 Price::from("100.00"),
3928 Quantity::from(1),
3929 UnixNanos::default(),
3930 );
3931
3932 let next_sec = UnixNanos::from(1_000_000_000);
3933 clock.borrow_mut().set_time(next_sec);
3934
3935 let event = TimeEvent::new(
3936 Ustr::from("1-SECOND-LAST"),
3937 UUID4::new(),
3938 next_sec,
3939 next_sec,
3940 );
3941 aggregator.build_bar(&event);
3942
3943 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3944 assert_eq!(handler_guard.len(), 1);
3945 let bar = handler_guard.first().unwrap();
3946 assert_eq!(bar.ts_event, UnixNanos::default());
3947 assert_eq!(bar.ts_init, next_sec);
3948 }
3949
3950 #[rstest]
3951 fn test_time_bar_aggregator_stop_clears_timer_and_allows_restart(equity_aapl: Equity) {
3952 let instrument = InstrumentAny::Equity(equity_aapl);
3953 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3954 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3955 let timer_name = format!("TIME_BAR_{bar_type}");
3956 let clock = Rc::new(RefCell::new(TestClock::new()));
3957
3958 let aggregator = TimeBarAggregator::new(
3959 bar_type,
3960 instrument.price_precision(),
3961 instrument.size_precision(),
3962 clock.clone(),
3963 |_bar: Bar| {},
3964 true,
3965 false,
3966 BarIntervalType::LeftOpen,
3967 None,
3968 15,
3969 false,
3970 );
3971
3972 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
3973 let rc = Rc::new(RefCell::new(boxed));
3974
3975 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
3976 assert_eq!(clock.borrow().timer_names(), vec![timer_name.as_str()]);
3977
3978 rc.borrow_mut().stop();
3979 assert!(clock.borrow().timer_names().is_empty());
3980
3981 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
3982 assert_eq!(clock.borrow().timer_names(), vec![timer_name.as_str()]);
3983 }
3984
3985 #[rstest]
3986 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
3987 let instrument = InstrumentAny::Equity(equity_aapl);
3988 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3989 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3990 let handler = Arc::new(Mutex::new(Vec::new()));
3991 let handler_clone = Arc::clone(&handler);
3992 let clock = Rc::new(RefCell::new(TestClock::new()));
3993
3994 let mut aggregator = TimeBarAggregator::new(
3995 bar_type,
3996 instrument.price_precision(),
3997 instrument.size_precision(),
3998 clock.clone(),
3999 move |bar: Bar| {
4000 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4001 handler_guard.push(bar);
4002 },
4003 true, true, BarIntervalType::LeftOpen,
4006 None,
4007 15,
4008 false, );
4010
4011 aggregator.update(
4013 Price::from("100.00"),
4014 Quantity::from(1),
4015 UnixNanos::default(),
4016 );
4017
4018 let ts1 = UnixNanos::from(1_000_000_000);
4020 clock.borrow_mut().set_time(ts1);
4021 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4022 aggregator.build_bar(&event);
4023
4024 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
4026
4027 let ts2 = UnixNanos::from(2_000_000_000);
4029 clock.borrow_mut().set_time(ts2);
4030 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4031 aggregator.build_bar(&event);
4032
4033 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4034 assert_eq!(handler_guard.len(), 2);
4035
4036 let bar1 = &handler_guard[0];
4037 assert_eq!(bar1.ts_event, ts1); assert_eq!(bar1.ts_init, ts1);
4039 assert_eq!(bar1.close, Price::from("100.00"));
4040 let bar2 = &handler_guard[1];
4041 assert_eq!(bar2.ts_event, ts2);
4042 assert_eq!(bar2.ts_init, ts2);
4043 assert_eq!(bar2.close, Price::from("101.00"));
4044 }
4045
4046 #[rstest]
4047 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
4048 let instrument = InstrumentAny::Equity(equity_aapl);
4049 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4050 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4051 let handler = Arc::new(Mutex::new(Vec::new()));
4052 let handler_clone = Arc::clone(&handler);
4053 let clock = Rc::new(RefCell::new(TestClock::new()));
4054 let mut aggregator = TimeBarAggregator::new(
4055 bar_type,
4056 instrument.price_precision(),
4057 instrument.size_precision(),
4058 clock.clone(),
4059 move |bar: Bar| {
4060 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4061 handler_guard.push(bar);
4062 },
4063 true, true, BarIntervalType::RightOpen,
4066 None,
4067 15,
4068 false, );
4070
4071 aggregator.update(
4073 Price::from("100.00"),
4074 Quantity::from(1),
4075 UnixNanos::default(),
4076 );
4077
4078 let ts1 = UnixNanos::from(1_000_000_000);
4080 clock.borrow_mut().set_time(ts1);
4081 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4082 aggregator.build_bar(&event);
4083
4084 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
4086
4087 let ts2 = UnixNanos::from(2_000_000_000);
4089 clock.borrow_mut().set_time(ts2);
4090 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4091 aggregator.build_bar(&event);
4092
4093 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4094 assert_eq!(handler_guard.len(), 2);
4095
4096 let bar1 = &handler_guard[0];
4097 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
4099 assert_eq!(bar1.close, Price::from("100.00"));
4100
4101 let bar2 = &handler_guard[1];
4102 assert_eq!(bar2.ts_event, ts1);
4103 assert_eq!(bar2.ts_init, ts2);
4104 assert_eq!(bar2.close, Price::from("101.00"));
4105 }
4106
4107 #[rstest]
4108 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
4109 let instrument = InstrumentAny::Equity(equity_aapl);
4110 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4111 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4112 let handler = Arc::new(Mutex::new(Vec::new()));
4113 let handler_clone = Arc::clone(&handler);
4114 let clock = Rc::new(RefCell::new(TestClock::new()));
4115
4116 let mut aggregator = TimeBarAggregator::new(
4118 bar_type,
4119 instrument.price_precision(),
4120 instrument.size_precision(),
4121 clock.clone(),
4122 move |bar: Bar| {
4123 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4124 handler_guard.push(bar);
4125 },
4126 false, true, BarIntervalType::LeftOpen,
4129 None,
4130 15,
4131 false, );
4133
4134 let ts1 = UnixNanos::from(1_000_000_000);
4136 clock.borrow_mut().set_time(ts1);
4137 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4138 aggregator.build_bar(&event);
4139
4140 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4141 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
4143
4144 let handler = Arc::new(Mutex::new(Vec::new()));
4146 let handler_clone = Arc::clone(&handler);
4147 let mut aggregator = TimeBarAggregator::new(
4148 bar_type,
4149 instrument.price_precision(),
4150 instrument.size_precision(),
4151 clock.clone(),
4152 move |bar: Bar| {
4153 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4154 handler_guard.push(bar);
4155 },
4156 true, true, BarIntervalType::LeftOpen,
4159 None,
4160 15,
4161 false, );
4163
4164 aggregator.update(
4165 Price::from("100.00"),
4166 Quantity::from(1),
4167 UnixNanos::default(),
4168 );
4169
4170 let ts1 = UnixNanos::from(1_000_000_000);
4172 clock.borrow_mut().set_time(ts1);
4173 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
4174 aggregator.build_bar(&event);
4175
4176 let ts2 = UnixNanos::from(2_000_000_000);
4178 clock.borrow_mut().set_time(ts2);
4179 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4180 aggregator.build_bar(&event);
4181
4182 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4183 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
4185 assert_eq!(bar1.close, Price::from("100.00"));
4186 let bar2 = &handler_guard[1];
4187 assert_eq!(bar2.close, Price::from("100.00")); }
4189
4190 #[rstest]
4191 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
4192 let instrument = InstrumentAny::Equity(equity_aapl);
4193 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
4194 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4195 let clock = Rc::new(RefCell::new(TestClock::new()));
4196 let handler = Arc::new(Mutex::new(Vec::new()));
4197 let handler_clone = Arc::clone(&handler);
4198
4199 let mut aggregator = TimeBarAggregator::new(
4200 bar_type,
4201 instrument.price_precision(),
4202 instrument.size_precision(),
4203 clock.clone(),
4204 move |bar: Bar| {
4205 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4206 handler_guard.push(bar);
4207 },
4208 true, true, BarIntervalType::RightOpen,
4211 None,
4212 15,
4213 false, );
4215
4216 let ts1 = UnixNanos::from(1_000_000_000);
4217 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
4218
4219 let ts2 = UnixNanos::from(2_000_000_000);
4220 clock.borrow_mut().set_time(ts2);
4221
4222 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
4224 aggregator.build_bar(&event);
4225
4226 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4227 let bar = handler_guard.first().unwrap();
4228 assert_eq!(bar.ts_event, UnixNanos::default());
4229 assert_eq!(bar.ts_init, ts2);
4230 }
4231
4232 #[rstest]
4233 fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
4234 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4235 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4237 let handler = Arc::new(Mutex::new(Vec::new()));
4238 let handler_clone = Arc::clone(&handler);
4239
4240 let aggregator = RenkoBarAggregator::new(
4241 bar_type,
4242 instrument.price_precision(),
4243 instrument.size_precision(),
4244 instrument.price_increment(),
4245 move |bar: Bar| {
4246 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4247 handler_guard.push(bar);
4248 },
4249 );
4250
4251 assert_eq!(aggregator.bar_type(), bar_type);
4252 assert!(!aggregator.is_running());
4253 let expected_brick_size = 10 * instrument.price_increment().raw;
4255 assert_eq!(aggregator.brick_size, expected_brick_size);
4256 }
4257
4258 #[rstest]
4259 fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
4260 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4261 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4263 let handler = Arc::new(Mutex::new(Vec::new()));
4264 let handler_clone = Arc::clone(&handler);
4265
4266 let mut aggregator = RenkoBarAggregator::new(
4267 bar_type,
4268 instrument.price_precision(),
4269 instrument.size_precision(),
4270 instrument.price_increment(),
4271 move |bar: Bar| {
4272 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4273 handler_guard.push(bar);
4274 },
4275 );
4276
4277 aggregator.update(
4279 Price::from("1.00000"),
4280 Quantity::from(1),
4281 UnixNanos::default(),
4282 );
4283 aggregator.update(
4284 Price::from("1.00005"),
4285 Quantity::from(1),
4286 UnixNanos::from(1000),
4287 );
4288
4289 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4290 assert_eq!(handler_guard.len(), 0); }
4292
4293 #[rstest]
4294 fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
4295 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4296 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4298 let handler = Arc::new(Mutex::new(Vec::new()));
4299 let handler_clone = Arc::clone(&handler);
4300
4301 let mut aggregator = RenkoBarAggregator::new(
4302 bar_type,
4303 instrument.price_precision(),
4304 instrument.size_precision(),
4305 instrument.price_increment(),
4306 move |bar: Bar| {
4307 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4308 handler_guard.push(bar);
4309 },
4310 );
4311
4312 aggregator.update(
4314 Price::from("1.00000"),
4315 Quantity::from(1),
4316 UnixNanos::default(),
4317 );
4318 aggregator.update(
4319 Price::from("1.00015"),
4320 Quantity::from(1),
4321 UnixNanos::from(1000),
4322 );
4323
4324 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4325 assert_eq!(handler_guard.len(), 1);
4326
4327 let bar = handler_guard.first().unwrap();
4328 assert_eq!(bar.open, Price::from("1.00000"));
4329 assert_eq!(bar.high, Price::from("1.00010"));
4330 assert_eq!(bar.low, Price::from("1.00000"));
4331 assert_eq!(bar.close, Price::from("1.00010"));
4332 assert_eq!(bar.volume, Quantity::from(2));
4333 assert_eq!(bar.ts_event, UnixNanos::from(1000));
4334 assert_eq!(bar.ts_init, UnixNanos::from(1000));
4335 }
4336
4337 #[rstest]
4338 fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
4339 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4340 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4342 let handler = Arc::new(Mutex::new(Vec::new()));
4343 let handler_clone = Arc::clone(&handler);
4344
4345 let mut aggregator = RenkoBarAggregator::new(
4346 bar_type,
4347 instrument.price_precision(),
4348 instrument.size_precision(),
4349 instrument.price_increment(),
4350 move |bar: Bar| {
4351 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4352 handler_guard.push(bar);
4353 },
4354 );
4355
4356 aggregator.update(
4358 Price::from("1.00000"),
4359 Quantity::from(1),
4360 UnixNanos::default(),
4361 );
4362 aggregator.update(
4363 Price::from("1.00025"),
4364 Quantity::from(1),
4365 UnixNanos::from(1000),
4366 );
4367
4368 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4369 assert_eq!(handler_guard.len(), 2);
4370
4371 let bar1 = &handler_guard[0];
4372 assert_eq!(bar1.open, Price::from("1.00000"));
4373 assert_eq!(bar1.high, Price::from("1.00010"));
4374 assert_eq!(bar1.low, Price::from("1.00000"));
4375 assert_eq!(bar1.close, Price::from("1.00010"));
4376
4377 let bar2 = &handler_guard[1];
4378 assert_eq!(bar2.open, Price::from("1.00010"));
4379 assert_eq!(bar2.high, Price::from("1.00020"));
4380 assert_eq!(bar2.low, Price::from("1.00010"));
4381 assert_eq!(bar2.close, Price::from("1.00020"));
4382 }
4383
4384 #[rstest]
4385 fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
4386 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4387 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4389 let handler = Arc::new(Mutex::new(Vec::new()));
4390 let handler_clone = Arc::clone(&handler);
4391
4392 let mut aggregator = RenkoBarAggregator::new(
4393 bar_type,
4394 instrument.price_precision(),
4395 instrument.size_precision(),
4396 instrument.price_increment(),
4397 move |bar: Bar| {
4398 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4399 handler_guard.push(bar);
4400 },
4401 );
4402
4403 aggregator.update(
4405 Price::from("1.00020"),
4406 Quantity::from(1),
4407 UnixNanos::default(),
4408 );
4409 aggregator.update(
4410 Price::from("1.00005"),
4411 Quantity::from(1),
4412 UnixNanos::from(1000),
4413 );
4414
4415 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4416 assert_eq!(handler_guard.len(), 1);
4417
4418 let bar = handler_guard.first().unwrap();
4419 assert_eq!(bar.open, Price::from("1.00020"));
4420 assert_eq!(bar.high, Price::from("1.00020"));
4421 assert_eq!(bar.low, Price::from("1.00010"));
4422 assert_eq!(bar.close, Price::from("1.00010"));
4423 assert_eq!(bar.volume, Quantity::from(2));
4424 }
4425
4426 #[rstest]
4427 fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
4428 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4429 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4431 let handler = Arc::new(Mutex::new(Vec::new()));
4432 let handler_clone = Arc::clone(&handler);
4433
4434 let mut aggregator = RenkoBarAggregator::new(
4435 bar_type,
4436 instrument.price_precision(),
4437 instrument.size_precision(),
4438 instrument.price_increment(),
4439 move |bar: Bar| {
4440 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4441 handler_guard.push(bar);
4442 },
4443 );
4444
4445 let input_bar = Bar::new(
4447 BarType::new(
4448 instrument.id(),
4449 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4450 AggregationSource::Internal,
4451 ),
4452 Price::from("1.00000"),
4453 Price::from("1.00005"),
4454 Price::from("0.99995"),
4455 Price::from("1.00005"), Quantity::from(100),
4457 UnixNanos::default(),
4458 UnixNanos::from(1000),
4459 );
4460
4461 aggregator.handle_bar(input_bar);
4462
4463 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4464 assert_eq!(handler_guard.len(), 0); }
4466
4467 #[rstest]
4468 fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
4469 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4470 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4472 let handler = Arc::new(Mutex::new(Vec::new()));
4473 let handler_clone = Arc::clone(&handler);
4474
4475 let mut aggregator = RenkoBarAggregator::new(
4476 bar_type,
4477 instrument.price_precision(),
4478 instrument.size_precision(),
4479 instrument.price_increment(),
4480 move |bar: Bar| {
4481 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4482 handler_guard.push(bar);
4483 },
4484 );
4485
4486 let bar1 = Bar::new(
4488 BarType::new(
4489 instrument.id(),
4490 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4491 AggregationSource::Internal,
4492 ),
4493 Price::from("1.00000"),
4494 Price::from("1.00005"),
4495 Price::from("0.99995"),
4496 Price::from("1.00000"),
4497 Quantity::from(100),
4498 UnixNanos::default(),
4499 UnixNanos::default(),
4500 );
4501
4502 let bar2 = Bar::new(
4504 BarType::new(
4505 instrument.id(),
4506 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4507 AggregationSource::Internal,
4508 ),
4509 Price::from("1.00000"),
4510 Price::from("1.00015"),
4511 Price::from("0.99995"),
4512 Price::from("1.00010"), Quantity::from(50),
4514 UnixNanos::from(60_000_000_000),
4515 UnixNanos::from(60_000_000_000),
4516 );
4517
4518 aggregator.handle_bar(bar1);
4519 aggregator.handle_bar(bar2);
4520
4521 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4522 assert_eq!(handler_guard.len(), 1);
4523
4524 let bar = handler_guard.first().unwrap();
4525 assert_eq!(bar.open, Price::from("1.00000"));
4526 assert_eq!(bar.high, Price::from("1.00010"));
4527 assert_eq!(bar.low, Price::from("1.00000"));
4528 assert_eq!(bar.close, Price::from("1.00010"));
4529 assert_eq!(bar.volume, Quantity::from(150));
4530 }
4531
4532 #[rstest]
4533 fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
4534 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4535 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4537 let handler = Arc::new(Mutex::new(Vec::new()));
4538 let handler_clone = Arc::clone(&handler);
4539
4540 let mut aggregator = RenkoBarAggregator::new(
4541 bar_type,
4542 instrument.price_precision(),
4543 instrument.size_precision(),
4544 instrument.price_increment(),
4545 move |bar: Bar| {
4546 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4547 handler_guard.push(bar);
4548 },
4549 );
4550
4551 let bar1 = Bar::new(
4553 BarType::new(
4554 instrument.id(),
4555 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4556 AggregationSource::Internal,
4557 ),
4558 Price::from("1.00000"),
4559 Price::from("1.00005"),
4560 Price::from("0.99995"),
4561 Price::from("1.00000"),
4562 Quantity::from(100),
4563 UnixNanos::default(),
4564 UnixNanos::default(),
4565 );
4566
4567 let bar2 = Bar::new(
4569 BarType::new(
4570 instrument.id(),
4571 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4572 AggregationSource::Internal,
4573 ),
4574 Price::from("1.00000"),
4575 Price::from("1.00035"),
4576 Price::from("0.99995"),
4577 Price::from("1.00030"), Quantity::from(50),
4579 UnixNanos::from(60_000_000_000),
4580 UnixNanos::from(60_000_000_000),
4581 );
4582
4583 aggregator.handle_bar(bar1);
4584 aggregator.handle_bar(bar2);
4585
4586 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4587 assert_eq!(handler_guard.len(), 3);
4588
4589 let bar1 = &handler_guard[0];
4590 assert_eq!(bar1.open, Price::from("1.00000"));
4591 assert_eq!(bar1.close, Price::from("1.00010"));
4592
4593 let bar2 = &handler_guard[1];
4594 assert_eq!(bar2.open, Price::from("1.00010"));
4595 assert_eq!(bar2.close, Price::from("1.00020"));
4596
4597 let bar3 = &handler_guard[2];
4598 assert_eq!(bar3.open, Price::from("1.00020"));
4599 assert_eq!(bar3.close, Price::from("1.00030"));
4600 }
4601
4602 #[rstest]
4603 fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
4604 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4605 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4607 let handler = Arc::new(Mutex::new(Vec::new()));
4608 let handler_clone = Arc::clone(&handler);
4609
4610 let mut aggregator = RenkoBarAggregator::new(
4611 bar_type,
4612 instrument.price_precision(),
4613 instrument.size_precision(),
4614 instrument.price_increment(),
4615 move |bar: Bar| {
4616 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4617 handler_guard.push(bar);
4618 },
4619 );
4620
4621 let bar1 = Bar::new(
4623 BarType::new(
4624 instrument.id(),
4625 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4626 AggregationSource::Internal,
4627 ),
4628 Price::from("1.00020"),
4629 Price::from("1.00025"),
4630 Price::from("1.00015"),
4631 Price::from("1.00020"),
4632 Quantity::from(100),
4633 UnixNanos::default(),
4634 UnixNanos::default(),
4635 );
4636
4637 let bar2 = Bar::new(
4639 BarType::new(
4640 instrument.id(),
4641 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
4642 AggregationSource::Internal,
4643 ),
4644 Price::from("1.00020"),
4645 Price::from("1.00025"),
4646 Price::from("1.00005"),
4647 Price::from("1.00010"), Quantity::from(50),
4649 UnixNanos::from(60_000_000_000),
4650 UnixNanos::from(60_000_000_000),
4651 );
4652
4653 aggregator.handle_bar(bar1);
4654 aggregator.handle_bar(bar2);
4655
4656 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4657 assert_eq!(handler_guard.len(), 1);
4658
4659 let bar = handler_guard.first().unwrap();
4660 assert_eq!(bar.open, Price::from("1.00020"));
4661 assert_eq!(bar.high, Price::from("1.00020"));
4662 assert_eq!(bar.low, Price::from("1.00010"));
4663 assert_eq!(bar.close, Price::from("1.00010"));
4664 assert_eq!(bar.volume, Quantity::from(150));
4665 }
4666
4667 #[rstest]
4668 fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
4669 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4670
4671 let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
4674 let handler = Arc::new(Mutex::new(Vec::new()));
4675 let handler_clone = Arc::clone(&handler);
4676
4677 let aggregator_5 = RenkoBarAggregator::new(
4678 bar_type_5,
4679 instrument.price_precision(),
4680 instrument.size_precision(),
4681 instrument.price_increment(),
4682 move |_bar: Bar| {
4683 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4684 handler_guard.push(_bar);
4685 },
4686 );
4687
4688 let expected_brick_size_5 = 5 * instrument.price_increment().raw;
4690 assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
4691
4692 let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
4694 let handler2 = Arc::new(Mutex::new(Vec::new()));
4695 let handler2_clone = Arc::clone(&handler2);
4696
4697 let aggregator_20 = RenkoBarAggregator::new(
4698 bar_type_20,
4699 instrument.price_precision(),
4700 instrument.size_precision(),
4701 instrument.price_increment(),
4702 move |_bar: Bar| {
4703 let mut handler_guard = handler2_clone.lock().expect(MUTEX_POISONED);
4704 handler_guard.push(_bar);
4705 },
4706 );
4707
4708 let expected_brick_size_20 = 20 * instrument.price_increment().raw;
4710 assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
4711 }
4712
4713 #[rstest]
4714 fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
4715 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4716 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4718 let handler = Arc::new(Mutex::new(Vec::new()));
4719 let handler_clone = Arc::clone(&handler);
4720
4721 let mut aggregator = RenkoBarAggregator::new(
4722 bar_type,
4723 instrument.price_precision(),
4724 instrument.size_precision(),
4725 instrument.price_increment(),
4726 move |bar: Bar| {
4727 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4728 handler_guard.push(bar);
4729 },
4730 );
4731
4732 aggregator.update(
4734 Price::from("1.00000"),
4735 Quantity::from(1),
4736 UnixNanos::from(1000),
4737 );
4738 aggregator.update(
4739 Price::from("1.00010"),
4740 Quantity::from(1),
4741 UnixNanos::from(2000),
4742 ); aggregator.update(
4744 Price::from("1.00020"),
4745 Quantity::from(1),
4746 UnixNanos::from(3000),
4747 ); aggregator.update(
4749 Price::from("1.00025"),
4750 Quantity::from(1),
4751 UnixNanos::from(4000),
4752 ); aggregator.update(
4754 Price::from("1.00030"),
4755 Quantity::from(1),
4756 UnixNanos::from(5000),
4757 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
4760 assert_eq!(handler_guard.len(), 3);
4761
4762 let bar1 = &handler_guard[0];
4763 assert_eq!(bar1.open, Price::from("1.00000"));
4764 assert_eq!(bar1.close, Price::from("1.00010"));
4765
4766 let bar2 = &handler_guard[1];
4767 assert_eq!(bar2.open, Price::from("1.00010"));
4768 assert_eq!(bar2.close, Price::from("1.00020"));
4769
4770 let bar3 = &handler_guard[2];
4771 assert_eq!(bar3.open, Price::from("1.00020"));
4772 assert_eq!(bar3.close, Price::from("1.00030"));
4773 }
4774
4775 #[rstest]
4776 fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
4777 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
4778 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4780 let handler = Arc::new(Mutex::new(Vec::new()));
4781 let handler_clone = Arc::clone(&handler);
4782
4783 let mut aggregator = RenkoBarAggregator::new(
4784 bar_type,
4785 instrument.price_precision(),
4786 instrument.size_precision(),
4787 instrument.price_increment(),
4788 move |bar: Bar| {
4789 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4790 handler_guard.push(bar);
4791 },
4792 );
4793
4794 aggregator.update(
4796 Price::from("1.00000"),
4797 Quantity::from(1),
4798 UnixNanos::from(1000),
4799 );
4800 aggregator.update(
4801 Price::from("1.00010"),
4802 Quantity::from(1),
4803 UnixNanos::from(2000),
4804 ); aggregator.update(
4806 Price::from("0.99990"),
4807 Quantity::from(1),
4808 UnixNanos::from(3000),
4809 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
4812 assert_eq!(handler_guard.len(), 3);
4813
4814 let bar1 = &handler_guard[0]; assert_eq!(bar1.open, Price::from("1.00000"));
4816 assert_eq!(bar1.high, Price::from("1.00010"));
4817 assert_eq!(bar1.low, Price::from("1.00000"));
4818 assert_eq!(bar1.close, Price::from("1.00010"));
4819
4820 let bar2 = &handler_guard[1]; assert_eq!(bar2.open, Price::from("1.00010"));
4822 assert_eq!(bar2.high, Price::from("1.00010"));
4823 assert_eq!(bar2.low, Price::from("1.00000"));
4824 assert_eq!(bar2.close, Price::from("1.00000"));
4825
4826 let bar3 = &handler_guard[2]; assert_eq!(bar3.open, Price::from("1.00000"));
4828 assert_eq!(bar3.high, Price::from("1.00000"));
4829 assert_eq!(bar3.low, Price::from("0.99990"));
4830 assert_eq!(bar3.close, Price::from("0.99990"));
4831 }
4832
4833 #[rstest]
4834 fn test_tick_imbalance_bar_aggregator_mixed_trades_cancel_out(equity_aapl: Equity) {
4835 let instrument = InstrumentAny::Equity(equity_aapl);
4836 let bar_spec = BarSpecification::new(3, BarAggregation::TickImbalance, PriceType::Last);
4837 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4838 let handler = Arc::new(Mutex::new(Vec::new()));
4839 let handler_clone = Arc::clone(&handler);
4840
4841 let mut aggregator = TickImbalanceBarAggregator::new(
4842 bar_type,
4843 instrument.price_precision(),
4844 instrument.size_precision(),
4845 move |bar: Bar| {
4846 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4847 handler_guard.push(bar);
4848 },
4849 );
4850
4851 let buy = TradeTick {
4852 aggressor_side: AggressorSide::Buyer,
4853 ..TradeTick::default()
4854 };
4855 let sell = TradeTick {
4856 aggressor_side: AggressorSide::Seller,
4857 ..TradeTick::default()
4858 };
4859
4860 aggregator.handle_trade(buy);
4861 aggregator.handle_trade(sell);
4862 aggregator.handle_trade(buy);
4863
4864 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4865 assert_eq!(handler_guard.len(), 0);
4866 }
4867
4868 #[rstest]
4869 fn test_tick_imbalance_bar_aggregator_no_aggressor_ignored(equity_aapl: Equity) {
4870 let instrument = InstrumentAny::Equity(equity_aapl);
4871 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
4872 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4873 let handler = Arc::new(Mutex::new(Vec::new()));
4874 let handler_clone = Arc::clone(&handler);
4875
4876 let mut aggregator = TickImbalanceBarAggregator::new(
4877 bar_type,
4878 instrument.price_precision(),
4879 instrument.size_precision(),
4880 move |bar: Bar| {
4881 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4882 handler_guard.push(bar);
4883 },
4884 );
4885
4886 let buy = TradeTick {
4887 aggressor_side: AggressorSide::Buyer,
4888 ..TradeTick::default()
4889 };
4890 let no_aggressor = TradeTick {
4891 aggressor_side: AggressorSide::NoAggressor,
4892 ..TradeTick::default()
4893 };
4894
4895 aggregator.handle_trade(buy);
4896 aggregator.handle_trade(no_aggressor);
4897 aggregator.handle_trade(buy);
4898
4899 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4900 assert_eq!(handler_guard.len(), 1);
4901 }
4902
4903 #[rstest]
4904 fn test_tick_runs_bar_aggregator_multiple_consecutive_runs(equity_aapl: Equity) {
4905 let instrument = InstrumentAny::Equity(equity_aapl);
4906 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
4907 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4908 let handler = Arc::new(Mutex::new(Vec::new()));
4909 let handler_clone = Arc::clone(&handler);
4910
4911 let mut aggregator = TickRunsBarAggregator::new(
4912 bar_type,
4913 instrument.price_precision(),
4914 instrument.size_precision(),
4915 move |bar: Bar| {
4916 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4917 handler_guard.push(bar);
4918 },
4919 );
4920
4921 let buy = TradeTick {
4922 aggressor_side: AggressorSide::Buyer,
4923 ..TradeTick::default()
4924 };
4925 let sell = TradeTick {
4926 aggressor_side: AggressorSide::Seller,
4927 ..TradeTick::default()
4928 };
4929
4930 aggregator.handle_trade(buy);
4931 aggregator.handle_trade(buy);
4932 aggregator.handle_trade(sell);
4933 aggregator.handle_trade(sell);
4934
4935 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4936 assert_eq!(handler_guard.len(), 2);
4937 }
4938
4939 #[rstest]
4940 fn test_volume_imbalance_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4941 let instrument = InstrumentAny::Equity(equity_aapl);
4942 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4943 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4944 let handler = Arc::new(Mutex::new(Vec::new()));
4945 let handler_clone = Arc::clone(&handler);
4946
4947 let mut aggregator = VolumeImbalanceBarAggregator::new(
4948 bar_type,
4949 instrument.price_precision(),
4950 instrument.size_precision(),
4951 move |bar: Bar| {
4952 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4953 handler_guard.push(bar);
4954 },
4955 );
4956
4957 let large_trade = TradeTick {
4958 size: Quantity::from(25),
4959 aggressor_side: AggressorSide::Buyer,
4960 ..TradeTick::default()
4961 };
4962
4963 aggregator.handle_trade(large_trade);
4964
4965 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4966 assert_eq!(handler_guard.len(), 2);
4967 }
4968
4969 #[rstest]
4970 fn test_volume_imbalance_bar_aggregator_no_aggressor_does_not_affect_imbalance(
4971 equity_aapl: Equity,
4972 ) {
4973 let instrument = InstrumentAny::Equity(equity_aapl);
4974 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4975 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4976 let handler = Arc::new(Mutex::new(Vec::new()));
4977 let handler_clone = Arc::clone(&handler);
4978
4979 let mut aggregator = VolumeImbalanceBarAggregator::new(
4980 bar_type,
4981 instrument.price_precision(),
4982 instrument.size_precision(),
4983 move |bar: Bar| {
4984 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4985 handler_guard.push(bar);
4986 },
4987 );
4988
4989 let buy = TradeTick {
4990 size: Quantity::from(5),
4991 aggressor_side: AggressorSide::Buyer,
4992 ..TradeTick::default()
4993 };
4994 let no_aggressor = TradeTick {
4995 size: Quantity::from(3),
4996 aggressor_side: AggressorSide::NoAggressor,
4997 ..TradeTick::default()
4998 };
4999
5000 aggregator.handle_trade(buy);
5001 aggregator.handle_trade(no_aggressor);
5002 aggregator.handle_trade(buy);
5003
5004 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5005 assert_eq!(handler_guard.len(), 1);
5006 }
5007
5008 #[rstest]
5009 fn test_volume_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
5010 let instrument = InstrumentAny::Equity(equity_aapl);
5011 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeRuns, PriceType::Last);
5012 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5013 let handler = Arc::new(Mutex::new(Vec::new()));
5014 let handler_clone = Arc::clone(&handler);
5015
5016 let mut aggregator = VolumeRunsBarAggregator::new(
5017 bar_type,
5018 instrument.price_precision(),
5019 instrument.size_precision(),
5020 move |bar: Bar| {
5021 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5022 handler_guard.push(bar);
5023 },
5024 );
5025
5026 let large_trade = TradeTick {
5027 size: Quantity::from(25),
5028 aggressor_side: AggressorSide::Buyer,
5029 ..TradeTick::default()
5030 };
5031
5032 aggregator.handle_trade(large_trade);
5033
5034 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5035 assert_eq!(handler_guard.len(), 2);
5036 }
5037
5038 #[rstest]
5039 fn test_value_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
5040 let instrument = InstrumentAny::Equity(equity_aapl);
5041 let bar_spec = BarSpecification::new(50, BarAggregation::ValueRuns, PriceType::Last);
5042 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5043 let handler = Arc::new(Mutex::new(Vec::new()));
5044 let handler_clone = Arc::clone(&handler);
5045
5046 let mut aggregator = ValueRunsBarAggregator::new(
5047 bar_type,
5048 instrument.price_precision(),
5049 instrument.size_precision(),
5050 move |bar: Bar| {
5051 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5052 handler_guard.push(bar);
5053 },
5054 );
5055
5056 let large_trade = TradeTick {
5057 price: Price::from("5.00"),
5058 size: Quantity::from(25),
5059 aggressor_side: AggressorSide::Buyer,
5060 ..TradeTick::default()
5061 };
5062
5063 aggregator.handle_trade(large_trade);
5064
5065 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5066 assert_eq!(handler_guard.len(), 2);
5067 }
5068
5069 #[rstest]
5070 fn test_value_bar_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
5071 let instrument = InstrumentAny::Equity(equity_aapl);
5072 let bar_spec = BarSpecification::new(100, BarAggregation::Value, PriceType::Last);
5073 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5074 let handler = Arc::new(Mutex::new(Vec::new()));
5075 let handler_clone = Arc::clone(&handler);
5076
5077 let mut aggregator = ValueBarAggregator::new(
5078 bar_type,
5079 instrument.price_precision(),
5080 instrument.size_precision(),
5081 move |bar: Bar| {
5082 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5083 handler_guard.push(bar);
5084 },
5085 );
5086
5087 aggregator.update(
5089 Price::from("1000.00"),
5090 Quantity::from(3),
5091 UnixNanos::default(),
5092 );
5093
5094 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5096 assert_eq!(handler_guard.len(), 3);
5097 for bar in handler_guard.iter() {
5098 assert_eq!(bar.volume, Quantity::from(1));
5099 }
5100 }
5101
5102 #[rstest]
5103 fn test_value_imbalance_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
5104 let instrument = InstrumentAny::Equity(equity_aapl);
5105 let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
5106 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5107 let handler = Arc::new(Mutex::new(Vec::new()));
5108 let handler_clone = Arc::clone(&handler);
5109
5110 let mut aggregator = ValueImbalanceBarAggregator::new(
5111 bar_type,
5112 instrument.price_precision(),
5113 instrument.size_precision(),
5114 move |bar: Bar| {
5115 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5116 handler_guard.push(bar);
5117 },
5118 );
5119
5120 let trade = TradeTick {
5121 price: Price::from("1000.00"),
5122 size: Quantity::from(3),
5123 aggressor_side: AggressorSide::Buyer,
5124 instrument_id: instrument.id(),
5125 ..TradeTick::default()
5126 };
5127
5128 aggregator.handle_trade(trade);
5129
5130 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5131 assert_eq!(handler_guard.len(), 3);
5132 for bar in handler_guard.iter() {
5133 assert_eq!(bar.volume, Quantity::from(1));
5134 }
5135 }
5136
5137 #[rstest]
5138 fn test_value_imbalance_opposite_side_overshoot_emits_bar(equity_aapl: Equity) {
5139 let instrument = InstrumentAny::Equity(equity_aapl);
5140 let bar_spec = BarSpecification::new(100, BarAggregation::ValueImbalance, PriceType::Last);
5141 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5142 let handler = Arc::new(Mutex::new(Vec::new()));
5143 let handler_clone = Arc::clone(&handler);
5144
5145 let mut aggregator = ValueImbalanceBarAggregator::new(
5146 bar_type,
5147 instrument.price_precision(),
5148 instrument.size_precision(),
5149 move |bar: Bar| {
5150 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5151 handler_guard.push(bar);
5152 },
5153 );
5154
5155 let sell_tick = TradeTick {
5157 price: Price::from("10.00"),
5158 size: Quantity::from(5),
5159 aggressor_side: AggressorSide::Seller,
5160 instrument_id: instrument.id(),
5161 ..TradeTick::default()
5162 };
5163
5164 let buy_tick = TradeTick {
5167 price: Price::from("1000.00"),
5168 size: Quantity::from(1),
5169 aggressor_side: AggressorSide::Buyer,
5170 instrument_id: instrument.id(),
5171 ts_init: UnixNanos::from(1),
5172 ts_event: UnixNanos::from(1),
5173 ..TradeTick::default()
5174 };
5175
5176 aggregator.handle_trade(sell_tick);
5177 aggregator.handle_trade(buy_tick);
5178
5179 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5180 assert_eq!(handler_guard.len(), 1);
5181 assert_eq!(handler_guard[0].volume, Quantity::from(6));
5182 }
5183
5184 #[rstest]
5185 fn test_value_runs_high_price_low_step_no_zero_volume_bars(equity_aapl: Equity) {
5186 let instrument = InstrumentAny::Equity(equity_aapl);
5187 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
5188 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5189 let handler = Arc::new(Mutex::new(Vec::new()));
5190 let handler_clone = Arc::clone(&handler);
5191
5192 let mut aggregator = ValueRunsBarAggregator::new(
5193 bar_type,
5194 instrument.price_precision(),
5195 instrument.size_precision(),
5196 move |bar: Bar| {
5197 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5198 handler_guard.push(bar);
5199 },
5200 );
5201
5202 let trade = TradeTick {
5203 price: Price::from("1000.00"),
5204 size: Quantity::from(3),
5205 aggressor_side: AggressorSide::Buyer,
5206 instrument_id: instrument.id(),
5207 ..TradeTick::default()
5208 };
5209
5210 aggregator.handle_trade(trade);
5211
5212 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5213 assert_eq!(handler_guard.len(), 3);
5214 for bar in handler_guard.iter() {
5215 assert_eq!(bar.volume, Quantity::from(1));
5216 }
5217 }
5218
5219 #[rstest]
5220 #[case(1000_u64)]
5221 #[case(1500_u64)]
5222 fn test_volume_imbalance_bar_aggregator_large_step_no_overflow(
5223 equity_aapl: Equity,
5224 #[case] step: u64,
5225 ) {
5226 let instrument = InstrumentAny::Equity(equity_aapl);
5227 let bar_spec = BarSpecification::new(
5228 step as usize,
5229 BarAggregation::VolumeImbalance,
5230 PriceType::Last,
5231 );
5232 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5233 let handler = Arc::new(Mutex::new(Vec::new()));
5234 let handler_clone = Arc::clone(&handler);
5235
5236 let mut aggregator = VolumeImbalanceBarAggregator::new(
5237 bar_type,
5238 instrument.price_precision(),
5239 instrument.size_precision(),
5240 move |bar: Bar| {
5241 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5242 handler_guard.push(bar);
5243 },
5244 );
5245
5246 let trade = TradeTick {
5247 size: Quantity::from(step * 2),
5248 aggressor_side: AggressorSide::Buyer,
5249 ..TradeTick::default()
5250 };
5251
5252 aggregator.handle_trade(trade);
5253
5254 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5255 assert_eq!(handler_guard.len(), 2);
5256 for bar in handler_guard.iter() {
5257 assert_eq!(bar.volume.as_f64(), step as f64);
5258 }
5259 }
5260
5261 #[rstest]
5262 fn test_volume_imbalance_bar_aggregator_different_large_steps_produce_different_bar_counts(
5263 equity_aapl: Equity,
5264 ) {
5265 let instrument = InstrumentAny::Equity(equity_aapl);
5266 let total_volume = 3000_u64;
5267 let mut results = Vec::new();
5268
5269 for step in [1000_usize, 1500] {
5270 let bar_spec =
5271 BarSpecification::new(step, BarAggregation::VolumeImbalance, PriceType::Last);
5272 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5273 let handler = Arc::new(Mutex::new(Vec::new()));
5274 let handler_clone = Arc::clone(&handler);
5275
5276 let mut aggregator = VolumeImbalanceBarAggregator::new(
5277 bar_type,
5278 instrument.price_precision(),
5279 instrument.size_precision(),
5280 move |bar: Bar| {
5281 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5282 handler_guard.push(bar);
5283 },
5284 );
5285
5286 let trade = TradeTick {
5287 size: Quantity::from(total_volume),
5288 aggressor_side: AggressorSide::Buyer,
5289 ..TradeTick::default()
5290 };
5291
5292 aggregator.handle_trade(trade);
5293
5294 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5295 results.push(handler_guard.len());
5296 }
5297
5298 assert_eq!(results[0], 3); assert_eq!(results[1], 2); assert_ne!(results[0], results[1]);
5301 }
5302
5303 #[rstest]
5304 #[case(1000_u64)]
5305 #[case(1500_u64)]
5306 fn test_volume_runs_bar_aggregator_large_step_no_overflow(
5307 equity_aapl: Equity,
5308 #[case] step: u64,
5309 ) {
5310 let instrument = InstrumentAny::Equity(equity_aapl);
5311 let bar_spec =
5312 BarSpecification::new(step as usize, BarAggregation::VolumeRuns, PriceType::Last);
5313 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5314 let handler = Arc::new(Mutex::new(Vec::new()));
5315 let handler_clone = Arc::clone(&handler);
5316
5317 let mut aggregator = VolumeRunsBarAggregator::new(
5318 bar_type,
5319 instrument.price_precision(),
5320 instrument.size_precision(),
5321 move |bar: Bar| {
5322 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5323 handler_guard.push(bar);
5324 },
5325 );
5326
5327 let trade = TradeTick {
5328 size: Quantity::from(step * 2),
5329 aggressor_side: AggressorSide::Buyer,
5330 ..TradeTick::default()
5331 };
5332
5333 aggregator.handle_trade(trade);
5334
5335 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5336 assert_eq!(handler_guard.len(), 2);
5337 for bar in handler_guard.iter() {
5338 assert_eq!(bar.volume.as_f64(), step as f64);
5339 }
5340 }
5341
5342 #[rstest]
5343 fn test_volume_runs_bar_aggregator_different_large_steps_produce_different_bar_counts(
5344 equity_aapl: Equity,
5345 ) {
5346 let instrument = InstrumentAny::Equity(equity_aapl);
5347 let total_volume = 3000_u64;
5348 let mut results = Vec::new();
5349
5350 for step in [1000_usize, 1500] {
5351 let bar_spec = BarSpecification::new(step, BarAggregation::VolumeRuns, PriceType::Last);
5352 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5353 let handler = Arc::new(Mutex::new(Vec::new()));
5354 let handler_clone = Arc::clone(&handler);
5355
5356 let mut aggregator = VolumeRunsBarAggregator::new(
5357 bar_type,
5358 instrument.price_precision(),
5359 instrument.size_precision(),
5360 move |bar: Bar| {
5361 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
5362 handler_guard.push(bar);
5363 },
5364 );
5365
5366 let trade = TradeTick {
5367 size: Quantity::from(total_volume),
5368 aggressor_side: AggressorSide::Buyer,
5369 ..TradeTick::default()
5370 };
5371
5372 aggregator.handle_trade(trade);
5373
5374 let handler_guard = handler.lock().expect(MUTEX_POISONED);
5375 results.push(handler_guard.len());
5376 }
5377
5378 assert_eq!(results[0], 3); assert_eq!(results[1], 2); assert_ne!(results[0], results[1]);
5381 }
5382
5383 #[rstest]
5385 fn test_time_bar_historical_defers_event_at_ts_init_until_after_update(equity_aapl: Equity) {
5386 let instrument = InstrumentAny::Equity(equity_aapl);
5387 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
5388 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
5389 let handler = Arc::new(Mutex::new(Vec::new()));
5390 let handler_clone = Arc::clone(&handler);
5391 let clock = Rc::new(RefCell::new(TestClock::new()));
5392
5393 let mut agg = TimeBarAggregator::new(
5394 bar_type,
5395 instrument.price_precision(),
5396 instrument.size_precision(),
5397 clock.clone(),
5398 move |bar: Bar| {
5399 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
5400 h.push(bar);
5401 },
5402 true,
5403 true,
5404 BarIntervalType::LeftOpen,
5405 None,
5406 0,
5407 false,
5408 );
5409 agg.historical_mode = true;
5410 agg.set_clock_internal(clock);
5411 let boxed: Box<dyn BarAggregator> = Box::new(agg);
5412 let rc = Rc::new(RefCell::new(boxed));
5413 rc.borrow_mut().set_aggregator_weak(Rc::downgrade(&rc));
5414
5415 rc.borrow_mut().update(
5416 Price::from("100.00"),
5417 Quantity::from(1),
5418 UnixNanos::default(),
5419 );
5420 rc.borrow_mut().update(
5421 Price::from("100.00"),
5422 Quantity::from(1),
5423 UnixNanos::from(1_000_000_000),
5424 );
5425
5426 let bars = handler.lock().expect(MUTEX_POISONED);
5427 assert!(
5428 !bars.is_empty(),
5429 "deferred event at ts_init should produce a bar that includes the update"
5430 );
5431 let last_bar = bars.last().unwrap();
5432 assert_eq!(last_bar.close, Price::from("100.00"));
5433 assert!(
5434 last_bar.volume.as_f64() >= 1.0,
5435 "bar built after deferred event should include the update at ts_init"
5436 );
5437 }
5438
5439 #[rstest]
5440 fn test_spread_quote_quote_driven_emits_when_all_legs_received(equity_aapl: Equity) {
5441 let instrument = InstrumentAny::Equity(equity_aapl);
5442 let leg1 = instrument.id();
5443 let leg2 = InstrumentId::from("MSFT.XNAS");
5444 let spread_id = InstrumentId::from("SPREAD.XNAS");
5445 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5446 let handler = Arc::new(Mutex::new(Vec::new()));
5447 let handler_clone = Arc::clone(&handler);
5448 let clock = Rc::new(RefCell::new(TestClock::new()));
5449
5450 let mut agg = SpreadQuoteAggregator::new(
5451 spread_id,
5452 &legs,
5453 true,
5454 instrument.price_precision(),
5455 0,
5456 Box::new(move |q: QuoteTick| {
5457 handler_clone.lock().expect(MUTEX_POISONED).push(q);
5458 }),
5459 clock,
5460 false,
5461 None,
5462 0,
5463 None,
5464 None,
5465 );
5466
5467 let ts = UnixNanos::from(1_000_000_000);
5468 agg.handle_quote_tick(QuoteTick::new(
5469 leg1,
5470 Price::from("100.00"),
5471 Price::from("100.10"),
5472 Quantity::from(10),
5473 Quantity::from(10),
5474 ts,
5475 ts,
5476 ));
5477 assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
5478
5479 agg.handle_quote_tick(QuoteTick::new(
5480 leg2,
5481 Price::from("99.00"),
5482 Price::from("99.10"),
5483 Quantity::from(10),
5484 Quantity::from(10),
5485 ts,
5486 ts,
5487 ));
5488 let quotes = handler.lock().expect(MUTEX_POISONED);
5489 assert_eq!(quotes.len(), 1);
5490 assert_eq!(quotes[0].instrument_id, spread_id);
5491 assert!(quotes[0].bid_price < quotes[0].ask_price);
5492 }
5493
5494 #[rstest]
5495 fn test_spread_quote_futures_pricing_signed_ratios(equity_aapl: Equity) {
5496 let instrument = InstrumentAny::Equity(equity_aapl);
5497 let leg1 = instrument.id();
5498 let leg2 = InstrumentId::from("MSFT.XNAS");
5499 let spread_id = InstrumentId::from("SPREAD.XNAS");
5500 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5501 let handler = Arc::new(Mutex::new(Vec::new()));
5502 let handler_clone = Arc::clone(&handler);
5503 let clock = Rc::new(RefCell::new(TestClock::new()));
5504
5505 let mut agg = SpreadQuoteAggregator::new(
5506 spread_id,
5507 &legs,
5508 true,
5509 instrument.price_precision(),
5510 0,
5511 Box::new(move |q: QuoteTick| {
5512 handler_clone.lock().expect(MUTEX_POISONED).push(q);
5513 }),
5514 clock,
5515 false,
5516 None,
5517 0,
5518 None,
5519 None,
5520 );
5521
5522 let ts = UnixNanos::from(1_000_000_000);
5523 agg.handle_quote_tick(QuoteTick::new(
5524 leg1,
5525 Price::from("10.00"),
5526 Price::from("10.10"),
5527 Quantity::from(100),
5528 Quantity::from(100),
5529 ts,
5530 ts,
5531 ));
5532 agg.handle_quote_tick(QuoteTick::new(
5533 leg2,
5534 Price::from("20.00"),
5535 Price::from("20.10"),
5536 Quantity::from(100),
5537 Quantity::from(100),
5538 ts,
5539 ts,
5540 ));
5541 let quotes = handler.lock().expect(MUTEX_POISONED);
5542 assert_eq!(quotes.len(), 1);
5543 let q = "es[0];
5544 assert_eq!(q.instrument_id, spread_id);
5545 assert_eq!(q.bid_price, Price::from("-10.10"));
5546 assert_eq!(q.ask_price, Price::from("-9.90"));
5547 }
5548
5549 #[rstest]
5550 fn test_spread_quote_size_calculation_non_unit_ratios(equity_aapl: Equity) {
5551 let instrument = InstrumentAny::Equity(equity_aapl);
5552 let leg1 = instrument.id();
5553 let leg2 = InstrumentId::from("MSFT.XNAS");
5554 let spread_id = InstrumentId::from("SPREAD.XNAS");
5555 let legs = vec![(leg1, 2_i64), (leg2, -1_i64)];
5556 let handler = Arc::new(Mutex::new(Vec::new()));
5557 let handler_clone = Arc::clone(&handler);
5558 let clock = Rc::new(RefCell::new(TestClock::new()));
5559
5560 let mut agg = SpreadQuoteAggregator::new(
5561 spread_id,
5562 &legs,
5563 true,
5564 instrument.price_precision(),
5565 0,
5566 Box::new(move |q: QuoteTick| {
5567 handler_clone.lock().expect(MUTEX_POISONED).push(q);
5568 }),
5569 clock,
5570 false,
5571 None,
5572 0,
5573 None,
5574 None,
5575 );
5576
5577 let ts = UnixNanos::from(1_000_000_000);
5578 agg.handle_quote_tick(QuoteTick::new(
5579 leg1,
5580 Price::from("10.00"),
5581 Price::from("10.10"),
5582 Quantity::from(100),
5583 Quantity::from(40),
5584 ts,
5585 ts,
5586 ));
5587 agg.handle_quote_tick(QuoteTick::new(
5588 leg2,
5589 Price::from("10.00"),
5590 Price::from("10.10"),
5591 Quantity::from(50),
5592 Quantity::from(30),
5593 ts,
5594 ts,
5595 ));
5596 let quotes = handler.lock().expect(MUTEX_POISONED);
5597 assert_eq!(quotes.len(), 1);
5598 let q = "es[0];
5599 assert_eq!(q.bid_size.as_f64(), 30.0);
5600 assert_eq!(q.ask_size.as_f64(), 20.0);
5601 }
5602
5603 #[rstest]
5604 fn test_spread_quote_timer_driven_emission_cadence(equity_aapl: Equity) {
5605 let instrument = InstrumentAny::Equity(equity_aapl);
5606 let leg1 = instrument.id();
5607 let leg2 = InstrumentId::from("MSFT.XNAS");
5608 let spread_id = InstrumentId::from("SPREAD.XNAS");
5609 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5610 let handler = Arc::new(Mutex::new(Vec::new()));
5611 let handler_clone = Arc::clone(&handler);
5612 let clock = Rc::new(RefCell::new(TestClock::new()));
5613 clock.borrow_mut().set_time(UnixNanos::from(0));
5614
5615 let agg = SpreadQuoteAggregator::new(
5616 spread_id,
5617 &legs,
5618 true,
5619 instrument.price_precision(),
5620 0,
5621 Box::new(move |q: QuoteTick| {
5622 handler_clone.lock().expect(MUTEX_POISONED).push(q);
5623 }),
5624 clock.clone(),
5625 false,
5626 Some(1),
5627 0,
5628 None,
5629 None,
5630 );
5631 let rc = Rc::new(RefCell::new(agg));
5632 rc.borrow_mut().prepare_for_timer_mode(&rc);
5633 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
5634
5635 for event in clock.borrow_mut().advance_time(UnixNanos::from(0), true) {
5636 rc.borrow_mut().on_timer_fire(event.ts_event);
5637 }
5638 assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
5639
5640 let ts1 = UnixNanos::from(1_000_000_000);
5641 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5642 leg1,
5643 Price::from("100.00"),
5644 Price::from("100.10"),
5645 Quantity::from(10),
5646 Quantity::from(10),
5647 ts1,
5648 ts1,
5649 ));
5650 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5651 leg2,
5652 Price::from("99.00"),
5653 Price::from("99.10"),
5654 Quantity::from(10),
5655 Quantity::from(10),
5656 ts1,
5657 ts1,
5658 ));
5659
5660 for event in clock.borrow_mut().advance_time(ts1, true) {
5661 rc.borrow_mut().on_timer_fire(event.ts_event);
5662 }
5663
5664 {
5665 let quotes = handler.lock().expect(MUTEX_POISONED);
5666 assert_eq!(quotes.len(), 1);
5667 assert_eq!(quotes[0].ts_event, ts1);
5668 assert_eq!(quotes[0].ts_init, ts1);
5669 }
5670
5671 let ts2 = UnixNanos::from(2_000_000_000);
5672 for event in clock.borrow_mut().advance_time(ts2, true) {
5673 rc.borrow_mut().on_timer_fire(event.ts_event);
5674 }
5675
5676 let quotes = handler.lock().expect(MUTEX_POISONED);
5677 assert_eq!(quotes.len(), 1);
5678 }
5679
5680 #[rstest]
5681 fn test_spread_quote_historical_timer_waits_for_all_legs(equity_aapl: Equity) {
5682 let instrument = InstrumentAny::Equity(equity_aapl);
5683 let leg1 = instrument.id();
5684 let leg2 = InstrumentId::from("MSFT.XNAS");
5685 let spread_id = InstrumentId::from("SPREAD.XNAS");
5686 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5687 let handler = Arc::new(Mutex::new(Vec::new()));
5688 let handler_clone = Arc::clone(&handler);
5689 let clock = Rc::new(RefCell::new(TestClock::new()));
5690
5691 let agg = SpreadQuoteAggregator::new(
5692 spread_id,
5693 &legs,
5694 true,
5695 instrument.price_precision(),
5696 0,
5697 Box::new(move |q: QuoteTick| {
5698 handler_clone.lock().expect(MUTEX_POISONED).push(q);
5699 }),
5700 clock.clone(),
5702 true,
5703 Some(1),
5704 0,
5705 None,
5706 None,
5707 );
5708 let rc = Rc::new(RefCell::new(agg));
5709 rc.borrow_mut().prepare_for_timer_mode(&rc);
5710 rc.borrow_mut().set_clock(clock);
5711
5712 let ts1 = UnixNanos::from(1_000_000_000);
5713 let ts2 = UnixNanos::from(2_000_000_000);
5714 let ts3 = UnixNanos::from(3_000_000_000);
5715 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5716 leg1,
5717 Price::from("100.00"),
5718 Price::from("100.10"),
5719 Quantity::from(10),
5720 Quantity::from(10),
5721 ts1,
5722 ts1,
5723 ));
5724 assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
5725
5726 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5727 leg2,
5728 Price::from("99.00"),
5729 Price::from("99.10"),
5730 Quantity::from(10),
5731 Quantity::from(10),
5732 ts2,
5733 ts2,
5734 ));
5735 assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
5736
5737 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5738 leg1,
5739 Price::from("100.00"),
5740 Price::from("100.10"),
5741 Quantity::from(10),
5742 Quantity::from(10),
5743 ts3,
5744 ts3,
5745 ));
5746 let quotes = handler.lock().expect(MUTEX_POISONED);
5747 assert_eq!(
5748 quotes.len(),
5749 1,
5750 "deferred event at ts2 is processed when we have all legs and advance to ts3"
5751 );
5752 }
5753
5754 #[rstest]
5755 fn test_spread_quote_historical_flush_emits_pending_final_quote(equity_aapl: Equity) {
5756 let instrument = InstrumentAny::Equity(equity_aapl);
5757 let leg1 = instrument.id();
5758 let leg2 = InstrumentId::from("MSFT.XNAS");
5759 let spread_id = InstrumentId::from("SPREAD.XNAS");
5760 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5761 let handler = Arc::new(Mutex::new(Vec::new()));
5762 let handler_clone = Arc::clone(&handler);
5763 let clock = Rc::new(RefCell::new(TestClock::new()));
5764
5765 let agg = SpreadQuoteAggregator::new(
5766 spread_id,
5767 &legs,
5768 true,
5769 instrument.price_precision(),
5770 0,
5771 Box::new(move |q: QuoteTick| {
5772 handler_clone.lock().expect(MUTEX_POISONED).push(q);
5773 }),
5774 clock.clone(),
5776 true,
5777 Some(1),
5778 0,
5779 None,
5780 None,
5781 );
5782 let rc = Rc::new(RefCell::new(agg));
5783 rc.borrow_mut().prepare_for_timer_mode(&rc);
5784 rc.borrow_mut().set_clock(clock);
5785
5786 let ts1 = UnixNanos::from(1_000_000_000);
5787 let ts2 = UnixNanos::from(2_000_000_000);
5788 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5789 leg1,
5790 Price::from("100.00"),
5791 Price::from("100.10"),
5792 Quantity::from(10),
5793 Quantity::from(10),
5794 ts1,
5795 ts1,
5796 ));
5797 rc.borrow_mut().handle_quote_tick(QuoteTick::new(
5798 leg2,
5799 Price::from("99.00"),
5800 Price::from("99.10"),
5801 Quantity::from(10),
5802 Quantity::from(10),
5803 ts2,
5804 ts2,
5805 ));
5806
5807 assert_eq!(handler.lock().expect(MUTEX_POISONED).len(), 0);
5808
5809 rc.borrow_mut().flush_pending_historical_quote();
5810
5811 let quotes = handler.lock().expect(MUTEX_POISONED);
5812 assert_eq!(
5813 quotes.len(),
5814 1,
5815 "final historical quote should be emitted when the deferred event is flushed",
5816 );
5817 assert_eq!(quotes[0].ts_event, ts2);
5818 }
5819
5820 #[rstest]
5821 fn test_spread_quote_option_vega_weighting(equity_aapl: Equity) {
5822 let instrument = InstrumentAny::Equity(equity_aapl);
5823 let leg1 = instrument.id();
5824 let leg2 = InstrumentId::from("MSFT.XNAS");
5825 let spread_id = InstrumentId::from("SPREAD.XNAS");
5826 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5827 let handler = Arc::new(Mutex::new(Vec::new()));
5828 let handler_clone = Arc::clone(&handler);
5829 let clock = Rc::new(RefCell::new(TestClock::new()));
5830
5831 let mut vega_provider = MapVegaProvider::new();
5832 vega_provider.insert(leg1, 0.15);
5833 vega_provider.insert(leg2, 0.12);
5834
5835 let mut agg = SpreadQuoteAggregator::new(
5836 spread_id,
5837 &legs,
5838 false,
5839 instrument.price_precision(),
5840 0,
5841 Box::new(move |q: QuoteTick| {
5842 handler_clone.lock().expect(MUTEX_POISONED).push(q);
5843 }),
5844 clock,
5845 false,
5846 None,
5847 0,
5848 Some(Box::new(vega_provider)),
5849 None,
5850 );
5851
5852 let ts = UnixNanos::from(1_000_000_000);
5853 agg.handle_quote_tick(QuoteTick::new(
5854 leg1,
5855 Price::from("10.00"),
5856 Price::from("10.20"),
5857 Quantity::from(100),
5858 Quantity::from(100),
5859 ts,
5860 ts,
5861 ));
5862 agg.handle_quote_tick(QuoteTick::new(
5863 leg2,
5864 Price::from("11.00"),
5865 Price::from("11.20"),
5866 Quantity::from(100),
5867 Quantity::from(100),
5868 ts,
5869 ts,
5870 ));
5871 let quotes = handler.lock().expect(MUTEX_POISONED);
5872 assert_eq!(quotes.len(), 1);
5873 let q = "es[0];
5874 assert!(q.bid_price < q.ask_price);
5875 assert!(q.ask_price.as_f64() - q.bid_price.as_f64() > 0.0);
5876 }
5877
5878 #[rstest]
5879 fn test_spread_quote_all_zero_vega_fallback(equity_aapl: Equity) {
5880 let instrument = InstrumentAny::Equity(equity_aapl);
5881 let leg1 = instrument.id();
5882 let leg2 = InstrumentId::from("MSFT.XNAS");
5883 let spread_id = InstrumentId::from("SPREAD.XNAS");
5884 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5885 let handler = Arc::new(Mutex::new(Vec::new()));
5886 let handler_clone = Arc::clone(&handler);
5887 let clock = Rc::new(RefCell::new(TestClock::new()));
5888
5889 let mut vega_provider = MapVegaProvider::new();
5890 vega_provider.insert(leg1, 0.0);
5891 vega_provider.insert(leg2, 0.0);
5892
5893 let mut agg = SpreadQuoteAggregator::new(
5894 spread_id,
5895 &legs,
5896 false,
5897 instrument.price_precision(),
5898 0,
5899 Box::new(move |q: QuoteTick| {
5900 handler_clone.lock().expect(MUTEX_POISONED).push(q);
5901 }),
5902 clock,
5903 false,
5904 None,
5905 0,
5906 Some(Box::new(vega_provider)),
5907 None,
5908 );
5909
5910 let ts = UnixNanos::from(1_000_000_000);
5911 agg.handle_quote_tick(QuoteTick::new(
5912 leg1,
5913 Price::from("10.00"),
5914 Price::from("10.10"),
5915 Quantity::from(100),
5916 Quantity::from(100),
5917 ts,
5918 ts,
5919 ));
5920 agg.handle_quote_tick(QuoteTick::new(
5921 leg2,
5922 Price::from("20.00"),
5923 Price::from("20.10"),
5924 Quantity::from(100),
5925 Quantity::from(100),
5926 ts,
5927 ts,
5928 ));
5929 let quotes = handler.lock().expect(MUTEX_POISONED);
5930 assert_eq!(quotes.len(), 1);
5931 let q = "es[0];
5932 assert_eq!(q.bid_price, Price::from("-10.10"));
5933 assert_eq!(q.ask_price, Price::from("-9.90"));
5934 }
5935
5936 #[rstest]
5937 fn test_spread_quote_negative_prices_tick_scheme(equity_aapl: Equity) {
5938 let instrument = InstrumentAny::Equity(equity_aapl);
5939 let leg1 = instrument.id();
5940 let leg2 = InstrumentId::from("MSFT.XNAS");
5941 let spread_id = InstrumentId::from("SPREAD.XNAS");
5942 let legs = vec![(leg1, 1_i64), (leg2, -1_i64)];
5943 let handler = Arc::new(Mutex::new(Vec::new()));
5944 let handler_clone = Arc::clone(&handler);
5945 let clock = Rc::new(RefCell::new(TestClock::new()));
5946 let rounder = FixedTickSchemeRounder::new(0.01).unwrap();
5947
5948 let mut agg = SpreadQuoteAggregator::new(
5949 spread_id,
5950 &legs,
5951 true,
5952 2,
5953 0,
5954 Box::new(move |q: QuoteTick| {
5955 handler_clone.lock().expect(MUTEX_POISONED).push(q);
5956 }),
5957 clock,
5958 false,
5959 None,
5960 0,
5961 None,
5962 Some(Box::new(rounder)),
5963 );
5964
5965 let ts = UnixNanos::from(1_000_000_000);
5966 agg.handle_quote_tick(QuoteTick::new(
5967 leg1,
5968 Price::from("10.00"),
5969 Price::from("10.10"),
5970 Quantity::from(100),
5971 Quantity::from(100),
5972 ts,
5973 ts,
5974 ));
5975 agg.handle_quote_tick(QuoteTick::new(
5976 leg2,
5977 Price::from("20.00"),
5978 Price::from("20.10"),
5979 Quantity::from(100),
5980 Quantity::from(100),
5981 ts,
5982 ts,
5983 ));
5984 let quotes = handler.lock().expect(MUTEX_POISONED);
5985 assert_eq!(quotes.len(), 1);
5986 let q = "es[0];
5987 assert!(q.bid_price.as_f64() < 0.0);
5988 assert!(q.ask_price.as_f64() < 0.0);
5989 assert!(q.bid_price < q.ask_price);
5990 }
5991
5992 #[rstest]
5993 #[case(BarIntervalType::LeftOpen)]
5994 #[case(BarIntervalType::RightOpen)]
5995 fn test_time_bar_skip_first_non_full_bar_noop_on_boundary(
5996 equity_aapl: Equity,
5997 #[case] interval_type: BarIntervalType,
5998 ) {
5999 let instrument = InstrumentAny::Equity(equity_aapl);
6004 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6005 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6006 let handler = Arc::new(Mutex::new(Vec::new()));
6007 let handler_clone = Arc::clone(&handler);
6008 let clock = Rc::new(RefCell::new(TestClock::new()));
6009 clock.borrow_mut().set_time(UnixNanos::from(1_000_000_000));
6010 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6011
6012 let aggregator = TimeBarAggregator::new(
6013 bar_type,
6014 instrument.price_precision(),
6015 instrument.size_precision(),
6016 clock,
6017 move |bar: Bar| {
6018 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6019 h.push(bar);
6020 },
6021 false,
6022 false,
6023 interval_type,
6024 None,
6025 0,
6026 true, );
6028
6029 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6030 let rc = Rc::new(RefCell::new(boxed));
6031 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6032
6033 rc.borrow_mut().update(
6034 Price::from("100.00"),
6035 Quantity::from(1),
6036 UnixNanos::from(1_000_000_000),
6037 );
6038 rc.borrow_mut().build_bar(&TimeEvent::new(
6039 event_name,
6040 UUID4::new(),
6041 UnixNanos::from(2_000_000_000),
6042 UnixNanos::from(2_000_000_000),
6043 ));
6044 rc.borrow_mut().update(
6045 Price::from("101.00"),
6046 Quantity::from(1),
6047 UnixNanos::from(2_500_000_000),
6048 );
6049 rc.borrow_mut().build_bar(&TimeEvent::new(
6050 event_name,
6051 UUID4::new(),
6052 UnixNanos::from(3_000_000_000),
6053 UnixNanos::from(3_000_000_000),
6054 ));
6055
6056 let bars = handler.lock().expect(MUTEX_POISONED);
6057 assert_eq!(bars.len(), 2);
6058 assert_eq!(bars[0].close, Price::from("100.00"));
6059 assert_eq!(bars[1].close, Price::from("101.00"));
6060 }
6061
6062 #[rstest]
6063 #[case(BarIntervalType::LeftOpen)]
6064 #[case(BarIntervalType::RightOpen)]
6065 fn test_time_bar_skip_first_non_full_bar_drops_partial_bar(
6066 equity_aapl: Equity,
6067 #[case] interval_type: BarIntervalType,
6068 ) {
6069 let instrument = InstrumentAny::Equity(equity_aapl);
6073 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6074 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6075 let handler = Arc::new(Mutex::new(Vec::new()));
6076 let handler_clone = Arc::clone(&handler);
6077 let clock = Rc::new(RefCell::new(TestClock::new()));
6078 clock.borrow_mut().set_time(UnixNanos::from(1_500_000_000));
6079 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6080
6081 let aggregator = TimeBarAggregator::new(
6082 bar_type,
6083 instrument.price_precision(),
6084 instrument.size_precision(),
6085 clock,
6086 move |bar: Bar| {
6087 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6088 h.push(bar);
6089 },
6090 false,
6091 false,
6092 interval_type,
6093 None,
6094 0,
6095 true, );
6097
6098 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6099 let rc = Rc::new(RefCell::new(boxed));
6100 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6101
6102 rc.borrow_mut().update(
6103 Price::from("100.00"),
6104 Quantity::from(1),
6105 UnixNanos::from(1_500_000_000),
6106 );
6107 rc.borrow_mut().build_bar(&TimeEvent::new(
6108 event_name,
6109 UUID4::new(),
6110 UnixNanos::from(2_000_000_000),
6111 UnixNanos::from(2_000_000_000),
6112 ));
6113 rc.borrow_mut().update(
6114 Price::from("101.00"),
6115 Quantity::from(1),
6116 UnixNanos::from(2_500_000_000),
6117 );
6118 rc.borrow_mut().build_bar(&TimeEvent::new(
6119 event_name,
6120 UUID4::new(),
6121 UnixNanos::from(3_000_000_000),
6122 UnixNanos::from(3_000_000_000),
6123 ));
6124
6125 let bars = handler.lock().expect(MUTEX_POISONED);
6126 assert_eq!(bars.len(), 1);
6127 assert_eq!(bars[0].close, Price::from("101.00"));
6128 }
6129
6130 #[rstest]
6131 fn test_time_bar_skip_first_non_full_bar_skips_every_call_before_first_close(
6132 equity_aapl: Equity,
6133 ) {
6134 let instrument = InstrumentAny::Equity(equity_aapl);
6138 let bar_spec = BarSpecification::new(10, BarAggregation::Second, PriceType::Last);
6139 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6140 let handler = Arc::new(Mutex::new(Vec::new()));
6141 let handler_clone = Arc::clone(&handler);
6142 let clock = Rc::new(RefCell::new(TestClock::new()));
6143 clock.borrow_mut().set_time(UnixNanos::from(5_000_000_000));
6144 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6145
6146 let aggregator = TimeBarAggregator::new(
6147 bar_type,
6148 instrument.price_precision(),
6149 instrument.size_precision(),
6150 clock,
6151 move |bar: Bar| {
6152 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6153 h.push(bar);
6154 },
6155 false,
6156 false,
6157 BarIntervalType::LeftOpen,
6158 None,
6159 0,
6160 true, );
6162
6163 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6164 let rc = Rc::new(RefCell::new(boxed));
6165 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6166
6167 for (price, update_ts, event_ts) in [
6171 ("100.00", 5_500_000_000_u64, 7_000_000_000_u64),
6172 ("101.00", 7_500_000_000_u64, 8_000_000_000_u64),
6173 ("102.00", 9_000_000_000_u64, 10_000_000_000_u64),
6174 ] {
6175 rc.borrow_mut().update(
6176 Price::from(price),
6177 Quantity::from(1),
6178 UnixNanos::from(update_ts),
6179 );
6180 rc.borrow_mut().build_bar(&TimeEvent::new(
6181 event_name,
6182 UUID4::new(),
6183 UnixNanos::from(event_ts),
6184 UnixNanos::from(event_ts),
6185 ));
6186 }
6187
6188 rc.borrow_mut().update(
6190 Price::from("103.00"),
6191 Quantity::from(1),
6192 UnixNanos::from(10_500_000_000),
6193 );
6194 rc.borrow_mut().build_bar(&TimeEvent::new(
6195 event_name,
6196 UUID4::new(),
6197 UnixNanos::from(11_000_000_000),
6198 UnixNanos::from(11_000_000_000),
6199 ));
6200
6201 let bars = handler.lock().expect(MUTEX_POISONED);
6202 assert_eq!(bars.len(), 1);
6203 assert_eq!(bars[0].close, Price::from("103.00"));
6204 }
6205
6206 #[rstest]
6207 fn test_time_bar_skip_first_non_full_bar_skips_when_build_delay_shifts_start(
6208 equity_aapl: Equity,
6209 ) {
6210 let instrument = InstrumentAny::Equity(equity_aapl);
6215 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6216 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6217 let handler = Arc::new(Mutex::new(Vec::new()));
6218 let handler_clone = Arc::clone(&handler);
6219 let clock = Rc::new(RefCell::new(TestClock::new()));
6220 clock.borrow_mut().set_time(UnixNanos::from(2_000_000_000));
6221 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6222
6223 let aggregator = TimeBarAggregator::new(
6224 bar_type,
6225 instrument.price_precision(),
6226 instrument.size_precision(),
6227 clock,
6228 move |bar: Bar| {
6229 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6230 h.push(bar);
6231 },
6232 false,
6233 false,
6234 BarIntervalType::LeftOpen,
6235 None,
6236 100, true, );
6239
6240 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6241 let rc = Rc::new(RefCell::new(boxed));
6242 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6243
6244 rc.borrow_mut().update(
6246 Price::from("100.00"),
6247 Quantity::from(1),
6248 UnixNanos::from(2_500_000_000),
6249 );
6250 rc.borrow_mut().build_bar(&TimeEvent::new(
6251 event_name,
6252 UUID4::new(),
6253 UnixNanos::from(3_000_100_000),
6254 UnixNanos::from(3_000_100_000),
6255 ));
6256 rc.borrow_mut().update(
6257 Price::from("101.00"),
6258 Quantity::from(1),
6259 UnixNanos::from(3_500_000_000),
6260 );
6261 rc.borrow_mut().build_bar(&TimeEvent::new(
6262 event_name,
6263 UUID4::new(),
6264 UnixNanos::from(4_000_100_000),
6265 UnixNanos::from(4_000_100_000),
6266 ));
6267
6268 let bars = handler.lock().expect(MUTEX_POISONED);
6269 assert_eq!(bars.len(), 1);
6270 assert_eq!(bars[0].close, Price::from("101.00"));
6271 }
6272
6273 #[rstest]
6274 #[case(
6275 BarAggregation::Month,
6276 1_735_689_600_000_000_000_u64,
6277 1_733_011_200_000_000_000_u64
6278 )]
6279 #[case(
6280 BarAggregation::Year,
6281 1_735_689_600_000_000_000_u64,
6282 1_704_067_200_000_000_000_u64
6283 )]
6284 fn test_time_bar_fire_immediately_month_year_stored_open_points_to_previous_period(
6285 equity_aapl: Equity,
6286 #[case] aggregation: BarAggregation,
6287 #[case] start_ns: u64,
6288 #[case] expected_stored_open_ns: u64,
6289 ) {
6290 let instrument = InstrumentAny::Equity(equity_aapl);
6295 let bar_spec = BarSpecification::new(1, aggregation, PriceType::Last);
6296 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6297 let handler = Arc::new(Mutex::new(Vec::new()));
6298 let handler_clone = Arc::clone(&handler);
6299 let clock = Rc::new(RefCell::new(TestClock::new()));
6300 clock.borrow_mut().set_time(UnixNanos::from(start_ns));
6301 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6302
6303 let aggregator = TimeBarAggregator::new(
6304 bar_type,
6305 instrument.price_precision(),
6306 instrument.size_precision(),
6307 clock,
6308 move |bar: Bar| {
6309 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6310 h.push(bar);
6311 },
6312 false,
6313 false,
6314 BarIntervalType::RightOpen, None,
6316 0,
6317 false, );
6319
6320 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6321 let rc = Rc::new(RefCell::new(boxed));
6322 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6323
6324 rc.borrow_mut().update(
6325 Price::from("100.00"),
6326 Quantity::from(1),
6327 UnixNanos::from(start_ns),
6328 );
6329 rc.borrow_mut().build_bar(&TimeEvent::new(
6330 event_name,
6331 UUID4::new(),
6332 UnixNanos::from(start_ns),
6333 UnixNanos::from(start_ns),
6334 ));
6335
6336 let bars = handler.lock().expect(MUTEX_POISONED);
6337 assert_eq!(bars.len(), 1);
6338 assert_eq!(bars[0].ts_event, UnixNanos::from(expected_stored_open_ns));
6339 assert_eq!(bars[0].ts_init, UnixNanos::from(start_ns));
6340 }
6341
6342 #[rstest]
6343 fn test_time_bar_historical_prevents_bars_for_timer_before_last_data(equity_aapl: Equity) {
6344 let instrument = InstrumentAny::Equity(equity_aapl);
6345 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
6346 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6347 let handler = Arc::new(Mutex::new(Vec::new()));
6348 let handler_clone = Arc::clone(&handler);
6349 let clock = Rc::new(RefCell::new(TestClock::new()));
6350
6351 let mut agg = TimeBarAggregator::new(
6352 bar_type,
6353 instrument.price_precision(),
6354 instrument.size_precision(),
6355 clock.clone(),
6356 move |bar: Bar| {
6357 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6358 h.push(bar);
6359 },
6360 true,
6361 true,
6362 BarIntervalType::LeftOpen,
6363 None,
6364 0,
6365 false,
6366 );
6367 agg.historical_mode = true;
6368 agg.set_clock_internal(clock);
6369 let boxed: Box<dyn BarAggregator> = Box::new(agg);
6370 let rc = Rc::new(RefCell::new(boxed));
6371 rc.borrow_mut().set_aggregator_weak(Rc::downgrade(&rc));
6372
6373 let ts1 = UnixNanos::from(2_000_000_000);
6374 rc.borrow_mut()
6375 .update(Price::from("100.00"), Quantity::from(1), ts1);
6376
6377 let ts2 = UnixNanos::from(3_000_000_000);
6378 rc.borrow_mut()
6379 .update(Price::from("101.00"), Quantity::from(1), ts2);
6380
6381 let bars = handler.lock().expect(MUTEX_POISONED);
6382 assert!(
6383 !bars.is_empty(),
6384 "advancing time from ts1 to ts2 should produce at least one bar"
6385 );
6386 assert_eq!(bars[0].close, Price::from("100.00"));
6387 }
6388}
6389
6390#[cfg(test)]
6391mod property_tests {
6392 use std::{
6393 cell::RefCell,
6394 rc::Rc,
6395 sync::{Arc, Mutex},
6396 };
6397
6398 use nautilus_common::{clock::TestClock, timer::TimeEvent};
6399 use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
6400 use nautilus_model::{
6401 data::{Bar, BarSpecification, BarType, bar::get_bar_interval_ns},
6402 enums::{AggregationSource, BarAggregation, BarIntervalType, PriceType},
6403 instruments::{Instrument, InstrumentAny, stubs::equity_aapl},
6404 types::{Price, Quantity},
6405 };
6406 use proptest::prelude::*;
6407 use rstest::rstest;
6408 use ustr::Ustr;
6409
6410 use super::*;
6411
6412 fn aggregation_strategy() -> impl Strategy<Value = BarAggregation> {
6413 prop_oneof![
6414 Just(BarAggregation::Second),
6415 Just(BarAggregation::Minute),
6416 Just(BarAggregation::Hour),
6417 ]
6418 }
6419
6420 fn interval_type_strategy() -> impl Strategy<Value = BarIntervalType> {
6421 prop_oneof![
6422 Just(BarIntervalType::LeftOpen),
6423 Just(BarIntervalType::RightOpen),
6424 ]
6425 }
6426
6427 proptest! {
6428 #[rstest]
6429 fn prop_skip_first_drops_partial_then_emits(
6430 aggregation in aggregation_strategy(),
6431 step in 1usize..=5,
6432 interval_type in interval_type_strategy(),
6433 skip_first in any::<bool>(),
6434 ) {
6435 let instrument = InstrumentAny::Equity(equity_aapl());
6436 let bar_spec = BarSpecification::new(step, aggregation, PriceType::Last);
6437 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6438 let interval_ns = get_bar_interval_ns(&bar_type).as_u64();
6439
6440 let now_ns = interval_ns + interval_ns / 2;
6443
6444 let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6445 let handler_clone = Arc::clone(&handler);
6446 let clock = Rc::new(RefCell::new(TestClock::new()));
6447 clock.borrow_mut().set_time(UnixNanos::from(now_ns));
6448 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6449
6450 let aggregator = TimeBarAggregator::new(
6451 bar_type,
6452 instrument.price_precision(),
6453 instrument.size_precision(),
6454 clock,
6455 move |bar: Bar| {
6456 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6457 h.push(bar);
6458 },
6459 false,
6460 false,
6461 interval_type,
6462 None,
6463 0,
6464 skip_first,
6465 );
6466
6467 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6468 let rc = Rc::new(RefCell::new(boxed));
6469 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6470
6471 rc.borrow_mut().update(
6474 Price::from("100.00"),
6475 Quantity::from(1),
6476 UnixNanos::from(now_ns),
6477 );
6478 let first_close = 2 * interval_ns;
6479 rc.borrow_mut().build_bar(&TimeEvent::new(
6480 event_name,
6481 UUID4::new(),
6482 UnixNanos::from(first_close),
6483 UnixNanos::from(first_close),
6484 ));
6485
6486 rc.borrow_mut().update(
6488 Price::from("101.00"),
6489 Quantity::from(1),
6490 UnixNanos::from(first_close + interval_ns / 2),
6491 );
6492 let second_close = first_close + interval_ns;
6493 rc.borrow_mut().build_bar(&TimeEvent::new(
6494 event_name,
6495 UUID4::new(),
6496 UnixNanos::from(second_close),
6497 UnixNanos::from(second_close),
6498 ));
6499
6500 let bars = handler.lock().expect(MUTEX_POISONED);
6501 let expected = if skip_first { 1 } else { 2 };
6502 prop_assert_eq!(bars.len(), expected);
6503 prop_assert_eq!(bars.last().unwrap().close, Price::from("101.00"));
6504 for bar in bars.iter() {
6505 prop_assert!(bar.high >= bar.open);
6506 prop_assert!(bar.high >= bar.close);
6507 prop_assert!(bar.low <= bar.open);
6508 prop_assert!(bar.low <= bar.close);
6509 }
6510 }
6511
6512 #[rstest]
6513 fn prop_skip_first_noop_on_exact_boundary(
6514 aggregation in aggregation_strategy(),
6515 step in 1usize..=5,
6516 interval_type in interval_type_strategy(),
6517 ) {
6518 let instrument = InstrumentAny::Equity(equity_aapl());
6519 let bar_spec = BarSpecification::new(step, aggregation, PriceType::Last);
6520 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6521 let interval_ns = get_bar_interval_ns(&bar_type).as_u64();
6522
6523 let now_ns = interval_ns;
6526 let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6527 let handler_clone = Arc::clone(&handler);
6528 let clock = Rc::new(RefCell::new(TestClock::new()));
6529 clock.borrow_mut().set_time(UnixNanos::from(now_ns));
6530 let event_name = Ustr::from(&format!("TIME_BAR_{bar_type}"));
6531
6532 let aggregator = TimeBarAggregator::new(
6533 bar_type,
6534 instrument.price_precision(),
6535 instrument.size_precision(),
6536 clock,
6537 move |bar: Bar| {
6538 let mut h = handler_clone.lock().expect(MUTEX_POISONED);
6539 h.push(bar);
6540 },
6541 false,
6542 false,
6543 interval_type,
6544 None,
6545 0,
6546 true, );
6548
6549 let boxed: Box<dyn BarAggregator> = Box::new(aggregator);
6550 let rc = Rc::new(RefCell::new(boxed));
6551 rc.borrow_mut().start_timer(Some(Rc::clone(&rc)));
6552
6553 rc.borrow_mut().update(
6554 Price::from("100.00"),
6555 Quantity::from(1),
6556 UnixNanos::from(now_ns),
6557 );
6558 let next_close = now_ns + interval_ns;
6559 rc.borrow_mut().build_bar(&TimeEvent::new(
6560 event_name,
6561 UUID4::new(),
6562 UnixNanos::from(next_close),
6563 UnixNanos::from(next_close),
6564 ));
6565
6566 let bars = handler.lock().expect(MUTEX_POISONED);
6567 prop_assert_eq!(bars.len(), 1);
6568 prop_assert_eq!(bars[0].close, Price::from("100.00"));
6569 }
6570
6571 #[rstest]
6572 fn prop_bar_builder_ohlc_invariants(
6573 updates in prop::collection::vec((1i64..=100_000i64, 1u64..=1_000u64), 1..=50),
6574 ) {
6575 let instrument = InstrumentAny::Equity(equity_aapl());
6576 let bar_spec = BarSpecification::new(1, BarAggregation::Tick, PriceType::Last);
6577 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6578 let mut builder = BarBuilder::new(bar_type, 2, 0);
6579
6580 let mut total_volume: u64 = 0;
6581
6582 for (i, (price_cents, size)) in updates.iter().enumerate() {
6583 let price = Price::new((*price_cents as f64) / 100.0, 2);
6584 let qty = Quantity::new(*size as f64, 0);
6585 let ts = UnixNanos::from((i as u64 + 1) * 1_000);
6586 total_volume += *size;
6587 builder.update(price, qty, ts);
6588 }
6589
6590 let bar = builder.build_now();
6591 prop_assert!(bar.low <= bar.open);
6592 prop_assert!(bar.low <= bar.close);
6593 prop_assert!(bar.high >= bar.open);
6594 prop_assert!(bar.high >= bar.close);
6595 prop_assert!(bar.low <= bar.high);
6596 prop_assert_eq!(bar.volume.as_f64(), total_volume as f64);
6597 }
6598
6599 #[rstest]
6600 fn prop_tick_bar_aggregator_volume_conservation(
6601 ticks in prop::collection::vec((1i64..=1_000i64, 1u64..=100u64), 3..=60),
6602 step in 1usize..=5,
6603 ) {
6604 let instrument = InstrumentAny::Equity(equity_aapl());
6605 let bar_spec = BarSpecification::new(step, BarAggregation::Tick, PriceType::Last);
6606 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6607 let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6608 let handler_clone = Arc::clone(&handler);
6609
6610 let mut aggregator = TickBarAggregator::new(
6611 bar_type,
6612 instrument.price_precision(),
6613 instrument.size_precision(),
6614 move |bar: Bar| {
6615 handler_clone.lock().expect(MUTEX_POISONED).push(bar);
6616 },
6617 );
6618
6619 let mut total_input: u64 = 0;
6620
6621 for (i, (price_cents, size)) in ticks.iter().enumerate() {
6622 let price = Price::new((*price_cents as f64) / 100.0, 2);
6623 let qty = Quantity::new(*size as f64, 0);
6624 aggregator.update(price, qty, UnixNanos::from((i as u64 + 1) * 1_000));
6625 total_input += *size;
6626 }
6627
6628 let bars = handler.lock().expect(MUTEX_POISONED);
6629 let emitted_count = bars.len();
6630 prop_assert_eq!(emitted_count, ticks.len() / step);
6631
6632 let mut sum_emitted: f64 = 0.0;
6633
6634 for bar in bars.iter() {
6635 prop_assert!(bar.low <= bar.open);
6636 prop_assert!(bar.low <= bar.close);
6637 prop_assert!(bar.high >= bar.open);
6638 prop_assert!(bar.high >= bar.close);
6639 sum_emitted += bar.volume.as_f64();
6640 }
6641
6642 let pending_size: u64 = ticks.iter()
6644 .skip(emitted_count * step)
6645 .map(|(_, s)| *s)
6646 .sum();
6647 prop_assert!((sum_emitted + pending_size as f64 - total_input as f64).abs() < 1e-6);
6648 }
6649
6650 #[rstest]
6651 fn prop_volume_bar_aggregator_conservation(
6652 sizes in prop::collection::vec(1u64..=50u64, 3..=40),
6653 step in 2u64..=10u64,
6654 ) {
6655 let instrument = InstrumentAny::Equity(equity_aapl());
6656 let bar_spec = BarSpecification::new(step as usize, BarAggregation::Volume, PriceType::Last);
6657 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6658 let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6659 let handler_clone = Arc::clone(&handler);
6660
6661 let mut aggregator = VolumeBarAggregator::new(
6662 bar_type,
6663 instrument.price_precision(),
6664 instrument.size_precision(),
6665 move |bar: Bar| {
6666 handler_clone.lock().expect(MUTEX_POISONED).push(bar);
6667 },
6668 );
6669
6670 let mut total_input: u64 = 0;
6671
6672 for (i, size) in sizes.iter().enumerate() {
6673 aggregator.update(
6674 Price::from("100.00"),
6675 Quantity::new(*size as f64, 0),
6676 UnixNanos::from((i as u64 + 1) * 1_000),
6677 );
6678 total_input += *size;
6679 }
6680
6681 let bars = handler.lock().expect(MUTEX_POISONED);
6682
6683 for bar in bars.iter() {
6685 prop_assert_eq!(bar.volume, Quantity::from(step));
6686 prop_assert!(bar.low <= bar.open);
6687 prop_assert!(bar.low <= bar.close);
6688 prop_assert!(bar.high >= bar.open);
6689 prop_assert!(bar.high >= bar.close);
6690 }
6691
6692 let emitted_total: u64 = bars.len() as u64 * step;
6694 let pending = aggregator.core.builder.volume.as_f64();
6695 prop_assert!((emitted_total as f64 + pending - total_input as f64).abs() < 1e-6);
6696 }
6697
6698 #[rstest]
6699 fn prop_value_bar_aggregator_ohlc_invariants(
6700 ticks in prop::collection::vec((50i64..=500i64, 1u64..=20u64), 2..=30),
6701 step in 100u64..=2_000u64,
6702 ) {
6703 let instrument = InstrumentAny::Equity(equity_aapl());
6704 let bar_spec = BarSpecification::new(step as usize, BarAggregation::Value, PriceType::Last);
6705 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
6706 let handler = Arc::new(Mutex::new(Vec::<Bar>::new()));
6707 let handler_clone = Arc::clone(&handler);
6708
6709 let mut aggregator = ValueBarAggregator::new(
6710 bar_type,
6711 instrument.price_precision(),
6712 instrument.size_precision(),
6713 move |bar: Bar| {
6714 handler_clone.lock().expect(MUTEX_POISONED).push(bar);
6715 },
6716 );
6717
6718 for (i, (price_cents, size)) in ticks.iter().enumerate() {
6719 aggregator.update(
6720 Price::new((*price_cents as f64) / 100.0, 2),
6721 Quantity::new(*size as f64, 0),
6722 UnixNanos::from((i as u64 + 1) * 1_000),
6723 );
6724 }
6725
6726 let bars = handler.lock().expect(MUTEX_POISONED);
6727 for bar in bars.iter() {
6728 prop_assert!(bar.low <= bar.open);
6729 prop_assert!(bar.low <= bar.close);
6730 prop_assert!(bar.high >= bar.open);
6731 prop_assert!(bar.high >= bar.close);
6732 prop_assert!(bar.volume.as_f64() > 0.0);
6733 }
6734 }
6735 }
6736}