Skip to main content

nautilus_execution/reconciliation/
positions.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Position-state reconciliation.
17//!
18//! Position simulation, partial-window fill reconstruction, mass-status processing,
19//! and final position-match checks. The core invariant maintained here is that the
20//! reconstructed position matches the venue's reported position within tolerance
21//! (default 0.01%) after reconciliation is applied.
22
23use indexmap::IndexMap;
24use nautilus_core::UnixNanos;
25use nautilus_model::{
26    enums::{LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified, TimeInForce},
27    identifiers::{AccountId, InstrumentId, VenueOrderId},
28    instruments::{Instrument, InstrumentAny},
29    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
30    types::{Money, Price, Quantity},
31};
32use rust_decimal::Decimal;
33
34use super::{
35    ids::{create_synthetic_trade_id, create_synthetic_venue_order_id},
36    types::{FillAdjustmentResult, FillSnapshot, ReconciliationResult, VenuePositionSnapshot},
37};
38
39const DEFAULT_TOLERANCE: Decimal = Decimal::from_parts(1, 0, 0, false, 4); // 0.0001
40
41/// Simulate position from chronologically ordered fills using netting logic.
42///
43/// # Returns
44///
45/// Returns a tuple of (quantity, value) after applying all fills.
46#[must_use]
47pub fn simulate_position(fills: &[FillSnapshot]) -> (Decimal, Decimal) {
48    let mut qty = Decimal::ZERO;
49    let mut value = Decimal::ZERO;
50
51    for fill in fills {
52        debug_assert!(
53            fill.qty > Decimal::ZERO,
54            "fill snapshot qty must be positive, received {}",
55            fill.qty,
56        );
57        let direction = Decimal::from(fill.direction());
58        let new_qty = qty + (direction * fill.qty);
59
60        // Check if we're accumulating or crossing zero (flip/close)
61        if (qty >= Decimal::ZERO && direction > Decimal::ZERO)
62            || (qty <= Decimal::ZERO && direction < Decimal::ZERO)
63        {
64            // Accumulating in same direction
65            value += fill.qty * fill.px;
66            qty = new_qty;
67        } else {
68            // Closing or flipping position
69            if qty.abs() >= fill.qty {
70                // Partial close - maintain average price by reducing value proportionally
71                let close_ratio = fill.qty / qty.abs();
72                value *= Decimal::ONE - close_ratio;
73                qty = new_qty;
74            } else {
75                // Close and flip - reset value to opening position
76                let remaining = fill.qty - qty.abs();
77                qty = direction * remaining;
78                value = remaining * fill.px;
79            }
80        }
81    }
82
83    debug_assert!(
84        value >= Decimal::ZERO,
85        "simulated position value must be non-negative, was {value}",
86    );
87    debug_assert!(
88        !(qty != Decimal::ZERO && value.is_sign_negative()),
89        "simulated avg price invariant: qty={qty}, value={value}",
90    );
91
92    (qty, value)
93}
94
95/// Detect zero-crossing timestamps in a sequence of fills.
96///
97/// A zero-crossing occurs when position quantity crosses through zero (FLAT).
98/// This includes both landing exactly on zero and flipping from long to short or vice versa.
99///
100/// # Returns
101///
102/// Returns a list of timestamps where position crosses through zero.
103#[must_use]
104pub fn detect_zero_crossings(fills: &[FillSnapshot]) -> Vec<u64> {
105    let mut running_qty = Decimal::ZERO;
106    let mut zero_crossings = Vec::new();
107
108    for fill in fills {
109        let prev_qty = running_qty;
110        running_qty += Decimal::from(fill.direction()) * fill.qty;
111
112        // Detect when position crosses zero
113        if prev_qty != Decimal::ZERO {
114            if running_qty == Decimal::ZERO {
115                // Landed exactly on zero
116                zero_crossings.push(fill.ts_event);
117            } else if (prev_qty > Decimal::ZERO) != (running_qty > Decimal::ZERO) {
118                // Sign changed - crossed through zero (flip)
119                zero_crossings.push(fill.ts_event);
120            }
121        }
122    }
123
124    zero_crossings
125}
126
127/// Check if simulated position matches venue position within tolerance.
128///
129/// # Returns
130///
131/// Returns true if quantities and average prices match within tolerance.
132#[must_use]
133pub fn check_position_match(
134    simulated_qty: Decimal,
135    simulated_value: Decimal,
136    venue_qty: Decimal,
137    venue_avg_px: Decimal,
138    tolerance: Decimal,
139) -> bool {
140    if simulated_qty != venue_qty {
141        return false;
142    }
143
144    if simulated_qty == Decimal::ZERO {
145        return true; // Both FLAT
146    }
147
148    // Guard against division by zero
149    let abs_qty = simulated_qty.abs();
150    if abs_qty == Decimal::ZERO {
151        return false;
152    }
153
154    let simulated_avg_px = simulated_value / abs_qty;
155
156    // If venue avg px is zero, we cannot calculate relative difference
157    if venue_avg_px == Decimal::ZERO {
158        return false;
159    }
160
161    let relative_diff = (simulated_avg_px - venue_avg_px).abs() / venue_avg_px;
162
163    relative_diff <= tolerance
164}
165
166/// Calculate the price needed for a reconciliation order to achieve target position.
167///
168/// This is a pure function that calculates what price a fill would need to have
169/// to move from the current position state to the target position state with the
170/// correct average price, accounting for the netting simulation logic.
171///
172/// # Returns
173///
174/// Returns `Some(Decimal)` if a valid reconciliation price can be calculated, `None` otherwise.
175///
176/// # Notes
177///
178/// The function handles four scenarios:
179/// 1. Position to flat: reconciliation_px = current_avg_px (close at current average)
180/// 2. Flat to position: reconciliation_px = target_avg_px
181/// 3. Position flip (sign change): reconciliation_px = target_avg_px (due to value reset in simulation)
182/// 4. Accumulation/reduction: weighted average formula
183pub fn calculate_reconciliation_price(
184    current_position_qty: Decimal,
185    current_position_avg_px: Option<Decimal>,
186    target_position_qty: Decimal,
187    target_position_avg_px: Option<Decimal>,
188) -> Option<Decimal> {
189    let qty_diff = target_position_qty - current_position_qty;
190
191    if qty_diff == Decimal::ZERO {
192        return None; // No reconciliation needed
193    }
194
195    // Special case: closing to flat (target_position_qty == 0)
196    // When flattening, the reconciliation price equals the current position's average price
197    if target_position_qty == Decimal::ZERO {
198        return current_position_avg_px;
199    }
200
201    // If target average price is not provided or zero, we cannot calculate
202    let target_avg_px = target_position_avg_px?;
203    if target_avg_px == Decimal::ZERO {
204        return None;
205    }
206
207    // If current position is flat, the reconciliation price equals target avg price
208    if current_position_qty == Decimal::ZERO || current_position_avg_px.is_none() {
209        return Some(target_avg_px);
210    }
211
212    let current_avg_px = current_position_avg_px?;
213
214    // Check if this is a flip scenario (sign change)
215    // In simulation, flips reset value to remaining * px, so reconciliation_px = target_avg_px
216    let is_flip = (current_position_qty > Decimal::ZERO) != (target_position_qty > Decimal::ZERO)
217        && target_position_qty != Decimal::ZERO;
218
219    if is_flip {
220        return Some(target_avg_px);
221    }
222
223    // For accumulation or reduction (same side), use weighted average formula
224    // Formula: (target_qty * target_avg_px) = (current_qty * current_avg_px) + (qty_diff * reconciliation_px)
225    let target_value = target_position_qty * target_avg_px;
226    let current_value = current_position_qty * current_avg_px;
227    let diff_value = target_value - current_value;
228
229    // qty_diff is guaranteed to be non-zero here due to early return at line 270
230    let reconciliation_px = diff_value / qty_diff;
231
232    // Ensure price is positive
233    if reconciliation_px > Decimal::ZERO {
234        return Some(reconciliation_px);
235    }
236
237    None
238}
239
240/// Adjust fills for partial reconciliation window to handle incomplete position lifecycles.
241///
242/// This function analyzes fills and determines if adjustments are needed when the reconciliation
243/// window doesn't capture the complete position history (missing opening fills).
244///
245/// # Returns
246///
247/// Returns `FillAdjustmentResult` indicating what adjustments (if any) are needed.
248///
249#[must_use]
250#[expect(clippy::missing_panics_doc)] // All unwraps guarded by prior checks
251pub fn adjust_fills_for_partial_window(
252    fills: &[FillSnapshot],
253    venue_position: &VenuePositionSnapshot,
254    _instrument: &InstrumentAny,
255    tolerance: Decimal,
256) -> FillAdjustmentResult {
257    // If no fills, nothing to adjust
258    if fills.is_empty() {
259        return FillAdjustmentResult::NoAdjustment;
260    }
261
262    // If venue position is FLAT, return unchanged
263    if venue_position.qty == Decimal::ZERO {
264        return FillAdjustmentResult::NoAdjustment;
265    }
266
267    // Detect zero-crossings
268    let zero_crossings = detect_zero_crossings(fills);
269
270    // Convert venue position to signed quantity
271    let venue_qty_signed = match venue_position.side {
272        OrderSide::Buy => venue_position.qty,
273        OrderSide::Sell => -venue_position.qty,
274        _ => Decimal::ZERO,
275    };
276
277    // Case 1: Has zero-crossings - focus on current lifecycle after last zero-crossing
278    if !zero_crossings.is_empty() {
279        // Find the last zero-crossing that lands on FLAT (qty==0)
280        // This separates lifecycles; flips within a lifecycle don't count
281        let mut last_flat_crossing_ts = None;
282        let mut running_qty = Decimal::ZERO;
283
284        for fill in fills {
285            let prev_qty = running_qty;
286            running_qty += Decimal::from(fill.direction()) * fill.qty;
287
288            if prev_qty != Decimal::ZERO && running_qty == Decimal::ZERO {
289                last_flat_crossing_ts = Some(fill.ts_event);
290            }
291        }
292
293        let lifecycle_boundary_ts =
294            last_flat_crossing_ts.unwrap_or(*zero_crossings.last().unwrap());
295
296        // Get fills from current lifecycle (after lifecycle boundary)
297        let current_lifecycle_fills: Vec<FillSnapshot> = fills
298            .iter()
299            .filter(|f| f.ts_event > lifecycle_boundary_ts)
300            .cloned()
301            .collect();
302
303        if current_lifecycle_fills.is_empty() {
304            return FillAdjustmentResult::NoAdjustment;
305        }
306
307        // Simulate current lifecycle
308        let (current_qty, current_value) = simulate_position(&current_lifecycle_fills);
309
310        // Check if current lifecycle matches venue
311        if check_position_match(
312            current_qty,
313            current_value,
314            venue_qty_signed,
315            venue_position.avg_px,
316            tolerance,
317        ) {
318            // Current lifecycle matches - filter out old lifecycles
319            return FillAdjustmentResult::FilterToCurrentLifecycle {
320                last_zero_crossing_ts: lifecycle_boundary_ts,
321                current_lifecycle_fills,
322            };
323        }
324
325        // Current lifecycle doesn't match - replace with synthetic fill
326        if let Some(first_fill) = current_lifecycle_fills.first() {
327            let synthetic_fill = FillSnapshot::new(
328                first_fill.ts_event.saturating_sub(1), // Timestamp before first fill
329                venue_position.side,
330                venue_position.qty,
331                venue_position.avg_px,
332                first_fill.venue_order_id,
333            );
334
335            return FillAdjustmentResult::ReplaceCurrentLifecycle {
336                synthetic_fill,
337                first_venue_order_id: first_fill.venue_order_id,
338            };
339        }
340
341        return FillAdjustmentResult::NoAdjustment;
342    }
343
344    // Case 2: Single lifecycle or one zero-crossing
345    // Determine which fills to analyze
346    let oldest_lifecycle_fills: Vec<FillSnapshot> =
347        if let Some(&first_zero_crossing_ts) = zero_crossings.first() {
348            // Get fills before first zero-crossing
349            fills
350                .iter()
351                .filter(|f| f.ts_event <= first_zero_crossing_ts)
352                .cloned()
353                .collect()
354        } else {
355            // No zero-crossings - all fills are in single lifecycle
356            fills.to_vec()
357        };
358
359    if oldest_lifecycle_fills.is_empty() {
360        return FillAdjustmentResult::NoAdjustment;
361    }
362
363    // Simulate oldest lifecycle
364    let (oldest_qty, oldest_value) = simulate_position(&oldest_lifecycle_fills);
365
366    // If single lifecycle (no zero-crossings)
367    if zero_crossings.is_empty() {
368        // Check if simulated position matches venue
369        if check_position_match(
370            oldest_qty,
371            oldest_value,
372            venue_qty_signed,
373            venue_position.avg_px,
374            tolerance,
375        ) {
376            return FillAdjustmentResult::NoAdjustment;
377        }
378
379        // Doesn't match - need to add synthetic opening fill
380        if let Some(first_fill) = oldest_lifecycle_fills.first() {
381            // Calculate what opening fill is needed
382            // Use simulated position as current, venue position as target
383            let oldest_avg_px = if oldest_qty == Decimal::ZERO {
384                None
385            } else {
386                Some(oldest_value / oldest_qty.abs())
387            };
388
389            let reconciliation_price = calculate_reconciliation_price(
390                oldest_qty,
391                oldest_avg_px,
392                venue_qty_signed,
393                Some(venue_position.avg_px),
394            );
395
396            if let Some(opening_px) = reconciliation_price {
397                // Calculate opening quantity needed
398                let opening_qty = if oldest_qty == Decimal::ZERO {
399                    venue_qty_signed
400                } else {
401                    // Work backwards: venue = opening + current fills
402                    venue_qty_signed - oldest_qty
403                };
404
405                if opening_qty.abs() > Decimal::ZERO {
406                    let synthetic_side = if opening_qty > Decimal::ZERO {
407                        OrderSide::Buy
408                    } else {
409                        OrderSide::Sell
410                    };
411
412                    let synthetic_fill = FillSnapshot::new(
413                        first_fill.ts_event.saturating_sub(1),
414                        synthetic_side,
415                        opening_qty.abs(),
416                        opening_px,
417                        first_fill.venue_order_id,
418                    );
419
420                    return FillAdjustmentResult::AddSyntheticOpening {
421                        synthetic_fill,
422                        existing_fills: oldest_lifecycle_fills,
423                    };
424                }
425            }
426        }
427
428        return FillAdjustmentResult::NoAdjustment;
429    }
430
431    // Has one zero-crossing - check if oldest lifecycle closes at zero
432    if oldest_qty == Decimal::ZERO {
433        // Lifecycle closes correctly - no adjustment needed
434        return FillAdjustmentResult::NoAdjustment;
435    }
436
437    // Oldest lifecycle doesn't close at zero - add synthetic opening fill
438    if !oldest_lifecycle_fills.is_empty()
439        && let Some(&first_zero_crossing_ts) = zero_crossings.first()
440    {
441        // Need to add opening fill that makes position close at zero-crossing
442        let current_lifecycle_fills: Vec<FillSnapshot> = fills
443            .iter()
444            .filter(|f| f.ts_event > first_zero_crossing_ts)
445            .cloned()
446            .collect();
447
448        if !current_lifecycle_fills.is_empty()
449            && let Some(first_current_fill) = current_lifecycle_fills.first()
450        {
451            let synthetic_fill = FillSnapshot::new(
452                first_current_fill.ts_event.saturating_sub(1),
453                venue_position.side,
454                venue_position.qty,
455                venue_position.avg_px,
456                first_current_fill.venue_order_id,
457            );
458
459            return FillAdjustmentResult::AddSyntheticOpening {
460                synthetic_fill,
461                existing_fills: oldest_lifecycle_fills,
462            };
463        }
464    }
465
466    FillAdjustmentResult::NoAdjustment
467}
468
469/// Create a synthetic `OrderStatusReport` from a `FillSnapshot`.
470///
471/// Populates `avg_px` from the fill's price so downstream reconciliation paths
472/// (e.g. [`crate::reconciliation::orders::create_inferred_fill`]) can resolve a
473/// fill price without falling back to the "no avg_px or price available" warning.
474///
475/// # Errors
476///
477/// Returns an error if the fill quantity cannot be converted to f64.
478pub fn create_synthetic_order_report(
479    fill: &FillSnapshot,
480    account_id: AccountId,
481    instrument_id: InstrumentId,
482    instrument: &InstrumentAny,
483    venue_order_id: VenueOrderId,
484) -> anyhow::Result<OrderStatusReport> {
485    let order_qty = Quantity::from_decimal_dp(fill.qty, instrument.size_precision())?;
486
487    let mut report = OrderStatusReport::new(
488        account_id,
489        instrument_id,
490        None, // client_order_id
491        venue_order_id,
492        fill.side,
493        OrderType::Market,
494        TimeInForce::Gtc,
495        OrderStatus::Filled,
496        order_qty,
497        order_qty, // filled_qty = order_qty (fully filled)
498        UnixNanos::from(fill.ts_event),
499        UnixNanos::from(fill.ts_event),
500        UnixNanos::from(fill.ts_event),
501        None, // report_id
502    );
503    report.avg_px = Some(fill.px);
504    Ok(report)
505}
506
507/// Create a synthetic `FillReport` from a `FillSnapshot`.
508///
509/// # Errors
510///
511/// Returns an error if the fill quantity or price cannot be converted.
512pub fn create_synthetic_fill_report(
513    fill: &FillSnapshot,
514    account_id: AccountId,
515    instrument_id: InstrumentId,
516    instrument: &InstrumentAny,
517    venue_order_id: VenueOrderId,
518) -> anyhow::Result<FillReport> {
519    let trade_id = create_synthetic_trade_id(fill);
520    let qty = Quantity::from_decimal_dp(fill.qty, instrument.size_precision())?;
521    let px = Price::from_decimal_dp(fill.px, instrument.price_precision())?;
522
523    Ok(FillReport::new(
524        account_id,
525        instrument_id,
526        venue_order_id,
527        trade_id,
528        fill.side,
529        qty,
530        px,
531        Money::new(0.0, instrument.quote_currency()),
532        LiquiditySide::NoLiquiditySide,
533        None, // client_order_id
534        None, // venue_position_id
535        fill.ts_event.into(),
536        fill.ts_event.into(),
537        None, // report_id
538    ))
539}
540
541/// Process fill reports from a mass status for position reconciliation.
542///
543/// This is the main entry point for position reconciliation. It:
544/// 1. Extracts fills and position for the given instrument
545/// 2. Detects position discrepancies
546/// 3. Returns adjusted order/fill reports ready for processing
547///
548/// # Errors
549///
550/// Returns an error if synthetic report creation fails.
551pub fn process_mass_status_for_reconciliation(
552    mass_status: &ExecutionMassStatus,
553    instrument: &InstrumentAny,
554    tolerance: Option<Decimal>,
555) -> anyhow::Result<ReconciliationResult> {
556    let instrument_id = instrument.id();
557    let account_id = mass_status.account_id;
558    let tol = tolerance.unwrap_or(DEFAULT_TOLERANCE);
559
560    // Get position report for this instrument
561    let position_reports = mass_status.position_reports();
562    let venue_position = match position_reports.get(&instrument_id).and_then(|r| r.first()) {
563        Some(report) => position_report_to_snapshot(report),
564        None => {
565            // No position report - return orders/fills unchanged
566            return Ok(extract_instrument_reports(mass_status, instrument_id));
567        }
568    };
569
570    // Extract and convert fills to snapshots
571    let extracted = extract_fills_for_instrument(mass_status, instrument_id);
572    let fill_snapshots = extracted.snapshots;
573    let mut order_map = extracted.orders;
574    let mut fill_map = extracted.fills;
575
576    if fill_snapshots.is_empty() {
577        return Ok(ReconciliationResult {
578            orders: order_map,
579            fills: fill_map,
580        });
581    }
582
583    // Run adjustment logic
584    let result = adjust_fills_for_partial_window(&fill_snapshots, &venue_position, instrument, tol);
585
586    // Apply adjustments
587    match result {
588        FillAdjustmentResult::NoAdjustment => {}
589
590        FillAdjustmentResult::AddSyntheticOpening {
591            synthetic_fill,
592            existing_fills: _,
593        } => {
594            let venue_order_id = create_synthetic_venue_order_id(&synthetic_fill, instrument_id);
595            let order = create_synthetic_order_report(
596                &synthetic_fill,
597                account_id,
598                instrument_id,
599                instrument,
600                venue_order_id,
601            )?;
602            let fill = create_synthetic_fill_report(
603                &synthetic_fill,
604                account_id,
605                instrument_id,
606                instrument,
607                venue_order_id,
608            )?;
609
610            order_map.insert(venue_order_id, order);
611            fill_map.entry(venue_order_id).or_default().insert(0, fill);
612        }
613
614        FillAdjustmentResult::ReplaceCurrentLifecycle {
615            synthetic_fill,
616            first_venue_order_id,
617        } => {
618            let order = create_synthetic_order_report(
619                &synthetic_fill,
620                account_id,
621                instrument_id,
622                instrument,
623                first_venue_order_id,
624            )?;
625            let fill = create_synthetic_fill_report(
626                &synthetic_fill,
627                account_id,
628                instrument_id,
629                instrument,
630                first_venue_order_id,
631            )?;
632
633            // Replace with only synthetic
634            order_map.clear();
635            fill_map.clear();
636            order_map.insert(first_venue_order_id, order);
637            fill_map.insert(first_venue_order_id, vec![fill]);
638        }
639
640        FillAdjustmentResult::FilterToCurrentLifecycle {
641            last_zero_crossing_ts,
642            current_lifecycle_fills: _,
643        } => {
644            // Filter fills to current lifecycle
645            for fills in fill_map.values_mut() {
646                fills.retain(|f| f.ts_event.as_u64() > last_zero_crossing_ts);
647            }
648            fill_map.retain(|_, fills| !fills.is_empty());
649
650            // Keep only orders that have fills or are still working
651            let orders_with_fills: ahash::AHashSet<VenueOrderId> =
652                fill_map.keys().copied().collect();
653            order_map.retain(|id, order| {
654                orders_with_fills.contains(id)
655                    || !matches!(
656                        order.order_status,
657                        OrderStatus::Denied
658                            | OrderStatus::Rejected
659                            | OrderStatus::Canceled
660                            | OrderStatus::Expired
661                            | OrderStatus::Filled
662                    )
663            });
664        }
665    }
666
667    Ok(ReconciliationResult {
668        orders: order_map,
669        fills: fill_map,
670    })
671}
672
673/// Convert a position status report to a venue position snapshot.
674fn position_report_to_snapshot(report: &PositionStatusReport) -> VenuePositionSnapshot {
675    let side = match report.position_side {
676        PositionSideSpecified::Long => OrderSide::Buy,
677        PositionSideSpecified::Short => OrderSide::Sell,
678        PositionSideSpecified::Flat => OrderSide::Buy,
679    };
680
681    VenuePositionSnapshot {
682        side,
683        qty: report.quantity.into(),
684        avg_px: report.avg_px_open.unwrap_or(Decimal::ZERO),
685    }
686}
687
688/// Extract orders and fills for a specific instrument from mass status.
689fn extract_instrument_reports(
690    mass_status: &ExecutionMassStatus,
691    instrument_id: InstrumentId,
692) -> ReconciliationResult {
693    let mut orders = IndexMap::new();
694    let mut fills = IndexMap::new();
695
696    for (id, order) in mass_status.order_reports() {
697        if order.instrument_id == instrument_id {
698            orders.insert(id, order.clone());
699        }
700    }
701
702    for (id, fill_list) in mass_status.fill_reports() {
703        let filtered: Vec<_> = fill_list
704            .iter()
705            .filter(|f| f.instrument_id == instrument_id)
706            .cloned()
707            .collect();
708
709        if !filtered.is_empty() {
710            fills.insert(id, filtered);
711        }
712    }
713
714    ReconciliationResult { orders, fills }
715}
716
717/// Extracted fills and reports for an instrument.
718struct ExtractedFills {
719    snapshots: Vec<FillSnapshot>,
720    orders: IndexMap<VenueOrderId, OrderStatusReport>,
721    fills: IndexMap<VenueOrderId, Vec<FillReport>>,
722}
723
724/// Extract fills for an instrument and convert to snapshots.
725fn extract_fills_for_instrument(
726    mass_status: &ExecutionMassStatus,
727    instrument_id: InstrumentId,
728) -> ExtractedFills {
729    let mut snapshots = Vec::new();
730    let mut order_map = IndexMap::new();
731    let mut fill_map = IndexMap::new();
732
733    // Seed order_map
734    for (id, order) in mass_status.order_reports() {
735        if order.instrument_id == instrument_id {
736            order_map.insert(id, order.clone());
737        }
738    }
739
740    // Extract fills
741    for (venue_order_id, fill_reports) in mass_status.fill_reports() {
742        for fill in fill_reports {
743            if fill.instrument_id == instrument_id {
744                let side = mass_status
745                    .order_reports()
746                    .get(&venue_order_id)
747                    .map_or(fill.order_side, |o| o.order_side);
748
749                snapshots.push(FillSnapshot::new(
750                    fill.ts_event.as_u64(),
751                    side,
752                    fill.last_qty.into(),
753                    fill.last_px.into(),
754                    venue_order_id,
755                ));
756
757                fill_map
758                    .entry(venue_order_id)
759                    .or_insert_with(Vec::new)
760                    .push(fill.clone());
761            }
762        }
763    }
764
765    // Sort chronologically
766    snapshots.sort_by_key(|f| f.ts_event);
767
768    ExtractedFills {
769        snapshots,
770        orders: order_map,
771        fills: fill_map,
772    }
773}
774
775/// Generates the appropriate order events for an external order and order status report.
776///
777/// After creating an external order, we need to transition it to its actual state
778/// based on the order status report from the venue. For terminal states like
779/// Canceled/Expired/Filled, we return multiple events to properly transition
780pub fn check_position_reconciliation(
781    report: &PositionStatusReport,
782    cached_signed_qty: Decimal,
783    size_precision: Option<u8>,
784) -> bool {
785    let venue_signed_qty = report.signed_decimal_qty;
786
787    if venue_signed_qty == Decimal::ZERO && cached_signed_qty == Decimal::ZERO {
788        return true;
789    }
790
791    if let Some(precision) = size_precision
792        && is_within_single_unit_tolerance(cached_signed_qty, venue_signed_qty, precision)
793    {
794        log::debug!(
795            "Position for {} within tolerance: cached={}, venue={}",
796            report.instrument_id,
797            cached_signed_qty,
798            venue_signed_qty
799        );
800        return true;
801    }
802
803    if cached_signed_qty == venue_signed_qty {
804        return true;
805    }
806
807    log::warn!(
808        "Position discrepancy for {}: cached={}, venue={}",
809        report.instrument_id,
810        cached_signed_qty,
811        venue_signed_qty
812    );
813
814    false
815}
816
817/// Checks if two decimal values are within a single unit of tolerance for the given precision.
818///
819/// For integer precision (0), requires exact match.
820/// For fractional precision, allows difference of 1 unit at that precision.
821#[must_use]
822pub fn is_within_single_unit_tolerance(value1: Decimal, value2: Decimal, precision: u8) -> bool {
823    if precision == 0 {
824        return value1 == value2;
825    }
826
827    let tolerance = Decimal::new(1, u32::from(precision));
828    let difference = (value1 - value2).abs();
829    difference <= tolerance
830}