Skip to main content

nautilus_execution/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `ExecutionEngine` for all environments.
17//!
18//! The execution engines primary responsibility is to orchestrate interactions
19//! between the `ExecutionClient` instances, and the rest of the platform. This
20//! includes sending commands to, and receiving events from, the trading venue
21//! endpoints via its registered execution clients.
22
23pub mod config;
24pub mod stubs;
25
26use std::{
27    cell::{RefCell, RefMut},
28    collections::{HashMap, HashSet},
29    fmt::Debug,
30    rc::Rc,
31    time::SystemTime,
32};
33
34use ahash::AHashSet;
35use config::ExecutionEngineConfig;
36use futures::future::join_all;
37use indexmap::{IndexMap, IndexSet};
38use nautilus_common::{
39    cache::Cache,
40    clients::ExecutionClient,
41    clock::Clock,
42    generators::position_id::PositionIdGenerator,
43    logging::{CMD, EVT, RECV, SEND},
44    messages::{
45        ExecutionReport,
46        execution::{
47            BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
48            SubmitOrder, SubmitOrderList, TradingCommand,
49        },
50    },
51    msgbus::{
52        self, MessagingSwitchboard, ShareableMessageHandler, TypedIntoHandler, get_message_bus,
53        switchboard::{self},
54    },
55    runner::try_get_trading_cmd_sender,
56    timer::{TimeEvent, TimeEventCallback},
57};
58use nautilus_core::{
59    UUID4, UnixNanos, WeakCell,
60    datetime::{mins_to_nanos, mins_to_secs},
61};
62use nautilus_model::{
63    enums::{
64        ContingencyType, OmsType, OrderStatus, OrderType, PositionSide, TimeInForce,
65        TrailingOffsetType,
66    },
67    events::{
68        OrderAccepted, OrderCanceled, OrderDenied, OrderEvent, OrderEventAny, OrderExpired,
69        OrderFilled, OrderInitialized, PositionChanged, PositionClosed, PositionEvent,
70        PositionOpened,
71    },
72    identifiers::{
73        ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, Venue, VenueOrderId,
74    },
75    instruments::{Instrument, InstrumentAny},
76    orderbook::own::{OwnOrderBook, should_handle_own_book_order},
77    orders::{Order, OrderAny, OrderError},
78    position::Position,
79    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
80    types::{Money, Quantity},
81};
82use rust_decimal::Decimal;
83
84use crate::{
85    client::ExecutionClientAdapter,
86    reconciliation::{
87        check_position_reconciliation, create_incremental_inferred_fill,
88        generate_external_order_status_events, generate_reconciliation_order_events,
89        reconcile_fill_report as reconcile_fill,
90    },
91};
92
93const TIMER_PURGE_CLOSED_ORDERS: &str = "ExecEngine_PURGE_CLOSED_ORDERS";
94const TIMER_PURGE_CLOSED_POSITIONS: &str = "ExecEngine_PURGE_CLOSED_POSITIONS";
95const TIMER_PURGE_ACCOUNT_EVENTS: &str = "ExecEngine_PURGE_ACCOUNT_EVENTS";
96
97/// Central execution engine responsible for orchestrating order routing and execution.
98///
99/// The execution engine manages the entire order lifecycle from submission to completion,
100/// handling routing to appropriate execution clients, position management, and event
101/// processing. It supports multiple execution venues through registered clients and
102/// provides sophisticated order management capabilities.
103pub struct ExecutionEngine {
104    clock: Rc<RefCell<dyn Clock>>,
105    cache: Rc<RefCell<Cache>>,
106    clients: IndexMap<ClientId, ExecutionClientAdapter>,
107    default_client: Option<ExecutionClientAdapter>,
108    routing_map: HashMap<Venue, ClientId>,
109    oms_overrides: HashMap<StrategyId, OmsType>,
110    external_order_claims: HashMap<InstrumentId, StrategyId>,
111    external_clients: HashSet<ClientId>,
112    pos_id_generator: PositionIdGenerator,
113    config: ExecutionEngineConfig,
114}
115
116impl Debug for ExecutionEngine {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        f.debug_struct(stringify!(ExecutionEngine))
119            .field("client_count", &self.clients.len())
120            .finish()
121    }
122}
123
124impl ExecutionEngine {
125    /// Creates a new [`ExecutionEngine`] instance.
126    pub fn new(
127        clock: Rc<RefCell<dyn Clock>>,
128        cache: Rc<RefCell<Cache>>,
129        config: Option<ExecutionEngineConfig>,
130    ) -> Self {
131        let trader_id = get_message_bus().borrow().trader_id;
132        Self {
133            clock: clock.clone(),
134            cache,
135            clients: IndexMap::new(),
136            default_client: None,
137            routing_map: HashMap::new(),
138            oms_overrides: HashMap::new(),
139            external_order_claims: HashMap::new(),
140            external_clients: config
141                .as_ref()
142                .and_then(|c| c.external_clients.clone())
143                .unwrap_or_default()
144                .into_iter()
145                .collect(),
146            pos_id_generator: PositionIdGenerator::new(trader_id, clock),
147            config: config.unwrap_or_default(),
148        }
149    }
150
151    /// Registers all message bus handlers for the execution engine.
152    pub fn register_msgbus_handlers(engine: &Rc<RefCell<Self>>) {
153        let weak = WeakCell::from(Rc::downgrade(engine));
154
155        let weak1 = weak.clone();
156        msgbus::register_trading_command_endpoint(
157            MessagingSwitchboard::exec_engine_execute(),
158            TypedIntoHandler::from(move |cmd: TradingCommand| {
159                if let Some(rc) = weak1.upgrade() {
160                    rc.borrow().execute(cmd);
161                }
162            }),
163        );
164
165        // Queued endpoint for deferred command execution (re-entrancy safe),
166        // falls back to direct endpoint if no sender is initialized (e.g., backtest/test).
167        msgbus::register_trading_command_endpoint(
168            MessagingSwitchboard::exec_engine_queue_execute(),
169            TypedIntoHandler::from(move |cmd: TradingCommand| {
170                if let Some(sender) = try_get_trading_cmd_sender() {
171                    sender.execute(cmd);
172                } else {
173                    let endpoint = MessagingSwitchboard::exec_engine_execute();
174                    msgbus::send_trading_command(endpoint, cmd);
175                }
176            }),
177        );
178
179        let weak2 = weak.clone();
180        msgbus::register_order_event_endpoint(
181            MessagingSwitchboard::exec_engine_process(),
182            TypedIntoHandler::from(move |event: OrderEventAny| {
183                if let Some(rc) = weak2.upgrade() {
184                    rc.borrow_mut().process(&event);
185                }
186            }),
187        );
188
189        let weak3 = weak;
190        msgbus::register_execution_report_endpoint(
191            MessagingSwitchboard::exec_engine_reconcile_execution_report(),
192            TypedIntoHandler::from(move |report: ExecutionReport| {
193                if let Some(rc) = weak3.upgrade() {
194                    rc.borrow_mut().reconcile_execution_report(&report);
195                }
196            }),
197        );
198    }
199
200    /// Subscribes to instrument updates for a venue via the message bus.
201    ///
202    /// When instruments are published by the `DataEngine`, the handler routes
203    /// them to the execution client registered for that venue.
204    pub fn subscribe_venue_instruments(engine: &Rc<RefCell<Self>>, venue: Venue) {
205        let weak = WeakCell::from(Rc::downgrade(engine));
206        let pattern = switchboard::get_instruments_pattern(venue);
207
208        let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
209            if let Some(rc) = weak.upgrade() {
210                let venue = instrument.id().venue;
211                let client_id = rc.borrow().routing_map.get(&venue).copied();
212                if let Some(client_id) = client_id {
213                    let mut engine = rc.borrow_mut();
214                    if let Some(adapter) = engine.get_client_adapter_mut(&client_id) {
215                        adapter.on_instrument(instrument.clone());
216                    }
217                }
218            }
219        });
220
221        msgbus::subscribe_any(pattern, handler, None);
222        log::info!("Subscribed to instrument updates for venue {venue}");
223    }
224
225    #[must_use]
226    /// Returns the position ID count for the specified strategy.
227    pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
228        self.pos_id_generator.count(strategy_id)
229    }
230
231    #[must_use]
232    /// Returns a reference to the cache.
233    pub fn cache(&self) -> &Rc<RefCell<Cache>> {
234        &self.cache
235    }
236
237    #[must_use]
238    /// Returns a reference to the configuration.
239    pub const fn config(&self) -> &ExecutionEngineConfig {
240        &self.config
241    }
242
243    #[must_use]
244    /// Checks the integrity of cached execution data.
245    pub fn check_integrity(&self) -> bool {
246        self.cache.borrow_mut().check_integrity()
247    }
248
249    #[must_use]
250    /// Returns true if all registered execution clients are connected.
251    pub fn check_connected(&self) -> bool {
252        let clients_connected = self.clients.values().all(|c| c.is_connected());
253        let default_connected = self
254            .default_client
255            .as_ref()
256            .is_none_or(|c| c.is_connected());
257        clients_connected && default_connected
258    }
259
260    #[must_use]
261    /// Returns true if all registered execution clients are disconnected.
262    pub fn check_disconnected(&self) -> bool {
263        let clients_disconnected = self.clients.values().all(|c| !c.is_connected());
264        let default_disconnected = self
265            .default_client
266            .as_ref()
267            .is_none_or(|c| !c.is_connected());
268        clients_disconnected && default_disconnected
269    }
270
271    /// Returns connection status for each registered client.
272    #[must_use]
273    pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
274        let mut status: Vec<_> = self
275            .clients
276            .values()
277            .map(|c| (c.client_id(), c.is_connected()))
278            .collect();
279
280        if let Some(default) = &self.default_client {
281            status.push((default.client_id(), default.is_connected()));
282        }
283
284        status
285    }
286
287    #[must_use]
288    /// Checks for residual positions and orders in the cache.
289    pub fn check_residuals(&self) -> bool {
290        self.cache.borrow().check_residuals()
291    }
292
293    #[must_use]
294    /// Returns the set of instruments that have external order claims.
295    pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
296        self.external_order_claims.keys().copied().collect()
297    }
298
299    #[must_use]
300    /// Returns the configured external client IDs.
301    pub fn get_external_client_ids(&self) -> HashSet<ClientId> {
302        self.external_clients.clone()
303    }
304
305    #[must_use]
306    /// Returns any external order claim for the given instrument ID.
307    pub fn get_external_order_claim(&self, instrument_id: &InstrumentId) -> Option<StrategyId> {
308        self.external_order_claims.get(instrument_id).copied()
309    }
310
311    /// Registers a new execution client.
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if a client with the same ID is already registered.
316    pub fn register_client(&mut self, client: Box<dyn ExecutionClient>) -> anyhow::Result<()> {
317        let client_id = client.client_id();
318        let venue = client.venue();
319
320        if self.clients.contains_key(&client_id) {
321            anyhow::bail!("Client already registered with ID {client_id}");
322        }
323
324        let adapter = ExecutionClientAdapter::new(client);
325
326        if let Some(existing_client_id) = self.routing_map.get(&venue) {
327            anyhow::bail!(
328                "Venue {venue} already routed to {existing_client_id}, \
329                 cannot register {client_id} for the same venue"
330            );
331        }
332
333        self.routing_map.insert(venue, client_id);
334        log::debug!("Registered client {client_id}");
335        self.clients.insert(client_id, adapter);
336        Ok(())
337    }
338
339    /// Registers a default execution client for fallback routing.
340    pub fn register_default_client(&mut self, client: Box<dyn ExecutionClient>) {
341        let client_id = client.client_id();
342        let adapter = ExecutionClientAdapter::new(client);
343
344        log::debug!("Registered default client {client_id}");
345        self.default_client = Some(adapter);
346    }
347
348    #[must_use]
349    /// Returns a reference to the execution client registered with the given ID.
350    pub fn get_client(&self, client_id: &ClientId) -> Option<&dyn ExecutionClient> {
351        self.clients.get(client_id).map(|a| a.client.as_ref())
352    }
353
354    #[must_use]
355    /// Returns a mutable reference to the execution client adapter registered with the given ID.
356    pub fn get_client_adapter_mut(
357        &mut self,
358        client_id: &ClientId,
359    ) -> Option<&mut ExecutionClientAdapter> {
360        if let Some(default) = &self.default_client
361            && &default.client_id == client_id
362        {
363            return self.default_client.as_mut();
364        }
365        self.clients.get_mut(client_id)
366    }
367
368    /// Generates mass status for the given client.
369    ///
370    /// # Errors
371    ///
372    /// Returns an error if the client is not found or mass status generation fails.
373    pub async fn generate_mass_status(
374        &mut self,
375        client_id: &ClientId,
376        lookback_mins: Option<u64>,
377    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
378        if let Some(client) = self.get_client_adapter_mut(client_id) {
379            client.generate_mass_status(lookback_mins).await
380        } else {
381            anyhow::bail!("Client {client_id} not found")
382        }
383    }
384
385    /// Registers an external order with the execution client for tracking.
386    ///
387    /// This is called after reconciliation creates an external order, allowing the
388    /// execution client to track it for subsequent events (e.g., cancellations).
389    pub fn register_external_order(
390        &self,
391        client_order_id: ClientOrderId,
392        venue_order_id: VenueOrderId,
393        instrument_id: InstrumentId,
394        strategy_id: StrategyId,
395        ts_init: UnixNanos,
396    ) {
397        let venue = instrument_id.venue;
398        if let Some(client_id) = self.routing_map.get(&venue) {
399            if let Some(client) = self.clients.get(client_id) {
400                client.register_external_order(
401                    client_order_id,
402                    venue_order_id,
403                    instrument_id,
404                    strategy_id,
405                    ts_init,
406                );
407            }
408        } else if let Some(default) = &self.default_client {
409            default.register_external_order(
410                client_order_id,
411                venue_order_id,
412                instrument_id,
413                strategy_id,
414                ts_init,
415            );
416        }
417    }
418
419    #[must_use]
420    /// Returns all registered execution client IDs.
421    pub fn client_ids(&self) -> Vec<ClientId> {
422        let mut ids: Vec<_> = self.clients.keys().copied().collect();
423
424        if let Some(default) = &self.default_client {
425            ids.push(default.client_id);
426        }
427        ids
428    }
429
430    #[must_use]
431    /// Returns mutable access to all registered execution clients.
432    pub fn get_clients_mut(&mut self) -> Vec<&mut ExecutionClientAdapter> {
433        let mut adapters: Vec<_> = self.clients.values_mut().collect();
434
435        if let Some(default) = &mut self.default_client {
436            adapters.push(default);
437        }
438        adapters
439    }
440
441    /// Returns all registered execution clients.
442    #[must_use]
443    pub fn get_all_clients(&self) -> Vec<&dyn ExecutionClient> {
444        let mut clients: Vec<&dyn ExecutionClient> =
445            self.clients.values().map(|a| a.client.as_ref()).collect();
446
447        if let Some(default) = &self.default_client {
448            clients.push(default.client.as_ref());
449        }
450
451        clients
452    }
453
454    #[must_use]
455    /// Returns execution clients that would handle the given orders.
456    ///
457    /// This method first attempts to resolve each order's originating client from the cache,
458    /// then falls back to venue routing for any orders without a cached client.
459    pub fn get_clients_for_orders(&self, orders: &[OrderAny]) -> Vec<&dyn ExecutionClient> {
460        let mut client_ids: IndexSet<ClientId> = IndexSet::new();
461        let mut venues: IndexSet<Venue> = IndexSet::new();
462
463        // Collect client IDs from cache and venues for fallback
464        for order in orders {
465            venues.insert(order.instrument_id().venue);
466            if let Some(client_id) = self.cache.borrow().client_id(&order.client_order_id()) {
467                client_ids.insert(*client_id);
468            }
469        }
470
471        let mut clients: Vec<&dyn ExecutionClient> = Vec::new();
472
473        // Add clients for cached client IDs (orders go back to originating client)
474        for client_id in &client_ids {
475            if let Some(adapter) = self.clients.get(client_id)
476                && !clients.iter().any(|c| c.client_id() == adapter.client_id)
477            {
478                clients.push(adapter.client.as_ref());
479            }
480        }
481
482        // Add clients for venue routing (for orders not in cache)
483        for venue in &venues {
484            if let Some(client_id) = self.routing_map.get(venue) {
485                if let Some(adapter) = self.clients.get(client_id)
486                    && !clients.iter().any(|c| c.client_id() == adapter.client_id)
487                {
488                    clients.push(adapter.client.as_ref());
489                }
490            } else if let Some(adapter) = &self.default_client
491                && !clients.iter().any(|c| c.client_id() == adapter.client_id)
492            {
493                clients.push(adapter.client.as_ref());
494            }
495        }
496
497        clients
498    }
499
500    /// Sets routing for a specific venue to a given client ID.
501    ///
502    /// # Errors
503    ///
504    /// Returns an error if the client ID is not registered.
505    pub fn register_venue_routing(
506        &mut self,
507        client_id: ClientId,
508        venue: Venue,
509    ) -> anyhow::Result<()> {
510        if !self.clients.contains_key(&client_id) {
511            anyhow::bail!("No client registered with ID {client_id}");
512        }
513
514        if let Some(existing_client_id) = self.routing_map.get(&venue)
515            && *existing_client_id != client_id
516        {
517            anyhow::bail!(
518                "Venue {venue} already routed to {existing_client_id}, \
519                 cannot re-route to {client_id}"
520            );
521        }
522
523        self.routing_map.insert(venue, client_id);
524        log::info!("Set client {client_id} routing for {venue}");
525        Ok(())
526    }
527
528    /// Registers the OMS (Order Management System) type for a strategy.
529    ///
530    /// If an OMS type is already registered for this strategy, it will be overridden.
531    pub fn register_oms_type(&mut self, strategy_id: StrategyId, oms_type: OmsType) {
532        self.oms_overrides.insert(strategy_id, oms_type);
533        log::info!("Registered OMS::{oms_type:?} for {strategy_id}");
534    }
535
536    /// Registers external order claims for a strategy.
537    ///
538    /// This operation is atomic: either all instruments are registered or none are.
539    ///
540    /// # Errors
541    ///
542    /// Returns an error if any instrument already has a registered claim.
543    pub fn register_external_order_claims(
544        &mut self,
545        strategy_id: StrategyId,
546        instrument_ids: &HashSet<InstrumentId>,
547    ) -> anyhow::Result<()> {
548        // Validate all instruments first
549        for instrument_id in instrument_ids {
550            if let Some(existing) = self.external_order_claims.get(instrument_id) {
551                anyhow::bail!(
552                    "External order claim for {instrument_id} already exists for {existing}"
553                );
554            }
555        }
556
557        // If validation passed, insert all claims
558        for instrument_id in instrument_ids {
559            self.external_order_claims
560                .insert(*instrument_id, strategy_id);
561        }
562
563        if !instrument_ids.is_empty() {
564            log::info!("Registered external order claims for {strategy_id}: {instrument_ids:?}");
565        }
566
567        Ok(())
568    }
569
570    /// # Errors
571    ///
572    /// Returns an error if no client is registered with the given ID.
573    pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
574        if self.clients.shift_remove(&client_id).is_some() {
575            // Remove from routing map if present
576            self.routing_map
577                .retain(|_, mapped_id| mapped_id != &client_id);
578            log::info!("Deregistered client {client_id}");
579            Ok(())
580        } else {
581            anyhow::bail!("No client registered with ID {client_id}")
582        }
583    }
584
585    /// Connects all registered execution clients concurrently.
586    ///
587    /// Connection failures are logged but do not prevent the node from running.
588    pub async fn connect(&mut self) {
589        let futures: Vec<_> = self
590            .get_clients_mut()
591            .into_iter()
592            .map(|client| client.connect())
593            .collect();
594
595        let results = join_all(futures).await;
596
597        for error in results.into_iter().filter_map(Result::err) {
598            log::error!("Failed to connect execution client: {error:#}");
599        }
600    }
601
602    /// Disconnects all registered execution clients concurrently.
603    ///
604    /// # Errors
605    ///
606    /// Returns an error if any client fails to disconnect.
607    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
608        let futures: Vec<_> = self
609            .get_clients_mut()
610            .into_iter()
611            .map(|client| client.disconnect())
612            .collect();
613
614        let results = join_all(futures).await;
615        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
616
617        if errors.is_empty() {
618            Ok(())
619        } else {
620            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
621            anyhow::bail!(
622                "Failed to disconnect execution clients: {}",
623                error_msgs.join("; ")
624            )
625        }
626    }
627
628    /// Sets the `manage_own_order_books` configuration option.
629    pub fn set_manage_own_order_books(&mut self, value: bool) {
630        self.config.manage_own_order_books = value;
631    }
632
633    /// Starts the position snapshot timer if configured.
634    ///
635    /// Timer functionality requires a live execution context with an active clock.
636    pub fn start_snapshot_timer(&mut self) {
637        if let Some(interval_secs) = self.config.snapshot_positions_interval_secs {
638            log::info!("Starting position snapshots timer at {interval_secs} second intervals");
639        }
640    }
641
642    /// Stops the position snapshot timer if running.
643    pub fn stop_snapshot_timer(&mut self) {
644        if self.config.snapshot_positions_interval_secs.is_some() {
645            log::info!("Canceling position snapshots timer");
646        }
647    }
648
649    /// Starts the purge timers if configured.
650    #[expect(
651        clippy::missing_panics_doc,
652        reason = "timer registration is not expected to fail"
653    )]
654    pub fn start_purge_timers(&mut self) {
655        if let Some(interval_mins) = self
656            .config
657            .purge_closed_orders_interval_mins
658            .filter(|&m| m > 0)
659            && !self
660                .clock
661                .borrow()
662                .timer_names()
663                .contains(&TIMER_PURGE_CLOSED_ORDERS)
664        {
665            let interval_ns = mins_to_nanos(u64::from(interval_mins));
666            let buffer_mins = self.config.purge_closed_orders_buffer_mins.unwrap_or(0);
667            let buffer_secs = mins_to_secs(u64::from(buffer_mins));
668            let cache = self.cache.clone();
669            let clock = self.clock.clone();
670
671            let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event| {
672                let ts_now = clock.borrow().timestamp_ns();
673                cache.borrow_mut().purge_closed_orders(ts_now, buffer_secs);
674            });
675            let callback = TimeEventCallback::from(callback_fn);
676
677            log::info!("Starting purge closed orders timer at {interval_mins} minute intervals");
678            self.clock
679                .borrow_mut()
680                .set_timer_ns(
681                    TIMER_PURGE_CLOSED_ORDERS,
682                    interval_ns,
683                    None,
684                    None,
685                    Some(callback),
686                    None,
687                    None,
688                )
689                .expect("Failed to set purge closed orders timer");
690        }
691
692        if let Some(interval_mins) = self
693            .config
694            .purge_closed_positions_interval_mins
695            .filter(|&m| m > 0)
696            && !self
697                .clock
698                .borrow()
699                .timer_names()
700                .contains(&TIMER_PURGE_CLOSED_POSITIONS)
701        {
702            let interval_ns = mins_to_nanos(u64::from(interval_mins));
703            let buffer_mins = self.config.purge_closed_positions_buffer_mins.unwrap_or(0);
704            let buffer_secs = mins_to_secs(u64::from(buffer_mins));
705            let cache = self.cache.clone();
706            let clock = self.clock.clone();
707
708            let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event| {
709                let ts_now = clock.borrow().timestamp_ns();
710                cache
711                    .borrow_mut()
712                    .purge_closed_positions(ts_now, buffer_secs);
713            });
714            let callback = TimeEventCallback::from(callback_fn);
715
716            log::info!("Starting purge closed positions timer at {interval_mins} minute intervals");
717            self.clock
718                .borrow_mut()
719                .set_timer_ns(
720                    TIMER_PURGE_CLOSED_POSITIONS,
721                    interval_ns,
722                    None,
723                    None,
724                    Some(callback),
725                    None,
726                    None,
727                )
728                .expect("Failed to set purge closed positions timer");
729        }
730
731        if let Some(interval_mins) = self
732            .config
733            .purge_account_events_interval_mins
734            .filter(|&m| m > 0)
735            && !self
736                .clock
737                .borrow()
738                .timer_names()
739                .contains(&TIMER_PURGE_ACCOUNT_EVENTS)
740        {
741            let interval_ns = mins_to_nanos(u64::from(interval_mins));
742            let lookback_mins = self.config.purge_account_events_lookback_mins.unwrap_or(0);
743            let lookback_secs = mins_to_secs(u64::from(lookback_mins));
744            let cache = self.cache.clone();
745            let clock = self.clock.clone();
746
747            let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event| {
748                let ts_now = clock.borrow().timestamp_ns();
749                cache
750                    .borrow_mut()
751                    .purge_account_events(ts_now, lookback_secs);
752            });
753            let callback = TimeEventCallback::from(callback_fn);
754
755            log::info!("Starting purge account events timer at {interval_mins} minute intervals");
756            self.clock
757                .borrow_mut()
758                .set_timer_ns(
759                    TIMER_PURGE_ACCOUNT_EVENTS,
760                    interval_ns,
761                    None,
762                    None,
763                    Some(callback),
764                    None,
765                    None,
766                )
767                .expect("Failed to set purge account events timer");
768        }
769    }
770
771    /// Stops the purge timers if running.
772    pub fn stop_purge_timers(&mut self) {
773        let timer_names: Vec<String> = self
774            .clock
775            .borrow()
776            .timer_names()
777            .into_iter()
778            .map(String::from)
779            .collect();
780
781        if timer_names.iter().any(|n| n == TIMER_PURGE_CLOSED_ORDERS) {
782            log::info!("Canceling purge closed orders timer");
783            self.clock
784                .borrow_mut()
785                .cancel_timer(TIMER_PURGE_CLOSED_ORDERS);
786        }
787
788        if timer_names
789            .iter()
790            .any(|n| n == TIMER_PURGE_CLOSED_POSITIONS)
791        {
792            log::info!("Canceling purge closed positions timer");
793            self.clock
794                .borrow_mut()
795                .cancel_timer(TIMER_PURGE_CLOSED_POSITIONS);
796        }
797
798        if timer_names.iter().any(|n| n == TIMER_PURGE_ACCOUNT_EVENTS) {
799            log::info!("Canceling purge account events timer");
800            self.clock
801                .borrow_mut()
802                .cancel_timer(TIMER_PURGE_ACCOUNT_EVENTS);
803        }
804    }
805
806    /// Creates snapshots of all open positions.
807    pub fn snapshot_open_position_states(&self) {
808        let positions: Vec<Position> = self
809            .cache
810            .borrow()
811            .positions_open(None, None, None, None, None)
812            .into_iter()
813            .cloned()
814            .collect();
815
816        for position in positions {
817            self.create_position_state_snapshot(&position);
818        }
819    }
820
821    #[expect(clippy::await_holding_refcell_ref)]
822    /// Loads persistent state into cache and rebuilds indices.
823    ///
824    /// # Errors
825    ///
826    /// Returns an error if any cache operation fails.
827    pub async fn load_cache(&mut self) -> anyhow::Result<()> {
828        let ts = SystemTime::now(); // dst-ok: init-time log timing, not on DST state path
829
830        {
831            let mut cache = self.cache.borrow_mut();
832            cache.clear_index();
833            cache.cache_general()?;
834            self.cache.borrow_mut().cache_all().await?;
835            cache.build_index();
836            let _ = cache.check_integrity();
837
838            if self.config.manage_own_order_books {
839                for order in cache.orders(None, None, None, None, None) {
840                    if order.is_closed() || !should_handle_own_book_order(order) {
841                        continue;
842                    }
843                    let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
844                    own_book.add(order.to_own_book_order());
845                }
846            }
847        }
848
849        self.set_position_id_counts();
850
851        log::info!(
852            "Loaded cache in {}ms",
853            SystemTime::now() // dst-ok: init-time log timing, not on DST state path
854                .duration_since(ts)
855                .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
856                .as_millis()
857        );
858
859        Ok(())
860    }
861
862    /// Flushes the database to persist all cached data.
863    pub fn flush_db(&self) {
864        self.cache.borrow_mut().flush_db();
865    }
866
867    /// Reconciles an execution report.
868    pub fn reconcile_execution_report(&mut self, report: &ExecutionReport) {
869        match report {
870            ExecutionReport::Order(order_report) => {
871                self.reconcile_order_status_report(order_report);
872            }
873            ExecutionReport::Fill(fill_report) => {
874                self.reconcile_fill_report(fill_report);
875            }
876            ExecutionReport::OrderWithFills(order_report, fills) => {
877                self.reconcile_order_with_fills(order_report, fills);
878            }
879            ExecutionReport::Position(position_report) => {
880                self.reconcile_position_report(position_report);
881            }
882            ExecutionReport::MassStatus(mass_status) => {
883                self.reconcile_execution_mass_status(mass_status);
884            }
885        }
886    }
887
888    /// Reconciles an order status report received at runtime.
889    ///
890    /// Handles order status transitions by generating appropriate events when the venue
891    /// reports a different status than our local state. Supports all order states including
892    /// fills with inferred fill generation when instruments are available.
893    ///
894    /// When the order is not found in cache, creates an external order from the report.
895    /// This handles exchange-generated orders (liquidation, ADL, settlement) that were
896    /// not submitted locally.
897    pub fn reconcile_order_status_report(&mut self, report: &OrderStatusReport) {
898        let cache = self.cache.borrow();
899
900        let order = report
901            .client_order_id
902            .and_then(|id| cache.order(&id).cloned())
903            .or_else(|| {
904                cache
905                    .client_order_id(&report.venue_order_id)
906                    .and_then(|cid| cache.order(cid).cloned())
907            });
908
909        let instrument = cache.instrument(&report.instrument_id).cloned();
910
911        drop(cache);
912
913        if let Some(order) = order {
914            let ts_now = self.clock.borrow().timestamp_ns();
915            let events =
916                generate_reconciliation_order_events(&order, report, instrument.as_ref(), ts_now);
917
918            for event in &events {
919                self.handle_event(event);
920            }
921        } else {
922            self.create_external_order(report, instrument.as_ref());
923        }
924    }
925
926    fn create_external_order(
927        &mut self,
928        report: &OrderStatusReport,
929        instrument: Option<&InstrumentAny>,
930    ) {
931        let Some(instrument) = instrument else {
932            log::warn!(
933                "Cannot create external order for venue_order_id={}: instrument {} not found",
934                report.venue_order_id,
935                report.instrument_id
936            );
937            return;
938        };
939
940        let Some(order) = self.materialize_external_order_from_status(report) else {
941            return;
942        };
943
944        let ts_now = self.clock.borrow().timestamp_ns();
945        let events = generate_external_order_status_events(
946            &order,
947            report,
948            &report.account_id,
949            instrument,
950            ts_now,
951        );
952
953        for event in &events {
954            self.handle_event(event);
955        }
956    }
957
958    /// Builds and registers an external order from an [`OrderStatusReport`] without
959    /// emitting status events. Returns the registered order.
960    fn materialize_external_order_from_status(
961        &self,
962        report: &OrderStatusReport,
963    ) -> Option<OrderAny> {
964        let strategy_id = self.resolve_external_strategy(&report.instrument_id);
965
966        let client_order_id = report
967            .client_order_id
968            .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
969
970        let trader_id = get_message_bus().borrow().trader_id;
971        let ts_now = self.clock.borrow().timestamp_ns();
972
973        let initialized = OrderInitialized::new(
974            trader_id,
975            strategy_id,
976            report.instrument_id,
977            client_order_id,
978            report.order_side,
979            report.order_type,
980            report.quantity,
981            report.time_in_force,
982            report.post_only,
983            report.reduce_only,
984            false, // quote_quantity
985            true,  // reconciliation
986            UUID4::new(),
987            ts_now,
988            ts_now,
989            report.price,
990            report.trigger_price,
991            report.trigger_type,
992            report.limit_offset,
993            report.trailing_offset,
994            Some(report.trailing_offset_type),
995            report.expire_time,
996            report.display_qty,
997            None, // emulation_trigger
998            None, // trigger_instrument_id
999            Some(report.contingency_type),
1000            report.order_list_id,
1001            report.linked_order_ids.clone(),
1002            report.parent_order_id,
1003            None, // exec_algorithm_id
1004            None, // exec_algorithm_params
1005            None, // exec_spawn_id
1006            None, // tags
1007        );
1008
1009        self.materialize_external_order(
1010            initialized,
1011            client_order_id,
1012            report.venue_order_id,
1013            report.instrument_id,
1014            strategy_id,
1015            ts_now,
1016            Some(report.order_status),
1017        )
1018    }
1019
1020    /// Builds and registers an external order from a [`FillReport`] when no matching
1021    /// order exists in cache. The order is created with `OrderType::Market` and a
1022    /// quantity equal to the fill's `last_qty`, so the fill consumes the entire
1023    /// order on application.
1024    ///
1025    /// This handles venue-initiated fills (most commonly Hyperliquid liquidations)
1026    /// where the venue does not surface a user-level order on its order channel.
1027    fn materialize_external_order_from_fill(&self, report: &FillReport) -> Option<OrderAny> {
1028        let strategy_id = self.resolve_external_strategy(&report.instrument_id);
1029
1030        let client_order_id = report
1031            .client_order_id
1032            .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
1033
1034        let trader_id = get_message_bus().borrow().trader_id;
1035        let ts_now = self.clock.borrow().timestamp_ns();
1036
1037        let initialized = OrderInitialized::new(
1038            trader_id,
1039            strategy_id,
1040            report.instrument_id,
1041            client_order_id,
1042            report.order_side,
1043            OrderType::Market,
1044            report.last_qty,
1045            TimeInForce::Ioc,
1046            false, // post_only
1047            true,  // reduce_only: venue-initiated closes always reduce
1048            false, // quote_quantity
1049            true,  // reconciliation
1050            UUID4::new(),
1051            ts_now,
1052            ts_now,
1053            None, // price
1054            None, // trigger_price
1055            None, // trigger_type
1056            None, // limit_offset
1057            None, // trailing_offset
1058            Some(TrailingOffsetType::NoTrailingOffset),
1059            None, // expire_time
1060            None, // display_qty
1061            None, // emulation_trigger
1062            None, // trigger_instrument_id
1063            Some(ContingencyType::NoContingency),
1064            None, // order_list_id
1065            None, // linked_order_ids
1066            None, // parent_order_id
1067            None, // exec_algorithm_id
1068            None, // exec_algorithm_params
1069            None, // exec_spawn_id
1070            None, // tags
1071        );
1072
1073        self.materialize_external_order(
1074            initialized,
1075            client_order_id,
1076            report.venue_order_id,
1077            report.instrument_id,
1078            strategy_id,
1079            ts_now,
1080            None,
1081        )
1082    }
1083
1084    fn resolve_external_strategy(&self, instrument_id: &InstrumentId) -> StrategyId {
1085        self.external_order_claims
1086            .get(instrument_id)
1087            .copied()
1088            .unwrap_or_else(|| StrategyId::from("EXTERNAL"))
1089    }
1090
1091    /// Adds an external order to the cache and registers it for adapter routing.
1092    /// Returns the registered order on success.
1093    #[allow(
1094        clippy::too_many_arguments,
1095        reason = "external order materialisation threads several ids and a timestamp"
1096    )]
1097    fn materialize_external_order(
1098        &self,
1099        initialized: OrderInitialized,
1100        client_order_id: ClientOrderId,
1101        venue_order_id: VenueOrderId,
1102        instrument_id: InstrumentId,
1103        strategy_id: StrategyId,
1104        ts_now: UnixNanos,
1105        order_status: Option<OrderStatus>,
1106    ) -> Option<OrderAny> {
1107        let order = match OrderAny::from_events(vec![OrderEventAny::Initialized(initialized)]) {
1108            Ok(order) => order,
1109            Err(e) => {
1110                log::error!("Failed to create external order from report: {e}");
1111                return None;
1112            }
1113        };
1114
1115        {
1116            let mut cache = self.cache.borrow_mut();
1117            if let Err(e) = cache.add_order(order.clone(), None, None, false) {
1118                log::error!("Failed to add external order to cache: {e}");
1119                return None;
1120            }
1121
1122            if let Err(e) = cache.add_venue_order_id(&client_order_id, &venue_order_id, false) {
1123                log::warn!("Failed to add venue order ID index: {e}");
1124            }
1125        }
1126
1127        match order_status {
1128            Some(status) => log::info!(
1129                "Created external order {client_order_id} ({venue_order_id}) for {instrument_id} [{status}]",
1130            ),
1131            None => log::info!(
1132                "Created external order {client_order_id} ({venue_order_id}) for {instrument_id}",
1133            ),
1134        }
1135
1136        self.register_external_order(
1137            client_order_id,
1138            venue_order_id,
1139            instrument_id,
1140            strategy_id,
1141            ts_now,
1142        );
1143
1144        Some(order)
1145    }
1146
1147    /// Reconciles a fill report received at runtime.
1148    ///
1149    /// Finds the associated order, validates the fill, and generates an OrderFilled event
1150    /// if the fill is not a duplicate and won't cause an overfill. When the order is not
1151    /// in cache, an external order is bootstrapped from the fill so that venue-initiated
1152    /// closures (e.g. Hyperliquid liquidations) that arrive without a companion order
1153    /// status report still update the local position.
1154    pub fn reconcile_fill_report(&mut self, report: &FillReport) {
1155        let cache = self.cache.borrow();
1156
1157        let order = report
1158            .client_order_id
1159            .and_then(|id| cache.order(&id).cloned())
1160            .or_else(|| {
1161                cache
1162                    .client_order_id(&report.venue_order_id)
1163                    .and_then(|cid| cache.order(cid).cloned())
1164            });
1165
1166        let instrument = cache.instrument(&report.instrument_id).cloned();
1167
1168        drop(cache);
1169
1170        let Some(instrument) = instrument else {
1171            log::debug!(
1172                "Cannot reconcile fill report for venue_order_id={}: instrument {} not found",
1173                report.venue_order_id,
1174                report.instrument_id
1175            );
1176            return;
1177        };
1178
1179        let order = match order {
1180            Some(order) => order,
1181            None => {
1182                let Some(order) = self.materialize_external_order_from_fill(report) else {
1183                    return;
1184                };
1185                let ts_now = self.clock.borrow().timestamp_ns();
1186                let accepted = OrderAccepted::new(
1187                    order.trader_id(),
1188                    order.strategy_id(),
1189                    order.instrument_id(),
1190                    order.client_order_id(),
1191                    report.venue_order_id,
1192                    report.account_id,
1193                    UUID4::new(),
1194                    report.ts_event,
1195                    ts_now,
1196                    true, // reconciliation
1197                );
1198                self.handle_event(&OrderEventAny::Accepted(accepted));
1199                self.cache
1200                    .borrow()
1201                    .order(&order.client_order_id())
1202                    .cloned()
1203                    .unwrap_or(order)
1204            }
1205        };
1206
1207        let ts_now = self.clock.borrow().timestamp_ns();
1208
1209        if let Some(event) = reconcile_fill(
1210            &order,
1211            report,
1212            &instrument,
1213            ts_now,
1214            self.config.allow_overfills,
1215        ) {
1216            self.handle_event(&event);
1217        }
1218    }
1219
1220    /// Reconciles an [`OrderStatusReport`] paired with companion [`FillReport`]s
1221    /// for the same venue event.
1222    ///
1223    /// Real fills supplied by the adapter are applied first so their `trade_id` and
1224    /// `commission` are preserved; any residual quantity not covered by the fills is
1225    /// then synthesised as an inferred fill from the status report's `avg_px`.
1226    /// Adapters use this to emit ADL / liquidation / settlement events without
1227    /// losing real fill metadata.
1228    pub fn reconcile_order_with_fills(&mut self, report: &OrderStatusReport, fills: &[FillReport]) {
1229        let cache = self.cache.borrow();
1230        let order = report
1231            .client_order_id
1232            .and_then(|id| cache.order(&id).cloned())
1233            .or_else(|| {
1234                cache
1235                    .client_order_id(&report.venue_order_id)
1236                    .and_then(|cid| cache.order(cid).cloned())
1237            });
1238        let instrument = cache.instrument(&report.instrument_id).cloned();
1239        drop(cache);
1240
1241        let Some(instrument) = instrument else {
1242            log::debug!(
1243                "Cannot reconcile bundled report for venue_order_id={}: instrument {} not found",
1244                report.venue_order_id,
1245                report.instrument_id,
1246            );
1247            return;
1248        };
1249
1250        // Bootstrap the external order with only OrderAccepted; defer fill events to
1251        // the per-fill loop so real fill metadata is preserved.
1252        let mut order = match order {
1253            Some(order) => order,
1254            None => {
1255                let Some(order) = self.materialize_external_order_from_status(report) else {
1256                    return;
1257                };
1258                let ts_now = self.clock.borrow().timestamp_ns();
1259                let accepted = OrderAccepted::new(
1260                    order.trader_id(),
1261                    order.strategy_id(),
1262                    order.instrument_id(),
1263                    order.client_order_id(),
1264                    report.venue_order_id,
1265                    report.account_id,
1266                    UUID4::new(),
1267                    report.ts_accepted,
1268                    ts_now,
1269                    true, // reconciliation
1270                );
1271                self.handle_event(&OrderEventAny::Accepted(accepted));
1272                order
1273            }
1274        };
1275
1276        let client_order_id = order.client_order_id();
1277
1278        for fill in fills {
1279            let ts_now = self.clock.borrow().timestamp_ns();
1280
1281            if let Some(event) = reconcile_fill(
1282                &order,
1283                fill,
1284                &instrument,
1285                ts_now,
1286                self.config.allow_overfills,
1287            ) {
1288                self.handle_event(&event);
1289            }
1290
1291            // Refresh order after fill to keep filled_qty accurate for the next iteration.
1292            if let Some(refreshed) = self.cache.borrow().order(&client_order_id).cloned() {
1293                order = refreshed;
1294            }
1295        }
1296
1297        // Cover any quantity gap between the status report and the real fills with
1298        // an inferred fill so the order reaches the venue-reported terminal state.
1299        if matches!(
1300            report.order_status,
1301            OrderStatus::PartiallyFilled | OrderStatus::Filled,
1302        ) && report.filled_qty > order.filled_qty()
1303        {
1304            let ts_now = self.clock.borrow().timestamp_ns();
1305
1306            if let Some(event) = create_incremental_inferred_fill(
1307                &order,
1308                report,
1309                &report.account_id,
1310                &instrument,
1311                ts_now,
1312                None,
1313            ) {
1314                self.handle_event(&event);
1315
1316                if let Some(refreshed) = self.cache.borrow().order(&client_order_id).cloned() {
1317                    order = refreshed;
1318                }
1319            }
1320        }
1321
1322        // Apply terminal events when the venue reports a non-fill closure.
1323        match report.order_status {
1324            OrderStatus::Canceled if !order.is_closed() => {
1325                let ts_now = self.clock.borrow().timestamp_ns();
1326                let canceled = OrderCanceled::new(
1327                    order.trader_id(),
1328                    order.strategy_id(),
1329                    order.instrument_id(),
1330                    order.client_order_id(),
1331                    UUID4::new(),
1332                    report.ts_last,
1333                    ts_now,
1334                    true,
1335                    Some(report.venue_order_id),
1336                    Some(report.account_id),
1337                );
1338                self.handle_event(&OrderEventAny::Canceled(canceled));
1339            }
1340            OrderStatus::Expired if !order.is_closed() => {
1341                let ts_now = self.clock.borrow().timestamp_ns();
1342                let expired = OrderExpired::new(
1343                    order.trader_id(),
1344                    order.strategy_id(),
1345                    order.instrument_id(),
1346                    order.client_order_id(),
1347                    UUID4::new(),
1348                    report.ts_last,
1349                    ts_now,
1350                    true,
1351                    Some(report.venue_order_id),
1352                    Some(report.account_id),
1353                );
1354                self.handle_event(&OrderEventAny::Expired(expired));
1355            }
1356            _ => {}
1357        }
1358    }
1359
1360    /// Reconciles a position status report received at runtime.
1361    ///
1362    /// Compares the venue-reported position with cached positions and logs any discrepancies.
1363    /// Handles both hedging (with venue_position_id) and netting (without) modes.
1364    pub fn reconcile_position_report(&mut self, report: &PositionStatusReport) {
1365        let cache = self.cache.borrow();
1366
1367        let size_precision = cache
1368            .instrument(&report.instrument_id)
1369            .map(|i| i.size_precision());
1370
1371        if report.venue_position_id.is_some() {
1372            self.reconcile_position_report_hedging(report, &cache);
1373        } else {
1374            self.reconcile_position_report_netting(report, &cache, size_precision);
1375        }
1376    }
1377
1378    fn reconcile_position_report_hedging(&self, report: &PositionStatusReport, cache: &Cache) {
1379        let venue_position_id = report.venue_position_id.as_ref().unwrap();
1380
1381        log::debug!(
1382            "Reconciling HEDGE position for {}, venue_position_id={}",
1383            report.instrument_id,
1384            venue_position_id
1385        );
1386
1387        let Some(position) = cache.position(venue_position_id) else {
1388            log::error!("Cannot reconcile position: {venue_position_id} not found in cache");
1389            return;
1390        };
1391
1392        let cached_signed_qty = match position.side {
1393            PositionSide::Long => position.quantity.as_decimal(),
1394            PositionSide::Short => -position.quantity.as_decimal(),
1395            _ => Decimal::ZERO,
1396        };
1397        let venue_signed_qty = report.signed_decimal_qty;
1398
1399        if cached_signed_qty != venue_signed_qty {
1400            log::error!(
1401                "Position mismatch for {} {}: cached={}, venue={}",
1402                report.instrument_id,
1403                venue_position_id,
1404                cached_signed_qty,
1405                venue_signed_qty
1406            );
1407        }
1408    }
1409
1410    fn reconcile_position_report_netting(
1411        &self,
1412        report: &PositionStatusReport,
1413        cache: &Cache,
1414        size_precision: Option<u8>,
1415    ) {
1416        log::debug!("Reconciling NET position for {}", report.instrument_id);
1417
1418        let positions_open =
1419            cache.positions_open(None, Some(&report.instrument_id), None, None, None);
1420
1421        // Sum up cached position quantities using domain types to avoid f64 precision loss
1422        let cached_signed_qty: Decimal = positions_open
1423            .iter()
1424            .map(|p| match p.side {
1425                PositionSide::Long => p.quantity.as_decimal(),
1426                PositionSide::Short => -p.quantity.as_decimal(),
1427                _ => Decimal::ZERO,
1428            })
1429            .sum();
1430
1431        log::debug!(
1432            "Position report: venue_signed_qty={}, cached_signed_qty={}",
1433            report.signed_decimal_qty,
1434            cached_signed_qty
1435        );
1436
1437        let _ = check_position_reconciliation(report, cached_signed_qty, size_precision);
1438    }
1439
1440    /// Reconciles an execution mass status report.
1441    ///
1442    /// Processes all order reports, fill reports, and position reports contained
1443    /// in the mass status. Orders created as external during this pass already receive
1444    /// inferred fills, so their companion fill reports are skipped to avoid double-fills.
1445    pub fn reconcile_execution_mass_status(&mut self, mass_status: &ExecutionMassStatus) {
1446        log::info!(
1447            "Reconciling mass status for client={}, account={}, venue={}",
1448            mass_status.client_id,
1449            mass_status.account_id,
1450            mass_status.venue
1451        );
1452
1453        let mut external_venue_ids = AHashSet::new();
1454
1455        for order_report in mass_status.order_reports().values() {
1456            let existed = {
1457                let cache = self.cache.borrow();
1458                order_report
1459                    .client_order_id
1460                    .and_then(|id| cache.order(&id).cloned())
1461                    .or_else(|| {
1462                        cache
1463                            .client_order_id(&order_report.venue_order_id)
1464                            .and_then(|cid| cache.order(cid).cloned())
1465                    })
1466                    .is_some()
1467            };
1468
1469            self.reconcile_order_status_report(order_report);
1470
1471            if !existed {
1472                external_venue_ids.insert(order_report.venue_order_id);
1473            }
1474        }
1475
1476        for fill_reports in mass_status.fill_reports().values() {
1477            for fill_report in fill_reports {
1478                if external_venue_ids.contains(&fill_report.venue_order_id) {
1479                    log::debug!(
1480                        "Skipping fill report for external order {}: covered by inferred fill",
1481                        fill_report.venue_order_id
1482                    );
1483                    continue;
1484                }
1485
1486                self.reconcile_fill_report(fill_report);
1487            }
1488        }
1489
1490        for position_reports in mass_status.position_reports().values() {
1491            for position_report in position_reports {
1492                self.reconcile_position_report(position_report);
1493            }
1494        }
1495
1496        log::info!(
1497            "Mass status reconciliation complete: {} orders, {} fills, {} positions",
1498            mass_status.order_reports().len(),
1499            mass_status
1500                .fill_reports()
1501                .values()
1502                .map(|v| v.len())
1503                .sum::<usize>(),
1504            mass_status
1505                .position_reports()
1506                .values()
1507                .map(|v| v.len())
1508                .sum::<usize>()
1509        );
1510    }
1511
1512    /// Executes a trading command by routing it to the appropriate execution client.
1513    pub fn execute(&self, command: TradingCommand) {
1514        self.execute_command(command);
1515    }
1516
1517    /// Processes an order event, updating internal state and routing as needed.
1518    pub fn process(&mut self, event: &OrderEventAny) {
1519        self.handle_event(event);
1520    }
1521
1522    /// Starts the execution engine.
1523    pub fn start(&mut self) {
1524        self.start_snapshot_timer();
1525        self.start_purge_timers();
1526
1527        log::info!("Started");
1528    }
1529
1530    /// Stops the execution engine.
1531    pub fn stop(&mut self) {
1532        self.stop_snapshot_timer();
1533        self.stop_purge_timers();
1534
1535        log::info!("Stopped");
1536    }
1537
1538    /// Resets the execution engine to its initial state.
1539    pub fn reset(&mut self) {
1540        self.pos_id_generator.reset();
1541
1542        log::info!("Reset");
1543    }
1544
1545    /// Disposes of the execution engine, releasing resources.
1546    pub fn dispose(&mut self) {
1547        log::info!("Disposed");
1548    }
1549
1550    fn execute_command(&self, command: TradingCommand) {
1551        if self.config.debug {
1552            log::debug!("{RECV}{CMD} {command:?}");
1553        }
1554
1555        if let Some(cid) = command.client_id()
1556            && self.external_clients.contains(&cid)
1557        {
1558            if self.config.debug {
1559                log::debug!("Skipping execution command for external client {cid}: {command:?}");
1560            }
1561            return;
1562        }
1563
1564        let client = if let Some(adapter) = command
1565            .client_id()
1566            .and_then(|cid| self.clients.get(&cid))
1567            .or_else(|| {
1568                self.routing_map
1569                    .get(&command.instrument_id().venue)
1570                    .and_then(|client_id| self.clients.get(client_id))
1571            })
1572            .or(self.default_client.as_ref())
1573        {
1574            adapter.client.as_ref()
1575        } else {
1576            log::error!(
1577                "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
1578                command.client_id(),
1579                command.instrument_id().venue,
1580            );
1581
1582            let reason = format!(
1583                "No execution client found for client_id={:?}, venue={}",
1584                command.client_id(),
1585                command.instrument_id().venue,
1586            );
1587
1588            match command {
1589                TradingCommand::SubmitOrder(cmd) => {
1590                    let cache = self.cache.borrow();
1591                    if let Some(order) = cache.order(&cmd.client_order_id) {
1592                        let order = order.clone();
1593                        drop(cache);
1594                        self.deny_order(&order, &reason);
1595                    }
1596                }
1597                TradingCommand::SubmitOrderList(cmd) => {
1598                    let orders: Vec<OrderAny> = self
1599                        .cache
1600                        .borrow()
1601                        .orders_for_ids(&cmd.order_list.client_order_ids, &cmd);
1602
1603                    for order in &orders {
1604                        self.deny_order(order, &reason);
1605                    }
1606                }
1607                _ => {}
1608            }
1609
1610            return;
1611        };
1612
1613        match command {
1614            TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
1615            TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
1616            TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
1617            TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
1618            TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
1619            TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
1620            TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
1621            TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
1622        }
1623    }
1624
1625    fn handle_submit_order(&self, client: &dyn ExecutionClient, cmd: SubmitOrder) {
1626        let client_order_id = cmd.client_order_id;
1627
1628        let order = {
1629            let cache = self.cache.borrow();
1630            match cache.order(&client_order_id) {
1631                Some(order) => order.clone(),
1632                None => {
1633                    log::error!(
1634                        "Cannot handle submit order: order not found in cache for {client_order_id}"
1635                    );
1636                    return;
1637                }
1638            }
1639        };
1640
1641        let order_venue = order.instrument_id().venue;
1642        let client_venue = client.venue();
1643        if order_venue != client_venue {
1644            self.deny_order(
1645                &order,
1646                &format!("Order venue {order_venue} does not match client venue {client_venue}"),
1647            );
1648            return;
1649        }
1650
1651        let instrument_id = order.instrument_id();
1652
1653        if self.config.snapshot_orders {
1654            self.create_order_state_snapshot(&order);
1655        }
1656
1657        {
1658            let cache = self.cache.borrow();
1659            if cache.instrument(&instrument_id).is_none() {
1660                log::error!(
1661                    "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
1662                );
1663                return;
1664            }
1665        }
1666
1667        if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
1668            let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
1669            own_book.add(order.to_own_book_order());
1670        }
1671
1672        if let Err(e) = client.submit_order(cmd) {
1673            self.deny_order(&order, &format!("failed-to-submit-order-to-client: {e}"));
1674        }
1675    }
1676
1677    fn handle_submit_order_list(&self, client: &dyn ExecutionClient, cmd: SubmitOrderList) {
1678        let orders: Vec<OrderAny> = self
1679            .cache
1680            .borrow()
1681            .orders_for_ids(&cmd.order_list.client_order_ids, &cmd);
1682
1683        if orders.len() != cmd.order_list.client_order_ids.len() {
1684            for order in &orders {
1685                self.deny_order(
1686                    order,
1687                    &format!("Incomplete order list: missing orders in cache for {cmd}"),
1688                );
1689            }
1690            return;
1691        }
1692
1693        let order_list_venue = cmd.instrument_id.venue;
1694        let client_venue = client.venue();
1695        if order_list_venue != client_venue {
1696            for order in &orders {
1697                self.deny_order(
1698                    order,
1699                    &format!("Order list venue {order_list_venue} does not match client venue {client_venue}"),
1700                );
1701            }
1702            return;
1703        }
1704
1705        if self.config.snapshot_orders {
1706            for order in &orders {
1707                self.create_order_state_snapshot(order);
1708            }
1709        }
1710
1711        {
1712            let cache = self.cache.borrow();
1713            if cache.instrument(&cmd.instrument_id).is_none() {
1714                log::error!(
1715                    "Cannot handle submit order list: no instrument found for {}, {cmd}",
1716                    cmd.instrument_id,
1717                );
1718                return;
1719            }
1720        }
1721
1722        if self.config.manage_own_order_books {
1723            let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
1724
1725            for order in &orders {
1726                if should_handle_own_book_order(order) {
1727                    own_book.add(order.to_own_book_order());
1728                }
1729            }
1730        }
1731
1732        if let Err(e) = client.submit_order_list(cmd) {
1733            log::error!("Error submitting order list to client: {e}");
1734            for order in &orders {
1735                self.deny_order(
1736                    order,
1737                    &format!("failed-to-submit-order-list-to-client: {e}"),
1738                );
1739            }
1740        }
1741    }
1742
1743    fn handle_modify_order(&self, client: &dyn ExecutionClient, cmd: ModifyOrder) {
1744        if let Err(e) = client.modify_order(cmd) {
1745            log::error!("Error modifying order: {e}");
1746        }
1747    }
1748
1749    fn handle_cancel_order(&self, client: &dyn ExecutionClient, cmd: CancelOrder) {
1750        if let Err(e) = client.cancel_order(cmd) {
1751            log::error!("Error canceling order: {e}");
1752        }
1753    }
1754
1755    fn handle_cancel_all_orders(&self, client: &dyn ExecutionClient, cmd: CancelAllOrders) {
1756        if let Err(e) = client.cancel_all_orders(cmd) {
1757            log::error!("Error canceling all orders: {e}");
1758        }
1759    }
1760
1761    fn handle_batch_cancel_orders(&self, client: &dyn ExecutionClient, cmd: BatchCancelOrders) {
1762        if let Err(e) = client.batch_cancel_orders(cmd) {
1763            log::error!("Error batch canceling orders: {e}");
1764        }
1765    }
1766
1767    fn handle_query_account(&self, client: &dyn ExecutionClient, cmd: QueryAccount) {
1768        if let Err(e) = client.query_account(cmd) {
1769            log::error!("Error querying account: {e}");
1770        }
1771    }
1772
1773    fn handle_query_order(&self, client: &dyn ExecutionClient, cmd: QueryOrder) {
1774        if let Err(e) = client.query_order(cmd) {
1775            log::error!("Error querying order: {e}");
1776        }
1777    }
1778
1779    fn create_order_state_snapshot(&self, order: &OrderAny) {
1780        if self.config.debug {
1781            log::debug!("Creating order state snapshot for {order}");
1782        }
1783
1784        if self.cache.borrow().has_backing()
1785            && let Err(e) = self.cache.borrow().snapshot_order_state(order)
1786        {
1787            log::error!("Failed to snapshot order state: {e}");
1788        }
1789    }
1790
1791    fn create_position_state_snapshot(&self, position: &Position) {
1792        if self.config.debug {
1793            log::debug!("Creating position state snapshot for {position}");
1794        }
1795
1796        // let mut position: Position = position.clone();
1797        // if let Some(pnl) = self.cache.borrow().calculate_unrealized_pnl(&position) {
1798        //     position.unrealized_pnl(last)
1799        // }
1800    }
1801
1802    fn handle_event(&mut self, event: &OrderEventAny) {
1803        if self.config.debug {
1804            log::debug!("{RECV}{EVT} {event:?}");
1805        }
1806
1807        let client_order_id = event.client_order_id();
1808        let cache = self.cache.borrow();
1809        let mut order = if let Some(order) = cache.order(&client_order_id) {
1810            order.clone()
1811        } else {
1812            log::warn!(
1813                "Order with {} not found in the cache to apply {}",
1814                event.client_order_id(),
1815                event
1816            );
1817
1818            // Try to find order by venue order ID if available
1819            let venue_order_id = if let Some(id) = event.venue_order_id() {
1820                id
1821            } else {
1822                log::error!(
1823                    "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
1824                    event.client_order_id()
1825                );
1826                return;
1827            };
1828
1829            // Look up client order ID from venue order ID
1830            let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
1831                id
1832            } else {
1833                log::error!(
1834                    "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
1835                    event.client_order_id(),
1836                );
1837                return;
1838            };
1839
1840            // Get order using found client order ID
1841            if let Some(order) = cache.order(client_order_id) {
1842                log::info!("Order with {client_order_id} was found in the cache");
1843                order.clone()
1844            } else {
1845                log::error!(
1846                    "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
1847                );
1848                return;
1849            }
1850        };
1851
1852        drop(cache);
1853
1854        match event {
1855            OrderEventAny::Filled(fill) => {
1856                let oms_type = self.determine_oms_type(fill);
1857                let position_id = self.determine_position_id(*fill, oms_type, Some(&order));
1858
1859                let mut fill = *fill;
1860                fill.position_id = Some(position_id);
1861
1862                if self.apply_fill_to_order(&mut order, fill).is_ok() {
1863                    self.handle_order_fill(&order, fill, oms_type);
1864                }
1865            }
1866            _ => {
1867                let _ = self.apply_event_to_order(&mut order, event);
1868            }
1869        }
1870    }
1871
1872    fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
1873        // Check for strategy OMS override
1874        if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
1875            return *oms_type;
1876        }
1877
1878        // Use native venue OMS
1879        if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
1880            && let Some(client) = self.clients.get(client_id)
1881        {
1882            return client.oms_type();
1883        }
1884
1885        if let Some(client) = &self.default_client {
1886            return client.oms_type();
1887        }
1888
1889        OmsType::Netting // Default fallback
1890    }
1891
1892    fn determine_position_id(
1893        &mut self,
1894        fill: OrderFilled,
1895        oms_type: OmsType,
1896        order: Option<&OrderAny>,
1897    ) -> PositionId {
1898        let cache = self.cache.borrow();
1899        let cached_position_id = cache.position_id(&fill.client_order_id()).copied();
1900        drop(cache);
1901
1902        if self.config.debug {
1903            log::debug!(
1904                "Determining position ID for {}, position_id={:?}",
1905                fill.client_order_id(),
1906                cached_position_id,
1907            );
1908        }
1909
1910        if let Some(position_id) = cached_position_id {
1911            if let Some(fill_position_id) = fill.position_id
1912                && fill_position_id != position_id
1913            {
1914                log::warn!(
1915                    "Incorrect position ID assigned to fill: \
1916                     cached={position_id}, assigned={fill_position_id}; \
1917                     re-assigning from cache",
1918                );
1919            }
1920
1921            if self.config.debug {
1922                log::debug!("Assigned {position_id} to {}", fill.client_order_id());
1923            }
1924
1925            return position_id;
1926        }
1927
1928        let position_id = match oms_type {
1929            OmsType::Hedging => self.determine_hedging_position_id(fill, order),
1930            OmsType::Netting => self.determine_netting_position_id(fill),
1931            _ => self.determine_netting_position_id(fill),
1932        };
1933
1934        let order = if let Some(o) = order {
1935            o.clone()
1936        } else {
1937            let cache = self.cache.borrow();
1938            cache
1939                .order(&fill.client_order_id())
1940                .cloned()
1941                .unwrap_or_else(|| {
1942                    panic!(
1943                        "Order for {} not found to determine position ID",
1944                        fill.client_order_id()
1945                    )
1946                })
1947        };
1948
1949        if order.exec_algorithm_id().is_some()
1950            && let Some(exec_spawn_id) = order.exec_spawn_id()
1951        {
1952            let cache = self.cache.borrow();
1953            let primary = if let Some(p) = cache.order(&exec_spawn_id) {
1954                p.clone()
1955            } else {
1956                log::warn!(
1957                    "Primary exec spawn order {exec_spawn_id} not found, \
1958                     skipping position ID propagation"
1959                );
1960                return position_id;
1961            };
1962            let primary_already_indexed = cache.position_id(&primary.client_order_id()).is_some();
1963            drop(cache);
1964
1965            if primary.position_id().is_none() && !primary_already_indexed {
1966                let mut cache = self.cache.borrow_mut();
1967                if let Some(primary_mut) = cache.mut_order(&exec_spawn_id) {
1968                    primary_mut.set_position_id(Some(position_id));
1969                }
1970                let _ = cache.add_position_id(
1971                    &position_id,
1972                    &primary.instrument_id().venue,
1973                    &primary.client_order_id(),
1974                    &primary.strategy_id(),
1975                );
1976                log::debug!("Assigned primary order {position_id}");
1977            }
1978        }
1979
1980        position_id
1981    }
1982
1983    fn determine_hedging_position_id(
1984        &mut self,
1985        fill: OrderFilled,
1986        order: Option<&OrderAny>,
1987    ) -> PositionId {
1988        // Check if position ID already exists
1989        if let Some(position_id) = fill.position_id {
1990            if self.config.debug {
1991                log::debug!("Already had a position ID of: {position_id}");
1992            }
1993            return position_id;
1994        }
1995
1996        let cache = self.cache.borrow();
1997
1998        let order = if let Some(o) = order {
1999            o
2000        } else {
2001            match cache.order(&fill.client_order_id()) {
2002                Some(o) => o,
2003                None => {
2004                    panic!(
2005                        "Order for {} not found to determine position ID",
2006                        fill.client_order_id()
2007                    );
2008                }
2009            }
2010        };
2011
2012        // Check execution spawn orders
2013        if let Some(spawn_id) = order.exec_spawn_id() {
2014            let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
2015            for spawned_order in spawn_orders {
2016                if let Some(pos_id) = spawned_order.position_id() {
2017                    if self.config.debug {
2018                        log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
2019                    }
2020                    return pos_id;
2021                }
2022            }
2023        }
2024
2025        // Generate new position ID
2026        let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
2027
2028        if self.config.debug {
2029            log::debug!("Generated {} for {}", position_id, fill.client_order_id());
2030        }
2031        position_id
2032    }
2033
2034    fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
2035        PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
2036    }
2037
2038    fn apply_fill_to_order(&self, order: &mut OrderAny, fill: OrderFilled) -> anyhow::Result<()> {
2039        if order.is_duplicate_fill(&fill) {
2040            log::warn!(
2041                "Duplicate fill: {} trade_id={} already applied, skipping",
2042                order.client_order_id(),
2043                fill.trade_id
2044            );
2045            anyhow::bail!("Duplicate fill");
2046        }
2047
2048        self.check_overfill(order, &fill)?;
2049        let event = OrderEventAny::Filled(fill);
2050        self.apply_order_event(order, &event)
2051    }
2052
2053    fn apply_event_to_order(
2054        &self,
2055        order: &mut OrderAny,
2056        event: &OrderEventAny,
2057    ) -> anyhow::Result<()> {
2058        self.apply_order_event(order, event)
2059    }
2060
2061    fn apply_order_event(&self, order: &mut OrderAny, event: &OrderEventAny) -> anyhow::Result<()> {
2062        if let Err(e) = order.apply(event.clone()) {
2063            match e {
2064                OrderError::InvalidStateTransition => {
2065                    // Event already applied to order (e.g., from reconciliation or duplicate processing)
2066                    // Log warning and continue with downstream processing (cache update, publishing, etc.)
2067                    log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
2068                }
2069                OrderError::DuplicateFill(trade_id) => {
2070                    // Duplicate fill detected at order level (secondary safety check)
2071                    log::warn!(
2072                        "Duplicate fill rejected at order level: trade_id={trade_id}, did not apply {event}"
2073                    );
2074                    anyhow::bail!("{e}");
2075                }
2076                _ => {
2077                    // Protection against invalid IDs and other invariants
2078                    log::error!("Error applying event: {e}, did not apply {event}");
2079
2080                    if should_handle_own_book_order(order) {
2081                        self.cache.borrow_mut().update_own_order_book(order);
2082                    }
2083                    anyhow::bail!("{e}");
2084                }
2085            }
2086        }
2087
2088        if let Err(e) = self.cache.borrow_mut().update_order(order) {
2089            log::error!("Error updating order in cache: {e}");
2090        }
2091
2092        if self.config.debug {
2093            log::debug!("{SEND}{EVT} {event}");
2094        }
2095
2096        let topic = switchboard::get_event_orders_topic(event.strategy_id());
2097        msgbus::publish_order_event(topic, event);
2098
2099        if self.config.snapshot_orders {
2100            self.create_order_state_snapshot(order);
2101        }
2102
2103        Ok(())
2104    }
2105
2106    fn check_overfill(&self, order: &OrderAny, fill: &OrderFilled) -> anyhow::Result<()> {
2107        let potential_overfill = order.calculate_overfill(fill.last_qty);
2108
2109        if potential_overfill.is_positive() {
2110            if self.config.allow_overfills {
2111                log::warn!(
2112                    "Order overfill detected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}",
2113                    order.client_order_id(),
2114                    potential_overfill,
2115                    order.filled_qty(),
2116                    fill.last_qty,
2117                    order.quantity()
2118                );
2119            } else {
2120                let msg = format!(
2121                    "Order overfill rejected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}. \
2122                Set `allow_overfills=true` in ExecutionEngineConfig to allow overfills.",
2123                    order.client_order_id(),
2124                    potential_overfill,
2125                    order.filled_qty(),
2126                    fill.last_qty,
2127                    order.quantity()
2128                );
2129                anyhow::bail!("{msg}");
2130            }
2131        }
2132
2133        Ok(())
2134    }
2135
2136    fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
2137        let instrument =
2138            if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
2139                instrument.clone()
2140            } else {
2141                log::error!(
2142                    "Cannot handle order fill: no instrument found for {}, {fill}",
2143                    fill.instrument_id,
2144                );
2145                return;
2146            };
2147
2148        if self.cache.borrow().account(&fill.account_id).is_none() {
2149            log::error!(
2150                "Cannot handle order fill: no account found for {}, {fill}",
2151                fill.instrument_id.venue,
2152            );
2153            return;
2154        }
2155
2156        // Skip portfolio position updates for combo fills (spread instruments)
2157        // Combo fills are only used for order management, not portfolio updates
2158        let position = if instrument.is_spread() {
2159            None
2160        } else {
2161            self.handle_position_update(&instrument, fill, oms_type);
2162            let position_id = fill.position_id.unwrap();
2163            self.cache.borrow().position(&position_id).cloned()
2164        };
2165
2166        // Handle contingent orders for both spread and non-spread instruments
2167        // For spread instruments, contingent orders work without position linkage
2168        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
2169            // For non-spread instruments, link to position if available
2170            if !instrument.is_spread()
2171                && let Some(ref pos) = position
2172                && pos.is_open()
2173            {
2174                let position_id = pos.id;
2175
2176                for client_order_id in order.linked_order_ids().unwrap_or_default() {
2177                    let mut cache = self.cache.borrow_mut();
2178                    let contingent_order = cache.mut_order(client_order_id);
2179                    if let Some(contingent_order) = contingent_order
2180                        && contingent_order.position_id().is_none()
2181                    {
2182                        contingent_order.set_position_id(Some(position_id));
2183
2184                        if let Err(e) = self.cache.borrow_mut().add_position_id(
2185                            &position_id,
2186                            &contingent_order.instrument_id().venue,
2187                            &contingent_order.client_order_id(),
2188                            &contingent_order.strategy_id(),
2189                        ) {
2190                            log::error!("Failed to add position ID: {e}");
2191                        }
2192                    }
2193                }
2194            }
2195            // For spread instruments, contingent orders can still be triggered
2196            // but without position linkage (since no position is created for spreads)
2197        }
2198    }
2199
2200    /// Handle position creation or update for a fill.
2201    ///
2202    /// This function mirrors the Python `_handle_position_update` method.
2203    fn handle_position_update(
2204        &mut self,
2205        instrument: &InstrumentAny,
2206        fill: OrderFilled,
2207        oms_type: OmsType,
2208    ) {
2209        let position_id = if let Some(position_id) = fill.position_id {
2210            position_id
2211        } else {
2212            log::error!("Cannot handle position update: no position ID found for fill {fill}");
2213            return;
2214        };
2215
2216        let position_opt = self.cache.borrow().position(&position_id).cloned();
2217
2218        match position_opt {
2219            None => {
2220                // Position is None - open new position
2221                if self.open_position(instrument, None, fill, oms_type).is_ok() {
2222                    // Position opened successfully
2223                }
2224            }
2225            Some(pos) if pos.is_closed() => {
2226                // Position is closed - open new position
2227                if self
2228                    .open_position(instrument, Some(&pos), fill, oms_type)
2229                    .is_ok()
2230                {
2231                    // Position opened successfully
2232                }
2233            }
2234            Some(mut pos) => {
2235                if self.will_flip_position(&pos, fill) {
2236                    // Position will flip
2237                    self.flip_position(instrument, &mut pos, fill, oms_type);
2238                } else {
2239                    // Update existing position
2240                    self.update_position(&mut pos, fill);
2241                }
2242            }
2243        }
2244    }
2245
2246    fn open_position(
2247        &self,
2248        instrument: &InstrumentAny,
2249        position: Option<&Position>,
2250        fill: OrderFilled,
2251        oms_type: OmsType,
2252    ) -> anyhow::Result<()> {
2253        if let Some(position) = position {
2254            if Self::is_duplicate_closed_fill(position, &fill) {
2255                log::warn!(
2256                    "Ignoring duplicate fill {} for closed position {}; no position reopened (side={:?}, qty={}, px={})",
2257                    fill.trade_id,
2258                    position.id,
2259                    fill.order_side,
2260                    fill.last_qty,
2261                    fill.last_px
2262                );
2263                return Ok(());
2264            }
2265            self.reopen_position(position, oms_type)?;
2266        }
2267
2268        let position = Position::new(instrument, fill);
2269        self.cache.borrow_mut().add_position(&position, oms_type)?;
2270
2271        if self.config.snapshot_positions {
2272            self.create_position_state_snapshot(&position);
2273        }
2274
2275        let ts_init = self.clock.borrow().timestamp_ns();
2276        let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
2277        let topic = switchboard::get_event_positions_topic(event.strategy_id);
2278        msgbus::publish_position_event(topic, &PositionEvent::PositionOpened(event));
2279
2280        Ok(())
2281    }
2282
2283    fn is_duplicate_closed_fill(position: &Position, fill: &OrderFilled) -> bool {
2284        position.events.iter().any(|event| {
2285            event.trade_id == fill.trade_id
2286                && event.order_side == fill.order_side
2287                && event.last_px == fill.last_px
2288                && event.last_qty == fill.last_qty
2289        })
2290    }
2291
2292    fn reopen_position(&self, position: &Position, oms_type: OmsType) -> anyhow::Result<()> {
2293        if oms_type == OmsType::Netting {
2294            if position.is_open() {
2295                anyhow::bail!(
2296                    "Cannot reopen position {} (oms_type=NETTING): reopening is only valid for closed positions in NETTING mode",
2297                    position.id
2298                );
2299            }
2300            // Snapshot closed position if reopening (NETTING mode)
2301            self.cache.borrow_mut().snapshot_position(position)?;
2302        } else {
2303            // HEDGING mode
2304            log::warn!(
2305                "Received fill for closed position {} in HEDGING mode; creating new position and ignoring previous state",
2306                position.id
2307            );
2308        }
2309        Ok(())
2310    }
2311
2312    fn update_position(&self, position: &mut Position, fill: OrderFilled) {
2313        // Apply the fill to the position
2314        position.apply(&fill);
2315
2316        // Check if position is closed after applying the fill
2317        let is_closed = position.is_closed();
2318
2319        // Update position in cache - this should handle the closed state tracking
2320        if let Err(e) = self.cache.borrow_mut().update_position(position) {
2321            log::error!("Failed to update position: {e:?}");
2322            return;
2323        }
2324
2325        // Verify cache state after update
2326        let cache = self.cache.borrow();
2327
2328        drop(cache);
2329
2330        // Create position state snapshot if enabled
2331        if self.config.snapshot_positions {
2332            self.create_position_state_snapshot(position);
2333        }
2334
2335        // Create and publish appropriate position event
2336        let topic = switchboard::get_event_positions_topic(position.strategy_id);
2337        let ts_init = self.clock.borrow().timestamp_ns();
2338
2339        if is_closed {
2340            let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
2341            msgbus::publish_position_event(topic, &PositionEvent::PositionClosed(event));
2342        } else {
2343            let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
2344            msgbus::publish_position_event(topic, &PositionEvent::PositionChanged(event));
2345        }
2346    }
2347
2348    fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
2349        position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
2350    }
2351
2352    fn flip_position(
2353        &mut self,
2354        instrument: &InstrumentAny,
2355        position: &mut Position,
2356        fill: OrderFilled,
2357        oms_type: OmsType,
2358    ) {
2359        let difference = match position.side {
2360            PositionSide::Long => Quantity::from_raw(
2361                fill.last_qty.raw - position.quantity.raw,
2362                position.size_precision,
2363            ),
2364            PositionSide::Short => Quantity::from_raw(
2365                position.quantity.raw.abs_diff(fill.last_qty.raw), // Equivalent to Python's abs(position.quantity - fill.last_qty)
2366                position.size_precision,
2367            ),
2368            _ => fill.last_qty,
2369        };
2370
2371        // Split commission between two positions
2372        let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
2373        let (commission1, commission2) = if let Some(commission) = fill.commission {
2374            let commission_currency = commission.currency;
2375            let commission1 = Money::new(commission * fill_percent, commission_currency);
2376            let commission2 = commission - commission1;
2377            (Some(commission1), Some(commission2))
2378        } else {
2379            log::error!("Commission is not available");
2380            (None, None)
2381        };
2382
2383        let mut fill_split1: Option<OrderFilled> = None;
2384
2385        if position.is_open() {
2386            fill_split1 = Some(OrderFilled::new(
2387                fill.trader_id,
2388                fill.strategy_id,
2389                fill.instrument_id,
2390                fill.client_order_id,
2391                fill.venue_order_id,
2392                fill.account_id,
2393                fill.trade_id,
2394                fill.order_side,
2395                fill.order_type,
2396                position.quantity,
2397                fill.last_px,
2398                fill.currency,
2399                fill.liquidity_side,
2400                UUID4::new(),
2401                fill.ts_event,
2402                fill.ts_init,
2403                fill.reconciliation,
2404                fill.position_id,
2405                commission1,
2406            ));
2407
2408            self.update_position(position, fill_split1.unwrap());
2409
2410            // Snapshot closed position before reusing ID (NETTING mode)
2411            if oms_type == OmsType::Netting
2412                && let Err(e) = self.cache.borrow_mut().snapshot_position(position)
2413            {
2414                log::error!("Failed to snapshot position during flip: {e:?}");
2415            }
2416        }
2417
2418        // Guard against flipping a position with a zero fill size
2419        if difference.raw == 0 {
2420            log::warn!(
2421                "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
2422            );
2423            return;
2424        }
2425
2426        let position_id_flip = if oms_type == OmsType::Hedging
2427            && let Some(position_id) = fill.position_id
2428            && position_id.is_virtual()
2429        {
2430            // Generate new position ID for flipped virtual position (Hedging OMS only)
2431            Some(self.pos_id_generator.generate(fill.strategy_id, true))
2432        } else {
2433            // Default: use the same position ID as the fill (Python behavior)
2434            fill.position_id
2435        };
2436
2437        let fill_split2 = OrderFilled::new(
2438            fill.trader_id,
2439            fill.strategy_id,
2440            fill.instrument_id,
2441            fill.client_order_id,
2442            fill.venue_order_id,
2443            fill.account_id,
2444            fill.trade_id,
2445            fill.order_side,
2446            fill.order_type,
2447            difference,
2448            fill.last_px,
2449            fill.currency,
2450            fill.liquidity_side,
2451            UUID4::new(),
2452            fill.ts_event,
2453            fill.ts_init,
2454            fill.reconciliation,
2455            position_id_flip,
2456            commission2,
2457        );
2458
2459        if oms_type == OmsType::Hedging
2460            && let Some(position_id) = fill.position_id
2461            && position_id.is_virtual()
2462        {
2463            log::warn!("Closing position {fill_split1:?}");
2464            log::warn!("Flipping position {fill_split2:?}");
2465        }
2466
2467        // Open flipped position
2468        if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
2469            log::error!("Failed to open flipped position: {e:?}");
2470        }
2471    }
2472
2473    /// Sets the internal position ID generator counts based on existing cached positions.
2474    pub fn set_position_id_counts(&mut self) {
2475        let cache = self.cache.borrow();
2476        let positions = cache.positions(None, None, None, None, None);
2477
2478        // Count positions per instrument_id using a HashMap
2479        let mut counts: HashMap<StrategyId, usize> = HashMap::new();
2480
2481        for position in positions {
2482            *counts.entry(position.strategy_id).or_insert(0) += 1;
2483        }
2484
2485        self.pos_id_generator.reset();
2486
2487        for (strategy_id, count) in counts {
2488            self.pos_id_generator.set_count(count, strategy_id);
2489            log::info!("Set PositionId count for {strategy_id} to {count}");
2490        }
2491    }
2492
2493    fn deny_order(&self, order: &OrderAny, reason: &str) {
2494        let denied = OrderDenied::new(
2495            order.trader_id(),
2496            order.strategy_id(),
2497            order.instrument_id(),
2498            order.client_order_id(),
2499            reason.into(),
2500            UUID4::new(),
2501            self.clock.borrow().timestamp_ns(),
2502            self.clock.borrow().timestamp_ns(),
2503        );
2504
2505        let mut order = order.clone();
2506
2507        if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
2508            log::error!("Failed to apply denied event to order: {e}");
2509            return;
2510        }
2511
2512        if let Err(e) = self.cache.borrow_mut().update_order(&order) {
2513            log::error!("Failed to update order in cache: {e}");
2514            return;
2515        }
2516
2517        let topic = switchboard::get_event_orders_topic(order.strategy_id());
2518        msgbus::publish_order_event(topic, &OrderEventAny::Denied(denied));
2519
2520        if self.config.snapshot_orders {
2521            self.create_order_state_snapshot(&order);
2522        }
2523    }
2524
2525    fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
2526        let mut cache = self.cache.borrow_mut();
2527        if cache.own_order_book_mut(instrument_id).is_none() {
2528            let own_book = OwnOrderBook::new(*instrument_id);
2529            cache.add_own_order_book(own_book).unwrap();
2530        }
2531
2532        RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
2533    }
2534}