1use std::{cell::RefCell, fmt::Debug, rc::Rc, str::FromStr, sync::LazyLock};
22
23use indexmap::{IndexMap, IndexSet};
24use nautilus_common::{
25 cache::Cache,
26 clients::ExecutionClient,
27 clock::Clock,
28 enums::{LogColor, LogLevel},
29 log_info,
30 messages::{
31 ExecutionReport,
32 execution::{
33 QueryOrder, TradingCommand,
34 report::{GenerateOrderStatusReports, GeneratePositionStatusReports},
35 },
36 },
37};
38use nautilus_core::{
39 UUID4, UnixNanos,
40 datetime::{
41 NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND, mins_to_nanos, mins_to_secs,
42 nanos_to_millis,
43 },
44};
45use nautilus_execution::{
46 engine::ExecutionEngine,
47 reconciliation::{
48 calculate_reconciliation_price, create_inferred_fill_for_qty,
49 create_position_reconciliation_venue_order_id, create_reconciliation_rejected,
50 create_reconciliation_triggered, generate_external_order_status_events,
51 process_mass_status_for_reconciliation, reconcile_order_report,
52 should_reconciliation_update,
53 },
54};
55use nautilus_model::{
56 enums::{OrderSide, OrderStatus, OrderType, TimeInForce},
57 events::{OrderCanceled, OrderEventAny, OrderFilled, OrderInitialized},
58 identifiers::{
59 AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId, TraderId,
60 VenueOrderId,
61 },
62 instruments::{Instrument, InstrumentAny},
63 orders::{Order, OrderAny, TRIGGERABLE_ORDER_TYPES},
64 position::Position,
65 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
66 types::{Price, Quantity},
67};
68use rust_decimal::{Decimal, prelude::ToPrimitive};
69use ustr::Ustr;
70
71use crate::config::LiveExecEngineConfig;
72
73static TAG_VENUE: LazyLock<Ustr> = LazyLock::new(|| Ustr::from("VENUE"));
75
76static TAG_RECONCILIATION: LazyLock<Ustr> = LazyLock::new(|| Ustr::from("RECONCILIATION"));
78
79#[derive(Debug, Clone)]
81pub struct ExternalOrderMetadata {
82 pub client_order_id: ClientOrderId,
83 pub venue_order_id: VenueOrderId,
84 pub instrument_id: InstrumentId,
85 pub strategy_id: StrategyId,
86 pub ts_init: UnixNanos,
87}
88
89#[derive(Debug, Default)]
91pub struct ReconciliationResult {
92 pub events: Vec<OrderEventAny>,
94 pub external_orders: Vec<ExternalOrderMetadata>,
96}
97
98#[derive(Debug, Default)]
100pub struct InflightCheckResult {
101 pub events: Vec<OrderEventAny>,
103 pub queries: Vec<TradingCommand>,
105}
106
107#[derive(Debug, Clone)]
109pub struct ExecutionManagerConfig {
110 pub trader_id: TraderId,
112 pub reconciliation: bool,
114 pub lookback_mins: Option<u64>,
116 pub reconciliation_instrument_ids: IndexSet<InstrumentId>,
118 pub filter_unclaimed_external: bool,
120 pub filter_position_reports: bool,
122 pub filtered_client_order_ids: IndexSet<ClientOrderId>,
124 pub generate_missing_orders: bool,
126 pub inflight_check_interval_ms: u32,
128 pub inflight_threshold_ms: u64,
130 pub inflight_max_retries: u32,
132 pub open_check_interval_secs: Option<f64>,
134 pub open_check_lookback_mins: Option<u64>,
136 pub open_check_threshold_ns: u64,
138 pub open_check_missing_retries: u32,
140 pub open_check_open_only: bool,
142 pub max_single_order_queries_per_cycle: u32,
144 pub single_order_query_delay_ms: u32,
146 pub position_check_interval_secs: Option<f64>,
148 pub position_check_lookback_mins: u64,
150 pub position_check_threshold_ns: u64,
152 pub position_check_retries: u32,
154 pub purge_closed_orders_buffer_mins: Option<u32>,
156 pub purge_closed_positions_buffer_mins: Option<u32>,
158 pub purge_account_events_lookback_mins: Option<u32>,
160 pub purge_from_database: bool,
162}
163
164impl Default for ExecutionManagerConfig {
165 fn default() -> Self {
166 Self {
167 trader_id: TraderId::default(),
168 reconciliation: true,
169 lookback_mins: Some(60),
170 reconciliation_instrument_ids: IndexSet::new(),
171 filter_unclaimed_external: false,
172 filter_position_reports: false,
173 filtered_client_order_ids: IndexSet::new(),
174 generate_missing_orders: true,
175 inflight_check_interval_ms: 2_000,
176 inflight_threshold_ms: 5_000,
177 inflight_max_retries: 5,
178 open_check_interval_secs: None,
179 open_check_lookback_mins: Some(60),
180 open_check_threshold_ns: 5_000_000_000,
181 open_check_missing_retries: 5,
182 open_check_open_only: true,
183 max_single_order_queries_per_cycle: 5,
184 single_order_query_delay_ms: 100,
185 position_check_interval_secs: None,
186 position_check_lookback_mins: 60,
187 position_check_threshold_ns: 60_000_000_000,
188 position_check_retries: 3,
189 purge_closed_orders_buffer_mins: None,
190 purge_closed_positions_buffer_mins: None,
191 purge_account_events_lookback_mins: None,
192 purge_from_database: false,
193 }
194 }
195}
196
197impl From<&LiveExecEngineConfig> for ExecutionManagerConfig {
198 fn from(config: &LiveExecEngineConfig) -> Self {
199 let filtered_client_order_ids: IndexSet<ClientOrderId> = config
200 .filtered_client_order_ids
201 .clone()
202 .unwrap_or_default()
203 .into_iter()
204 .map(|value| ClientOrderId::from(value.as_str()))
205 .collect();
206
207 let reconciliation_instrument_ids: IndexSet<InstrumentId> = config
208 .reconciliation_instrument_ids
209 .clone()
210 .unwrap_or_default()
211 .into_iter()
212 .map(InstrumentId::from)
213 .collect();
214
215 let open_check_threshold_ns =
216 (config.open_check_threshold_ms as u64) * NANOSECONDS_IN_MILLISECOND;
217 let position_check_threshold_ns =
218 (config.position_check_threshold_ms as u64) * NANOSECONDS_IN_MILLISECOND;
219
220 Self {
221 trader_id: TraderId::default(), reconciliation: config.reconciliation,
223 lookback_mins: config.reconciliation_lookback_mins.map(|m| m as u64),
224 reconciliation_instrument_ids,
225 filter_unclaimed_external: config.filter_unclaimed_external_orders,
226 filter_position_reports: config.filter_position_reports,
227 filtered_client_order_ids,
228 generate_missing_orders: config.generate_missing_orders,
229 inflight_check_interval_ms: config.inflight_check_interval_ms,
230 inflight_threshold_ms: config.inflight_check_threshold_ms as u64,
231 inflight_max_retries: config.inflight_check_retries,
232 open_check_interval_secs: config.open_check_interval_secs,
233 open_check_lookback_mins: config.open_check_lookback_mins.map(|m| m as u64),
234 open_check_threshold_ns,
235 open_check_missing_retries: config.open_check_missing_retries,
236 open_check_open_only: config.open_check_open_only,
237 max_single_order_queries_per_cycle: config.max_single_order_queries_per_cycle,
238 single_order_query_delay_ms: config.single_order_query_delay_ms,
239 position_check_interval_secs: config.position_check_interval_secs,
240 position_check_lookback_mins: config.position_check_lookback_mins as u64,
241 position_check_threshold_ns,
242 position_check_retries: config.position_check_retries,
243 purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins,
244 purge_closed_positions_buffer_mins: config.purge_closed_positions_buffer_mins,
245 purge_account_events_lookback_mins: config.purge_account_events_lookback_mins,
246 purge_from_database: config.purge_from_database,
247 }
248 }
249}
250
251impl ExecutionManagerConfig {
252 #[must_use]
254 pub fn with_trader_id(mut self, trader_id: TraderId) -> Self {
255 self.trader_id = trader_id;
256 self
257 }
258}
259
260#[derive(Debug, Clone)]
262struct InflightCheck {
263 #[allow(dead_code)]
264 pub client_order_id: ClientOrderId,
265 pub ts_submitted: UnixNanos,
266 pub retry_count: u32,
267 pub last_query_ts: Option<UnixNanos>,
268}
269
270#[derive(Clone)]
293pub struct ExecutionManager {
294 clock: Rc<RefCell<dyn Clock>>,
295 cache: Rc<RefCell<Cache>>,
296 config: ExecutionManagerConfig,
297 inflight_checks: IndexMap<ClientOrderId, InflightCheck>,
298 external_order_claims: IndexMap<InstrumentId, StrategyId>,
299 processed_fills: IndexMap<TradeId, ClientOrderId>,
300 recon_check_retries: IndexMap<ClientOrderId, u32>,
301 ts_last_query: IndexMap<ClientOrderId, UnixNanos>,
302 order_local_activity_ns: IndexMap<ClientOrderId, UnixNanos>,
303 position_local_activity_ns: IndexMap<InstrumentId, UnixNanos>,
304 position_recon_retries: IndexMap<InstrumentId, u32>,
305 recent_fills_cache: IndexMap<TradeId, UnixNanos>,
306}
307
308impl Debug for ExecutionManager {
309 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310 f.debug_struct(stringify!(ExecutionManager))
311 .field("config", &self.config)
312 .field("inflight_checks", &self.inflight_checks)
313 .field("external_order_claims", &self.external_order_claims)
314 .field("processed_fills", &self.processed_fills)
315 .field("recon_check_retries", &self.recon_check_retries)
316 .finish()
317 }
318}
319
320impl ExecutionManager {
321 pub fn new(
323 clock: Rc<RefCell<dyn Clock>>,
324 cache: Rc<RefCell<Cache>>,
325 config: ExecutionManagerConfig,
326 ) -> Self {
327 Self {
328 clock,
329 cache,
330 config,
331 inflight_checks: IndexMap::new(),
332 external_order_claims: IndexMap::new(),
333 processed_fills: IndexMap::new(),
334 recon_check_retries: IndexMap::new(),
335 ts_last_query: IndexMap::new(),
336 order_local_activity_ns: IndexMap::new(),
337 position_local_activity_ns: IndexMap::new(),
338 position_recon_retries: IndexMap::new(),
339 recent_fills_cache: IndexMap::new(),
340 }
341 }
342
343 #[must_use]
345 pub fn generate_timestamp_ns(&self) -> UnixNanos {
346 self.clock.borrow().timestamp_ns()
347 }
348
349 pub async fn reconcile_execution_mass_status(
355 &mut self,
356 mass_status: ExecutionMassStatus,
357 exec_engine: Rc<RefCell<ExecutionEngine>>,
358 ) -> ReconciliationResult {
359 let venue = mass_status.venue;
360 let order_count = mass_status.order_reports().len();
361 let fill_count: usize = mass_status.fill_reports().values().map(|v| v.len()).sum();
362 let position_count = mass_status.position_reports().len();
363
364 log_info!(
365 "Reconciling ExecutionMassStatus for {venue}",
366 color = LogColor::Blue
367 );
368 log_info!(
369 "Received {order_count} order(s), {fill_count} fill(s), {position_count} position(s)",
370 color = LogColor::Blue
371 );
372
373 let (adjusted_order_reports, adjusted_fill_reports) =
374 self.adjust_mass_status_fills(&mass_status);
375
376 let mut events = Vec::new();
377 let mut external_orders = Vec::new();
378 let mut orders_reconciled = 0usize;
379 let mut external_orders_created = 0usize;
380 let mut open_orders_initialized = 0usize;
381 let mut orders_skipped_no_instrument = 0usize;
382 let mut orders_skipped_duplicate = 0usize;
383 let mut fills_applied = 0usize;
384
385 let fill_reports = &adjusted_fill_reports;
386 let mut seen_trade_ids: IndexSet<TradeId> = IndexSet::new();
387
388 for fills in fill_reports.values() {
389 for fill in fills {
390 if !seen_trade_ids.insert(fill.trade_id) {
391 log::warn!("Duplicate trade_id {} in mass status", fill.trade_id);
392 }
393 }
394 }
395
396 let order_reports = self.deduplicate_order_reports(adjusted_order_reports.values());
398 let mut orders_skipped_filtered = 0usize;
399
400 for report in order_reports.values() {
401 if self.should_skip_order_report(report) {
402 orders_skipped_filtered += 1;
403 continue;
404 }
405
406 if let Some(client_order_id) = &report.client_order_id {
407 if let Some(cached_order) = self.get_order(client_order_id)
408 && self.is_exact_order_match(&cached_order, report)
409 {
410 log::debug!("Skipping order {client_order_id}: already in sync with venue");
411 orders_skipped_duplicate += 1;
412
413 if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
415 client_order_id,
416 &report.venue_order_id,
417 false,
418 ) {
419 log::warn!("Failed to add venue order ID index: {e}");
420 }
421
422 continue;
423 }
424
425 if let Some(cached_order) = self.get_order(client_order_id)
427 && cached_order.is_closed()
428 && cached_order
429 .tags()
430 .is_some_and(|tags| tags.contains(&*TAG_RECONCILIATION))
431 {
432 log::debug!(
433 "Skipping closed reconciliation order {client_order_id}: \
434 synthetic position adjustment from previous session",
435 );
436 orders_skipped_duplicate += 1;
437 continue;
438 }
439
440 if let Some(order) = self.get_order(client_order_id) {
441 let instrument = self.get_instrument(&report.instrument_id);
442 log::info!(
443 color = LogColor::Blue as u8;
444 "Reconciling {} {} {} [{}] -> [{}]",
445 client_order_id,
446 report.venue_order_id,
447 report.instrument_id,
448 order.status(),
449 report.order_status,
450 );
451
452 let order_fills: Vec<&FillReport> = fill_reports
453 .get(&report.venue_order_id)
454 .map(|f| f.iter().collect())
455 .unwrap_or_default();
456 let order_events = self.reconcile_order_with_fills(
457 &order,
458 report,
459 &order_fills,
460 instrument.as_ref(),
461 );
462
463 if !order_events.is_empty() {
464 orders_reconciled += 1;
465 fills_applied += order_events
466 .iter()
467 .filter(|e| matches!(e, OrderEventAny::Filled(_)))
468 .count();
469 events.extend(order_events);
470 }
471
472 if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
474 client_order_id,
475 &report.venue_order_id,
476 false,
477 ) {
478 log::warn!("Failed to add venue order ID index: {e}");
479 }
480 } else if let Some(order) = self.get_order_by_venue_order_id(&report.venue_order_id)
481 {
482 let instrument = self.get_instrument(&report.instrument_id);
484
485 log::info!(
486 color = LogColor::Blue as u8;
487 "Reconciling {} (matched by venue_order_id {}) {} [{}] -> [{}]",
488 order.client_order_id(),
489 report.venue_order_id,
490 report.instrument_id,
491 order.status(),
492 report.order_status,
493 );
494
495 let order_fills: Vec<&FillReport> = fill_reports
496 .get(&report.venue_order_id)
497 .map(|f| f.iter().collect())
498 .unwrap_or_default();
499 let order_events = self.reconcile_order_with_fills(
500 &order,
501 report,
502 &order_fills,
503 instrument.as_ref(),
504 );
505
506 if !order_events.is_empty() {
507 orders_reconciled += 1;
508 fills_applied += order_events
509 .iter()
510 .filter(|e| matches!(e, OrderEventAny::Filled(_)))
511 .count();
512 events.extend(order_events);
513 }
514
515 if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
516 &order.client_order_id(),
517 &report.venue_order_id,
518 false,
519 ) {
520 log::warn!("Failed to add venue order ID index: {e}");
521 }
522 } else if !self.config.filter_unclaimed_external {
523 if let Some(instrument) = self.get_instrument(&report.instrument_id) {
524 let order_fills: Vec<&FillReport> = fill_reports
525 .get(&report.venue_order_id)
526 .map(|f| f.iter().collect())
527 .unwrap_or_default();
528 let (external_events, metadata) = self.handle_external_order(
529 report,
530 &mass_status.account_id,
531 &instrument,
532 &order_fills,
533 false, );
535
536 if !external_events.is_empty() {
537 external_orders_created += 1;
538 fills_applied += external_events
539 .iter()
540 .filter(|e| matches!(e, OrderEventAny::Filled(_)))
541 .count();
542
543 if report.order_status.is_open() {
544 open_orders_initialized += 1;
545 }
546
547 events.extend(external_events);
548
549 if let Some(m) = metadata {
550 external_orders.push(m);
551 }
552 }
553 } else {
554 orders_skipped_no_instrument += 1;
555 }
556 }
557 } else if let Some(order) = self.get_order_by_venue_order_id(&report.venue_order_id) {
558 let instrument = self.get_instrument(&report.instrument_id);
560 log::info!(
561 color = LogColor::Blue as u8;
562 "Reconciling {} (matched by venue_order_id {}) {} [{}] -> [{}]",
563 order.client_order_id(),
564 report.venue_order_id,
565 report.instrument_id,
566 order.status(),
567 report.order_status,
568 );
569
570 let order_fills: Vec<&FillReport> = fill_reports
571 .get(&report.venue_order_id)
572 .map(|f| f.iter().collect())
573 .unwrap_or_default();
574 let order_events = self.reconcile_order_with_fills(
575 &order,
576 report,
577 &order_fills,
578 instrument.as_ref(),
579 );
580
581 if !order_events.is_empty() {
582 orders_reconciled += 1;
583 fills_applied += order_events
584 .iter()
585 .filter(|e| matches!(e, OrderEventAny::Filled(_)))
586 .count();
587 events.extend(order_events);
588 }
589
590 if let Err(e) = self.cache.borrow_mut().add_venue_order_id(
591 &order.client_order_id(),
592 &report.venue_order_id,
593 false,
594 ) {
595 log::warn!("Failed to add venue order ID index: {e}");
596 }
597 } else if let Some(instrument) = self.get_instrument(&report.instrument_id) {
598 let is_synthetic = report.venue_order_id.as_str().starts_with("S-");
600
601 let order_fills: Vec<&FillReport> = fill_reports
602 .get(&report.venue_order_id)
603 .map(|f| f.iter().collect())
604 .unwrap_or_default();
605 let (external_events, metadata) = self.handle_external_order(
606 report,
607 &mass_status.account_id,
608 &instrument,
609 &order_fills,
610 is_synthetic,
611 );
612
613 if !external_events.is_empty() {
614 external_orders_created += 1;
615 fills_applied += external_events
616 .iter()
617 .filter(|e| matches!(e, OrderEventAny::Filled(_)))
618 .count();
619
620 if report.order_status.is_open() {
621 open_orders_initialized += 1;
622 }
623
624 events.extend(external_events);
625
626 if let Some(m) = metadata {
627 external_orders.push(m);
628 }
629 }
630 } else {
631 orders_skipped_no_instrument += 1;
632 }
633 }
634
635 let processed_venue_order_ids: IndexSet<VenueOrderId> =
637 order_reports.keys().copied().collect();
638
639 for (venue_order_id, fills) in fill_reports {
640 if processed_venue_order_ids.contains(venue_order_id) {
641 continue;
642 }
643
644 let Some(first_fill) = fills.first() else {
645 continue;
646 };
647
648 if !self.should_reconcile_instrument(&first_fill.instrument_id) {
649 log::debug!(
650 "Skipping orphan fills for {}: not in reconciliation_instrument_ids",
651 first_fill.instrument_id
652 );
653 continue;
654 }
655
656 if let Some(client_order_id) = &first_fill.client_order_id
658 && self
659 .config
660 .filtered_client_order_ids
661 .contains(client_order_id)
662 {
663 log::debug!(
664 "Skipping orphan fills for {client_order_id}: in filtered_client_order_ids"
665 );
666 continue;
667 }
668
669 let order = first_fill
670 .client_order_id
671 .as_ref()
672 .and_then(|id| self.get_order(id))
673 .or_else(|| self.get_order_by_venue_order_id(venue_order_id));
674
675 if let Some(ref order) = order
677 && self
678 .config
679 .filtered_client_order_ids
680 .contains(&order.client_order_id())
681 {
682 log::debug!(
683 "Skipping orphan fills for {}: in filtered_client_order_ids",
684 order.client_order_id()
685 );
686 continue;
687 }
688
689 if let Some(order) = order {
690 let instrument_id = order.instrument_id();
691 if let Some(instrument) = self.get_instrument(&instrument_id) {
692 let mut sorted_fills: Vec<&FillReport> = fills.iter().collect();
693 sorted_fills.sort_by_key(|f| f.ts_event);
694
695 for fill in sorted_fills {
696 if let Some(event) = self.create_order_fill(&order, fill, &instrument) {
697 fills_applied += 1;
698 events.push(event);
699 }
700 }
701 }
702 }
703 }
704
705 events.sort_by_key(|e| e.ts_event());
706
707 for event in &events {
708 exec_engine.borrow_mut().process(event);
709 }
710
711 let mut positions_created = 0usize;
712
713 if !self.config.filter_position_reports {
714 let instruments_with_unattributed_fills: IndexSet<InstrumentId> = mass_status
717 .fill_reports()
718 .values()
719 .flatten()
720 .filter(|f| f.venue_position_id.is_none())
721 .map(|f| f.instrument_id)
722 .chain(
723 mass_status
724 .order_reports()
725 .values()
726 .filter(|r| !r.filled_qty.is_zero() && r.venue_position_id.is_none())
727 .map(|r| r.instrument_id),
728 )
729 .collect();
730
731 let positions_with_fills: IndexSet<PositionId> = mass_status
732 .fill_reports()
733 .values()
734 .flatten()
735 .filter_map(|f| f.venue_position_id)
736 .chain(
737 mass_status
738 .order_reports()
739 .values()
740 .filter(|r| !r.filled_qty.is_zero())
741 .filter_map(|r| r.venue_position_id),
742 )
743 .collect();
744
745 for (instrument_id, reports) in mass_status.position_reports() {
746 if !self.should_reconcile_instrument(&instrument_id) {
747 log::debug!(
748 "Skipping position reports for {instrument_id}: not in reconciliation_instrument_ids"
749 );
750 continue;
751 }
752
753 for report in reports {
754 if let Some(position_events) = self.reconcile_position_report(
755 &report,
756 &mass_status.account_id,
757 &instruments_with_unattributed_fills,
758 &positions_with_fills,
759 ) {
760 for event in position_events {
761 exec_engine.borrow_mut().process(&event);
762 events.push(event);
763 }
764 positions_created += 1;
765 }
766 }
767 }
768 }
769
770 if orders_skipped_no_instrument > 0 {
771 log::warn!("{orders_skipped_no_instrument} orders skipped (instrument not in cache)");
772 }
773
774 if orders_skipped_duplicate > 0 {
775 log::debug!("{orders_skipped_duplicate} orders skipped (already in sync)");
776 }
777
778 if orders_skipped_filtered > 0 {
779 log::debug!("{orders_skipped_filtered} orders skipped (filtered by config)");
780 }
781
782 log::info!(
783 color = LogColor::Blue as u8;
784 "Reconciliation complete for {venue}: reconciled={orders_reconciled}, external={external_orders_created}, open={open_orders_initialized}, fills={fills_applied}, positions={positions_created}, skipped={orders_skipped_duplicate}, filtered={orders_skipped_filtered}",
785 );
786
787 ReconciliationResult {
788 events,
789 external_orders,
790 }
791 }
792
793 pub fn check_inflight_orders(&mut self) -> InflightCheckResult {
799 let mut result = InflightCheckResult::default();
800 let current_time = self.clock.borrow().timestamp_ns();
801 let threshold_ns = self.config.inflight_threshold_ms * NANOSECONDS_IN_MILLISECOND;
802
803 let mut to_check = Vec::new();
804
805 for (client_order_id, check) in &self.inflight_checks {
806 if current_time - check.ts_submitted > threshold_ns {
807 to_check.push(*client_order_id);
808 }
809 }
810
811 for client_order_id in to_check {
812 if self
813 .config
814 .filtered_client_order_ids
815 .contains(&client_order_id)
816 {
817 continue;
818 }
819
820 if let Some(check) = self.inflight_checks.get_mut(&client_order_id) {
821 if let Some(last_query_ts) = check.last_query_ts
822 && current_time - last_query_ts < threshold_ns
823 {
824 continue;
825 }
826
827 check.retry_count += 1;
828 check.last_query_ts = Some(current_time);
829 self.ts_last_query.insert(client_order_id, current_time);
830 self.recon_check_retries
831 .insert(client_order_id, check.retry_count);
832
833 if check.retry_count >= self.config.inflight_max_retries {
834 let ts_now = self.clock.borrow().timestamp_ns();
835
836 if let Some(order) = self.get_order(&client_order_id) {
837 match order.status() {
838 OrderStatus::Submitted => {
839 if let Some(event) = create_reconciliation_rejected(
841 &order,
842 Some("INFLIGHT_TIMEOUT"),
843 ts_now,
844 ) {
845 result.events.push(event);
846 }
847 }
848 OrderStatus::PendingUpdate | OrderStatus::PendingCancel => {
849 let event = OrderEventAny::Canceled(OrderCanceled::new(
851 order.trader_id(),
852 order.strategy_id(),
853 order.instrument_id(),
854 order.client_order_id(),
855 UUID4::new(),
856 ts_now,
857 ts_now,
858 true, order.venue_order_id(),
860 order.account_id(),
861 ));
862 result.events.push(event);
863 }
864 _ => {
865 }
867 }
868 }
869 self.clear_recon_tracking(&client_order_id, true);
871 } else if let Some(order) = self.get_order(&client_order_id) {
872 let client_id = self.cache.borrow().client_id(&client_order_id).copied();
874 let query = TradingCommand::QueryOrder(QueryOrder::new(
875 order.trader_id(),
876 client_id,
877 order.strategy_id(),
878 order.instrument_id(),
879 order.client_order_id(),
880 order.venue_order_id(),
881 UUID4::new(),
882 current_time,
883 None,
884 ));
885 result.queries.push(query);
886 }
887 }
888 }
889
890 result
891 }
892
893 pub async fn check_open_orders(
903 &mut self,
904 clients: &[&dyn ExecutionClient],
905 ) -> Vec<OrderEventAny> {
906 log::debug!("Checking order consistency between cached-state and venues");
907
908 let filtered_orders: Vec<OrderAny> = {
909 let cache = self.cache.borrow();
910 let mut orders = cache.orders_open(None, None, None, None, None);
911 orders.extend(cache.orders_inflight(None, None, None, None, None));
912
913 if self.config.reconciliation_instrument_ids.is_empty() {
914 orders.iter().map(|o| (*o).clone()).collect()
915 } else {
916 orders
917 .iter()
918 .filter(|o| {
919 self.config
920 .reconciliation_instrument_ids
921 .contains(&o.instrument_id())
922 })
923 .map(|o| (*o).clone())
924 .collect()
925 }
926 };
927
928 log::debug!(
929 "Found {} order{} open in cache",
930 filtered_orders.len(),
931 if filtered_orders.len() == 1 { "" } else { "s" }
932 );
933
934 let mut all_reports = Vec::new();
935 let mut venue_reported_ids = IndexSet::new();
936
937 let ts_now = self.clock.borrow().timestamp_ns();
938 let start = self.config.open_check_lookback_mins.map(|mins| {
939 let lookback_ns = mins_to_nanos(mins);
940 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
941 });
942
943 for client in clients {
944 let mut cmd = GenerateOrderStatusReports::new(
945 UUID4::new(),
946 ts_now,
947 self.config.open_check_open_only,
948 None, start,
950 None, None, None, );
954 cmd.log_receipt_level = LogLevel::Debug;
955
956 match client.generate_order_status_reports(&cmd).await {
957 Ok(reports) => {
958 for report in reports {
959 if let Some(client_order_id) = &report.client_order_id {
960 venue_reported_ids.insert(*client_order_id);
961 }
962 all_reports.push(report);
963 }
964 }
965 Err(e) => {
966 log::error!(
967 "Failed to query order reports from {}: {e}",
968 client.client_id()
969 );
970 }
971 }
972 }
973
974 let ts_now = self.clock.borrow().timestamp_ns();
976 let mut events = Vec::new();
977
978 for report in all_reports {
979 if let Some(client_order_id) = &report.client_order_id
980 && let Some(order) = self.get_order(client_order_id)
981 {
982 if let Some(&last_activity) = self.order_local_activity_ns.get(client_order_id)
984 && (ts_now - last_activity) < self.config.open_check_threshold_ns
985 {
986 let elapsed_ms = nanos_to_millis((ts_now - last_activity).as_u64());
987 let threshold_ms = nanos_to_millis(self.config.open_check_threshold_ns);
988 log::info!(
989 "Deferring reconciliation for {client_order_id}: recent local activity ({elapsed_ms}ms < threshold={threshold_ms}ms)",
990 );
991 continue;
992 }
993
994 let instrument = self.get_instrument(&report.instrument_id);
995
996 if let Some(event) =
997 self.reconcile_order_report(&order, &report, instrument.as_ref())
998 {
999 events.push(event);
1000 }
1001 }
1002 }
1003
1004 if !self.config.open_check_open_only {
1009 let candidates: Vec<&OrderAny> = if let Some(cutoff) = start {
1010 filtered_orders
1011 .iter()
1012 .filter(|o| o.ts_last() >= cutoff)
1013 .collect()
1014 } else {
1015 filtered_orders.iter().collect()
1016 };
1017 let cached_ids: IndexSet<ClientOrderId> =
1018 candidates.iter().map(|o| o.client_order_id()).collect();
1019 let missing_at_venue: IndexSet<ClientOrderId> = cached_ids
1020 .difference(&venue_reported_ids)
1021 .copied()
1022 .collect();
1023
1024 for client_order_id in missing_at_venue {
1025 events.extend(self.handle_missing_order(client_order_id));
1026 }
1027 }
1028
1029 events
1030 }
1031
1032 pub async fn check_positions_consistency(
1042 &mut self,
1043 clients: &[&dyn ExecutionClient],
1044 ) -> Vec<OrderEventAny> {
1045 log::debug!("Checking position consistency between cached-state and venues");
1046
1047 let open_positions = {
1048 let cache = self.cache.borrow();
1049 let positions = cache.positions_open(None, None, None, None, None);
1050
1051 if self.config.reconciliation_instrument_ids.is_empty() {
1052 positions.iter().map(|p| (*p).clone()).collect()
1053 } else {
1054 positions
1055 .iter()
1056 .filter(|p| {
1057 self.config
1058 .reconciliation_instrument_ids
1059 .contains(&p.instrument_id)
1060 })
1061 .map(|p| (*p).clone())
1062 .collect::<Vec<_>>()
1063 }
1064 };
1065
1066 log::debug!(
1067 "Found {} position{} to check",
1068 open_positions.len(),
1069 if open_positions.len() == 1 { "" } else { "s" }
1070 );
1071
1072 let mut venue_positions = IndexMap::new();
1074
1075 for client in clients {
1076 let mut cmd = GeneratePositionStatusReports::new(
1077 UUID4::new(),
1078 self.clock.borrow().timestamp_ns(),
1079 None, None, None, None, None, );
1085 cmd.log_receipt_level = LogLevel::Debug;
1086
1087 match client.generate_position_status_reports(&cmd).await {
1088 Ok(reports) => {
1089 for report in reports {
1090 venue_positions.insert(report.instrument_id, report);
1091 }
1092 }
1093 Err(e) => {
1094 log::error!(
1095 "Failed to query position reports from {}: {e}",
1096 client.client_id()
1097 );
1098 }
1099 }
1100 }
1101
1102 let mut events = Vec::new();
1105 let mut checked_instruments = IndexSet::new();
1106
1107 for position in &open_positions {
1108 if !checked_instruments.insert(position.instrument_id) {
1109 continue;
1110 }
1111
1112 if !self.config.reconciliation_instrument_ids.is_empty()
1114 && !self
1115 .config
1116 .reconciliation_instrument_ids
1117 .contains(&position.instrument_id)
1118 {
1119 continue;
1120 }
1121
1122 let venue_report = venue_positions.get(&position.instrument_id);
1123
1124 if let Some(discrepancy_events) =
1125 self.check_position_discrepancy(position, venue_report)
1126 {
1127 events.extend(discrepancy_events);
1128 }
1129 }
1130
1131 let active_instruments: IndexSet<InstrumentId> = open_positions
1134 .iter()
1135 .map(|p| p.instrument_id)
1136 .chain(
1137 venue_positions
1138 .iter()
1139 .filter(|(_, r)| r.signed_decimal_qty != Decimal::ZERO)
1140 .map(|(id, _)| *id),
1141 )
1142 .collect();
1143 self.position_recon_retries
1144 .retain(|iid, _| active_instruments.contains(iid));
1145
1146 events
1147 }
1148
1149 pub fn register_inflight(&mut self, client_order_id: ClientOrderId) {
1151 let ts_submitted = self.clock.borrow().timestamp_ns();
1152 self.inflight_checks.insert(
1153 client_order_id,
1154 InflightCheck {
1155 client_order_id,
1156 ts_submitted,
1157 retry_count: 0,
1158 last_query_ts: None,
1159 },
1160 );
1161 self.recon_check_retries.insert(client_order_id, 0);
1162 self.ts_last_query.shift_remove(&client_order_id);
1163 self.order_local_activity_ns.shift_remove(&client_order_id);
1164 }
1165
1166 pub fn record_local_activity(&mut self, client_order_id: ClientOrderId) {
1172 let ts_now = self.clock.borrow().timestamp_ns();
1173 self.order_local_activity_ns.insert(client_order_id, ts_now);
1174 }
1175
1176 pub fn clear_recon_tracking(&mut self, client_order_id: &ClientOrderId, drop_last_query: bool) {
1178 self.inflight_checks.shift_remove(client_order_id);
1179 self.recon_check_retries.shift_remove(client_order_id);
1180
1181 if drop_last_query {
1182 self.ts_last_query.shift_remove(client_order_id);
1183 }
1184 self.order_local_activity_ns.shift_remove(client_order_id);
1185 }
1186
1187 pub fn claim_external_orders(&mut self, instrument_id: InstrumentId, strategy_id: StrategyId) {
1189 self.external_order_claims
1190 .insert(instrument_id, strategy_id);
1191 }
1192
1193 pub fn record_position_activity(&mut self, instrument_id: InstrumentId, ts_event: UnixNanos) {
1195 self.position_local_activity_ns
1196 .insert(instrument_id, ts_event);
1197 }
1198
1199 pub fn observe_execution_report(&mut self, report: &ExecutionReport) {
1211 match report {
1212 ExecutionReport::Order(order_report) => {
1213 if let Some(client_order_id) = &order_report.client_order_id {
1214 if !matches!(
1219 order_report.order_status,
1220 OrderStatus::PendingUpdate | OrderStatus::PendingCancel
1221 ) {
1222 self.clear_recon_tracking(client_order_id, true);
1223 }
1224 self.record_local_activity(*client_order_id);
1225 }
1226 }
1227 ExecutionReport::Fill(fill_report) => {
1228 let client_order_id = fill_report.client_order_id.or_else(|| {
1229 self.cache
1230 .borrow()
1231 .client_order_id(&fill_report.venue_order_id)
1232 .copied()
1233 });
1234
1235 if let Some(coid) = client_order_id {
1236 self.record_local_activity(coid);
1237 }
1238 self.record_position_activity(fill_report.instrument_id, fill_report.ts_event);
1239 }
1240 ExecutionReport::OrderWithFills(order_report, fills) => {
1241 if let Some(client_order_id) = &order_report.client_order_id
1242 && !matches!(
1243 order_report.order_status,
1244 OrderStatus::PendingUpdate | OrderStatus::PendingCancel
1245 )
1246 {
1247 self.clear_recon_tracking(client_order_id, true);
1248 self.record_local_activity(*client_order_id);
1249 }
1250
1251 for fill_report in fills {
1252 self.record_position_activity(fill_report.instrument_id, fill_report.ts_event);
1253 }
1254 }
1255 ExecutionReport::Position(position_report) => {
1256 self.record_position_activity(
1257 position_report.instrument_id,
1258 position_report.ts_last,
1259 );
1260 }
1261 ExecutionReport::MassStatus(_) => {
1262 }
1264 }
1265 }
1266
1267 pub fn is_fill_recently_processed(&self, trade_id: &TradeId) -> bool {
1269 self.recent_fills_cache.contains_key(trade_id)
1270 }
1271
1272 pub fn mark_fill_processed(&mut self, trade_id: TradeId) {
1274 let ts_now = self.clock.borrow().timestamp_ns();
1275 self.recent_fills_cache.insert(trade_id, ts_now);
1276 }
1277
1278 pub fn prune_recent_fills_cache(&mut self, ttl_secs: f64) {
1282 let ts_now = self.clock.borrow().timestamp_ns();
1283 let ttl_ns = (ttl_secs * NANOSECONDS_IN_SECOND as f64) as u64;
1284
1285 self.recent_fills_cache
1286 .retain(|_, &mut ts_cached| ts_now - ts_cached <= ttl_ns);
1287 }
1288
1289 pub fn purge_closed_orders(&mut self) {
1291 let Some(buffer_mins) = self.config.purge_closed_orders_buffer_mins else {
1292 return;
1293 };
1294
1295 let ts_now = self.clock.borrow().timestamp_ns();
1296 let buffer_secs = mins_to_secs(buffer_mins as u64);
1297
1298 self.cache
1299 .borrow_mut()
1300 .purge_closed_orders(ts_now, buffer_secs);
1301 }
1302
1303 pub fn purge_closed_positions(&mut self) {
1305 let Some(buffer_mins) = self.config.purge_closed_positions_buffer_mins else {
1306 return;
1307 };
1308
1309 let ts_now = self.clock.borrow().timestamp_ns();
1310 let buffer_secs = mins_to_secs(buffer_mins as u64);
1311
1312 self.cache
1313 .borrow_mut()
1314 .purge_closed_positions(ts_now, buffer_secs);
1315 }
1316
1317 pub fn purge_account_events(&mut self) {
1319 let Some(lookback_mins) = self.config.purge_account_events_lookback_mins else {
1320 return;
1321 };
1322
1323 let ts_now = self.clock.borrow().timestamp_ns();
1324 let lookback_secs = mins_to_secs(lookback_mins as u64);
1325
1326 self.cache
1327 .borrow_mut()
1328 .purge_account_events(ts_now, lookback_secs);
1329 }
1330
1331 fn get_order(&self, client_order_id: &ClientOrderId) -> Option<OrderAny> {
1334 self.cache.borrow().order(client_order_id).cloned()
1335 }
1336
1337 fn get_order_by_venue_order_id(&self, venue_order_id: &VenueOrderId) -> Option<OrderAny> {
1338 let cache = self.cache.borrow();
1339 cache
1340 .client_order_id(venue_order_id)
1341 .and_then(|client_order_id| cache.order(client_order_id).cloned())
1342 }
1343
1344 fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
1345 self.cache.borrow().instrument(instrument_id).cloned()
1346 }
1347
1348 fn should_skip_order_report(&self, report: &OrderStatusReport) -> bool {
1349 if let Some(client_order_id) = &report.client_order_id
1350 && self
1351 .config
1352 .filtered_client_order_ids
1353 .contains(client_order_id)
1354 {
1355 log::debug!(
1356 "Skipping order report {client_order_id}: in filtered_client_order_ids list"
1357 );
1358 return true;
1359 }
1360
1361 if !self.should_reconcile_instrument(&report.instrument_id) {
1362 log::debug!(
1363 "Skipping order report for {}: not in reconciliation_instrument_ids",
1364 report.instrument_id
1365 );
1366 return true;
1367 }
1368
1369 false
1370 }
1371
1372 fn should_reconcile_instrument(&self, instrument_id: &InstrumentId) -> bool {
1373 self.config.reconciliation_instrument_ids.is_empty()
1374 || self
1375 .config
1376 .reconciliation_instrument_ids
1377 .contains(instrument_id)
1378 }
1379
1380 fn handle_missing_order(&mut self, client_order_id: ClientOrderId) -> Vec<OrderEventAny> {
1381 let mut events = Vec::new();
1382
1383 let Some(order) = self.get_order(&client_order_id) else {
1384 return events;
1385 };
1386
1387 let ts_now = self.clock.borrow().timestamp_ns();
1388 let ts_last = order.ts_last();
1389
1390 if (ts_now - ts_last) < self.config.open_check_threshold_ns {
1392 return events;
1393 }
1394
1395 if let Some(&last_activity) = self.order_local_activity_ns.get(&client_order_id)
1397 && (ts_now - last_activity) < self.config.open_check_threshold_ns
1398 {
1399 return events;
1400 }
1401
1402 let retries = self.recon_check_retries.entry(client_order_id).or_insert(0);
1404 *retries += 1;
1405
1406 if *retries >= self.config.open_check_missing_retries {
1408 log::warn!(
1409 "Order {client_order_id} not found at venue after {retries} retries, marking as REJECTED"
1410 );
1411
1412 let ts_now = self.clock.borrow().timestamp_ns();
1413
1414 if let Some(rejected) =
1415 create_reconciliation_rejected(&order, Some("NOT_FOUND_AT_VENUE"), ts_now)
1416 {
1417 events.push(rejected);
1418 }
1419
1420 self.clear_recon_tracking(&client_order_id, true);
1421 } else {
1422 log::debug!(
1423 "Order {} not found at venue, retry {}/{}",
1424 client_order_id,
1425 retries,
1426 self.config.open_check_missing_retries
1427 );
1428 }
1429
1430 events
1431 }
1432
1433 fn check_position_discrepancy(
1434 &mut self,
1435 position: &Position,
1436 venue_report: Option<&PositionStatusReport>,
1437 ) -> Option<Vec<OrderEventAny>> {
1438 let cached_signed_qty = position.signed_decimal_qty();
1440 let venue_signed_qty = venue_report.map_or(Decimal::ZERO, |r| r.signed_decimal_qty);
1441
1442 let tolerance = Decimal::from_str("0.00000001").unwrap();
1443 if (cached_signed_qty - venue_signed_qty).abs() <= tolerance {
1444 self.position_recon_retries
1445 .shift_remove(&position.instrument_id);
1446 return None; }
1448
1449 let ts_now = self.clock.borrow().timestamp_ns();
1451
1452 if let Some(&last_activity) = self.position_local_activity_ns.get(&position.instrument_id)
1453 && (ts_now - last_activity) < self.config.position_check_threshold_ns
1454 {
1455 log::debug!(
1456 "Skipping position reconciliation for {}: recent activity within threshold",
1457 position.instrument_id
1458 );
1459 return None;
1460 }
1461
1462 let retries = *self
1463 .position_recon_retries
1464 .get(&position.instrument_id)
1465 .unwrap_or(&0);
1466
1467 if retries >= self.config.position_check_retries {
1468 return None;
1469 }
1470
1471 log::warn!(
1472 "Position discrepancy detected for {}: cached_signed_qty={}, venue_signed_qty={}",
1473 position.instrument_id,
1474 cached_signed_qty,
1475 venue_signed_qty
1476 );
1477
1478 let account_id = position.account_id;
1479 let instrument_id = position.instrument_id;
1480
1481 let Some(instrument) = self.cache.borrow().instrument(&instrument_id).cloned() else {
1482 log::debug!("Cannot reconcile position for {instrument_id}: instrument not in cache");
1483 let new_retries = retries + 1;
1484 self.position_recon_retries
1485 .insert(instrument_id, new_retries);
1486 if new_retries >= self.config.position_check_retries {
1487 log::error!(
1488 "Position discrepancy for {instrument_id} unresolved after {} attempts \
1489 (cached_qty={cached_signed_qty}, venue_qty={venue_signed_qty}); \
1490 no further reconciliation attempts will be made",
1491 self.config.position_check_retries,
1492 );
1493 }
1494 return None;
1495 };
1496
1497 let cached_avg_px = if position.avg_px_open > 0.0 {
1498 Decimal::from_str(&position.avg_px_open.to_string()).ok()
1499 } else {
1500 None
1501 };
1502 let venue_avg_px = venue_report.and_then(|r| r.avg_px_open);
1503
1504 let crosses_zero = (cached_signed_qty > Decimal::ZERO && venue_signed_qty < Decimal::ZERO)
1506 || (cached_signed_qty < Decimal::ZERO && venue_signed_qty > Decimal::ZERO);
1507
1508 let result = if crosses_zero {
1509 let venue_ts_last = venue_report.map_or(ts_now, |r| r.ts_last);
1511 self.reconcile_cross_zero_position(
1512 &instrument,
1513 account_id,
1514 instrument_id,
1515 cached_signed_qty,
1516 cached_avg_px,
1517 venue_signed_qty,
1518 venue_avg_px,
1519 ts_now,
1520 venue_ts_last,
1521 )
1522 } else {
1523 let qty_diff = venue_signed_qty - cached_signed_qty;
1524 let order_side = if qty_diff > Decimal::ZERO {
1525 OrderSide::Buy
1526 } else {
1527 OrderSide::Sell
1528 };
1529
1530 let reconciliation_px = calculate_reconciliation_price(
1531 cached_signed_qty,
1532 cached_avg_px,
1533 venue_signed_qty,
1534 venue_avg_px,
1535 );
1536
1537 match reconciliation_px.or(venue_avg_px).or(cached_avg_px) {
1538 Some(fill_px) => {
1539 let fill_qty = qty_diff.abs();
1540 let venue_position_id = venue_report.and_then(|r| r.venue_position_id);
1541 let venue_ts_last = venue_report.map_or(ts_now, |r| r.ts_last);
1542
1543 Quantity::from_decimal_dp(fill_qty, instrument.size_precision())
1544 .ok()
1545 .and_then(|order_qty| {
1546 let fill_price =
1547 Price::from_decimal_dp(fill_px, instrument.price_precision()).ok();
1548 let venue_order_id = create_position_reconciliation_venue_order_id(
1549 account_id,
1550 instrument_id,
1551 order_side,
1552 OrderType::Market,
1553 order_qty,
1554 fill_price,
1555 venue_position_id,
1556 None,
1557 venue_ts_last,
1558 );
1559
1560 OrderStatusReport::new(
1561 account_id,
1562 instrument_id,
1563 None,
1564 venue_order_id,
1565 order_side,
1566 OrderType::Market,
1567 TimeInForce::Gtc,
1568 OrderStatus::Filled,
1569 order_qty,
1570 order_qty,
1571 ts_now,
1572 ts_now,
1573 ts_now,
1574 None,
1575 )
1576 .with_avg_px(fill_px.to_f64().unwrap_or(0.0))
1577 .ok()
1578 })
1579 .map(|order_report| {
1580 log::info!(
1581 color = LogColor::Blue as u8;
1582 "Generating synthetic fill for position reconciliation {instrument_id}: side={order_side:?}, qty={}, px={fill_px}", qty_diff.abs(),
1583 );
1584
1585 let (events, _) = self.handle_external_order(
1586 &order_report,
1587 &account_id,
1588 &instrument,
1589 &[],
1590 true,
1591 );
1592 events
1593 })
1594 }
1595 None => None,
1596 }
1597 };
1598
1599 if result.is_none() || result.as_ref().is_some_and(|e| e.is_empty()) {
1601 let new_retries = retries + 1;
1602 self.position_recon_retries
1603 .insert(instrument_id, new_retries);
1604 if new_retries >= self.config.position_check_retries {
1605 log::error!(
1606 "Position discrepancy for {} unresolved after {} attempts \
1607 (cached_qty={}, venue_qty={}); \
1608 no further reconciliation attempts will be made",
1609 instrument_id,
1610 self.config.position_check_retries,
1611 cached_signed_qty,
1612 venue_signed_qty,
1613 );
1614 }
1615 } else {
1616 self.position_recon_retries.shift_remove(&instrument_id);
1617 }
1618
1619 result
1620 }
1621
1622 #[expect(clippy::too_many_arguments)]
1625 fn reconcile_cross_zero_position(
1626 &mut self,
1627 instrument: &InstrumentAny,
1628 account_id: AccountId,
1629 instrument_id: InstrumentId,
1630 cached_signed_qty: Decimal,
1631 cached_avg_px: Option<Decimal>,
1632 venue_signed_qty: Decimal,
1633 venue_avg_px: Option<Decimal>,
1634 ts_now: UnixNanos,
1635 venue_ts_last: UnixNanos,
1636 ) -> Option<Vec<OrderEventAny>> {
1637 log::info!(
1638 color = LogColor::Blue as u8;
1639 "Position crosses zero for {instrument_id}: cached={cached_signed_qty}, venue={venue_signed_qty}. Splitting into two fills",
1640 );
1641
1642 let mut all_events = Vec::new();
1643
1644 let close_qty = cached_signed_qty.abs();
1646 let close_side = if cached_signed_qty < Decimal::ZERO {
1647 OrderSide::Buy } else {
1649 OrderSide::Sell };
1651
1652 if let Some(close_px) = cached_avg_px {
1653 let close_order_qty =
1654 Quantity::from_decimal_dp(close_qty, instrument.size_precision()).ok()?;
1655 let close_fill_price =
1656 Price::from_decimal_dp(close_px, instrument.price_precision()).ok();
1657 let close_venue_order_id = create_position_reconciliation_venue_order_id(
1658 account_id,
1659 instrument_id,
1660 close_side,
1661 OrderType::Market,
1662 close_order_qty,
1663 close_fill_price,
1664 None,
1665 Some("CLOSE"),
1666 venue_ts_last,
1667 );
1668
1669 let close_report = OrderStatusReport::new(
1670 account_id,
1671 instrument_id,
1672 None,
1673 close_venue_order_id,
1674 close_side,
1675 OrderType::Market,
1676 TimeInForce::Gtc,
1677 OrderStatus::Filled,
1678 close_order_qty,
1679 close_order_qty,
1680 ts_now,
1681 ts_now,
1682 ts_now,
1683 None,
1684 )
1685 .with_avg_px(close_px.to_f64().unwrap_or(0.0))
1686 .ok()?;
1687
1688 log::info!(
1689 color = LogColor::Blue as u8;
1690 "Generating close fill for cross-zero {instrument_id}: side={close_side:?}, qty={close_qty}, px={close_px}",
1691 );
1692
1693 let (close_events, _) =
1694 self.handle_external_order(&close_report, &account_id, instrument, &[], true);
1695 all_events.extend(close_events);
1696 } else {
1697 log::warn!("Cannot close position for {instrument_id}: no cached average price");
1698 return None;
1699 }
1700
1701 let open_qty = venue_signed_qty.abs();
1703 let open_side = if venue_signed_qty > Decimal::ZERO {
1704 OrderSide::Buy } else {
1706 OrderSide::Sell };
1708
1709 if let Some(open_px) = venue_avg_px {
1710 let open_order_qty =
1711 Quantity::from_decimal_dp(open_qty, instrument.size_precision()).ok()?;
1712 let open_fill_price =
1713 Price::from_decimal_dp(open_px, instrument.price_precision()).ok();
1714 let open_venue_order_id = create_position_reconciliation_venue_order_id(
1715 account_id,
1716 instrument_id,
1717 open_side,
1718 OrderType::Market,
1719 open_order_qty,
1720 open_fill_price,
1721 None,
1722 Some("OPEN"),
1723 venue_ts_last,
1724 );
1725
1726 let open_report = OrderStatusReport::new(
1727 account_id,
1728 instrument_id,
1729 None,
1730 open_venue_order_id,
1731 open_side,
1732 OrderType::Market,
1733 TimeInForce::Gtc,
1734 OrderStatus::Filled,
1735 open_order_qty,
1736 open_order_qty,
1737 ts_now,
1738 ts_now,
1739 ts_now,
1740 None,
1741 )
1742 .with_avg_px(open_px.to_f64().unwrap_or(0.0))
1743 .ok()?;
1744
1745 log::info!(
1746 color = LogColor::Blue as u8;
1747 "Generating open fill for cross-zero {instrument_id}: side={open_side:?}, qty={open_qty}, px={open_px}",
1748 );
1749
1750 let (open_events, _) =
1751 self.handle_external_order(&open_report, &account_id, instrument, &[], true);
1752 all_events.extend(open_events);
1753 } else {
1754 log::warn!("Cannot open new position for {instrument_id}: no venue average price");
1755 return Some(all_events);
1756 }
1757
1758 Some(all_events)
1759 }
1760
1761 fn create_position_from_report(
1766 &mut self,
1767 report: &PositionStatusReport,
1768 account_id: &AccountId,
1769 instrument: &InstrumentAny,
1770 ) -> Option<Vec<OrderEventAny>> {
1771 let instrument_id = report.instrument_id;
1772 let venue_signed_qty = report.signed_decimal_qty;
1773
1774 if venue_signed_qty == Decimal::ZERO {
1775 return None;
1776 }
1777
1778 let order_side = if venue_signed_qty > Decimal::ZERO {
1779 OrderSide::Buy
1780 } else {
1781 OrderSide::Sell
1782 };
1783
1784 let qty_abs = venue_signed_qty.abs();
1785 let venue_avg_px = report.avg_px_open?;
1786
1787 let ts_now = self.clock.borrow().timestamp_ns();
1788 let order_qty = Quantity::from_decimal_dp(qty_abs, instrument.size_precision()).ok()?;
1789 let fill_price = Price::from_decimal_dp(venue_avg_px, instrument.price_precision()).ok();
1790 let venue_order_id = create_position_reconciliation_venue_order_id(
1791 *account_id,
1792 instrument_id,
1793 order_side,
1794 OrderType::Market,
1795 order_qty,
1796 fill_price,
1797 report.venue_position_id,
1798 None,
1799 report.ts_last,
1800 );
1801
1802 let mut order_report = OrderStatusReport::new(
1803 *account_id,
1804 instrument_id,
1805 None,
1806 venue_order_id,
1807 order_side,
1808 OrderType::Market,
1809 TimeInForce::Gtc,
1810 OrderStatus::Filled,
1811 order_qty,
1812 order_qty,
1813 ts_now,
1814 ts_now,
1815 ts_now,
1816 None,
1817 )
1818 .with_avg_px(venue_avg_px.to_f64().unwrap_or(0.0))
1819 .ok()?;
1820
1821 if let Some(venue_position_id) = report.venue_position_id {
1823 order_report = order_report.with_venue_position_id(venue_position_id);
1824 }
1825
1826 log::info!(
1827 color = LogColor::Blue as u8;
1828 "Creating position from venue report for {instrument_id}: side={order_side:?}, qty={qty_abs}, avg_px={venue_avg_px}",
1829 );
1830
1831 let (events, _) =
1832 self.handle_external_order(&order_report, account_id, instrument, &[], true);
1833 Some(events)
1834 }
1835
1836 fn reconcile_position_report(
1837 &mut self,
1838 report: &PositionStatusReport,
1839 account_id: &AccountId,
1840 instruments_with_unattributed_fills: &IndexSet<InstrumentId>,
1841 positions_with_fills: &IndexSet<PositionId>,
1842 ) -> Option<Vec<OrderEventAny>> {
1843 if report.venue_position_id.is_some() {
1844 self.reconcile_position_report_hedging(
1845 report,
1846 account_id,
1847 instruments_with_unattributed_fills,
1848 positions_with_fills,
1849 )
1850 } else {
1851 self.reconcile_position_report_netting(report, account_id)
1852 }
1853 }
1854
1855 fn reconcile_position_report_hedging(
1856 &mut self,
1857 report: &PositionStatusReport,
1858 account_id: &AccountId,
1859 instruments_with_unattributed_fills: &IndexSet<InstrumentId>,
1860 positions_with_fills: &IndexSet<PositionId>,
1861 ) -> Option<Vec<OrderEventAny>> {
1862 let venue_position_id = report.venue_position_id?;
1863
1864 if positions_with_fills.contains(&venue_position_id) {
1866 log::debug!(
1867 "Skipping hedge position {venue_position_id} reconciliation: fills already in batch"
1868 );
1869 return None;
1870 }
1871
1872 if instruments_with_unattributed_fills.contains(&report.instrument_id) {
1875 log::debug!(
1876 "Skipping hedge position {venue_position_id} reconciliation: unattributed fills in batch"
1877 );
1878 return None;
1879 }
1880
1881 log::debug!(
1882 "Reconciling HEDGE position for {}, venue_position_id={}",
1883 report.instrument_id,
1884 venue_position_id
1885 );
1886
1887 let position = {
1888 let cache = self.cache.borrow();
1889 cache.position(&venue_position_id).cloned()
1890 };
1891
1892 match position {
1893 Some(position) => {
1894 let cached_signed_qty = position.signed_decimal_qty();
1895 let venue_signed_qty = report.signed_decimal_qty;
1896
1897 if cached_signed_qty == venue_signed_qty {
1898 log::debug!(
1899 "Hedge position {venue_position_id} matches venue: qty={cached_signed_qty}"
1900 );
1901 return None;
1902 }
1903
1904 if venue_signed_qty == Decimal::ZERO && cached_signed_qty == Decimal::ZERO {
1905 return None;
1906 }
1907
1908 if !self.config.generate_missing_orders {
1909 log::error!(
1910 "Cannot reconcile {} {}: position net qty {} != reported net qty {} \
1911 and `generate_missing_orders` is disabled",
1912 report.instrument_id,
1913 venue_position_id,
1914 cached_signed_qty,
1915 venue_signed_qty
1916 );
1917 return None;
1918 }
1919
1920 self.reconcile_hedge_position_discrepancy(
1921 report,
1922 account_id,
1923 &position,
1924 cached_signed_qty,
1925 )
1926 }
1927 None => {
1928 if report.signed_decimal_qty == Decimal::ZERO {
1929 return None;
1930 }
1931
1932 if !self.config.generate_missing_orders {
1933 log::error!(
1934 "Cannot reconcile position: {venue_position_id} not found and `generate_missing_orders` is disabled"
1935 );
1936 return None;
1937 }
1938
1939 self.reconcile_missing_hedge_position(report, account_id)
1940 }
1941 }
1942 }
1943
1944 fn reconcile_hedge_position_discrepancy(
1945 &mut self,
1946 report: &PositionStatusReport,
1947 account_id: &AccountId,
1948 position: &Position,
1949 cached_signed_qty: Decimal,
1950 ) -> Option<Vec<OrderEventAny>> {
1951 let instrument = self.get_instrument(&report.instrument_id)?;
1952 let venue_signed_qty = report.signed_decimal_qty;
1953
1954 let diff = (cached_signed_qty - venue_signed_qty).abs();
1955 let diff_qty = Quantity::from_decimal_dp(diff, instrument.size_precision()).ok()?;
1956
1957 if diff_qty.is_zero() {
1958 log::debug!(
1959 "Difference quantity rounds to zero for {}, skipping",
1960 instrument.id()
1961 );
1962 return None;
1963 }
1964
1965 let venue_position_id = report.venue_position_id?;
1966 log::warn!(
1967 "Hedge position discrepancy for {} {}: cached={}, venue={}, generating reconciliation order",
1968 report.instrument_id,
1969 venue_position_id,
1970 cached_signed_qty,
1971 venue_signed_qty
1972 );
1973
1974 let current_avg_px = if position.avg_px_open > 0.0 {
1975 Decimal::from_str(&position.avg_px_open.to_string()).ok()
1976 } else {
1977 None
1978 };
1979
1980 self.create_position_reconciliation_order(
1981 report,
1982 account_id,
1983 &instrument,
1984 cached_signed_qty,
1985 diff_qty,
1986 current_avg_px,
1987 )
1988 }
1989
1990 fn reconcile_missing_hedge_position(
1991 &mut self,
1992 report: &PositionStatusReport,
1993 account_id: &AccountId,
1994 ) -> Option<Vec<OrderEventAny>> {
1995 let instrument = self.get_instrument(&report.instrument_id)?;
1996 let venue_signed_qty = report.signed_decimal_qty;
1997
1998 let qty = venue_signed_qty.abs();
1999 let diff_qty = Quantity::from_decimal_dp(qty, instrument.size_precision()).ok()?;
2000
2001 if diff_qty.is_zero() {
2002 return None;
2003 }
2004
2005 let venue_position_id = report.venue_position_id?;
2006 log::warn!(
2007 "Missing hedge position for {} {}: venue reports {}, generating reconciliation order",
2008 report.instrument_id,
2009 venue_position_id,
2010 venue_signed_qty
2011 );
2012
2013 self.create_position_reconciliation_order(
2014 report,
2015 account_id,
2016 &instrument,
2017 Decimal::ZERO,
2018 diff_qty,
2019 None,
2020 )
2021 }
2022
2023 fn reconcile_position_report_netting(
2024 &mut self,
2025 report: &PositionStatusReport,
2026 account_id: &AccountId,
2027 ) -> Option<Vec<OrderEventAny>> {
2028 let instrument_id = report.instrument_id;
2029
2030 log::debug!("Reconciling NET position for {instrument_id}");
2031
2032 let instrument = self.get_instrument(&instrument_id)?;
2033
2034 let (cached_signed_qty, cached_avg_px) = {
2035 let cache = self.cache.borrow();
2036 let positions =
2037 cache.positions_open(None, Some(&instrument_id), None, Some(account_id), None);
2038
2039 if positions.is_empty() {
2040 (Decimal::ZERO, None)
2041 } else {
2042 let mut total_signed_qty = Decimal::ZERO;
2043 let mut total_value = Decimal::ZERO;
2044 let mut total_qty = Decimal::ZERO;
2045
2046 for pos in positions {
2047 total_signed_qty += pos.signed_decimal_qty();
2048 let qty = pos.signed_decimal_qty().abs();
2049 if pos.avg_px_open > 0.0
2050 && qty > Decimal::ZERO
2051 && let Ok(avg_px) = Decimal::from_str(&pos.avg_px_open.to_string())
2052 {
2053 total_value += avg_px * qty;
2054 total_qty += qty;
2055 }
2056 }
2057
2058 let avg_px = if total_qty > Decimal::ZERO {
2059 Some(total_value / total_qty)
2060 } else {
2061 None
2062 };
2063
2064 (total_signed_qty, avg_px)
2065 }
2066 };
2067
2068 let venue_signed_qty = report.signed_decimal_qty;
2069
2070 log::debug!("venue_signed_qty={venue_signed_qty}, cached_signed_qty={cached_signed_qty}");
2071
2072 let tolerance = Decimal::from_str("0.00000001").unwrap_or(Decimal::ZERO);
2073 if (cached_signed_qty - venue_signed_qty).abs() <= tolerance {
2074 log::debug!("Position quantities match for {instrument_id}, no reconciliation needed");
2075 return None;
2076 }
2077
2078 if !self.config.generate_missing_orders {
2079 log::warn!(
2080 "Discrepancy for {instrument_id} position when `generate_missing_orders` disabled, skipping"
2081 );
2082 return None;
2083 }
2084
2085 let diff = (cached_signed_qty - venue_signed_qty).abs();
2086 let diff_qty = Quantity::from_decimal_dp(diff, instrument.size_precision()).ok()?;
2087
2088 if diff_qty.is_zero() {
2089 log::debug!(
2090 "Difference quantity rounds to zero for {instrument_id}, skipping order generation"
2091 );
2092 return None;
2093 }
2094
2095 let crosses_zero = cached_signed_qty != Decimal::ZERO
2096 && venue_signed_qty != Decimal::ZERO
2097 && ((cached_signed_qty > Decimal::ZERO && venue_signed_qty < Decimal::ZERO)
2098 || (cached_signed_qty < Decimal::ZERO && venue_signed_qty > Decimal::ZERO));
2099
2100 if crosses_zero {
2101 let ts_now = self.clock.borrow().timestamp_ns();
2102 return self.reconcile_cross_zero_position(
2103 &instrument,
2104 *account_id,
2105 instrument_id,
2106 cached_signed_qty,
2107 cached_avg_px,
2108 venue_signed_qty,
2109 report.avg_px_open,
2110 ts_now,
2111 report.ts_last,
2112 );
2113 }
2114
2115 if cached_signed_qty == Decimal::ZERO {
2116 return self.create_position_from_report(report, account_id, &instrument);
2117 }
2118
2119 self.create_position_reconciliation_order(
2120 report,
2121 account_id,
2122 &instrument,
2123 cached_signed_qty,
2124 diff_qty,
2125 cached_avg_px,
2126 )
2127 }
2128
2129 fn create_position_reconciliation_order(
2130 &mut self,
2131 report: &PositionStatusReport,
2132 account_id: &AccountId,
2133 instrument: &InstrumentAny,
2134 cached_signed_qty: Decimal,
2135 diff_qty: Quantity,
2136 current_avg_px: Option<Decimal>,
2137 ) -> Option<Vec<OrderEventAny>> {
2138 let venue_signed_qty = report.signed_decimal_qty;
2139 let instrument_id = report.instrument_id;
2140
2141 let order_side = if venue_signed_qty > cached_signed_qty {
2142 OrderSide::Buy
2143 } else {
2144 OrderSide::Sell
2145 };
2146
2147 let reconciliation_px = calculate_reconciliation_price(
2148 cached_signed_qty,
2149 current_avg_px,
2150 venue_signed_qty,
2151 report.avg_px_open,
2152 );
2153
2154 let fill_px = reconciliation_px
2155 .or(report.avg_px_open)
2156 .or(current_avg_px)?;
2157
2158 let ts_now = self.clock.borrow().timestamp_ns();
2159 let fill_price = Price::from_decimal_dp(fill_px, instrument.price_precision()).ok();
2160 let venue_order_id = create_position_reconciliation_venue_order_id(
2161 *account_id,
2162 instrument_id,
2163 order_side,
2164 OrderType::Market,
2165 diff_qty,
2166 fill_price,
2167 report.venue_position_id,
2168 None,
2169 report.ts_last,
2170 );
2171
2172 let mut order_report = OrderStatusReport::new(
2173 *account_id,
2174 instrument_id,
2175 None,
2176 venue_order_id,
2177 order_side,
2178 OrderType::Market,
2179 TimeInForce::Gtc,
2180 OrderStatus::Filled,
2181 diff_qty,
2182 diff_qty,
2183 ts_now,
2184 ts_now,
2185 ts_now,
2186 None,
2187 )
2188 .with_avg_px(fill_px.to_f64().unwrap_or(0.0))
2189 .ok()?;
2190
2191 if let Some(venue_position_id) = report.venue_position_id {
2192 order_report = order_report.with_venue_position_id(venue_position_id);
2193 }
2194
2195 log::info!(
2196 color = LogColor::Blue as u8;
2197 "Generating reconciliation order for {instrument_id}: side={order_side:?}, qty={diff_qty}, px={fill_px}",
2198 );
2199
2200 let (events, _) =
2201 self.handle_external_order(&order_report, account_id, instrument, &[], true);
2202 Some(events)
2203 }
2204
2205 fn reconcile_order_report(
2206 &self,
2207 order: &OrderAny,
2208 report: &OrderStatusReport,
2209 instrument: Option<&InstrumentAny>,
2210 ) -> Option<OrderEventAny> {
2211 let ts_now = self.clock.borrow().timestamp_ns();
2212 reconcile_order_report(order, report, instrument, ts_now)
2213 }
2214
2215 fn reconcile_order_with_fills(
2220 &mut self,
2221 order: &OrderAny,
2222 report: &OrderStatusReport,
2223 fills: &[&FillReport],
2224 instrument: Option<&InstrumentAny>,
2225 ) -> Vec<OrderEventAny> {
2226 let mut events = Vec::new();
2227 let mut sorted_fills: Vec<&FillReport> = fills.to_vec();
2228 sorted_fills.sort_by_key(|f| f.ts_event);
2229
2230 let ts_now = self.clock.borrow().timestamp_ns();
2231
2232 match report.order_status {
2233 OrderStatus::Canceled => {
2234 if report.ts_triggered.is_some()
2235 && order.status() != OrderStatus::Triggered
2236 && TRIGGERABLE_ORDER_TYPES.contains(&order.order_type())
2237 {
2238 events.push(create_reconciliation_triggered(order, report, ts_now));
2239 }
2240
2241 if let Some(inst) = instrument {
2244 for fill in &sorted_fills {
2245 if let Some(event) = self.create_order_fill(order, fill, inst) {
2246 events.push(event);
2247 }
2248 }
2249 }
2250
2251 if let Some(event) = self.reconcile_order_report(order, report, instrument) {
2252 events.push(event);
2253 }
2254 }
2255 OrderStatus::Expired => {
2256 if report.ts_triggered.is_some()
2257 && order.status() != OrderStatus::Triggered
2258 && TRIGGERABLE_ORDER_TYPES.contains(&order.order_type())
2259 {
2260 events.push(create_reconciliation_triggered(order, report, ts_now));
2261 }
2262
2263 if let Some(inst) = instrument {
2265 for fill in &sorted_fills {
2266 if let Some(event) = self.create_order_fill(order, fill, inst) {
2267 events.push(event);
2268 }
2269 }
2270 }
2271
2272 if let Some(event) = self.reconcile_order_report(order, report, instrument) {
2273 events.push(event);
2274 }
2275 }
2276 _ => {
2277 if let Some(event) = self.reconcile_order_report(order, report, instrument) {
2278 events.push(event);
2279 }
2280
2281 if let Some(inst) = instrument {
2282 for fill in &sorted_fills {
2283 if let Some(event) = self.create_order_fill(order, fill, inst) {
2284 events.push(event);
2285 }
2286 }
2287 }
2288 }
2289 }
2290
2291 events
2292 }
2293
2294 fn handle_external_order(
2295 &mut self,
2296 report: &OrderStatusReport,
2297 account_id: &AccountId,
2298 instrument: &InstrumentAny,
2299 fills: &[&FillReport],
2300 is_synthetic: bool,
2301 ) -> (Vec<OrderEventAny>, Option<ExternalOrderMetadata>) {
2302 let (strategy_id, tags) =
2303 if let Some(claimed_strategy) = self.external_order_claims.get(&report.instrument_id) {
2304 let order_id = report
2305 .client_order_id
2306 .map_or_else(|| report.venue_order_id.to_string(), |id| id.to_string());
2307 log::info!(
2308 color = LogColor::Blue as u8;
2309 "External order {} for {} claimed by strategy {}",
2310 order_id,
2311 report.instrument_id,
2312 claimed_strategy,
2313 );
2314 (*claimed_strategy, None)
2315 } else {
2316 let tag = if is_synthetic {
2318 *TAG_RECONCILIATION
2319 } else {
2320 *TAG_VENUE
2321 };
2322 (StrategyId::from("EXTERNAL"), Some(vec![tag]))
2323 };
2324
2325 if self.config.filter_unclaimed_external && !is_synthetic {
2327 return (Vec::new(), None);
2328 }
2329
2330 let client_order_id = report
2331 .client_order_id
2332 .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
2333
2334 if !report.quantity.is_positive() {
2335 log::error!(
2336 "Skipping external order {} ({}) for {}: non-positive quantity in report {:?}",
2337 client_order_id,
2338 report.venue_order_id,
2339 report.instrument_id,
2340 report,
2341 );
2342 return (Vec::new(), None);
2343 }
2344
2345 let ts_now = self.clock.borrow().timestamp_ns();
2346
2347 let initialized = OrderInitialized::new(
2348 self.config.trader_id,
2349 strategy_id,
2350 report.instrument_id,
2351 client_order_id,
2352 report.order_side,
2353 report.order_type,
2354 report.quantity,
2355 report.time_in_force,
2356 report.post_only,
2357 report.reduce_only,
2358 false, true, UUID4::new(),
2361 ts_now,
2362 ts_now,
2363 report.price,
2364 report.trigger_price,
2365 report.trigger_type,
2366 report.limit_offset,
2367 report.trailing_offset,
2368 Some(report.trailing_offset_type),
2369 report.expire_time,
2370 report.display_qty,
2371 None, None, Some(report.contingency_type),
2374 report.order_list_id,
2375 report.linked_order_ids.clone(),
2376 report.parent_order_id,
2377 None, None, None, tags,
2381 );
2382
2383 let events = vec![OrderEventAny::Initialized(initialized)];
2384 let order = match OrderAny::from_events(events) {
2385 Ok(order) => order,
2386 Err(e) => {
2387 log::error!("Failed to create order from report: {e}");
2388 return (Vec::new(), None);
2389 }
2390 };
2391
2392 {
2393 let mut cache = self.cache.borrow_mut();
2394 if let Err(e) = cache.add_order(order.clone(), None, None, false) {
2395 match cache.order(&client_order_id) {
2399 Some(existing) if is_synthetic && existing.is_closed() => {
2400 log::debug!(
2401 "Skipping synthetic reconciliation order {client_order_id} for {}: \
2402 replay deduped (cached status={:?})",
2403 report.instrument_id,
2404 existing.status(),
2405 );
2406 }
2407 Some(existing) if is_synthetic => {
2408 log::warn!(
2409 "Synthetic reconciliation order {client_order_id} for {} exists in \
2410 cache in non-terminal state {:?}; fill not regenerated",
2411 report.instrument_id,
2412 existing.status(),
2413 );
2414 }
2415 _ => {
2416 log::error!("Failed to add external order to cache: {e}");
2417 }
2418 }
2419 return (Vec::new(), None);
2420 }
2421
2422 if let Err(e) =
2423 cache.add_venue_order_id(&client_order_id, &report.venue_order_id, false)
2424 {
2425 log::warn!("Failed to add venue order ID index: {e}");
2426 }
2427 }
2428
2429 log::info!(
2430 color = LogColor::Blue as u8;
2431 "Created external order {} ({}) for {} [{}]",
2432 client_order_id,
2433 report.venue_order_id,
2434 report.instrument_id,
2435 report.order_status,
2436 );
2437
2438 let ts_now = self.clock.borrow().timestamp_ns();
2439
2440 let mut order_events =
2443 generate_external_order_status_events(&order, report, account_id, instrument, ts_now);
2444
2445 if !fills.is_empty() {
2446 let cached_order = self.get_order(&client_order_id).unwrap();
2447 let mut sorted_fills: Vec<&FillReport> = fills.to_vec();
2448 sorted_fills.sort_by_key(|f| f.ts_event);
2449
2450 match report.order_status {
2451 OrderStatus::Canceled | OrderStatus::Expired => {
2452 let terminal_event = order_events.pop();
2453
2454 for fill in sorted_fills {
2455 if let Some(fill_event) =
2456 self.create_order_fill(&cached_order, fill, instrument)
2457 {
2458 order_events.push(fill_event);
2459 }
2460 }
2461
2462 if let Some(event) = terminal_event {
2463 order_events.push(event);
2464 }
2465 }
2466 OrderStatus::Filled | OrderStatus::PartiallyFilled => {
2467 if order_events
2469 .last()
2470 .is_some_and(|e| matches!(e, OrderEventAny::Filled(_)))
2471 {
2472 order_events.pop();
2473 }
2474
2475 let mut real_fill_total = Decimal::ZERO;
2476
2477 for fill in &sorted_fills {
2478 if let Some(fill_event) =
2479 self.create_order_fill(&cached_order, fill, instrument)
2480 {
2481 real_fill_total += fill.last_qty.as_decimal();
2482 order_events.push(fill_event);
2483 }
2484 }
2485
2486 let report_filled = report.filled_qty.as_decimal();
2487 if real_fill_total < report_filled {
2488 let diff_decimal = report_filled - real_fill_total;
2489
2490 if let Ok(diff) =
2491 Quantity::from_decimal_dp(diff_decimal, instrument.size_precision())
2492 && let Some(inferred_fill) = create_inferred_fill_for_qty(
2493 &cached_order,
2494 report,
2495 account_id,
2496 instrument,
2497 diff,
2498 ts_now,
2499 None,
2500 )
2501 {
2502 order_events.push(inferred_fill);
2503 }
2504 }
2505 }
2506 _ => {}
2507 }
2508 }
2509
2510 let metadata = ExternalOrderMetadata {
2511 client_order_id,
2512 venue_order_id: report.venue_order_id,
2513 instrument_id: report.instrument_id,
2514 strategy_id,
2515 ts_init: ts_now,
2516 };
2517
2518 (order_events, Some(metadata))
2519 }
2520
2521 fn adjust_mass_status_fills(
2526 &self,
2527 mass_status: &ExecutionMassStatus,
2528 ) -> (
2529 IndexMap<VenueOrderId, OrderStatusReport>,
2530 IndexMap<VenueOrderId, Vec<FillReport>>,
2531 ) {
2532 let mut final_orders: IndexMap<VenueOrderId, OrderStatusReport> =
2533 mass_status.order_reports();
2534 let mut final_fills: IndexMap<VenueOrderId, Vec<FillReport>> = mass_status.fill_reports();
2535
2536 let mut instruments_to_adjust = Vec::new();
2537
2538 for (instrument_id, position_reports) in mass_status.position_reports() {
2539 if !self.should_reconcile_instrument(&instrument_id) {
2540 log::debug!(
2541 "Skipping fill adjustment for {instrument_id}: not in reconciliation_instrument_ids"
2542 );
2543 continue;
2544 }
2545
2546 let is_hedge_mode = position_reports
2549 .iter()
2550 .any(|r| r.venue_position_id.is_some());
2551
2552 if is_hedge_mode {
2553 log::debug!(
2554 "Skipping fill adjustment for {instrument_id}: hedge mode (has venue_position_id)"
2555 );
2556 continue;
2557 }
2558
2559 if let Some(instrument) = self.get_instrument(&instrument_id) {
2560 instruments_to_adjust.push(instrument);
2561 } else {
2562 log::debug!(
2563 "Skipping fill adjustment for {instrument_id}: instrument not found in cache"
2564 );
2565 }
2566 }
2567
2568 if instruments_to_adjust.is_empty() {
2569 return (final_orders, final_fills);
2570 }
2571
2572 log_info!(
2573 "Adjusting fills for {} instrument(s) with position reports",
2574 instruments_to_adjust.len(),
2575 color = LogColor::Blue
2576 );
2577
2578 for instrument in &instruments_to_adjust {
2579 let instrument_id = instrument.id();
2580
2581 match process_mass_status_for_reconciliation(mass_status, instrument, None) {
2582 Ok(result) => {
2583 final_orders.retain(|_, order| order.instrument_id != instrument_id);
2584 final_fills.retain(|_, fills| {
2585 fills
2586 .first()
2587 .is_none_or(|f| f.instrument_id != instrument_id)
2588 });
2589
2590 for (venue_order_id, order) in result.orders {
2591 final_orders.insert(venue_order_id, order);
2592 }
2593
2594 for (venue_order_id, fills) in result.fills {
2595 final_fills.insert(venue_order_id, fills);
2596 }
2597 }
2598 Err(e) => {
2599 log::warn!("Failed to adjust fills for {instrument_id}: {e}");
2600 }
2601 }
2602 }
2603
2604 log_info!(
2605 "After adjustment: {} order(s), {} fill group(s)",
2606 final_orders.len(),
2607 final_fills.len(),
2608 color = LogColor::Blue
2609 );
2610
2611 (final_orders, final_fills)
2612 }
2613
2614 fn deduplicate_order_reports<'a>(
2619 &self,
2620 reports: impl Iterator<Item = &'a OrderStatusReport>,
2621 ) -> IndexMap<VenueOrderId, &'a OrderStatusReport> {
2622 let mut best_reports: IndexMap<VenueOrderId, &'a OrderStatusReport> = IndexMap::new();
2623
2624 for report in reports {
2625 let dominated = best_reports
2626 .get(&report.venue_order_id)
2627 .is_some_and(|existing| self.is_more_advanced(existing, report));
2628
2629 if !dominated {
2630 best_reports.insert(report.venue_order_id, report);
2631 }
2632 }
2633
2634 best_reports
2635 }
2636
2637 fn is_more_advanced(&self, a: &OrderStatusReport, b: &OrderStatusReport) -> bool {
2638 if a.filled_qty > b.filled_qty {
2639 return true;
2640 }
2641
2642 if a.filled_qty < b.filled_qty {
2643 return false;
2644 }
2645
2646 Self::status_priority(a.order_status) > Self::status_priority(b.order_status)
2648 }
2649
2650 const fn status_priority(status: OrderStatus) -> u8 {
2651 match status {
2652 OrderStatus::Initialized | OrderStatus::Submitted | OrderStatus::Emulated => 0,
2653 OrderStatus::Released | OrderStatus::Denied => 1,
2654 OrderStatus::Accepted | OrderStatus::PendingUpdate | OrderStatus::PendingCancel => 2,
2655 OrderStatus::Triggered => 3,
2656 OrderStatus::PartiallyFilled => 4,
2657 OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected => 5,
2658 OrderStatus::Filled => 6,
2659 }
2660 }
2661
2662 fn is_exact_order_match(&self, order: &OrderAny, report: &OrderStatusReport) -> bool {
2663 order.status() == report.order_status
2664 && order.filled_qty() == report.filled_qty
2665 && !should_reconciliation_update(order, report)
2666 }
2667
2668 fn create_order_fill(
2669 &mut self,
2670 order: &OrderAny,
2671 fill: &FillReport,
2672 instrument: &InstrumentAny,
2673 ) -> Option<OrderEventAny> {
2674 if self.processed_fills.contains_key(&fill.trade_id) {
2675 return None;
2676 }
2677
2678 self.processed_fills
2679 .insert(fill.trade_id, order.client_order_id());
2680
2681 Some(OrderEventAny::Filled(OrderFilled::new(
2682 order.trader_id(),
2683 order.strategy_id(),
2684 order.instrument_id(),
2685 order.client_order_id(),
2686 fill.venue_order_id,
2687 fill.account_id,
2688 fill.trade_id,
2689 fill.order_side,
2690 order.order_type(),
2691 fill.last_qty,
2692 fill.last_px,
2693 instrument.quote_currency(),
2694 fill.liquidity_side,
2695 fill.report_id,
2696 fill.ts_event,
2697 self.clock.borrow().timestamp_ns(),
2698 false,
2699 fill.venue_position_id,
2700 Some(fill.commission),
2701 )))
2702 }
2703}