Skip to main content

nautilus_live/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Execution state manager for live trading.
17//!
18//! This module provides the execution manager for reconciling execution state between
19//! the local cache and connected venues, as well as purging old state during live trading.
20
21use std::{cell::RefCell, fmt::Debug, rc::Rc, str::FromStr, sync::LazyLock};
22
23use indexmap::{IndexMap, IndexSet};
24use nautilus_common::{
25    cache::Cache,
26    clients::ExecutionClient,
27    clock::Clock,
28    enums::{LogColor, LogLevel},
29    log_info,
30    messages::{
31        ExecutionReport,
32        execution::{
33            QueryOrder, TradingCommand,
34            report::{GenerateOrderStatusReports, GeneratePositionStatusReports},
35        },
36    },
37};
38use nautilus_core::{
39    UUID4, UnixNanos,
40    datetime::{
41        NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND, mins_to_nanos, mins_to_secs,
42        nanos_to_millis,
43    },
44};
45use nautilus_execution::{
46    engine::ExecutionEngine,
47    reconciliation::{
48        calculate_reconciliation_price, create_inferred_fill_for_qty,
49        create_position_reconciliation_venue_order_id, create_reconciliation_rejected,
50        create_reconciliation_triggered, generate_external_order_status_events,
51        process_mass_status_for_reconciliation, reconcile_order_report,
52        should_reconciliation_update,
53    },
54};
55use nautilus_model::{
56    enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
57    events::{OrderCanceled, OrderEventAny, OrderFilled, OrderInitialized},
58    identifiers::{
59        AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId, TraderId,
60        VenueOrderId,
61    },
62    instruments::{Instrument, InstrumentAny},
63    orders::{Order, OrderAny, TRIGGERABLE_ORDER_TYPES},
64    position::Position,
65    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
66    types::{Price, Quantity},
67};
68use rust_decimal::{Decimal, prelude::ToPrimitive};
69use ustr::Ustr;
70
71use crate::config::LiveExecEngineConfig;
72
73/// Tag for orders originating from venue (external orders).
74static TAG_VENUE: LazyLock<Ustr> = LazyLock::new(|| Ustr::from("VENUE"));
75
76/// Tag for orders generated by reconciliation logic (synthetic orders).
77static TAG_RECONCILIATION: LazyLock<Ustr> = LazyLock::new(|| Ustr::from("RECONCILIATION"));
78
79/// Metadata for an external order that needs to be registered with the execution client.
80#[derive(Debug, Clone)]
81pub struct ExternalOrderMetadata {
82    pub client_order_id: ClientOrderId,
83    pub venue_order_id: VenueOrderId,
84    pub instrument_id: InstrumentId,
85    pub strategy_id: StrategyId,
86    pub ts_init: UnixNanos,
87}
88
89/// Result of reconciliation containing events and external order metadata.
90#[derive(Debug, Default)]
91pub struct ReconciliationResult {
92    /// Order events generated during reconciliation.
93    pub events: Vec<OrderEventAny>,
94    /// External orders that need to be registered with execution clients.
95    pub external_orders: Vec<ExternalOrderMetadata>,
96}
97
98/// Result of inflight order checks containing terminal events and intermediate queries.
99#[derive(Debug, Default)]
100pub struct InflightCheckResult {
101    /// Terminal events (rejection/cancellation) for orders that exceeded max retries.
102    pub events: Vec<OrderEventAny>,
103    /// Intermediate venue queries for orders still within retry budget.
104    pub queries: Vec<TradingCommand>,
105}
106
107/// Configuration for execution manager.
108#[derive(Debug, Clone)]
109pub struct ExecutionManagerConfig {
110    /// The trader ID for generated orders.
111    pub trader_id: TraderId,
112    /// If reconciliation is active at start-up.
113    pub reconciliation: bool,
114    /// Number of minutes to look back during reconciliation.
115    pub lookback_mins: Option<u64>,
116    /// Instrument IDs to include during reconciliation (empty => all).
117    pub reconciliation_instrument_ids: IndexSet<InstrumentId>,
118    /// Whether to filter unclaimed external orders.
119    pub filter_unclaimed_external: bool,
120    /// Whether to filter position status reports during reconciliation.
121    pub filter_position_reports: bool,
122    /// Client order IDs excluded from reconciliation.
123    pub filtered_client_order_ids: IndexSet<ClientOrderId>,
124    /// Whether to generate missing orders from reports.
125    pub generate_missing_orders: bool,
126    /// The interval (milliseconds) between checking whether in-flight orders have exceeded their threshold.
127    pub inflight_check_interval_ms: u32,
128    /// Threshold in milliseconds for inflight order checks.
129    pub inflight_threshold_ms: u64,
130    /// Maximum number of retries for inflight checks.
131    pub inflight_max_retries: u32,
132    /// The interval (seconds) between checks for open orders at the venue.
133    pub open_check_interval_secs: Option<f64>,
134    /// The lookback minutes for open order checks.
135    pub open_check_lookback_mins: Option<u64>,
136    /// Threshold in nanoseconds before acting on venue discrepancies for open orders.
137    pub open_check_threshold_ns: u64,
138    /// Maximum retries before resolving an open order missing at the venue.
139    pub open_check_missing_retries: u32,
140    /// Whether open-order polling should only request open orders from the venue.
141    pub open_check_open_only: bool,
142    /// The maximum number of single-order queries per consistency check cycle.
143    pub max_single_order_queries_per_cycle: u32,
144    /// The delay (milliseconds) between consecutive single-order queries.
145    pub single_order_query_delay_ms: u32,
146    /// The interval (seconds) between checks for open positions at the venue.
147    pub position_check_interval_secs: Option<f64>,
148    /// The lookback minutes for position consistency checks.
149    pub position_check_lookback_mins: u64,
150    /// Threshold in nanoseconds before acting on venue discrepancies for positions.
151    pub position_check_threshold_ns: u64,
152    /// Maximum retries before stopping position discrepancy reconciliation.
153    pub position_check_retries: u32,
154    /// The time buffer (minutes) before closed orders can be purged.
155    pub purge_closed_orders_buffer_mins: Option<u32>,
156    /// The time buffer (minutes) before closed positions can be purged.
157    pub purge_closed_positions_buffer_mins: Option<u32>,
158    /// The time buffer (minutes) before account events can be purged.
159    pub purge_account_events_lookback_mins: Option<u32>,
160    /// If purge operations should also delete from the backing database.
161    pub purge_from_database: bool,
162}
163
164impl Default for ExecutionManagerConfig {
165    fn default() -> Self {
166        Self {
167            trader_id: TraderId::default(),
168            reconciliation: true,
169            lookback_mins: Some(60),
170            reconciliation_instrument_ids: IndexSet::new(),
171            filter_unclaimed_external: false,
172            filter_position_reports: false,
173            filtered_client_order_ids: IndexSet::new(),
174            generate_missing_orders: true,
175            inflight_check_interval_ms: 2_000,
176            inflight_threshold_ms: 5_000,
177            inflight_max_retries: 5,
178            open_check_interval_secs: None,
179            open_check_lookback_mins: Some(60),
180            open_check_threshold_ns: 5_000_000_000,
181            open_check_missing_retries: 5,
182            open_check_open_only: true,
183            max_single_order_queries_per_cycle: 5,
184            single_order_query_delay_ms: 100,
185            position_check_interval_secs: None,
186            position_check_lookback_mins: 60,
187            position_check_threshold_ns: 60_000_000_000,
188            position_check_retries: 3,
189            purge_closed_orders_buffer_mins: None,
190            purge_closed_positions_buffer_mins: None,
191            purge_account_events_lookback_mins: None,
192            purge_from_database: false,
193        }
194    }
195}
196
197impl From<&LiveExecEngineConfig> for ExecutionManagerConfig {
198    fn from(config: &LiveExecEngineConfig) -> Self {
199        let filtered_client_order_ids: IndexSet<ClientOrderId> = config
200            .filtered_client_order_ids
201            .clone()
202            .unwrap_or_default()
203            .into_iter()
204            .map(|value| ClientOrderId::from(value.as_str()))
205            .collect();
206
207        let reconciliation_instrument_ids: IndexSet<InstrumentId> = config
208            .reconciliation_instrument_ids
209            .clone()
210            .unwrap_or_default()
211            .into_iter()
212            .map(InstrumentId::from)
213            .collect();
214
215        let open_check_threshold_ns =
216            (config.open_check_threshold_ms as u64) * NANOSECONDS_IN_MILLISECOND;
217        let position_check_threshold_ns =
218            (config.position_check_threshold_ms as u64) * NANOSECONDS_IN_MILLISECOND;
219
220        Self {
221            trader_id: TraderId::default(), // Must be set separately via with_trader_id
222            reconciliation: config.reconciliation,
223            lookback_mins: config.reconciliation_lookback_mins.map(|m| m as u64),
224            reconciliation_instrument_ids,
225            filter_unclaimed_external: config.filter_unclaimed_external_orders,
226            filter_position_reports: config.filter_position_reports,
227            filtered_client_order_ids,
228            generate_missing_orders: config.generate_missing_orders,
229            inflight_check_interval_ms: config.inflight_check_interval_ms,
230            inflight_threshold_ms: config.inflight_check_threshold_ms as u64,
231            inflight_max_retries: config.inflight_check_retries,
232            open_check_interval_secs: config.open_check_interval_secs,
233            open_check_lookback_mins: config.open_check_lookback_mins.map(|m| m as u64),
234            open_check_threshold_ns,
235            open_check_missing_retries: config.open_check_missing_retries,
236            open_check_open_only: config.open_check_open_only,
237            max_single_order_queries_per_cycle: config.max_single_order_queries_per_cycle,
238            single_order_query_delay_ms: config.single_order_query_delay_ms,
239            position_check_interval_secs: config.position_check_interval_secs,
240            position_check_lookback_mins: config.position_check_lookback_mins as u64,
241            position_check_threshold_ns,
242            position_check_retries: config.position_check_retries,
243            purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins,
244            purge_closed_positions_buffer_mins: config.purge_closed_positions_buffer_mins,
245            purge_account_events_lookback_mins: config.purge_account_events_lookback_mins,
246            purge_from_database: config.purge_from_database,
247        }
248    }
249}
250
251impl ExecutionManagerConfig {
252    /// Sets the trader ID on the configuration.
253    #[must_use]
254    pub fn with_trader_id(mut self, trader_id: TraderId) -> Self {
255        self.trader_id = trader_id;
256        self
257    }
258}
259
260/// Information about an inflight order check.
261#[derive(Debug, Clone)]
262struct InflightCheck {
263    #[allow(dead_code)]
264    pub client_order_id: ClientOrderId,
265    pub ts_submitted: UnixNanos,
266    pub retry_count: u32,
267    pub last_query_ts: Option<UnixNanos>,
268}
269
270/// Manager for execution state.
271///
272/// The `ExecutionManager` handles:
273/// - Startup reconciliation to align state on system start.
274/// - Continuous reconciliation of inflight orders.
275/// - External order discovery and claiming.
276/// - Fill report processing and validation.
277/// - Purging of old orders, positions, and account events.
278///
279/// # Thread Safety
280///
281/// This struct is **not thread-safe** and is designed for single-threaded use within
282/// an async runtime. Internal state is managed using `IndexMap` without synchronization,
283/// and the `clock` and `cache` use `Rc<RefCell<>>` which provide runtime borrow checking
284/// but no thread-safety guarantees.
285///
286/// If concurrent access is required, this struct must be wrapped in `Arc<Mutex<>>` or
287/// similar synchronization primitives. Alternatively, ensure that all methods are called
288/// from the same thread/task in the async runtime.
289///
290/// **Warning:** Concurrent mutable access to internal IndexMaps or concurrent borrows
291/// of `RefCell` contents will cause runtime panics.
292#[derive(Clone)]
293pub struct ExecutionManager {
294    clock: Rc<RefCell<dyn Clock>>,
295    cache: Rc<RefCell<Cache>>,
296    config: ExecutionManagerConfig,
297    inflight_checks: IndexMap<ClientOrderId, InflightCheck>,
298    external_order_claims: IndexMap<InstrumentId, StrategyId>,
299    processed_fills: IndexMap<TradeId, ClientOrderId>,
300    recon_check_retries: IndexMap<ClientOrderId, u32>,
301    ts_last_query: IndexMap<ClientOrderId, UnixNanos>,
302    order_local_activity_ns: IndexMap<ClientOrderId, UnixNanos>,
303    position_local_activity_ns: IndexMap<InstrumentId, UnixNanos>,
304    position_recon_retries: IndexMap<InstrumentId, u32>,
305    recent_fills_cache: IndexMap<TradeId, UnixNanos>,
306}
307
308impl Debug for ExecutionManager {
309    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310        f.debug_struct(stringify!(ExecutionManager))
311            .field("config", &self.config)
312            .field("inflight_checks", &self.inflight_checks)
313            .field("external_order_claims", &self.external_order_claims)
314            .field("processed_fills", &self.processed_fills)
315            .field("recon_check_retries", &self.recon_check_retries)
316            .finish()
317    }
318}
319
320impl ExecutionManager {
321    /// Creates a new [`ExecutionManager`] instance.
322    pub fn new(
323        clock: Rc<RefCell<dyn Clock>>,
324        cache: Rc<RefCell<Cache>>,
325        config: ExecutionManagerConfig,
326    ) -> Self {
327        Self {
328            clock,
329            cache,
330            config,
331            inflight_checks: IndexMap::new(),
332            external_order_claims: IndexMap::new(),
333            processed_fills: IndexMap::new(),
334            recon_check_retries: IndexMap::new(),
335            ts_last_query: IndexMap::new(),
336            order_local_activity_ns: IndexMap::new(),
337            position_local_activity_ns: IndexMap::new(),
338            position_recon_retries: IndexMap::new(),
339            recent_fills_cache: IndexMap::new(),
340        }
341    }
342
343    /// Returns the current clock timestamp in nanoseconds.
344    #[must_use]
345    pub fn generate_timestamp_ns(&self) -> UnixNanos {
346        self.clock.borrow().timestamp_ns()
347    }
348
349    /// Reconciles orders and fills from a mass status report.
350    ///
351    /// Order events are collected, sorted globally by ts_event, then processed through
352    /// the execution engine to ensure chronological ordering across all orders.
353    /// Position events are processed after all order events to ensure fills are applied first.
354    pub async fn reconcile_execution_mass_status(
355        &mut self,
356        mass_status: ExecutionMassStatus,
357        exec_engine: Rc<RefCell<ExecutionEngine>>,
358    ) -> ReconciliationResult {
359        let venue = mass_status.venue;
360        let order_count = mass_status.order_reports().len();
361        let fill_count: usize = mass_status.fill_reports().values().map(|v| v.len()).sum();
362        let position_count = mass_status.position_reports().len();
363
364        log_info!(
365            "Reconciling ExecutionMassStatus for {venue}",
366            color = LogColor::Blue
367        );
368        log_info!(
369            "Received {order_count} order(s), {fill_count} fill(s), {position_count} position(s)",
370            color = LogColor::Blue
371        );
372
373        let (adjusted_order_reports, adjusted_fill_reports) =
374            self.adjust_mass_status_fills(&mass_status);
375
376        let mut events = Vec::new();
377        let mut external_orders = Vec::new();
378        let mut orders_reconciled = 0usize;
379        let mut external_orders_created = 0usize;
380        let mut open_orders_initialized = 0usize;
381        let mut orders_skipped_no_instrument = 0usize;
382        let mut orders_skipped_duplicate = 0usize;
383        let mut fills_applied = 0usize;
384
385        let fill_reports = &adjusted_fill_reports;
386        let mut seen_trade_ids: IndexSet<TradeId> = IndexSet::new();
387
388        for fills in fill_reports.values() {
389            for fill in fills {
390                if !seen_trade_ids.insert(fill.trade_id) {
391                    log::warn!("Duplicate trade_id {} in mass status", fill.trade_id);
392                }
393            }
394        }
395
396        // Deduplicate reports by venue_order_id, keeping the most advanced state
397        let order_reports = self.deduplicate_order_reports(adjusted_order_reports.values());
398        let mut orders_skipped_filtered = 0usize;
399
400        for report in order_reports.values() {
401            if self.should_skip_order_report(report) {
402                orders_skipped_filtered += 1;
403                continue;
404            }
405
406            if let Some(client_order_id) = &report.client_order_id {
407                if let Some(cached_order) = self.get_order(client_order_id)
408                    && self.is_exact_order_match(&cached_order, report)
409                {
410                    log::debug!("Skipping order {client_order_id}: already in sync with venue");
411                    orders_skipped_duplicate += 1;
412
413                    // Still ensure venue_order_id is indexed even when skipping
414                    if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
415                        client_order_id,
416                        &report.venue_order_id,
417                        false,
418                    ) {
419                        log::warn!("Failed to add venue order ID index: {e}");
420                    }
421
422                    continue;
423                }
424
425                // Skip closed reconciliation orders to prevent duplicate inferred fills on restart
426                if let Some(cached_order) = self.get_order(client_order_id)
427                    && cached_order.is_closed()
428                    && cached_order
429                        .tags()
430                        .is_some_and(|tags| tags.contains(&*TAG_RECONCILIATION))
431                {
432                    log::debug!(
433                        "Skipping closed reconciliation order {client_order_id}: \
434                         synthetic position adjustment from previous session",
435                    );
436                    orders_skipped_duplicate += 1;
437                    continue;
438                }
439
440                if let Some(order) = self.get_order(client_order_id) {
441                    let instrument = self.get_instrument(&report.instrument_id);
442                    log::info!(
443                        color = LogColor::Blue as u8;
444                        "Reconciling {} {} {} [{}] -> [{}]",
445                        client_order_id,
446                        report.venue_order_id,
447                        report.instrument_id,
448                        order.status(),
449                        report.order_status,
450                    );
451
452                    let order_fills: Vec<&FillReport> = fill_reports
453                        .get(&report.venue_order_id)
454                        .map(|f| f.iter().collect())
455                        .unwrap_or_default();
456                    let order_events = self.reconcile_order_with_fills(
457                        &order,
458                        report,
459                        &order_fills,
460                        instrument.as_ref(),
461                    );
462
463                    if !order_events.is_empty() {
464                        orders_reconciled += 1;
465                        fills_applied += order_events
466                            .iter()
467                            .filter(|e| matches!(e, OrderEventAny::Filled(_)))
468                            .count();
469                        events.extend(order_events);
470                    }
471
472                    // Always ensure venue_order_id is indexed after reconciliation
473                    if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
474                        client_order_id,
475                        &report.venue_order_id,
476                        false,
477                    ) {
478                        log::warn!("Failed to add venue order ID index: {e}");
479                    }
480                } else if let Some(order) = self.get_order_by_venue_order_id(&report.venue_order_id)
481                {
482                    // Fallback: match by venue_order_id
483                    let instrument = self.get_instrument(&report.instrument_id);
484
485                    log::info!(
486                        color = LogColor::Blue as u8;
487                        "Reconciling {} (matched by venue_order_id {}) {} [{}] -> [{}]",
488                        order.client_order_id(),
489                        report.venue_order_id,
490                        report.instrument_id,
491                        order.status(),
492                        report.order_status,
493                    );
494
495                    let order_fills: Vec<&FillReport> = fill_reports
496                        .get(&report.venue_order_id)
497                        .map(|f| f.iter().collect())
498                        .unwrap_or_default();
499                    let order_events = self.reconcile_order_with_fills(
500                        &order,
501                        report,
502                        &order_fills,
503                        instrument.as_ref(),
504                    );
505
506                    if !order_events.is_empty() {
507                        orders_reconciled += 1;
508                        fills_applied += order_events
509                            .iter()
510                            .filter(|e| matches!(e, OrderEventAny::Filled(_)))
511                            .count();
512                        events.extend(order_events);
513                    }
514
515                    if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
516                        &order.client_order_id(),
517                        &report.venue_order_id,
518                        false,
519                    ) {
520                        log::warn!("Failed to add venue order ID index: {e}");
521                    }
522                } else if !self.config.filter_unclaimed_external {
523                    if let Some(instrument) = self.get_instrument(&report.instrument_id) {
524                        let order_fills: Vec<&FillReport> = fill_reports
525                            .get(&report.venue_order_id)
526                            .map(|f| f.iter().collect())
527                            .unwrap_or_default();
528                        let (external_events, metadata) = self.handle_external_order(
529                            report,
530                            &mass_status.account_id,
531                            &instrument,
532                            &order_fills,
533                            false, // Not synthetic (venue order)
534                        );
535
536                        if !external_events.is_empty() {
537                            external_orders_created += 1;
538                            fills_applied += external_events
539                                .iter()
540                                .filter(|e| matches!(e, OrderEventAny::Filled(_)))
541                                .count();
542
543                            if report.order_status.is_open() {
544                                open_orders_initialized += 1;
545                            }
546
547                            events.extend(external_events);
548
549                            if let Some(m) = metadata {
550                                external_orders.push(m);
551                            }
552                        }
553                    } else {
554                        orders_skipped_no_instrument += 1;
555                    }
556                }
557            } else if let Some(order) = self.get_order_by_venue_order_id(&report.venue_order_id) {
558                // Fallback: match by venue_order_id
559                let instrument = self.get_instrument(&report.instrument_id);
560                log::info!(
561                    color = LogColor::Blue as u8;
562                    "Reconciling {} (matched by venue_order_id {}) {} [{}] -> [{}]",
563                    order.client_order_id(),
564                    report.venue_order_id,
565                    report.instrument_id,
566                    order.status(),
567                    report.order_status,
568                );
569
570                let order_fills: Vec<&FillReport> = fill_reports
571                    .get(&report.venue_order_id)
572                    .map(|f| f.iter().collect())
573                    .unwrap_or_default();
574                let order_events = self.reconcile_order_with_fills(
575                    &order,
576                    report,
577                    &order_fills,
578                    instrument.as_ref(),
579                );
580
581                if !order_events.is_empty() {
582                    orders_reconciled += 1;
583                    fills_applied += order_events
584                        .iter()
585                        .filter(|e| matches!(e, OrderEventAny::Filled(_)))
586                        .count();
587                    events.extend(order_events);
588                }
589
590                if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
591                    &order.client_order_id(),
592                    &report.venue_order_id,
593                    false,
594                ) {
595                    log::warn!("Failed to add venue order ID index: {e}");
596                }
597            } else if let Some(instrument) = self.get_instrument(&report.instrument_id) {
598                // Synthetic orders (S- prefix) are generated by reconciliation logic
599                let is_synthetic = report.venue_order_id.as_str().starts_with("S-");
600
601                let order_fills: Vec<&FillReport> = fill_reports
602                    .get(&report.venue_order_id)
603                    .map(|f| f.iter().collect())
604                    .unwrap_or_default();
605                let (external_events, metadata) = self.handle_external_order(
606                    report,
607                    &mass_status.account_id,
608                    &instrument,
609                    &order_fills,
610                    is_synthetic,
611                );
612
613                if !external_events.is_empty() {
614                    external_orders_created += 1;
615                    fills_applied += external_events
616                        .iter()
617                        .filter(|e| matches!(e, OrderEventAny::Filled(_)))
618                        .count();
619
620                    if report.order_status.is_open() {
621                        open_orders_initialized += 1;
622                    }
623
624                    events.extend(external_events);
625
626                    if let Some(m) = metadata {
627                        external_orders.push(m);
628                    }
629                }
630            } else {
631                orders_skipped_no_instrument += 1;
632            }
633        }
634
635        // Process orphan fills (fills without matching order reports)
636        let processed_venue_order_ids: IndexSet<VenueOrderId> =
637            order_reports.keys().copied().collect();
638
639        for (venue_order_id, fills) in fill_reports {
640            if processed_venue_order_ids.contains(venue_order_id) {
641                continue;
642            }
643
644            let Some(first_fill) = fills.first() else {
645                continue;
646            };
647
648            if !self.should_reconcile_instrument(&first_fill.instrument_id) {
649                log::debug!(
650                    "Skipping orphan fills for {}: not in reconciliation_instrument_ids",
651                    first_fill.instrument_id
652                );
653                continue;
654            }
655
656            // Skip if fill's client_order_id is in filtered list
657            if let Some(client_order_id) = &first_fill.client_order_id
658                && self
659                    .config
660                    .filtered_client_order_ids
661                    .contains(client_order_id)
662            {
663                log::debug!(
664                    "Skipping orphan fills for {client_order_id}: in filtered_client_order_ids"
665                );
666                continue;
667            }
668
669            let order = first_fill
670                .client_order_id
671                .as_ref()
672                .and_then(|id| self.get_order(id))
673                .or_else(|| self.get_order_by_venue_order_id(venue_order_id));
674
675            // Skip if resolved order's client_order_id is filtered (venue_order_id lookup path)
676            if let Some(ref order) = order
677                && self
678                    .config
679                    .filtered_client_order_ids
680                    .contains(&order.client_order_id())
681            {
682                log::debug!(
683                    "Skipping orphan fills for {}: in filtered_client_order_ids",
684                    order.client_order_id()
685                );
686                continue;
687            }
688
689            if let Some(order) = order {
690                let instrument_id = order.instrument_id();
691                if let Some(instrument) = self.get_instrument(&instrument_id) {
692                    let mut sorted_fills: Vec<&FillReport> = fills.iter().collect();
693                    sorted_fills.sort_by_key(|f| f.ts_event);
694
695                    for fill in sorted_fills {
696                        if let Some(event) = self.create_order_fill(&order, fill, &instrument) {
697                            fills_applied += 1;
698                            events.push(event);
699                        }
700                    }
701                }
702            }
703        }
704
705        events.sort_by_key(|e| e.ts_event());
706
707        for event in &events {
708            exec_engine.borrow_mut().process(event);
709        }
710
711        let mut positions_created = 0usize;
712
713        if !self.config.filter_position_reports {
714            // Collect instruments with fills that lack venue_position_id (can't attribute to
715            // specific hedge position, so must skip all hedge reports for that instrument)
716            let instruments_with_unattributed_fills: IndexSet<InstrumentId> = mass_status
717                .fill_reports()
718                .values()
719                .flatten()
720                .filter(|f| f.venue_position_id.is_none())
721                .map(|f| f.instrument_id)
722                .chain(
723                    mass_status
724                        .order_reports()
725                        .values()
726                        .filter(|r| !r.filled_qty.is_zero() && r.venue_position_id.is_none())
727                        .map(|r| r.instrument_id),
728                )
729                .collect();
730
731            let positions_with_fills: IndexSet<PositionId> = mass_status
732                .fill_reports()
733                .values()
734                .flatten()
735                .filter_map(|f| f.venue_position_id)
736                .chain(
737                    mass_status
738                        .order_reports()
739                        .values()
740                        .filter(|r| !r.filled_qty.is_zero())
741                        .filter_map(|r| r.venue_position_id),
742                )
743                .collect();
744
745            for (instrument_id, reports) in mass_status.position_reports() {
746                if !self.should_reconcile_instrument(&instrument_id) {
747                    log::debug!(
748                        "Skipping position reports for {instrument_id}: not in reconciliation_instrument_ids"
749                    );
750                    continue;
751                }
752
753                for report in reports {
754                    if let Some(position_events) = self.reconcile_position_report(
755                        &report,
756                        &mass_status.account_id,
757                        &instruments_with_unattributed_fills,
758                        &positions_with_fills,
759                    ) {
760                        for event in position_events {
761                            exec_engine.borrow_mut().process(&event);
762                            events.push(event);
763                        }
764                        positions_created += 1;
765                    }
766                }
767            }
768        }
769
770        if orders_skipped_no_instrument > 0 {
771            log::warn!("{orders_skipped_no_instrument} orders skipped (instrument not in cache)");
772        }
773
774        if orders_skipped_duplicate > 0 {
775            log::debug!("{orders_skipped_duplicate} orders skipped (already in sync)");
776        }
777
778        if orders_skipped_filtered > 0 {
779            log::debug!("{orders_skipped_filtered} orders skipped (filtered by config)");
780        }
781
782        log::info!(
783            color = LogColor::Blue as u8;
784            "Reconciliation complete for {venue}: reconciled={orders_reconciled}, external={external_orders_created}, open={open_orders_initialized}, fills={fills_applied}, positions={positions_created}, skipped={orders_skipped_duplicate}, filtered={orders_skipped_filtered}",
785        );
786
787        ReconciliationResult {
788            events,
789            external_orders,
790        }
791    }
792
793    /// Checks inflight orders and returns terminal events and intermediate venue queries.
794    ///
795    /// For retries below `inflight_max_retries`, generates `QueryOrder` commands to poll
796    /// the venue for the order's current status. At max retries, generates terminal events
797    /// (rejection or cancellation) based on the order's status.
798    pub fn check_inflight_orders(&mut self) -> InflightCheckResult {
799        let mut result = InflightCheckResult::default();
800        let current_time = self.clock.borrow().timestamp_ns();
801        let threshold_ns = self.config.inflight_threshold_ms * NANOSECONDS_IN_MILLISECOND;
802
803        let mut to_check = Vec::new();
804
805        for (client_order_id, check) in &self.inflight_checks {
806            if current_time - check.ts_submitted > threshold_ns {
807                to_check.push(*client_order_id);
808            }
809        }
810
811        for client_order_id in to_check {
812            if self
813                .config
814                .filtered_client_order_ids
815                .contains(&client_order_id)
816            {
817                continue;
818            }
819
820            if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
821                if let Some(last_query_ts) = check.last_query_ts
822                    && current_time - last_query_ts < threshold_ns
823                {
824                    continue;
825                }
826
827                check.retry_count += 1;
828                check.last_query_ts = Some(current_time);
829                self.ts_last_query.insert(client_order_id, current_time);
830                self.recon_check_retries
831                    .insert(client_order_id, check.retry_count);
832
833                if check.retry_count >= self.config.inflight_max_retries {
834                    let ts_now = self.clock.borrow().timestamp_ns();
835
836                    if let Some(order) = self.get_order(&client_order_id) {
837                        match order.status() {
838                            OrderStatus::Submitted => {
839                                // Generate rejection for submitted orders that never got accepted
840                                if let Some(event) = create_reconciliation_rejected(
841                                    &order,
842                                    Some("INFLIGHT_TIMEOUT"),
843                                    ts_now,
844                                ) {
845                                    result.events.push(event);
846                                }
847                            }
848                            OrderStatus::PendingUpdate | OrderStatus::PendingCancel => {
849                                // Generate cancellation for orders stuck in pending modify/cancel
850                                let event = OrderEventAny::Canceled(OrderCanceled::new(
851                                    order.trader_id(),
852                                    order.strategy_id(),
853                                    order.instrument_id(),
854                                    order.client_order_id(),
855                                    UUID4::new(),
856                                    ts_now,
857                                    ts_now,
858                                    true, // reconciliation
859                                    order.venue_order_id(),
860                                    order.account_id(),
861                                ));
862                                result.events.push(event);
863                            }
864                            _ => {
865                                // Order already resolved, just clear tracking
866                            }
867                        }
868                    }
869                    // Remove from inflight checks regardless of whether order exists
870                    self.clear_recon_tracking(&client_order_id, true);
871                } else if let Some(order) = self.get_order(&client_order_id) {
872                    // Intermediate retry: query the venue for current order status
873                    let client_id = self.cache.borrow().client_id(&client_order_id).copied();
874                    let query = TradingCommand::QueryOrder(QueryOrder::new(
875                        order.trader_id(),
876                        client_id,
877                        order.strategy_id(),
878                        order.instrument_id(),
879                        order.client_order_id(),
880                        order.venue_order_id(),
881                        UUID4::new(),
882                        current_time,
883                        None,
884                    ));
885                    result.queries.push(query);
886                }
887            }
888        }
889
890        result
891    }
892
893    /// Checks open orders consistency between cache and venue.
894    ///
895    /// This method validates that open orders in the cache match the venue's state,
896    /// comparing order status and filled quantities, and generating reconciliation
897    /// events for any discrepancies detected.
898    ///
899    /// # Returns
900    ///
901    /// A vector of order events generated to reconcile discrepancies.
902    pub async fn check_open_orders(
903        &mut self,
904        clients: &[&dyn ExecutionClient],
905    ) -> Vec<OrderEventAny> {
906        log::debug!("Checking order consistency between cached-state and venues");
907
908        let filtered_orders: Vec<OrderAny> = {
909            let cache = self.cache.borrow();
910            let mut orders = cache.orders_open(None, None, None, None, None);
911            orders.extend(cache.orders_inflight(None, None, None, None, None));
912
913            if self.config.reconciliation_instrument_ids.is_empty() {
914                orders.iter().map(|o| (*o).clone()).collect()
915            } else {
916                orders
917                    .iter()
918                    .filter(|o| {
919                        self.config
920                            .reconciliation_instrument_ids
921                            .contains(&o.instrument_id())
922                    })
923                    .map(|o| (*o).clone())
924                    .collect()
925            }
926        };
927
928        log::debug!(
929            "Found {} order{} open in cache",
930            filtered_orders.len(),
931            if filtered_orders.len() == 1 { "" } else { "s" }
932        );
933
934        let mut all_reports = Vec::new();
935        let mut venue_reported_ids = IndexSet::new();
936
937        let ts_now = self.clock.borrow().timestamp_ns();
938        let start = self.config.open_check_lookback_mins.map(|mins| {
939            let lookback_ns = mins_to_nanos(mins);
940            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
941        });
942
943        for client in clients {
944            let mut cmd = GenerateOrderStatusReports::new(
945                UUID4::new(),
946                ts_now,
947                self.config.open_check_open_only,
948                None, // instrument_id - query all
949                start,
950                None, // end
951                None, // params
952                None, // correlation_id
953            );
954            cmd.log_receipt_level = LogLevel::Debug;
955
956            match client.generate_order_status_reports(&cmd).await {
957                Ok(reports) => {
958                    for report in reports {
959                        if let Some(client_order_id) = &report.client_order_id {
960                            venue_reported_ids.insert(*client_order_id);
961                        }
962                        all_reports.push(report);
963                    }
964                }
965                Err(e) => {
966                    log::error!(
967                        "Failed to query order reports from {}: {e}",
968                        client.client_id()
969                    );
970                }
971            }
972        }
973
974        // Reconcile reports against cached orders
975        let ts_now = self.clock.borrow().timestamp_ns();
976        let mut events = Vec::new();
977
978        for report in all_reports {
979            if let Some(client_order_id) = &report.client_order_id
980                && let Some(order) = self.get_order(client_order_id)
981            {
982                // Check for recent local activity to avoid race conditions with in-flight fills
983                if let Some(&last_activity) = self.order_local_activity_ns.get(client_order_id)
984                    && (ts_now - last_activity) < self.config.open_check_threshold_ns
985                {
986                    let elapsed_ms = nanos_to_millis((ts_now - last_activity).as_u64());
987                    let threshold_ms = nanos_to_millis(self.config.open_check_threshold_ns);
988                    log::info!(
989                        "Deferring reconciliation for {client_order_id}: recent local activity ({elapsed_ms}ms < threshold={threshold_ms}ms)",
990                    );
991                    continue;
992                }
993
994                let instrument = self.get_instrument(&report.instrument_id);
995
996                if let Some(event) =
997                    self.reconcile_order_report(&order, &report, instrument.as_ref())
998                {
999                    events.push(event);
1000                }
1001            }
1002        }
1003
1004        // Handle orders missing at venue (skip in open_only mode where the
1005        // venue response may omit recently closed orders). When a lookback
1006        // window is set, only consider orders within that window so older
1007        // GTC orders outside the query range are not falsely marked missing.
1008        if !self.config.open_check_open_only {
1009            let candidates: Vec<&OrderAny> = if let Some(cutoff) = start {
1010                filtered_orders
1011                    .iter()
1012                    .filter(|o| o.ts_last() >= cutoff)
1013                    .collect()
1014            } else {
1015                filtered_orders.iter().collect()
1016            };
1017            let cached_ids: IndexSet<ClientOrderId> =
1018                candidates.iter().map(|o| o.client_order_id()).collect();
1019            let missing_at_venue: IndexSet<ClientOrderId> = cached_ids
1020                .difference(&venue_reported_ids)
1021                .copied()
1022                .collect();
1023
1024            for client_order_id in missing_at_venue {
1025                events.extend(self.handle_missing_order(client_order_id));
1026            }
1027        }
1028
1029        events
1030    }
1031
1032    /// Checks position consistency between cache and venue.
1033    ///
1034    /// This method validates that positions in the cache match the venue's state,
1035    /// detecting position drift and querying for missing fills when discrepancies
1036    /// are found.
1037    ///
1038    /// # Returns
1039    ///
1040    /// A vector of fill events generated to reconcile position discrepancies.
1041    pub async fn check_positions_consistency(
1042        &mut self,
1043        clients: &[&dyn ExecutionClient],
1044    ) -> Vec<OrderEventAny> {
1045        log::debug!("Checking position consistency between cached-state and venues");
1046
1047        let open_positions = {
1048            let cache = self.cache.borrow();
1049            let positions = cache.positions_open(None, None, None, None, None);
1050
1051            if self.config.reconciliation_instrument_ids.is_empty() {
1052                positions.iter().map(|p| (*p).clone()).collect()
1053            } else {
1054                positions
1055                    .iter()
1056                    .filter(|p| {
1057                        self.config
1058                            .reconciliation_instrument_ids
1059                            .contains(&p.instrument_id)
1060                    })
1061                    .map(|p| (*p).clone())
1062                    .collect::<Vec<_>>()
1063            }
1064        };
1065
1066        log::debug!(
1067            "Found {} position{} to check",
1068            open_positions.len(),
1069            if open_positions.len() == 1 { "" } else { "s" }
1070        );
1071
1072        // Query venue for position reports
1073        let mut venue_positions = IndexMap::new();
1074
1075        for client in clients {
1076            let mut cmd = GeneratePositionStatusReports::new(
1077                UUID4::new(),
1078                self.clock.borrow().timestamp_ns(),
1079                None, // instrument_id - query all
1080                None, // start
1081                None, // end
1082                None, // params
1083                None, // correlation_id
1084            );
1085            cmd.log_receipt_level = LogLevel::Debug;
1086
1087            match client.generate_position_status_reports(&cmd).await {
1088                Ok(reports) => {
1089                    for report in reports {
1090                        venue_positions.insert(report.instrument_id, report);
1091                    }
1092                }
1093                Err(e) => {
1094                    log::error!(
1095                        "Failed to query position reports from {}: {e}",
1096                        client.client_id()
1097                    );
1098                }
1099            }
1100        }
1101
1102        // Check for discrepancies (one check per instrument per cycle to avoid
1103        // burning multiple retries for hedging positions on the same instrument)
1104        let mut events = Vec::new();
1105        let mut checked_instruments = IndexSet::new();
1106
1107        for position in &open_positions {
1108            if !checked_instruments.insert(position.instrument_id) {
1109                continue;
1110            }
1111
1112            // Skip if not in filter
1113            if !self.config.reconciliation_instrument_ids.is_empty()
1114                && !self
1115                    .config
1116                    .reconciliation_instrument_ids
1117                    .contains(&position.instrument_id)
1118            {
1119                continue;
1120            }
1121
1122            let venue_report = venue_positions.get(&position.instrument_id);
1123
1124            if let Some(discrepancy_events) =
1125                self.check_position_discrepancy(position, venue_report)
1126            {
1127                events.extend(discrepancy_events);
1128            }
1129        }
1130
1131        // Prune retry counters for instruments no longer actively tracked,
1132        // excluding flat venue reports which shouldn't protect stale counters
1133        let active_instruments: IndexSet<InstrumentId> = open_positions
1134            .iter()
1135            .map(|p| p.instrument_id)
1136            .chain(
1137                venue_positions
1138                    .iter()
1139                    .filter(|(_, r)| r.signed_decimal_qty != Decimal::ZERO)
1140                    .map(|(id, _)| *id),
1141            )
1142            .collect();
1143        self.position_recon_retries
1144            .retain(|iid, _| active_instruments.contains(iid));
1145
1146        events
1147    }
1148
1149    /// Registers an order as inflight for tracking.
1150    pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
1151        let ts_submitted = self.clock.borrow().timestamp_ns();
1152        self.inflight_checks.insert(
1153            client_order_id,
1154            InflightCheck {
1155                client_order_id,
1156                ts_submitted,
1157                retry_count: 0,
1158                last_query_ts: None,
1159            },
1160        );
1161        self.recon_check_retries.insert(client_order_id, 0);
1162        self.ts_last_query.shift_remove(&client_order_id);
1163        self.order_local_activity_ns.shift_remove(&client_order_id);
1164    }
1165
1166    /// Records local activity for the specified order.
1167    ///
1168    /// Uses the current clock time (receipt time) instead of venue time to accurately
1169    /// track when we last processed activity for this order. This avoids race conditions
1170    /// where network/queue latency makes events appear "old" even though they just arrived.
1171    pub fn record_local_activity(&mut self, client_order_id: ClientOrderId) {
1172        let ts_now = self.clock.borrow().timestamp_ns();
1173        self.order_local_activity_ns.insert(client_order_id, ts_now);
1174    }
1175
1176    /// Clears reconciliation tracking state for an order.
1177    pub fn clear_recon_tracking(&mut self, client_order_id: &ClientOrderId, drop_last_query: bool) {
1178        self.inflight_checks.shift_remove(client_order_id);
1179        self.recon_check_retries.shift_remove(client_order_id);
1180
1181        if drop_last_query {
1182            self.ts_last_query.shift_remove(client_order_id);
1183        }
1184        self.order_local_activity_ns.shift_remove(client_order_id);
1185    }
1186
1187    /// Claims external orders for a specific strategy and instrument.
1188    pub fn claim_external_orders(&mut self, instrument_id: InstrumentId, strategy_id: StrategyId) {
1189        self.external_order_claims
1190            .insert(instrument_id, strategy_id);
1191    }
1192
1193    /// Records position activity for reconciliation tracking.
1194    pub fn record_position_activity(&mut self, instrument_id: InstrumentId, ts_event: UnixNanos) {
1195        self.position_local_activity_ns
1196            .insert(instrument_id, ts_event);
1197    }
1198
1199    /// Observes an incoming execution report and updates tracking state.
1200    ///
1201    /// This should be called **before** the report is dispatched to the execution
1202    /// engine, so that the manager's state is current when periodic checks run.
1203    ///
1204    /// Updates performed per report variant:
1205    /// - `Order`: clears inflight tracking and records local activity
1206    /// - `Fill`: marks fill as processed, records order and position activity
1207    /// - `OrderWithFills`: clears inflight tracking, records local activity, records position activity per fill
1208    /// - `Position`: records position activity
1209    /// - `MassStatus`: no-op (handled separately via startup reconciliation)
1210    pub fn observe_execution_report(&mut self, report: &ExecutionReport) {
1211        match report {
1212            ExecutionReport::Order(order_report) => {
1213                if let Some(client_order_id) = &order_report.client_order_id {
1214                    // Only clear inflight tracking for non-pending states.
1215                    // Pending reports (PendingUpdate, PendingCancel) are interim
1216                    // acknowledgements; the order is still inflight until the
1217                    // venue confirms the final state.
1218                    if !matches!(
1219                        order_report.order_status,
1220                        OrderStatus::PendingUpdate | OrderStatus::PendingCancel
1221                    ) {
1222                        self.clear_recon_tracking(client_order_id, true);
1223                    }
1224                    self.record_local_activity(*client_order_id);
1225                }
1226            }
1227            ExecutionReport::Fill(fill_report) => {
1228                let client_order_id = fill_report.client_order_id.or_else(|| {
1229                    self.cache
1230                        .borrow()
1231                        .client_order_id(&fill_report.venue_order_id)
1232                        .copied()
1233                });
1234
1235                if let Some(coid) = client_order_id {
1236                    self.record_local_activity(coid);
1237                }
1238                self.record_position_activity(fill_report.instrument_id, fill_report.ts_event);
1239            }
1240            ExecutionReport::OrderWithFills(order_report, fills) => {
1241                if let Some(client_order_id) = &order_report.client_order_id
1242                    && !matches!(
1243                        order_report.order_status,
1244                        OrderStatus::PendingUpdate | OrderStatus::PendingCancel
1245                    )
1246                {
1247                    self.clear_recon_tracking(client_order_id, true);
1248                    self.record_local_activity(*client_order_id);
1249                }
1250
1251                for fill_report in fills {
1252                    self.record_position_activity(fill_report.instrument_id, fill_report.ts_event);
1253                }
1254            }
1255            ExecutionReport::Position(position_report) => {
1256                self.record_position_activity(
1257                    position_report.instrument_id,
1258                    position_report.ts_last,
1259                );
1260            }
1261            ExecutionReport::MassStatus(_) => {
1262                // Handled separately via reconcile_execution_mass_status
1263            }
1264        }
1265    }
1266
1267    /// Checks if a fill has been recently processed (for deduplication).
1268    pub fn is_fill_recently_processed(&self, trade_id: &TradeId) -> bool {
1269        self.recent_fills_cache.contains_key(trade_id)
1270    }
1271
1272    /// Marks a fill as recently processed with current timestamp.
1273    pub fn mark_fill_processed(&mut self, trade_id: TradeId) {
1274        let ts_now = self.clock.borrow().timestamp_ns();
1275        self.recent_fills_cache.insert(trade_id, ts_now);
1276    }
1277
1278    /// Prunes expired fills from the recent fills cache.
1279    ///
1280    /// Default TTL is 60 seconds.
1281    pub fn prune_recent_fills_cache(&mut self, ttl_secs: f64) {
1282        let ts_now = self.clock.borrow().timestamp_ns();
1283        let ttl_ns = (ttl_secs * NANOSECONDS_IN_SECOND as f64) as u64;
1284
1285        self.recent_fills_cache
1286            .retain(|_, &mut ts_cached| ts_now - ts_cached <= ttl_ns);
1287    }
1288
1289    /// Purges closed orders from the cache that are older than the configured buffer.
1290    pub fn purge_closed_orders(&mut self) {
1291        let Some(buffer_mins) = self.config.purge_closed_orders_buffer_mins else {
1292            return;
1293        };
1294
1295        let ts_now = self.clock.borrow().timestamp_ns();
1296        let buffer_secs = mins_to_secs(buffer_mins as u64);
1297
1298        self.cache
1299            .borrow_mut()
1300            .purge_closed_orders(ts_now, buffer_secs);
1301    }
1302
1303    /// Purges closed positions from the cache that are older than the configured buffer.
1304    pub fn purge_closed_positions(&mut self) {
1305        let Some(buffer_mins) = self.config.purge_closed_positions_buffer_mins else {
1306            return;
1307        };
1308
1309        let ts_now = self.clock.borrow().timestamp_ns();
1310        let buffer_secs = mins_to_secs(buffer_mins as u64);
1311
1312        self.cache
1313            .borrow_mut()
1314            .purge_closed_positions(ts_now, buffer_secs);
1315    }
1316
1317    /// Purges old account events from the cache based on the configured lookback.
1318    pub fn purge_account_events(&mut self) {
1319        let Some(lookback_mins) = self.config.purge_account_events_lookback_mins else {
1320            return;
1321        };
1322
1323        let ts_now = self.clock.borrow().timestamp_ns();
1324        let lookback_secs = mins_to_secs(lookback_mins as u64);
1325
1326        self.cache
1327            .borrow_mut()
1328            .purge_account_events(ts_now, lookback_secs);
1329    }
1330
1331    // Private helper methods
1332
1333    fn get_order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
1334        self.cache.borrow().order(client_order_id).cloned()
1335    }
1336
1337    fn get_order_by_venue_order_id(&self, venue_order_id: &VenueOrderId) -> Option<OrderAny> {
1338        let cache = self.cache.borrow();
1339        cache
1340            .client_order_id(venue_order_id)
1341            .and_then(|client_order_id| cache.order(client_order_id).cloned())
1342    }
1343
1344    fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
1345        self.cache.borrow().instrument(instrument_id).cloned()
1346    }
1347
1348    fn should_skip_order_report(&self, report: &OrderStatusReport) -> bool {
1349        if let Some(client_order_id) = &report.client_order_id
1350            && self
1351                .config
1352                .filtered_client_order_ids
1353                .contains(client_order_id)
1354        {
1355            log::debug!(
1356                "Skipping order report {client_order_id}: in filtered_client_order_ids list"
1357            );
1358            return true;
1359        }
1360
1361        if !self.should_reconcile_instrument(&report.instrument_id) {
1362            log::debug!(
1363                "Skipping order report for {}: not in reconciliation_instrument_ids",
1364                report.instrument_id
1365            );
1366            return true;
1367        }
1368
1369        false
1370    }
1371
1372    fn should_reconcile_instrument(&self, instrument_id: &InstrumentId) -> bool {
1373        self.config.reconciliation_instrument_ids.is_empty()
1374            || self
1375                .config
1376                .reconciliation_instrument_ids
1377                .contains(instrument_id)
1378    }
1379
1380    fn handle_missing_order(&mut self, client_order_id: ClientOrderId) -> Vec<OrderEventAny> {
1381        let mut events = Vec::new();
1382
1383        let Some(order) = self.get_order(&client_order_id) else {
1384            return events;
1385        };
1386
1387        let ts_now = self.clock.borrow().timestamp_ns();
1388        let ts_last = order.ts_last();
1389
1390        // Check if order is too recent
1391        if (ts_now - ts_last) < self.config.open_check_threshold_ns {
1392            return events;
1393        }
1394
1395        // Check local activity threshold
1396        if let Some(&last_activity) = self.order_local_activity_ns.get(&client_order_id)
1397            && (ts_now - last_activity) < self.config.open_check_threshold_ns
1398        {
1399            return events;
1400        }
1401
1402        // Increment retry counter
1403        let retries = self.recon_check_retries.entry(client_order_id).or_insert(0);
1404        *retries += 1;
1405
1406        // If max retries exceeded, generate rejection event
1407        if *retries >= self.config.open_check_missing_retries {
1408            log::warn!(
1409                "Order {client_order_id} not found at venue after {retries} retries, marking as REJECTED"
1410            );
1411
1412            let ts_now = self.clock.borrow().timestamp_ns();
1413
1414            if let Some(rejected) =
1415                create_reconciliation_rejected(&order, Some("NOT_FOUND_AT_VENUE"), ts_now)
1416            {
1417                events.push(rejected);
1418            }
1419
1420            self.clear_recon_tracking(&client_order_id, true);
1421        } else {
1422            log::debug!(
1423                "Order {} not found at venue, retry {}/{}",
1424                client_order_id,
1425                retries,
1426                self.config.open_check_missing_retries
1427            );
1428        }
1429
1430        events
1431    }
1432
1433    fn check_position_discrepancy(
1434        &mut self,
1435        position: &Position,
1436        venue_report: Option<&PositionStatusReport>,
1437    ) -> Option<Vec<OrderEventAny>> {
1438        // Use signed quantities to detect both magnitude and side discrepancies
1439        let cached_signed_qty = position.signed_decimal_qty();
1440        let venue_signed_qty = venue_report.map_or(Decimal::ZERO, |r| r.signed_decimal_qty);
1441
1442        let tolerance = Decimal::from_str("0.00000001").unwrap();
1443        if (cached_signed_qty - venue_signed_qty).abs() <= tolerance {
1444            self.position_recon_retries
1445                .shift_remove(&position.instrument_id);
1446            return None; // No discrepancy
1447        }
1448
1449        // Check activity threshold
1450        let ts_now = self.clock.borrow().timestamp_ns();
1451
1452        if let Some(&last_activity) = self.position_local_activity_ns.get(&position.instrument_id)
1453            && (ts_now - last_activity) < self.config.position_check_threshold_ns
1454        {
1455            log::debug!(
1456                "Skipping position reconciliation for {}: recent activity within threshold",
1457                position.instrument_id
1458            );
1459            return None;
1460        }
1461
1462        let retries = *self
1463            .position_recon_retries
1464            .get(&position.instrument_id)
1465            .unwrap_or(&0);
1466
1467        if retries >= self.config.position_check_retries {
1468            return None;
1469        }
1470
1471        log::warn!(
1472            "Position discrepancy detected for {}: cached_signed_qty={}, venue_signed_qty={}",
1473            position.instrument_id,
1474            cached_signed_qty,
1475            venue_signed_qty
1476        );
1477
1478        let account_id = position.account_id;
1479        let instrument_id = position.instrument_id;
1480
1481        let Some(instrument) = self.cache.borrow().instrument(&instrument_id).cloned() else {
1482            log::debug!("Cannot reconcile position for {instrument_id}: instrument not in cache");
1483            let new_retries = retries + 1;
1484            self.position_recon_retries
1485                .insert(instrument_id, new_retries);
1486            if new_retries >= self.config.position_check_retries {
1487                log::error!(
1488                    "Position discrepancy for {instrument_id} unresolved after {} attempts \
1489                     (cached_qty={cached_signed_qty}, venue_qty={venue_signed_qty}); \
1490                     no further reconciliation attempts will be made",
1491                    self.config.position_check_retries,
1492                );
1493            }
1494            return None;
1495        };
1496
1497        let cached_avg_px = if position.avg_px_open > 0.0 {
1498            Decimal::from_str(&position.avg_px_open.to_string()).ok()
1499        } else {
1500            None
1501        };
1502        let venue_avg_px = venue_report.and_then(|r| r.avg_px_open);
1503
1504        // Check if position crosses zero (flips side)
1505        let crosses_zero = (cached_signed_qty > Decimal::ZERO && venue_signed_qty < Decimal::ZERO)
1506            || (cached_signed_qty < Decimal::ZERO && venue_signed_qty > Decimal::ZERO);
1507
1508        let result = if crosses_zero {
1509            // Split into two fills: close existing position, then open new position
1510            let venue_ts_last = venue_report.map_or(ts_now, |r| r.ts_last);
1511            self.reconcile_cross_zero_position(
1512                &instrument,
1513                account_id,
1514                instrument_id,
1515                cached_signed_qty,
1516                cached_avg_px,
1517                venue_signed_qty,
1518                venue_avg_px,
1519                ts_now,
1520                venue_ts_last,
1521            )
1522        } else {
1523            let qty_diff = venue_signed_qty - cached_signed_qty;
1524            let order_side = if qty_diff > Decimal::ZERO {
1525                OrderSide::Buy
1526            } else {
1527                OrderSide::Sell
1528            };
1529
1530            let reconciliation_px = calculate_reconciliation_price(
1531                cached_signed_qty,
1532                cached_avg_px,
1533                venue_signed_qty,
1534                venue_avg_px,
1535            );
1536
1537            match reconciliation_px.or(venue_avg_px).or(cached_avg_px) {
1538                Some(fill_px) => {
1539                    let fill_qty = qty_diff.abs();
1540                    let venue_position_id = venue_report.and_then(|r| r.venue_position_id);
1541                    let venue_ts_last = venue_report.map_or(ts_now, |r| r.ts_last);
1542
1543                    Quantity::from_decimal_dp(fill_qty, instrument.size_precision())
1544                        .ok()
1545                        .and_then(|order_qty| {
1546                            let fill_price =
1547                                Price::from_decimal_dp(fill_px, instrument.price_precision()).ok();
1548                            let venue_order_id = create_position_reconciliation_venue_order_id(
1549                                account_id,
1550                                instrument_id,
1551                                order_side,
1552                                OrderType::Market,
1553                                order_qty,
1554                                fill_price,
1555                                venue_position_id,
1556                                None,
1557                                venue_ts_last,
1558                            );
1559
1560                            OrderStatusReport::new(
1561                                account_id,
1562                                instrument_id,
1563                                None,
1564                                venue_order_id,
1565                                order_side,
1566                                OrderType::Market,
1567                                TimeInForce::Gtc,
1568                                OrderStatus::Filled,
1569                                order_qty,
1570                                order_qty,
1571                                ts_now,
1572                                ts_now,
1573                                ts_now,
1574                                None,
1575                            )
1576                            .with_avg_px(fill_px.to_f64().unwrap_or(0.0))
1577                            .ok()
1578                        })
1579                        .map(|order_report| {
1580                            log::info!(
1581                                color = LogColor::Blue as u8;
1582                                "Generating synthetic fill for position reconciliation {instrument_id}: side={order_side:?}, qty={}, px={fill_px}", qty_diff.abs(),
1583                            );
1584
1585                            let (events, _) = self.handle_external_order(
1586                                &order_report,
1587                                &account_id,
1588                                &instrument,
1589                                &[],
1590                                true,
1591                            );
1592                            events
1593                        })
1594                }
1595                None => None,
1596            }
1597        };
1598
1599        // Track retries when reconciliation didn't produce events
1600        if result.is_none() || result.as_ref().is_some_and(|e| e.is_empty()) {
1601            let new_retries = retries + 1;
1602            self.position_recon_retries
1603                .insert(instrument_id, new_retries);
1604            if new_retries >= self.config.position_check_retries {
1605                log::error!(
1606                    "Position discrepancy for {} unresolved after {} attempts \
1607                     (cached_qty={}, venue_qty={}); \
1608                     no further reconciliation attempts will be made",
1609                    instrument_id,
1610                    self.config.position_check_retries,
1611                    cached_signed_qty,
1612                    venue_signed_qty,
1613                );
1614            }
1615        } else {
1616            self.position_recon_retries.shift_remove(&instrument_id);
1617        }
1618
1619        result
1620    }
1621
1622    /// Handles position reconciliation when position flips sign, splitting into two
1623    /// fills: close existing position then open new position in opposite direction.
1624    #[expect(clippy::too_many_arguments)]
1625    fn reconcile_cross_zero_position(
1626        &mut self,
1627        instrument: &InstrumentAny,
1628        account_id: AccountId,
1629        instrument_id: InstrumentId,
1630        cached_signed_qty: Decimal,
1631        cached_avg_px: Option<Decimal>,
1632        venue_signed_qty: Decimal,
1633        venue_avg_px: Option<Decimal>,
1634        ts_now: UnixNanos,
1635        venue_ts_last: UnixNanos,
1636    ) -> Option<Vec<OrderEventAny>> {
1637        log::info!(
1638            color = LogColor::Blue as u8;
1639            "Position crosses zero for {instrument_id}: cached={cached_signed_qty}, venue={venue_signed_qty}. Splitting into two fills",
1640        );
1641
1642        let mut all_events = Vec::new();
1643
1644        // Close existing position first
1645        let close_qty = cached_signed_qty.abs();
1646        let close_side = if cached_signed_qty < Decimal::ZERO {
1647            OrderSide::Buy // Close short by buying
1648        } else {
1649            OrderSide::Sell // Close long by selling
1650        };
1651
1652        if let Some(close_px) = cached_avg_px {
1653            let close_order_qty =
1654                Quantity::from_decimal_dp(close_qty, instrument.size_precision()).ok()?;
1655            let close_fill_price =
1656                Price::from_decimal_dp(close_px, instrument.price_precision()).ok();
1657            let close_venue_order_id = create_position_reconciliation_venue_order_id(
1658                account_id,
1659                instrument_id,
1660                close_side,
1661                OrderType::Market,
1662                close_order_qty,
1663                close_fill_price,
1664                None,
1665                Some("CLOSE"),
1666                venue_ts_last,
1667            );
1668
1669            let close_report = OrderStatusReport::new(
1670                account_id,
1671                instrument_id,
1672                None,
1673                close_venue_order_id,
1674                close_side,
1675                OrderType::Market,
1676                TimeInForce::Gtc,
1677                OrderStatus::Filled,
1678                close_order_qty,
1679                close_order_qty,
1680                ts_now,
1681                ts_now,
1682                ts_now,
1683                None,
1684            )
1685            .with_avg_px(close_px.to_f64().unwrap_or(0.0))
1686            .ok()?;
1687
1688            log::info!(
1689                color = LogColor::Blue as u8;
1690                "Generating close fill for cross-zero {instrument_id}: side={close_side:?}, qty={close_qty}, px={close_px}",
1691            );
1692
1693            let (close_events, _) =
1694                self.handle_external_order(&close_report, &account_id, instrument, &[], true);
1695            all_events.extend(close_events);
1696        } else {
1697            log::warn!("Cannot close position for {instrument_id}: no cached average price");
1698            return None;
1699        }
1700
1701        // Then open new position in opposite direction
1702        let open_qty = venue_signed_qty.abs();
1703        let open_side = if venue_signed_qty > Decimal::ZERO {
1704            OrderSide::Buy // Open long
1705        } else {
1706            OrderSide::Sell // Open short
1707        };
1708
1709        if let Some(open_px) = venue_avg_px {
1710            let open_order_qty =
1711                Quantity::from_decimal_dp(open_qty, instrument.size_precision()).ok()?;
1712            let open_fill_price =
1713                Price::from_decimal_dp(open_px, instrument.price_precision()).ok();
1714            let open_venue_order_id = create_position_reconciliation_venue_order_id(
1715                account_id,
1716                instrument_id,
1717                open_side,
1718                OrderType::Market,
1719                open_order_qty,
1720                open_fill_price,
1721                None,
1722                Some("OPEN"),
1723                venue_ts_last,
1724            );
1725
1726            let open_report = OrderStatusReport::new(
1727                account_id,
1728                instrument_id,
1729                None,
1730                open_venue_order_id,
1731                open_side,
1732                OrderType::Market,
1733                TimeInForce::Gtc,
1734                OrderStatus::Filled,
1735                open_order_qty,
1736                open_order_qty,
1737                ts_now,
1738                ts_now,
1739                ts_now,
1740                None,
1741            )
1742            .with_avg_px(open_px.to_f64().unwrap_or(0.0))
1743            .ok()?;
1744
1745            log::info!(
1746                color = LogColor::Blue as u8;
1747                "Generating open fill for cross-zero {instrument_id}: side={open_side:?}, qty={open_qty}, px={open_px}",
1748            );
1749
1750            let (open_events, _) =
1751                self.handle_external_order(&open_report, &account_id, instrument, &[], true);
1752            all_events.extend(open_events);
1753        } else {
1754            log::warn!("Cannot open new position for {instrument_id}: no venue average price");
1755            return Some(all_events);
1756        }
1757
1758        Some(all_events)
1759    }
1760
1761    /// Creates a position from a venue position report when no orders/fills exist.
1762    ///
1763    /// This handles the case where the venue reports an open position but there are
1764    /// no order or fill reports to create it from (e.g., orders are already closed).
1765    fn create_position_from_report(
1766        &mut self,
1767        report: &PositionStatusReport,
1768        account_id: &AccountId,
1769        instrument: &InstrumentAny,
1770    ) -> Option<Vec<OrderEventAny>> {
1771        let instrument_id = report.instrument_id;
1772        let venue_signed_qty = report.signed_decimal_qty;
1773
1774        if venue_signed_qty == Decimal::ZERO {
1775            return None;
1776        }
1777
1778        let order_side = if venue_signed_qty > Decimal::ZERO {
1779            OrderSide::Buy
1780        } else {
1781            OrderSide::Sell
1782        };
1783
1784        let qty_abs = venue_signed_qty.abs();
1785        let venue_avg_px = report.avg_px_open?;
1786
1787        let ts_now = self.clock.borrow().timestamp_ns();
1788        let order_qty = Quantity::from_decimal_dp(qty_abs, instrument.size_precision()).ok()?;
1789        let fill_price = Price::from_decimal_dp(venue_avg_px, instrument.price_precision()).ok();
1790        let venue_order_id = create_position_reconciliation_venue_order_id(
1791            *account_id,
1792            instrument_id,
1793            order_side,
1794            OrderType::Market,
1795            order_qty,
1796            fill_price,
1797            report.venue_position_id,
1798            None,
1799            report.ts_last,
1800        );
1801
1802        let mut order_report = OrderStatusReport::new(
1803            *account_id,
1804            instrument_id,
1805            None,
1806            venue_order_id,
1807            order_side,
1808            OrderType::Market,
1809            TimeInForce::Gtc,
1810            OrderStatus::Filled,
1811            order_qty,
1812            order_qty,
1813            ts_now,
1814            ts_now,
1815            ts_now,
1816            None,
1817        )
1818        .with_avg_px(venue_avg_px.to_f64().unwrap_or(0.0))
1819        .ok()?;
1820
1821        // Preserve venue_position_id for hedging mode
1822        if let Some(venue_position_id) = report.venue_position_id {
1823            order_report = order_report.with_venue_position_id(venue_position_id);
1824        }
1825
1826        log::info!(
1827            color = LogColor::Blue as u8;
1828            "Creating position from venue report for {instrument_id}: side={order_side:?}, qty={qty_abs}, avg_px={venue_avg_px}",
1829        );
1830
1831        let (events, _) =
1832            self.handle_external_order(&order_report, account_id, instrument, &[], true);
1833        Some(events)
1834    }
1835
1836    fn reconcile_position_report(
1837        &mut self,
1838        report: &PositionStatusReport,
1839        account_id: &AccountId,
1840        instruments_with_unattributed_fills: &IndexSet<InstrumentId>,
1841        positions_with_fills: &IndexSet<PositionId>,
1842    ) -> Option<Vec<OrderEventAny>> {
1843        if report.venue_position_id.is_some() {
1844            self.reconcile_position_report_hedging(
1845                report,
1846                account_id,
1847                instruments_with_unattributed_fills,
1848                positions_with_fills,
1849            )
1850        } else {
1851            self.reconcile_position_report_netting(report, account_id)
1852        }
1853    }
1854
1855    fn reconcile_position_report_hedging(
1856        &mut self,
1857        report: &PositionStatusReport,
1858        account_id: &AccountId,
1859        instruments_with_unattributed_fills: &IndexSet<InstrumentId>,
1860        positions_with_fills: &IndexSet<PositionId>,
1861    ) -> Option<Vec<OrderEventAny>> {
1862        let venue_position_id = report.venue_position_id?;
1863
1864        // Skip if batch already has fills for this position (will be created from fills)
1865        if positions_with_fills.contains(&venue_position_id) {
1866            log::debug!(
1867                "Skipping hedge position {venue_position_id} reconciliation: fills already in batch"
1868            );
1869            return None;
1870        }
1871
1872        // Skip if fills exist for this instrument but lack venue_position_id
1873        // (can't determine which hedge position they belong to)
1874        if instruments_with_unattributed_fills.contains(&report.instrument_id) {
1875            log::debug!(
1876                "Skipping hedge position {venue_position_id} reconciliation: unattributed fills in batch"
1877            );
1878            return None;
1879        }
1880
1881        log::debug!(
1882            "Reconciling HEDGE position for {}, venue_position_id={}",
1883            report.instrument_id,
1884            venue_position_id
1885        );
1886
1887        let position = {
1888            let cache = self.cache.borrow();
1889            cache.position(&venue_position_id).cloned()
1890        };
1891
1892        match position {
1893            Some(position) => {
1894                let cached_signed_qty = position.signed_decimal_qty();
1895                let venue_signed_qty = report.signed_decimal_qty;
1896
1897                if cached_signed_qty == venue_signed_qty {
1898                    log::debug!(
1899                        "Hedge position {venue_position_id} matches venue: qty={cached_signed_qty}"
1900                    );
1901                    return None;
1902                }
1903
1904                if venue_signed_qty == Decimal::ZERO && cached_signed_qty == Decimal::ZERO {
1905                    return None;
1906                }
1907
1908                if !self.config.generate_missing_orders {
1909                    log::error!(
1910                        "Cannot reconcile {} {}: position net qty {} != reported net qty {} \
1911                         and `generate_missing_orders` is disabled",
1912                        report.instrument_id,
1913                        venue_position_id,
1914                        cached_signed_qty,
1915                        venue_signed_qty
1916                    );
1917                    return None;
1918                }
1919
1920                self.reconcile_hedge_position_discrepancy(
1921                    report,
1922                    account_id,
1923                    &position,
1924                    cached_signed_qty,
1925                )
1926            }
1927            None => {
1928                if report.signed_decimal_qty == Decimal::ZERO {
1929                    return None;
1930                }
1931
1932                if !self.config.generate_missing_orders {
1933                    log::error!(
1934                        "Cannot reconcile position: {venue_position_id} not found and `generate_missing_orders` is disabled"
1935                    );
1936                    return None;
1937                }
1938
1939                self.reconcile_missing_hedge_position(report, account_id)
1940            }
1941        }
1942    }
1943
1944    fn reconcile_hedge_position_discrepancy(
1945        &mut self,
1946        report: &PositionStatusReport,
1947        account_id: &AccountId,
1948        position: &Position,
1949        cached_signed_qty: Decimal,
1950    ) -> Option<Vec<OrderEventAny>> {
1951        let instrument = self.get_instrument(&report.instrument_id)?;
1952        let venue_signed_qty = report.signed_decimal_qty;
1953
1954        let diff = (cached_signed_qty - venue_signed_qty).abs();
1955        let diff_qty = Quantity::from_decimal_dp(diff, instrument.size_precision()).ok()?;
1956
1957        if diff_qty.is_zero() {
1958            log::debug!(
1959                "Difference quantity rounds to zero for {}, skipping",
1960                instrument.id()
1961            );
1962            return None;
1963        }
1964
1965        let venue_position_id = report.venue_position_id?;
1966        log::warn!(
1967            "Hedge position discrepancy for {} {}: cached={}, venue={}, generating reconciliation order",
1968            report.instrument_id,
1969            venue_position_id,
1970            cached_signed_qty,
1971            venue_signed_qty
1972        );
1973
1974        let current_avg_px = if position.avg_px_open > 0.0 {
1975            Decimal::from_str(&position.avg_px_open.to_string()).ok()
1976        } else {
1977            None
1978        };
1979
1980        self.create_position_reconciliation_order(
1981            report,
1982            account_id,
1983            &instrument,
1984            cached_signed_qty,
1985            diff_qty,
1986            current_avg_px,
1987        )
1988    }
1989
1990    fn reconcile_missing_hedge_position(
1991        &mut self,
1992        report: &PositionStatusReport,
1993        account_id: &AccountId,
1994    ) -> Option<Vec<OrderEventAny>> {
1995        let instrument = self.get_instrument(&report.instrument_id)?;
1996        let venue_signed_qty = report.signed_decimal_qty;
1997
1998        let qty = venue_signed_qty.abs();
1999        let diff_qty = Quantity::from_decimal_dp(qty, instrument.size_precision()).ok()?;
2000
2001        if diff_qty.is_zero() {
2002            return None;
2003        }
2004
2005        let venue_position_id = report.venue_position_id?;
2006        log::warn!(
2007            "Missing hedge position for {} {}: venue reports {}, generating reconciliation order",
2008            report.instrument_id,
2009            venue_position_id,
2010            venue_signed_qty
2011        );
2012
2013        self.create_position_reconciliation_order(
2014            report,
2015            account_id,
2016            &instrument,
2017            Decimal::ZERO,
2018            diff_qty,
2019            None,
2020        )
2021    }
2022
2023    fn reconcile_position_report_netting(
2024        &mut self,
2025        report: &PositionStatusReport,
2026        account_id: &AccountId,
2027    ) -> Option<Vec<OrderEventAny>> {
2028        let instrument_id = report.instrument_id;
2029
2030        log::debug!("Reconciling NET position for {instrument_id}");
2031
2032        let instrument = self.get_instrument(&instrument_id)?;
2033
2034        let (cached_signed_qty, cached_avg_px) = {
2035            let cache = self.cache.borrow();
2036            let positions =
2037                cache.positions_open(None, Some(&instrument_id), None, Some(account_id), None);
2038
2039            if positions.is_empty() {
2040                (Decimal::ZERO, None)
2041            } else {
2042                let mut total_signed_qty = Decimal::ZERO;
2043                let mut total_value = Decimal::ZERO;
2044                let mut total_qty = Decimal::ZERO;
2045
2046                for pos in positions {
2047                    total_signed_qty += pos.signed_decimal_qty();
2048                    let qty = pos.signed_decimal_qty().abs();
2049                    if pos.avg_px_open > 0.0
2050                        && qty > Decimal::ZERO
2051                        && let Ok(avg_px) = Decimal::from_str(&pos.avg_px_open.to_string())
2052                    {
2053                        total_value += avg_px * qty;
2054                        total_qty += qty;
2055                    }
2056                }
2057
2058                let avg_px = if total_qty > Decimal::ZERO {
2059                    Some(total_value / total_qty)
2060                } else {
2061                    None
2062                };
2063
2064                (total_signed_qty, avg_px)
2065            }
2066        };
2067
2068        let venue_signed_qty = report.signed_decimal_qty;
2069
2070        log::debug!("venue_signed_qty={venue_signed_qty}, cached_signed_qty={cached_signed_qty}");
2071
2072        let tolerance = Decimal::from_str("0.00000001").unwrap_or(Decimal::ZERO);
2073        if (cached_signed_qty - venue_signed_qty).abs() <= tolerance {
2074            log::debug!("Position quantities match for {instrument_id}, no reconciliation needed");
2075            return None;
2076        }
2077
2078        if !self.config.generate_missing_orders {
2079            log::warn!(
2080                "Discrepancy for {instrument_id} position when `generate_missing_orders` disabled, skipping"
2081            );
2082            return None;
2083        }
2084
2085        let diff = (cached_signed_qty - venue_signed_qty).abs();
2086        let diff_qty = Quantity::from_decimal_dp(diff, instrument.size_precision()).ok()?;
2087
2088        if diff_qty.is_zero() {
2089            log::debug!(
2090                "Difference quantity rounds to zero for {instrument_id}, skipping order generation"
2091            );
2092            return None;
2093        }
2094
2095        let crosses_zero = cached_signed_qty != Decimal::ZERO
2096            && venue_signed_qty != Decimal::ZERO
2097            && ((cached_signed_qty > Decimal::ZERO && venue_signed_qty < Decimal::ZERO)
2098                || (cached_signed_qty < Decimal::ZERO && venue_signed_qty > Decimal::ZERO));
2099
2100        if crosses_zero {
2101            let ts_now = self.clock.borrow().timestamp_ns();
2102            return self.reconcile_cross_zero_position(
2103                &instrument,
2104                *account_id,
2105                instrument_id,
2106                cached_signed_qty,
2107                cached_avg_px,
2108                venue_signed_qty,
2109                report.avg_px_open,
2110                ts_now,
2111                report.ts_last,
2112            );
2113        }
2114
2115        if cached_signed_qty == Decimal::ZERO {
2116            return self.create_position_from_report(report, account_id, &instrument);
2117        }
2118
2119        self.create_position_reconciliation_order(
2120            report,
2121            account_id,
2122            &instrument,
2123            cached_signed_qty,
2124            diff_qty,
2125            cached_avg_px,
2126        )
2127    }
2128
2129    fn create_position_reconciliation_order(
2130        &mut self,
2131        report: &PositionStatusReport,
2132        account_id: &AccountId,
2133        instrument: &InstrumentAny,
2134        cached_signed_qty: Decimal,
2135        diff_qty: Quantity,
2136        current_avg_px: Option<Decimal>,
2137    ) -> Option<Vec<OrderEventAny>> {
2138        let venue_signed_qty = report.signed_decimal_qty;
2139        let instrument_id = report.instrument_id;
2140
2141        let order_side = if venue_signed_qty > cached_signed_qty {
2142            OrderSide::Buy
2143        } else {
2144            OrderSide::Sell
2145        };
2146
2147        let reconciliation_px = calculate_reconciliation_price(
2148            cached_signed_qty,
2149            current_avg_px,
2150            venue_signed_qty,
2151            report.avg_px_open,
2152        );
2153
2154        let fill_px = reconciliation_px
2155            .or(report.avg_px_open)
2156            .or(current_avg_px)?;
2157
2158        let ts_now = self.clock.borrow().timestamp_ns();
2159        let fill_price = Price::from_decimal_dp(fill_px, instrument.price_precision()).ok();
2160        let venue_order_id = create_position_reconciliation_venue_order_id(
2161            *account_id,
2162            instrument_id,
2163            order_side,
2164            OrderType::Market,
2165            diff_qty,
2166            fill_price,
2167            report.venue_position_id,
2168            None,
2169            report.ts_last,
2170        );
2171
2172        let mut order_report = OrderStatusReport::new(
2173            *account_id,
2174            instrument_id,
2175            None,
2176            venue_order_id,
2177            order_side,
2178            OrderType::Market,
2179            TimeInForce::Gtc,
2180            OrderStatus::Filled,
2181            diff_qty,
2182            diff_qty,
2183            ts_now,
2184            ts_now,
2185            ts_now,
2186            None,
2187        )
2188        .with_avg_px(fill_px.to_f64().unwrap_or(0.0))
2189        .ok()?;
2190
2191        if let Some(venue_position_id) = report.venue_position_id {
2192            order_report = order_report.with_venue_position_id(venue_position_id);
2193        }
2194
2195        log::info!(
2196            color = LogColor::Blue as u8;
2197            "Generating reconciliation order for {instrument_id}: side={order_side:?}, qty={diff_qty}, px={fill_px}",
2198        );
2199
2200        let (events, _) =
2201            self.handle_external_order(&order_report, account_id, instrument, &[], true);
2202        Some(events)
2203    }
2204
2205    fn reconcile_order_report(
2206        &self,
2207        order: &OrderAny,
2208        report: &OrderStatusReport,
2209        instrument: Option<&InstrumentAny>,
2210    ) -> Option<OrderEventAny> {
2211        let ts_now = self.clock.borrow().timestamp_ns();
2212        reconcile_order_report(order, report, instrument, ts_now)
2213    }
2214
2215    /// Reconciles an order with its associated fills atomically.
2216    ///
2217    /// For terminal statuses (Canceled), fills are applied BEFORE the terminal event
2218    /// to ensure correct state transitions (matching Python behavior).
2219    fn reconcile_order_with_fills(
2220        &mut self,
2221        order: &OrderAny,
2222        report: &OrderStatusReport,
2223        fills: &[&FillReport],
2224        instrument: Option<&InstrumentAny>,
2225    ) -> Vec<OrderEventAny> {
2226        let mut events = Vec::new();
2227        let mut sorted_fills: Vec<&FillReport> = fills.to_vec();
2228        sorted_fills.sort_by_key(|f| f.ts_event);
2229
2230        let ts_now = self.clock.borrow().timestamp_ns();
2231
2232        match report.order_status {
2233            OrderStatus::Canceled => {
2234                if report.ts_triggered.is_some()
2235                    && order.status() != OrderStatus::Triggered
2236                    && TRIGGERABLE_ORDER_TYPES.contains(&order.order_type())
2237                {
2238                    events.push(create_reconciliation_triggered(order, report, ts_now));
2239                }
2240
2241                // Apply fills before Canceled event regardless of current state,
2242                // as the order may have partial fills we haven't seen yet
2243                if let Some(inst) = instrument {
2244                    for fill in &sorted_fills {
2245                        if let Some(event) = self.create_order_fill(order, fill, inst) {
2246                            events.push(event);
2247                        }
2248                    }
2249                }
2250
2251                if let Some(event) = self.reconcile_order_report(order, report, instrument) {
2252                    events.push(event);
2253                }
2254            }
2255            OrderStatus::Expired => {
2256                if report.ts_triggered.is_some()
2257                    && order.status() != OrderStatus::Triggered
2258                    && TRIGGERABLE_ORDER_TYPES.contains(&order.order_type())
2259                {
2260                    events.push(create_reconciliation_triggered(order, report, ts_now));
2261                }
2262
2263                // Apply fills before Expired event (same as Canceled)
2264                if let Some(inst) = instrument {
2265                    for fill in &sorted_fills {
2266                        if let Some(event) = self.create_order_fill(order, fill, inst) {
2267                            events.push(event);
2268                        }
2269                    }
2270                }
2271
2272                if let Some(event) = self.reconcile_order_report(order, report, instrument) {
2273                    events.push(event);
2274                }
2275            }
2276            _ => {
2277                if let Some(event) = self.reconcile_order_report(order, report, instrument) {
2278                    events.push(event);
2279                }
2280
2281                if let Some(inst) = instrument {
2282                    for fill in &sorted_fills {
2283                        if let Some(event) = self.create_order_fill(order, fill, inst) {
2284                            events.push(event);
2285                        }
2286                    }
2287                }
2288            }
2289        }
2290
2291        events
2292    }
2293
2294    fn handle_external_order(
2295        &mut self,
2296        report: &OrderStatusReport,
2297        account_id: &AccountId,
2298        instrument: &InstrumentAny,
2299        fills: &[&FillReport],
2300        is_synthetic: bool,
2301    ) -> (Vec<OrderEventAny>, Option<ExternalOrderMetadata>) {
2302        let (strategy_id, tags) =
2303            if let Some(claimed_strategy) = self.external_order_claims.get(&report.instrument_id) {
2304                let order_id = report
2305                    .client_order_id
2306                    .map_or_else(|| report.venue_order_id.to_string(), |id| id.to_string());
2307                log::info!(
2308                    color = LogColor::Blue as u8;
2309                    "External order {} for {} claimed by strategy {}",
2310                    order_id,
2311                    report.instrument_id,
2312                    claimed_strategy,
2313                );
2314                (*claimed_strategy, None)
2315            } else {
2316                // Unclaimed orders use EXTERNAL strategy ID with tag distinguishing source
2317                let tag = if is_synthetic {
2318                    *TAG_RECONCILIATION
2319                } else {
2320                    *TAG_VENUE
2321                };
2322                (StrategyId::from("EXTERNAL"), Some(vec![tag]))
2323            };
2324
2325        // Filter unclaimed venue orders (but not synthetic reconciliation orders)
2326        if self.config.filter_unclaimed_external && !is_synthetic {
2327            return (Vec::new(), None);
2328        }
2329
2330        let client_order_id = report
2331            .client_order_id
2332            .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
2333
2334        if !report.quantity.is_positive() {
2335            log::error!(
2336                "Skipping external order {} ({}) for {}: non-positive quantity in report {:?}",
2337                client_order_id,
2338                report.venue_order_id,
2339                report.instrument_id,
2340                report,
2341            );
2342            return (Vec::new(), None);
2343        }
2344
2345        let ts_now = self.clock.borrow().timestamp_ns();
2346
2347        let initialized = OrderInitialized::new(
2348            self.config.trader_id,
2349            strategy_id,
2350            report.instrument_id,
2351            client_order_id,
2352            report.order_side,
2353            report.order_type,
2354            report.quantity,
2355            report.time_in_force,
2356            report.post_only,
2357            report.reduce_only,
2358            false, // quote_quantity
2359            true,  // reconciliation
2360            UUID4::new(),
2361            ts_now,
2362            ts_now,
2363            report.price,
2364            report.trigger_price,
2365            report.trigger_type,
2366            report.limit_offset,
2367            report.trailing_offset,
2368            Some(report.trailing_offset_type),
2369            report.expire_time,
2370            report.display_qty,
2371            None, // emulation_trigger
2372            None, // trigger_instrument_id
2373            Some(report.contingency_type),
2374            report.order_list_id,
2375            report.linked_order_ids.clone(),
2376            report.parent_order_id,
2377            None, // exec_algorithm_id
2378            None, // exec_algorithm_params
2379            None, // exec_spawn_id
2380            tags,
2381        );
2382
2383        let events = vec![OrderEventAny::Initialized(initialized)];
2384        let order = match OrderAny::from_events(events) {
2385            Ok(order) => order,
2386            Err(e) => {
2387                log::error!("Failed to create order from report: {e}");
2388                return (Vec::new(), None);
2389            }
2390        };
2391
2392        {
2393            let mut cache = self.cache.borrow_mut();
2394            if let Err(e) = cache.add_order(order.clone(), None, None, false) {
2395                // Deterministic synthetic reconciliation IDs hash the same logical event
2396                // to the same client_order_id, so a restart replay can legitimately collide
2397                // with a cached order. Differentiate expected dedup from stuck state.
2398                match cache.order(&client_order_id) {
2399                    Some(existing) if is_synthetic && existing.is_closed() => {
2400                        log::debug!(
2401                            "Skipping synthetic reconciliation order {client_order_id} for {}: \
2402                             replay deduped (cached status={:?})",
2403                            report.instrument_id,
2404                            existing.status(),
2405                        );
2406                    }
2407                    Some(existing) if is_synthetic => {
2408                        log::warn!(
2409                            "Synthetic reconciliation order {client_order_id} for {} exists in \
2410                             cache in non-terminal state {:?}; fill not regenerated",
2411                            report.instrument_id,
2412                            existing.status(),
2413                        );
2414                    }
2415                    _ => {
2416                        log::error!("Failed to add external order to cache: {e}");
2417                    }
2418                }
2419                return (Vec::new(), None);
2420            }
2421
2422            if let Err(e) =
2423                cache.add_venue_order_id(&client_order_id, &report.venue_order_id, false)
2424            {
2425                log::warn!("Failed to add venue order ID index: {e}");
2426            }
2427        }
2428
2429        log::info!(
2430            color = LogColor::Blue as u8;
2431            "Created external order {} ({}) for {} [{}]",
2432            client_order_id,
2433            report.venue_order_id,
2434            report.instrument_id,
2435            report.order_status,
2436        );
2437
2438        let ts_now = self.clock.borrow().timestamp_ns();
2439
2440        // Generate events for external order: Accepted first, then fills (for terminal statuses),
2441        // then terminal status. This matches Python's behavior.
2442        let mut order_events =
2443            generate_external_order_status_events(&order, report, account_id, instrument, ts_now);
2444
2445        if !fills.is_empty() {
2446            let cached_order = self.get_order(&client_order_id).unwrap();
2447            let mut sorted_fills: Vec<&FillReport> = fills.to_vec();
2448            sorted_fills.sort_by_key(|f| f.ts_event);
2449
2450            match report.order_status {
2451                OrderStatus::Canceled | OrderStatus::Expired => {
2452                    let terminal_event = order_events.pop();
2453
2454                    for fill in sorted_fills {
2455                        if let Some(fill_event) =
2456                            self.create_order_fill(&cached_order, fill, instrument)
2457                        {
2458                            order_events.push(fill_event);
2459                        }
2460                    }
2461
2462                    if let Some(event) = terminal_event {
2463                        order_events.push(event);
2464                    }
2465                }
2466                OrderStatus::Filled | OrderStatus::PartiallyFilled => {
2467                    // Only pop if the last event is a Filled event (the inferred fill)
2468                    if order_events
2469                        .last()
2470                        .is_some_and(|e| matches!(e, OrderEventAny::Filled(_)))
2471                    {
2472                        order_events.pop();
2473                    }
2474
2475                    let mut real_fill_total = Decimal::ZERO;
2476
2477                    for fill in &sorted_fills {
2478                        if let Some(fill_event) =
2479                            self.create_order_fill(&cached_order, fill, instrument)
2480                        {
2481                            real_fill_total += fill.last_qty.as_decimal();
2482                            order_events.push(fill_event);
2483                        }
2484                    }
2485
2486                    let report_filled = report.filled_qty.as_decimal();
2487                    if real_fill_total < report_filled {
2488                        let diff_decimal = report_filled - real_fill_total;
2489
2490                        if let Ok(diff) =
2491                            Quantity::from_decimal_dp(diff_decimal, instrument.size_precision())
2492                            && let Some(inferred_fill) = create_inferred_fill_for_qty(
2493                                &cached_order,
2494                                report,
2495                                account_id,
2496                                instrument,
2497                                diff,
2498                                ts_now,
2499                                None,
2500                            )
2501                        {
2502                            order_events.push(inferred_fill);
2503                        }
2504                    }
2505                }
2506                _ => {}
2507            }
2508        }
2509
2510        let metadata = ExternalOrderMetadata {
2511            client_order_id,
2512            venue_order_id: report.venue_order_id,
2513            instrument_id: report.instrument_id,
2514            strategy_id,
2515            ts_init: ts_now,
2516        };
2517
2518        (order_events, Some(metadata))
2519    }
2520
2521    /// Adjusts fills for instruments with incomplete first lifecycle (partial window).
2522    ///
2523    /// When historical fills don't fully explain the current position (e.g., lookback window
2524    /// started mid-position), this creates synthetic fills to align with the venue position.
2525    fn adjust_mass_status_fills(
2526        &self,
2527        mass_status: &ExecutionMassStatus,
2528    ) -> (
2529        IndexMap<VenueOrderId, OrderStatusReport>,
2530        IndexMap<VenueOrderId, Vec<FillReport>>,
2531    ) {
2532        let mut final_orders: IndexMap<VenueOrderId, OrderStatusReport> =
2533            mass_status.order_reports();
2534        let mut final_fills: IndexMap<VenueOrderId, Vec<FillReport>> = mass_status.fill_reports();
2535
2536        let mut instruments_to_adjust = Vec::new();
2537
2538        for (instrument_id, position_reports) in mass_status.position_reports() {
2539            if !self.should_reconcile_instrument(&instrument_id) {
2540                log::debug!(
2541                    "Skipping fill adjustment for {instrument_id}: not in reconciliation_instrument_ids"
2542                );
2543                continue;
2544            }
2545
2546            // Skip hedge mode instruments (have venue_position_id) as partial-window
2547            // adjustment assumes a single net position per instrument
2548            let is_hedge_mode = position_reports
2549                .iter()
2550                .any(|r| r.venue_position_id.is_some());
2551
2552            if is_hedge_mode {
2553                log::debug!(
2554                    "Skipping fill adjustment for {instrument_id}: hedge mode (has venue_position_id)"
2555                );
2556                continue;
2557            }
2558
2559            if let Some(instrument) = self.get_instrument(&instrument_id) {
2560                instruments_to_adjust.push(instrument);
2561            } else {
2562                log::debug!(
2563                    "Skipping fill adjustment for {instrument_id}: instrument not found in cache"
2564                );
2565            }
2566        }
2567
2568        if instruments_to_adjust.is_empty() {
2569            return (final_orders, final_fills);
2570        }
2571
2572        log_info!(
2573            "Adjusting fills for {} instrument(s) with position reports",
2574            instruments_to_adjust.len(),
2575            color = LogColor::Blue
2576        );
2577
2578        for instrument in &instruments_to_adjust {
2579            let instrument_id = instrument.id();
2580
2581            match process_mass_status_for_reconciliation(mass_status, instrument, None) {
2582                Ok(result) => {
2583                    final_orders.retain(|_, order| order.instrument_id != instrument_id);
2584                    final_fills.retain(|_, fills| {
2585                        fills
2586                            .first()
2587                            .is_none_or(|f| f.instrument_id != instrument_id)
2588                    });
2589
2590                    for (venue_order_id, order) in result.orders {
2591                        final_orders.insert(venue_order_id, order);
2592                    }
2593
2594                    for (venue_order_id, fills) in result.fills {
2595                        final_fills.insert(venue_order_id, fills);
2596                    }
2597                }
2598                Err(e) => {
2599                    log::warn!("Failed to adjust fills for {instrument_id}: {e}");
2600                }
2601            }
2602        }
2603
2604        log_info!(
2605            "After adjustment: {} order(s), {} fill group(s)",
2606            final_orders.len(),
2607            final_fills.len(),
2608            color = LogColor::Blue
2609        );
2610
2611        (final_orders, final_fills)
2612    }
2613
2614    /// Deduplicates order reports, keeping the most advanced state per venue_order_id.
2615    ///
2616    /// When a batch contains multiple reports for the same order, we keep the one with
2617    /// the highest filled_qty (most progress), or if equal, the most terminal status.
2618    fn deduplicate_order_reports<'a>(
2619        &self,
2620        reports: impl Iterator<Item = &'a OrderStatusReport>,
2621    ) -> IndexMap<VenueOrderId, &'a OrderStatusReport> {
2622        let mut best_reports: IndexMap<VenueOrderId, &'a OrderStatusReport> = IndexMap::new();
2623
2624        for report in reports {
2625            let dominated = best_reports
2626                .get(&report.venue_order_id)
2627                .is_some_and(|existing| self.is_more_advanced(existing, report));
2628
2629            if !dominated {
2630                best_reports.insert(report.venue_order_id, report);
2631            }
2632        }
2633
2634        best_reports
2635    }
2636
2637    fn is_more_advanced(&self, a: &OrderStatusReport, b: &OrderStatusReport) -> bool {
2638        if a.filled_qty > b.filled_qty {
2639            return true;
2640        }
2641
2642        if a.filled_qty < b.filled_qty {
2643            return false;
2644        }
2645
2646        // Equal filled_qty - compare status (terminal states are more advanced)
2647        Self::status_priority(a.order_status) > Self::status_priority(b.order_status)
2648    }
2649
2650    const fn status_priority(status: OrderStatus) -> u8 {
2651        match status {
2652            OrderStatus::Initialized | OrderStatus::Submitted | OrderStatus::Emulated => 0,
2653            OrderStatus::Released | OrderStatus::Denied => 1,
2654            OrderStatus::Accepted | OrderStatus::PendingUpdate | OrderStatus::PendingCancel => 2,
2655            OrderStatus::Triggered => 3,
2656            OrderStatus::PartiallyFilled => 4,
2657            OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected => 5,
2658            OrderStatus::Filled => 6,
2659        }
2660    }
2661
2662    fn is_exact_order_match(&self, order: &OrderAny, report: &OrderStatusReport) -> bool {
2663        order.status() == report.order_status
2664            && order.filled_qty() == report.filled_qty
2665            && !should_reconciliation_update(order, report)
2666    }
2667
2668    fn create_order_fill(
2669        &mut self,
2670        order: &OrderAny,
2671        fill: &FillReport,
2672        instrument: &InstrumentAny,
2673    ) -> Option<OrderEventAny> {
2674        if self.processed_fills.contains_key(&fill.trade_id) {
2675            return None;
2676        }
2677
2678        self.processed_fills
2679            .insert(fill.trade_id, order.client_order_id());
2680
2681        Some(OrderEventAny::Filled(OrderFilled::new(
2682            order.trader_id(),
2683            order.strategy_id(),
2684            order.instrument_id(),
2685            order.client_order_id(),
2686            fill.venue_order_id,
2687            fill.account_id,
2688            fill.trade_id,
2689            fill.order_side,
2690            order.order_type(),
2691            fill.last_qty,
2692            fill.last_px,
2693            instrument.quote_currency(),
2694            fill.liquidity_side,
2695            fill.report_id,
2696            fill.ts_event,
2697            self.clock.borrow().timestamp_ns(),
2698            false,
2699            fill.venue_position_id,
2700            Some(fill.commission),
2701        )))
2702    }
2703}