1use indexmap::IndexMap;
24use nautilus_core::UnixNanos;
25use nautilus_model::{
26 enums::{LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified, TimeInForce},
27 identifiers::{AccountId, InstrumentId, VenueOrderId},
28 instruments::{Instrument, InstrumentAny},
29 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
30 types::{Money, Price, Quantity},
31};
32use rust_decimal::Decimal;
33
34use super::{
35 ids::{create_synthetic_trade_id, create_synthetic_venue_order_id},
36 types::{FillAdjustmentResult, FillSnapshot, ReconciliationResult, VenuePositionSnapshot},
37};
38
39const DEFAULT_TOLERANCE: Decimal = Decimal::from_parts(1, 0, 0, false, 4); #[must_use]
47pub fn simulate_position(fills: &[FillSnapshot]) -> (Decimal, Decimal) {
48 let mut qty = Decimal::ZERO;
49 let mut value = Decimal::ZERO;
50
51 for fill in fills {
52 debug_assert!(
53 fill.qty > Decimal::ZERO,
54 "fill snapshot qty must be positive, received {}",
55 fill.qty,
56 );
57 let direction = Decimal::from(fill.direction());
58 let new_qty = qty + (direction * fill.qty);
59
60 if (qty >= Decimal::ZERO && direction > Decimal::ZERO)
62 || (qty <= Decimal::ZERO && direction < Decimal::ZERO)
63 {
64 value += fill.qty * fill.px;
66 qty = new_qty;
67 } else {
68 if qty.abs() >= fill.qty {
70 let close_ratio = fill.qty / qty.abs();
72 value *= Decimal::ONE - close_ratio;
73 qty = new_qty;
74 } else {
75 let remaining = fill.qty - qty.abs();
77 qty = direction * remaining;
78 value = remaining * fill.px;
79 }
80 }
81 }
82
83 debug_assert!(
84 value >= Decimal::ZERO,
85 "simulated position value must be non-negative, was {value}",
86 );
87 debug_assert!(
88 !(qty != Decimal::ZERO && value.is_sign_negative()),
89 "simulated avg price invariant: qty={qty}, value={value}",
90 );
91
92 (qty, value)
93}
94
95#[must_use]
104pub fn detect_zero_crossings(fills: &[FillSnapshot]) -> Vec<u64> {
105 let mut running_qty = Decimal::ZERO;
106 let mut zero_crossings = Vec::new();
107
108 for fill in fills {
109 let prev_qty = running_qty;
110 running_qty += Decimal::from(fill.direction()) * fill.qty;
111
112 if prev_qty != Decimal::ZERO {
114 if running_qty == Decimal::ZERO {
115 zero_crossings.push(fill.ts_event);
117 } else if (prev_qty > Decimal::ZERO) != (running_qty > Decimal::ZERO) {
118 zero_crossings.push(fill.ts_event);
120 }
121 }
122 }
123
124 zero_crossings
125}
126
127#[must_use]
133pub fn check_position_match(
134 simulated_qty: Decimal,
135 simulated_value: Decimal,
136 venue_qty: Decimal,
137 venue_avg_px: Decimal,
138 tolerance: Decimal,
139) -> bool {
140 if simulated_qty != venue_qty {
141 return false;
142 }
143
144 if simulated_qty == Decimal::ZERO {
145 return true; }
147
148 let abs_qty = simulated_qty.abs();
150 if abs_qty == Decimal::ZERO {
151 return false;
152 }
153
154 let simulated_avg_px = simulated_value / abs_qty;
155
156 if venue_avg_px == Decimal::ZERO {
158 return false;
159 }
160
161 let relative_diff = (simulated_avg_px - venue_avg_px).abs() / venue_avg_px;
162
163 relative_diff <= tolerance
164}
165
166pub fn calculate_reconciliation_price(
184 current_position_qty: Decimal,
185 current_position_avg_px: Option<Decimal>,
186 target_position_qty: Decimal,
187 target_position_avg_px: Option<Decimal>,
188) -> Option<Decimal> {
189 let qty_diff = target_position_qty - current_position_qty;
190
191 if qty_diff == Decimal::ZERO {
192 return None; }
194
195 if target_position_qty == Decimal::ZERO {
198 return current_position_avg_px;
199 }
200
201 let target_avg_px = target_position_avg_px?;
203 if target_avg_px == Decimal::ZERO {
204 return None;
205 }
206
207 if current_position_qty == Decimal::ZERO || current_position_avg_px.is_none() {
209 return Some(target_avg_px);
210 }
211
212 let current_avg_px = current_position_avg_px?;
213
214 let is_flip = (current_position_qty > Decimal::ZERO) != (target_position_qty > Decimal::ZERO)
217 && target_position_qty != Decimal::ZERO;
218
219 if is_flip {
220 return Some(target_avg_px);
221 }
222
223 let target_value = target_position_qty * target_avg_px;
226 let current_value = current_position_qty * current_avg_px;
227 let diff_value = target_value - current_value;
228
229 let reconciliation_px = diff_value / qty_diff;
231
232 if reconciliation_px > Decimal::ZERO {
234 return Some(reconciliation_px);
235 }
236
237 None
238}
239
240#[must_use]
250#[expect(clippy::missing_panics_doc)] pub fn adjust_fills_for_partial_window(
252 fills: &[FillSnapshot],
253 venue_position: &VenuePositionSnapshot,
254 _instrument: &InstrumentAny,
255 tolerance: Decimal,
256) -> FillAdjustmentResult {
257 if fills.is_empty() {
259 return FillAdjustmentResult::NoAdjustment;
260 }
261
262 if venue_position.qty == Decimal::ZERO {
264 return FillAdjustmentResult::NoAdjustment;
265 }
266
267 let zero_crossings = detect_zero_crossings(fills);
269
270 let venue_qty_signed = match venue_position.side {
272 OrderSide::Buy => venue_position.qty,
273 OrderSide::Sell => -venue_position.qty,
274 _ => Decimal::ZERO,
275 };
276
277 if !zero_crossings.is_empty() {
279 let mut last_flat_crossing_ts = None;
282 let mut running_qty = Decimal::ZERO;
283
284 for fill in fills {
285 let prev_qty = running_qty;
286 running_qty += Decimal::from(fill.direction()) * fill.qty;
287
288 if prev_qty != Decimal::ZERO && running_qty == Decimal::ZERO {
289 last_flat_crossing_ts = Some(fill.ts_event);
290 }
291 }
292
293 let lifecycle_boundary_ts =
294 last_flat_crossing_ts.unwrap_or(*zero_crossings.last().unwrap());
295
296 let current_lifecycle_fills: Vec<FillSnapshot> = fills
298 .iter()
299 .filter(|f| f.ts_event > lifecycle_boundary_ts)
300 .cloned()
301 .collect();
302
303 if current_lifecycle_fills.is_empty() {
304 return FillAdjustmentResult::NoAdjustment;
305 }
306
307 let (current_qty, current_value) = simulate_position(¤t_lifecycle_fills);
309
310 if check_position_match(
312 current_qty,
313 current_value,
314 venue_qty_signed,
315 venue_position.avg_px,
316 tolerance,
317 ) {
318 return FillAdjustmentResult::FilterToCurrentLifecycle {
320 last_zero_crossing_ts: lifecycle_boundary_ts,
321 current_lifecycle_fills,
322 };
323 }
324
325 if let Some(first_fill) = current_lifecycle_fills.first() {
327 let synthetic_fill = FillSnapshot::new(
328 first_fill.ts_event.saturating_sub(1), venue_position.side,
330 venue_position.qty,
331 venue_position.avg_px,
332 first_fill.venue_order_id,
333 );
334
335 return FillAdjustmentResult::ReplaceCurrentLifecycle {
336 synthetic_fill,
337 first_venue_order_id: first_fill.venue_order_id,
338 };
339 }
340
341 return FillAdjustmentResult::NoAdjustment;
342 }
343
344 let oldest_lifecycle_fills: Vec<FillSnapshot> =
347 if let Some(&first_zero_crossing_ts) = zero_crossings.first() {
348 fills
350 .iter()
351 .filter(|f| f.ts_event <= first_zero_crossing_ts)
352 .cloned()
353 .collect()
354 } else {
355 fills.to_vec()
357 };
358
359 if oldest_lifecycle_fills.is_empty() {
360 return FillAdjustmentResult::NoAdjustment;
361 }
362
363 let (oldest_qty, oldest_value) = simulate_position(&oldest_lifecycle_fills);
365
366 if zero_crossings.is_empty() {
368 if check_position_match(
370 oldest_qty,
371 oldest_value,
372 venue_qty_signed,
373 venue_position.avg_px,
374 tolerance,
375 ) {
376 return FillAdjustmentResult::NoAdjustment;
377 }
378
379 if let Some(first_fill) = oldest_lifecycle_fills.first() {
381 let oldest_avg_px = if oldest_qty == Decimal::ZERO {
384 None
385 } else {
386 Some(oldest_value / oldest_qty.abs())
387 };
388
389 let reconciliation_price = calculate_reconciliation_price(
390 oldest_qty,
391 oldest_avg_px,
392 venue_qty_signed,
393 Some(venue_position.avg_px),
394 );
395
396 if let Some(opening_px) = reconciliation_price {
397 let opening_qty = if oldest_qty == Decimal::ZERO {
399 venue_qty_signed
400 } else {
401 venue_qty_signed - oldest_qty
403 };
404
405 if opening_qty.abs() > Decimal::ZERO {
406 let synthetic_side = if opening_qty > Decimal::ZERO {
407 OrderSide::Buy
408 } else {
409 OrderSide::Sell
410 };
411
412 let synthetic_fill = FillSnapshot::new(
413 first_fill.ts_event.saturating_sub(1),
414 synthetic_side,
415 opening_qty.abs(),
416 opening_px,
417 first_fill.venue_order_id,
418 );
419
420 return FillAdjustmentResult::AddSyntheticOpening {
421 synthetic_fill,
422 existing_fills: oldest_lifecycle_fills,
423 };
424 }
425 }
426 }
427
428 return FillAdjustmentResult::NoAdjustment;
429 }
430
431 if oldest_qty == Decimal::ZERO {
433 return FillAdjustmentResult::NoAdjustment;
435 }
436
437 if !oldest_lifecycle_fills.is_empty()
439 && let Some(&first_zero_crossing_ts) = zero_crossings.first()
440 {
441 let current_lifecycle_fills: Vec<FillSnapshot> = fills
443 .iter()
444 .filter(|f| f.ts_event > first_zero_crossing_ts)
445 .cloned()
446 .collect();
447
448 if !current_lifecycle_fills.is_empty()
449 && let Some(first_current_fill) = current_lifecycle_fills.first()
450 {
451 let synthetic_fill = FillSnapshot::new(
452 first_current_fill.ts_event.saturating_sub(1),
453 venue_position.side,
454 venue_position.qty,
455 venue_position.avg_px,
456 first_current_fill.venue_order_id,
457 );
458
459 return FillAdjustmentResult::AddSyntheticOpening {
460 synthetic_fill,
461 existing_fills: oldest_lifecycle_fills,
462 };
463 }
464 }
465
466 FillAdjustmentResult::NoAdjustment
467}
468
469pub fn create_synthetic_order_report(
479 fill: &FillSnapshot,
480 account_id: AccountId,
481 instrument_id: InstrumentId,
482 instrument: &InstrumentAny,
483 venue_order_id: VenueOrderId,
484) -> anyhow::Result<OrderStatusReport> {
485 let order_qty = Quantity::from_decimal_dp(fill.qty, instrument.size_precision())?;
486
487 let mut report = OrderStatusReport::new(
488 account_id,
489 instrument_id,
490 None, venue_order_id,
492 fill.side,
493 OrderType::Market,
494 TimeInForce::Gtc,
495 OrderStatus::Filled,
496 order_qty,
497 order_qty, UnixNanos::from(fill.ts_event),
499 UnixNanos::from(fill.ts_event),
500 UnixNanos::from(fill.ts_event),
501 None, );
503 report.avg_px = Some(fill.px);
504 Ok(report)
505}
506
507pub fn create_synthetic_fill_report(
513 fill: &FillSnapshot,
514 account_id: AccountId,
515 instrument_id: InstrumentId,
516 instrument: &InstrumentAny,
517 venue_order_id: VenueOrderId,
518) -> anyhow::Result<FillReport> {
519 let trade_id = create_synthetic_trade_id(fill);
520 let qty = Quantity::from_decimal_dp(fill.qty, instrument.size_precision())?;
521 let px = Price::from_decimal_dp(fill.px, instrument.price_precision())?;
522
523 Ok(FillReport::new(
524 account_id,
525 instrument_id,
526 venue_order_id,
527 trade_id,
528 fill.side,
529 qty,
530 px,
531 Money::new(0.0, instrument.quote_currency()),
532 LiquiditySide::NoLiquiditySide,
533 None, None, fill.ts_event.into(),
536 fill.ts_event.into(),
537 None, ))
539}
540
541pub fn process_mass_status_for_reconciliation(
552 mass_status: &ExecutionMassStatus,
553 instrument: &InstrumentAny,
554 tolerance: Option<Decimal>,
555) -> anyhow::Result<ReconciliationResult> {
556 let instrument_id = instrument.id();
557 let account_id = mass_status.account_id;
558 let tol = tolerance.unwrap_or(DEFAULT_TOLERANCE);
559
560 let position_reports = mass_status.position_reports();
562 let venue_position = match position_reports.get(&instrument_id).and_then(|r| r.first()) {
563 Some(report) => position_report_to_snapshot(report),
564 None => {
565 return Ok(extract_instrument_reports(mass_status, instrument_id));
567 }
568 };
569
570 let extracted = extract_fills_for_instrument(mass_status, instrument_id);
572 let fill_snapshots = extracted.snapshots;
573 let mut order_map = extracted.orders;
574 let mut fill_map = extracted.fills;
575
576 if fill_snapshots.is_empty() {
577 return Ok(ReconciliationResult {
578 orders: order_map,
579 fills: fill_map,
580 });
581 }
582
583 let result = adjust_fills_for_partial_window(&fill_snapshots, &venue_position, instrument, tol);
585
586 match result {
588 FillAdjustmentResult::NoAdjustment => {}
589
590 FillAdjustmentResult::AddSyntheticOpening {
591 synthetic_fill,
592 existing_fills: _,
593 } => {
594 let venue_order_id = create_synthetic_venue_order_id(&synthetic_fill, instrument_id);
595 let order = create_synthetic_order_report(
596 &synthetic_fill,
597 account_id,
598 instrument_id,
599 instrument,
600 venue_order_id,
601 )?;
602 let fill = create_synthetic_fill_report(
603 &synthetic_fill,
604 account_id,
605 instrument_id,
606 instrument,
607 venue_order_id,
608 )?;
609
610 order_map.insert(venue_order_id, order);
611 fill_map.entry(venue_order_id).or_default().insert(0, fill);
612 }
613
614 FillAdjustmentResult::ReplaceCurrentLifecycle {
615 synthetic_fill,
616 first_venue_order_id,
617 } => {
618 let order = create_synthetic_order_report(
619 &synthetic_fill,
620 account_id,
621 instrument_id,
622 instrument,
623 first_venue_order_id,
624 )?;
625 let fill = create_synthetic_fill_report(
626 &synthetic_fill,
627 account_id,
628 instrument_id,
629 instrument,
630 first_venue_order_id,
631 )?;
632
633 order_map.clear();
635 fill_map.clear();
636 order_map.insert(first_venue_order_id, order);
637 fill_map.insert(first_venue_order_id, vec![fill]);
638 }
639
640 FillAdjustmentResult::FilterToCurrentLifecycle {
641 last_zero_crossing_ts,
642 current_lifecycle_fills: _,
643 } => {
644 for fills in fill_map.values_mut() {
646 fills.retain(|f| f.ts_event.as_u64() > last_zero_crossing_ts);
647 }
648 fill_map.retain(|_, fills| !fills.is_empty());
649
650 let orders_with_fills: ahash::AHashSet<VenueOrderId> =
652 fill_map.keys().copied().collect();
653 order_map.retain(|id, order| {
654 orders_with_fills.contains(id)
655 || !matches!(
656 order.order_status,
657 OrderStatus::Denied
658 | OrderStatus::Rejected
659 | OrderStatus::Canceled
660 | OrderStatus::Expired
661 | OrderStatus::Filled
662 )
663 });
664 }
665 }
666
667 Ok(ReconciliationResult {
668 orders: order_map,
669 fills: fill_map,
670 })
671}
672
673fn position_report_to_snapshot(report: &PositionStatusReport) -> VenuePositionSnapshot {
675 let side = match report.position_side {
676 PositionSideSpecified::Long => OrderSide::Buy,
677 PositionSideSpecified::Short => OrderSide::Sell,
678 PositionSideSpecified::Flat => OrderSide::Buy,
679 };
680
681 VenuePositionSnapshot {
682 side,
683 qty: report.quantity.into(),
684 avg_px: report.avg_px_open.unwrap_or(Decimal::ZERO),
685 }
686}
687
688fn extract_instrument_reports(
690 mass_status: &ExecutionMassStatus,
691 instrument_id: InstrumentId,
692) -> ReconciliationResult {
693 let mut orders = IndexMap::new();
694 let mut fills = IndexMap::new();
695
696 for (id, order) in mass_status.order_reports() {
697 if order.instrument_id == instrument_id {
698 orders.insert(id, order.clone());
699 }
700 }
701
702 for (id, fill_list) in mass_status.fill_reports() {
703 let filtered: Vec<_> = fill_list
704 .iter()
705 .filter(|f| f.instrument_id == instrument_id)
706 .cloned()
707 .collect();
708
709 if !filtered.is_empty() {
710 fills.insert(id, filtered);
711 }
712 }
713
714 ReconciliationResult { orders, fills }
715}
716
717struct ExtractedFills {
719 snapshots: Vec<FillSnapshot>,
720 orders: IndexMap<VenueOrderId, OrderStatusReport>,
721 fills: IndexMap<VenueOrderId, Vec<FillReport>>,
722}
723
724fn extract_fills_for_instrument(
726 mass_status: &ExecutionMassStatus,
727 instrument_id: InstrumentId,
728) -> ExtractedFills {
729 let mut snapshots = Vec::new();
730 let mut order_map = IndexMap::new();
731 let mut fill_map = IndexMap::new();
732
733 for (id, order) in mass_status.order_reports() {
735 if order.instrument_id == instrument_id {
736 order_map.insert(id, order.clone());
737 }
738 }
739
740 for (venue_order_id, fill_reports) in mass_status.fill_reports() {
742 for fill in fill_reports {
743 if fill.instrument_id == instrument_id {
744 let side = mass_status
745 .order_reports()
746 .get(&venue_order_id)
747 .map_or(fill.order_side, |o| o.order_side);
748
749 snapshots.push(FillSnapshot::new(
750 fill.ts_event.as_u64(),
751 side,
752 fill.last_qty.into(),
753 fill.last_px.into(),
754 venue_order_id,
755 ));
756
757 fill_map
758 .entry(venue_order_id)
759 .or_insert_with(Vec::new)
760 .push(fill.clone());
761 }
762 }
763 }
764
765 snapshots.sort_by_key(|f| f.ts_event);
767
768 ExtractedFills {
769 snapshots,
770 orders: order_map,
771 fills: fill_map,
772 }
773}
774
775pub fn check_position_reconciliation(
781 report: &PositionStatusReport,
782 cached_signed_qty: Decimal,
783 size_precision: Option<u8>,
784) -> bool {
785 let venue_signed_qty = report.signed_decimal_qty;
786
787 if venue_signed_qty == Decimal::ZERO && cached_signed_qty == Decimal::ZERO {
788 return true;
789 }
790
791 if let Some(precision) = size_precision
792 && is_within_single_unit_tolerance(cached_signed_qty, venue_signed_qty, precision)
793 {
794 log::debug!(
795 "Position for {} within tolerance: cached={}, venue={}",
796 report.instrument_id,
797 cached_signed_qty,
798 venue_signed_qty
799 );
800 return true;
801 }
802
803 if cached_signed_qty == venue_signed_qty {
804 return true;
805 }
806
807 log::warn!(
808 "Position discrepancy for {}: cached={}, venue={}",
809 report.instrument_id,
810 cached_signed_qty,
811 venue_signed_qty
812 );
813
814 false
815}
816
817#[must_use]
822pub fn is_within_single_unit_tolerance(value1: Decimal, value2: Decimal, precision: u8) -> bool {
823 if precision == 0 {
824 return value1 == value2;
825 }
826
827 let tolerance = Decimal::new(1, u32::from(precision));
828 let difference = (value1 - value2).abs();
829 difference <= tolerance
830}