1pub mod config;
24pub mod stubs;
25
26use std::{
27 cell::{RefCell, RefMut},
28 collections::{HashMap, HashSet},
29 fmt::Debug,
30 rc::Rc,
31 time::SystemTime,
32};
33
34use ahash::AHashSet;
35use config::ExecutionEngineConfig;
36use futures::future::join_all;
37use indexmap::{IndexMap, IndexSet};
38use nautilus_common::{
39 cache::Cache,
40 clients::ExecutionClient,
41 clock::Clock,
42 generators::position_id::PositionIdGenerator,
43 logging::{CMD, EVT, RECV, SEND},
44 messages::{
45 ExecutionReport,
46 execution::{
47 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
48 SubmitOrder, SubmitOrderList, TradingCommand,
49 },
50 },
51 msgbus::{
52 self, MessagingSwitchboard, ShareableMessageHandler, TypedIntoHandler, get_message_bus,
53 switchboard::{self},
54 },
55 runner::try_get_trading_cmd_sender,
56 timer::{TimeEvent, TimeEventCallback},
57};
58use nautilus_core::{
59 UUID4, UnixNanos, WeakCell,
60 datetime::{mins_to_nanos, mins_to_secs},
61};
62use nautilus_model::{
63 enums::{
64 ContingencyType, OmsType, OrderStatus, OrderType, PositionSide, TimeInForce,
65 TrailingOffsetType,
66 },
67 events::{
68 OrderAccepted, OrderCanceled, OrderDenied, OrderEvent, OrderEventAny, OrderExpired,
69 OrderFilled, OrderInitialized, PositionChanged, PositionClosed, PositionEvent,
70 PositionOpened,
71 },
72 identifiers::{
73 ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, Venue, VenueOrderId,
74 },
75 instruments::{Instrument, InstrumentAny},
76 orderbook::own::{OwnOrderBook, should_handle_own_book_order},
77 orders::{Order, OrderAny, OrderError},
78 position::Position,
79 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
80 types::{Money, Quantity},
81};
82use rust_decimal::Decimal;
83
84use crate::{
85 client::ExecutionClientAdapter,
86 reconciliation::{
87 check_position_reconciliation, create_incremental_inferred_fill,
88 generate_external_order_status_events, generate_reconciliation_order_events,
89 reconcile_fill_report as reconcile_fill,
90 },
91};
92
93const TIMER_PURGE_CLOSED_ORDERS: &str = "ExecEngine_PURGE_CLOSED_ORDERS";
94const TIMER_PURGE_CLOSED_POSITIONS: &str = "ExecEngine_PURGE_CLOSED_POSITIONS";
95const TIMER_PURGE_ACCOUNT_EVENTS: &str = "ExecEngine_PURGE_ACCOUNT_EVENTS";
96
97pub struct ExecutionEngine {
104 clock: Rc<RefCell<dyn Clock>>,
105 cache: Rc<RefCell<Cache>>,
106 clients: IndexMap<ClientId, ExecutionClientAdapter>,
107 default_client: Option<ExecutionClientAdapter>,
108 routing_map: HashMap<Venue, ClientId>,
109 oms_overrides: HashMap<StrategyId, OmsType>,
110 external_order_claims: HashMap<InstrumentId, StrategyId>,
111 external_clients: HashSet<ClientId>,
112 pos_id_generator: PositionIdGenerator,
113 config: ExecutionEngineConfig,
114}
115
116impl Debug for ExecutionEngine {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 f.debug_struct(stringify!(ExecutionEngine))
119 .field("client_count", &self.clients.len())
120 .finish()
121 }
122}
123
124impl ExecutionEngine {
125 pub fn new(
127 clock: Rc<RefCell<dyn Clock>>,
128 cache: Rc<RefCell<Cache>>,
129 config: Option<ExecutionEngineConfig>,
130 ) -> Self {
131 let trader_id = get_message_bus().borrow().trader_id;
132 Self {
133 clock: clock.clone(),
134 cache,
135 clients: IndexMap::new(),
136 default_client: None,
137 routing_map: HashMap::new(),
138 oms_overrides: HashMap::new(),
139 external_order_claims: HashMap::new(),
140 external_clients: config
141 .as_ref()
142 .and_then(|c| c.external_clients.clone())
143 .unwrap_or_default()
144 .into_iter()
145 .collect(),
146 pos_id_generator: PositionIdGenerator::new(trader_id, clock),
147 config: config.unwrap_or_default(),
148 }
149 }
150
151 pub fn register_msgbus_handlers(engine: &Rc<RefCell<Self>>) {
153 let weak = WeakCell::from(Rc::downgrade(engine));
154
155 let weak1 = weak.clone();
156 msgbus::register_trading_command_endpoint(
157 MessagingSwitchboard::exec_engine_execute(),
158 TypedIntoHandler::from(move |cmd: TradingCommand| {
159 if let Some(rc) = weak1.upgrade() {
160 rc.borrow().execute(cmd);
161 }
162 }),
163 );
164
165 msgbus::register_trading_command_endpoint(
168 MessagingSwitchboard::exec_engine_queue_execute(),
169 TypedIntoHandler::from(move |cmd: TradingCommand| {
170 if let Some(sender) = try_get_trading_cmd_sender() {
171 sender.execute(cmd);
172 } else {
173 let endpoint = MessagingSwitchboard::exec_engine_execute();
174 msgbus::send_trading_command(endpoint, cmd);
175 }
176 }),
177 );
178
179 let weak2 = weak.clone();
180 msgbus::register_order_event_endpoint(
181 MessagingSwitchboard::exec_engine_process(),
182 TypedIntoHandler::from(move |event: OrderEventAny| {
183 if let Some(rc) = weak2.upgrade() {
184 rc.borrow_mut().process(&event);
185 }
186 }),
187 );
188
189 let weak3 = weak;
190 msgbus::register_execution_report_endpoint(
191 MessagingSwitchboard::exec_engine_reconcile_execution_report(),
192 TypedIntoHandler::from(move |report: ExecutionReport| {
193 if let Some(rc) = weak3.upgrade() {
194 rc.borrow_mut().reconcile_execution_report(&report);
195 }
196 }),
197 );
198 }
199
200 pub fn subscribe_venue_instruments(engine: &Rc<RefCell<Self>>, venue: Venue) {
205 let weak = WeakCell::from(Rc::downgrade(engine));
206 let pattern = switchboard::get_instruments_pattern(venue);
207
208 let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
209 if let Some(rc) = weak.upgrade() {
210 let venue = instrument.id().venue;
211 let client_id = rc.borrow().routing_map.get(&venue).copied();
212 if let Some(client_id) = client_id {
213 let mut engine = rc.borrow_mut();
214 if let Some(adapter) = engine.get_client_adapter_mut(&client_id) {
215 adapter.on_instrument(instrument.clone());
216 }
217 }
218 }
219 });
220
221 msgbus::subscribe_any(pattern, handler, None);
222 log::info!("Subscribed to instrument updates for venue {venue}");
223 }
224
225 #[must_use]
226 pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
228 self.pos_id_generator.count(strategy_id)
229 }
230
231 #[must_use]
232 pub fn cache(&self) -> &Rc<RefCell<Cache>> {
234 &self.cache
235 }
236
237 #[must_use]
238 pub const fn config(&self) -> &ExecutionEngineConfig {
240 &self.config
241 }
242
243 #[must_use]
244 pub fn check_integrity(&self) -> bool {
246 self.cache.borrow_mut().check_integrity()
247 }
248
249 #[must_use]
250 pub fn check_connected(&self) -> bool {
252 let clients_connected = self.clients.values().all(|c| c.is_connected());
253 let default_connected = self
254 .default_client
255 .as_ref()
256 .is_none_or(|c| c.is_connected());
257 clients_connected && default_connected
258 }
259
260 #[must_use]
261 pub fn check_disconnected(&self) -> bool {
263 let clients_disconnected = self.clients.values().all(|c| !c.is_connected());
264 let default_disconnected = self
265 .default_client
266 .as_ref()
267 .is_none_or(|c| !c.is_connected());
268 clients_disconnected && default_disconnected
269 }
270
271 #[must_use]
273 pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
274 let mut status: Vec<_> = self
275 .clients
276 .values()
277 .map(|c| (c.client_id(), c.is_connected()))
278 .collect();
279
280 if let Some(default) = &self.default_client {
281 status.push((default.client_id(), default.is_connected()));
282 }
283
284 status
285 }
286
287 #[must_use]
288 pub fn check_residuals(&self) -> bool {
290 self.cache.borrow().check_residuals()
291 }
292
293 #[must_use]
294 pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
296 self.external_order_claims.keys().copied().collect()
297 }
298
299 #[must_use]
300 pub fn get_external_client_ids(&self) -> HashSet<ClientId> {
302 self.external_clients.clone()
303 }
304
305 #[must_use]
306 pub fn get_external_order_claim(&self, instrument_id: &InstrumentId) -> Option<StrategyId> {
308 self.external_order_claims.get(instrument_id).copied()
309 }
310
311 pub fn register_client(&mut self, client: Box<dyn ExecutionClient>) -> anyhow::Result<()> {
317 let client_id = client.client_id();
318 let venue = client.venue();
319
320 if self.clients.contains_key(&client_id) {
321 anyhow::bail!("Client already registered with ID {client_id}");
322 }
323
324 let adapter = ExecutionClientAdapter::new(client);
325
326 if let Some(existing_client_id) = self.routing_map.get(&venue) {
327 anyhow::bail!(
328 "Venue {venue} already routed to {existing_client_id}, \
329 cannot register {client_id} for the same venue"
330 );
331 }
332
333 self.routing_map.insert(venue, client_id);
334 log::debug!("Registered client {client_id}");
335 self.clients.insert(client_id, adapter);
336 Ok(())
337 }
338
339 pub fn register_default_client(&mut self, client: Box<dyn ExecutionClient>) {
341 let client_id = client.client_id();
342 let adapter = ExecutionClientAdapter::new(client);
343
344 log::debug!("Registered default client {client_id}");
345 self.default_client = Some(adapter);
346 }
347
348 #[must_use]
349 pub fn get_client(&self, client_id: &ClientId) -> Option<&dyn ExecutionClient> {
351 self.clients.get(client_id).map(|a| a.client.as_ref())
352 }
353
354 #[must_use]
355 pub fn get_client_adapter_mut(
357 &mut self,
358 client_id: &ClientId,
359 ) -> Option<&mut ExecutionClientAdapter> {
360 if let Some(default) = &self.default_client
361 && &default.client_id == client_id
362 {
363 return self.default_client.as_mut();
364 }
365 self.clients.get_mut(client_id)
366 }
367
368 pub async fn generate_mass_status(
374 &mut self,
375 client_id: &ClientId,
376 lookback_mins: Option<u64>,
377 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
378 if let Some(client) = self.get_client_adapter_mut(client_id) {
379 client.generate_mass_status(lookback_mins).await
380 } else {
381 anyhow::bail!("Client {client_id} not found")
382 }
383 }
384
385 pub fn register_external_order(
390 &self,
391 client_order_id: ClientOrderId,
392 venue_order_id: VenueOrderId,
393 instrument_id: InstrumentId,
394 strategy_id: StrategyId,
395 ts_init: UnixNanos,
396 ) {
397 let venue = instrument_id.venue;
398 if let Some(client_id) = self.routing_map.get(&venue) {
399 if let Some(client) = self.clients.get(client_id) {
400 client.register_external_order(
401 client_order_id,
402 venue_order_id,
403 instrument_id,
404 strategy_id,
405 ts_init,
406 );
407 }
408 } else if let Some(default) = &self.default_client {
409 default.register_external_order(
410 client_order_id,
411 venue_order_id,
412 instrument_id,
413 strategy_id,
414 ts_init,
415 );
416 }
417 }
418
419 #[must_use]
420 pub fn client_ids(&self) -> Vec<ClientId> {
422 let mut ids: Vec<_> = self.clients.keys().copied().collect();
423
424 if let Some(default) = &self.default_client {
425 ids.push(default.client_id);
426 }
427 ids
428 }
429
430 #[must_use]
431 pub fn get_clients_mut(&mut self) -> Vec<&mut ExecutionClientAdapter> {
433 let mut adapters: Vec<_> = self.clients.values_mut().collect();
434
435 if let Some(default) = &mut self.default_client {
436 adapters.push(default);
437 }
438 adapters
439 }
440
441 #[must_use]
443 pub fn get_all_clients(&self) -> Vec<&dyn ExecutionClient> {
444 let mut clients: Vec<&dyn ExecutionClient> =
445 self.clients.values().map(|a| a.client.as_ref()).collect();
446
447 if let Some(default) = &self.default_client {
448 clients.push(default.client.as_ref());
449 }
450
451 clients
452 }
453
454 #[must_use]
455 pub fn get_clients_for_orders(&self, orders: &[OrderAny]) -> Vec<&dyn ExecutionClient> {
460 let mut client_ids: IndexSet<ClientId> = IndexSet::new();
461 let mut venues: IndexSet<Venue> = IndexSet::new();
462
463 for order in orders {
465 venues.insert(order.instrument_id().venue);
466 if let Some(client_id) = self.cache.borrow().client_id(&order.client_order_id()) {
467 client_ids.insert(*client_id);
468 }
469 }
470
471 let mut clients: Vec<&dyn ExecutionClient> = Vec::new();
472
473 for client_id in &client_ids {
475 if let Some(adapter) = self.clients.get(client_id)
476 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
477 {
478 clients.push(adapter.client.as_ref());
479 }
480 }
481
482 for venue in &venues {
484 if let Some(client_id) = self.routing_map.get(venue) {
485 if let Some(adapter) = self.clients.get(client_id)
486 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
487 {
488 clients.push(adapter.client.as_ref());
489 }
490 } else if let Some(adapter) = &self.default_client
491 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
492 {
493 clients.push(adapter.client.as_ref());
494 }
495 }
496
497 clients
498 }
499
500 pub fn register_venue_routing(
506 &mut self,
507 client_id: ClientId,
508 venue: Venue,
509 ) -> anyhow::Result<()> {
510 if !self.clients.contains_key(&client_id) {
511 anyhow::bail!("No client registered with ID {client_id}");
512 }
513
514 if let Some(existing_client_id) = self.routing_map.get(&venue)
515 && *existing_client_id != client_id
516 {
517 anyhow::bail!(
518 "Venue {venue} already routed to {existing_client_id}, \
519 cannot re-route to {client_id}"
520 );
521 }
522
523 self.routing_map.insert(venue, client_id);
524 log::info!("Set client {client_id} routing for {venue}");
525 Ok(())
526 }
527
528 pub fn register_oms_type(&mut self, strategy_id: StrategyId, oms_type: OmsType) {
532 self.oms_overrides.insert(strategy_id, oms_type);
533 log::info!("Registered OMS::{oms_type:?} for {strategy_id}");
534 }
535
536 pub fn register_external_order_claims(
544 &mut self,
545 strategy_id: StrategyId,
546 instrument_ids: &HashSet<InstrumentId>,
547 ) -> anyhow::Result<()> {
548 for instrument_id in instrument_ids {
550 if let Some(existing) = self.external_order_claims.get(instrument_id) {
551 anyhow::bail!(
552 "External order claim for {instrument_id} already exists for {existing}"
553 );
554 }
555 }
556
557 for instrument_id in instrument_ids {
559 self.external_order_claims
560 .insert(*instrument_id, strategy_id);
561 }
562
563 if !instrument_ids.is_empty() {
564 log::info!("Registered external order claims for {strategy_id}: {instrument_ids:?}");
565 }
566
567 Ok(())
568 }
569
570 pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
574 if self.clients.shift_remove(&client_id).is_some() {
575 self.routing_map
577 .retain(|_, mapped_id| mapped_id != &client_id);
578 log::info!("Deregistered client {client_id}");
579 Ok(())
580 } else {
581 anyhow::bail!("No client registered with ID {client_id}")
582 }
583 }
584
585 pub async fn connect(&mut self) {
589 let futures: Vec<_> = self
590 .get_clients_mut()
591 .into_iter()
592 .map(|client| client.connect())
593 .collect();
594
595 let results = join_all(futures).await;
596
597 for error in results.into_iter().filter_map(Result::err) {
598 log::error!("Failed to connect execution client: {error:#}");
599 }
600 }
601
602 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
608 let futures: Vec<_> = self
609 .get_clients_mut()
610 .into_iter()
611 .map(|client| client.disconnect())
612 .collect();
613
614 let results = join_all(futures).await;
615 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
616
617 if errors.is_empty() {
618 Ok(())
619 } else {
620 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
621 anyhow::bail!(
622 "Failed to disconnect execution clients: {}",
623 error_msgs.join("; ")
624 )
625 }
626 }
627
628 pub fn set_manage_own_order_books(&mut self, value: bool) {
630 self.config.manage_own_order_books = value;
631 }
632
633 pub fn start_snapshot_timer(&mut self) {
637 if let Some(interval_secs) = self.config.snapshot_positions_interval_secs {
638 log::info!("Starting position snapshots timer at {interval_secs} second intervals");
639 }
640 }
641
642 pub fn stop_snapshot_timer(&mut self) {
644 if self.config.snapshot_positions_interval_secs.is_some() {
645 log::info!("Canceling position snapshots timer");
646 }
647 }
648
649 #[expect(
651 clippy::missing_panics_doc,
652 reason = "timer registration is not expected to fail"
653 )]
654 pub fn start_purge_timers(&mut self) {
655 if let Some(interval_mins) = self
656 .config
657 .purge_closed_orders_interval_mins
658 .filter(|&m| m > 0)
659 && !self
660 .clock
661 .borrow()
662 .timer_names()
663 .contains(&TIMER_PURGE_CLOSED_ORDERS)
664 {
665 let interval_ns = mins_to_nanos(u64::from(interval_mins));
666 let buffer_mins = self.config.purge_closed_orders_buffer_mins.unwrap_or(0);
667 let buffer_secs = mins_to_secs(u64::from(buffer_mins));
668 let cache = self.cache.clone();
669 let clock = self.clock.clone();
670
671 let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event| {
672 let ts_now = clock.borrow().timestamp_ns();
673 cache.borrow_mut().purge_closed_orders(ts_now, buffer_secs);
674 });
675 let callback = TimeEventCallback::from(callback_fn);
676
677 log::info!("Starting purge closed orders timer at {interval_mins} minute intervals");
678 self.clock
679 .borrow_mut()
680 .set_timer_ns(
681 TIMER_PURGE_CLOSED_ORDERS,
682 interval_ns,
683 None,
684 None,
685 Some(callback),
686 None,
687 None,
688 )
689 .expect("Failed to set purge closed orders timer");
690 }
691
692 if let Some(interval_mins) = self
693 .config
694 .purge_closed_positions_interval_mins
695 .filter(|&m| m > 0)
696 && !self
697 .clock
698 .borrow()
699 .timer_names()
700 .contains(&TIMER_PURGE_CLOSED_POSITIONS)
701 {
702 let interval_ns = mins_to_nanos(u64::from(interval_mins));
703 let buffer_mins = self.config.purge_closed_positions_buffer_mins.unwrap_or(0);
704 let buffer_secs = mins_to_secs(u64::from(buffer_mins));
705 let cache = self.cache.clone();
706 let clock = self.clock.clone();
707
708 let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event| {
709 let ts_now = clock.borrow().timestamp_ns();
710 cache
711 .borrow_mut()
712 .purge_closed_positions(ts_now, buffer_secs);
713 });
714 let callback = TimeEventCallback::from(callback_fn);
715
716 log::info!("Starting purge closed positions timer at {interval_mins} minute intervals");
717 self.clock
718 .borrow_mut()
719 .set_timer_ns(
720 TIMER_PURGE_CLOSED_POSITIONS,
721 interval_ns,
722 None,
723 None,
724 Some(callback),
725 None,
726 None,
727 )
728 .expect("Failed to set purge closed positions timer");
729 }
730
731 if let Some(interval_mins) = self
732 .config
733 .purge_account_events_interval_mins
734 .filter(|&m| m > 0)
735 && !self
736 .clock
737 .borrow()
738 .timer_names()
739 .contains(&TIMER_PURGE_ACCOUNT_EVENTS)
740 {
741 let interval_ns = mins_to_nanos(u64::from(interval_mins));
742 let lookback_mins = self.config.purge_account_events_lookback_mins.unwrap_or(0);
743 let lookback_secs = mins_to_secs(u64::from(lookback_mins));
744 let cache = self.cache.clone();
745 let clock = self.clock.clone();
746
747 let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event| {
748 let ts_now = clock.borrow().timestamp_ns();
749 cache
750 .borrow_mut()
751 .purge_account_events(ts_now, lookback_secs);
752 });
753 let callback = TimeEventCallback::from(callback_fn);
754
755 log::info!("Starting purge account events timer at {interval_mins} minute intervals");
756 self.clock
757 .borrow_mut()
758 .set_timer_ns(
759 TIMER_PURGE_ACCOUNT_EVENTS,
760 interval_ns,
761 None,
762 None,
763 Some(callback),
764 None,
765 None,
766 )
767 .expect("Failed to set purge account events timer");
768 }
769 }
770
771 pub fn stop_purge_timers(&mut self) {
773 let timer_names: Vec<String> = self
774 .clock
775 .borrow()
776 .timer_names()
777 .into_iter()
778 .map(String::from)
779 .collect();
780
781 if timer_names.iter().any(|n| n == TIMER_PURGE_CLOSED_ORDERS) {
782 log::info!("Canceling purge closed orders timer");
783 self.clock
784 .borrow_mut()
785 .cancel_timer(TIMER_PURGE_CLOSED_ORDERS);
786 }
787
788 if timer_names
789 .iter()
790 .any(|n| n == TIMER_PURGE_CLOSED_POSITIONS)
791 {
792 log::info!("Canceling purge closed positions timer");
793 self.clock
794 .borrow_mut()
795 .cancel_timer(TIMER_PURGE_CLOSED_POSITIONS);
796 }
797
798 if timer_names.iter().any(|n| n == TIMER_PURGE_ACCOUNT_EVENTS) {
799 log::info!("Canceling purge account events timer");
800 self.clock
801 .borrow_mut()
802 .cancel_timer(TIMER_PURGE_ACCOUNT_EVENTS);
803 }
804 }
805
806 pub fn snapshot_open_position_states(&self) {
808 let positions: Vec<Position> = self
809 .cache
810 .borrow()
811 .positions_open(None, None, None, None, None)
812 .into_iter()
813 .cloned()
814 .collect();
815
816 for position in positions {
817 self.create_position_state_snapshot(&position);
818 }
819 }
820
821 #[expect(clippy::await_holding_refcell_ref)]
822 pub async fn load_cache(&mut self) -> anyhow::Result<()> {
828 let ts = SystemTime::now(); {
831 let mut cache = self.cache.borrow_mut();
832 cache.clear_index();
833 cache.cache_general()?;
834 self.cache.borrow_mut().cache_all().await?;
835 cache.build_index();
836 let _ = cache.check_integrity();
837
838 if self.config.manage_own_order_books {
839 for order in cache.orders(None, None, None, None, None) {
840 if order.is_closed() || !should_handle_own_book_order(order) {
841 continue;
842 }
843 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
844 own_book.add(order.to_own_book_order());
845 }
846 }
847 }
848
849 self.set_position_id_counts();
850
851 log::info!(
852 "Loaded cache in {}ms",
853 SystemTime::now() .duration_since(ts)
855 .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
856 .as_millis()
857 );
858
859 Ok(())
860 }
861
862 pub fn flush_db(&self) {
864 self.cache.borrow_mut().flush_db();
865 }
866
867 pub fn reconcile_execution_report(&mut self, report: &ExecutionReport) {
869 match report {
870 ExecutionReport::Order(order_report) => {
871 self.reconcile_order_status_report(order_report);
872 }
873 ExecutionReport::Fill(fill_report) => {
874 self.reconcile_fill_report(fill_report);
875 }
876 ExecutionReport::OrderWithFills(order_report, fills) => {
877 self.reconcile_order_with_fills(order_report, fills);
878 }
879 ExecutionReport::Position(position_report) => {
880 self.reconcile_position_report(position_report);
881 }
882 ExecutionReport::MassStatus(mass_status) => {
883 self.reconcile_execution_mass_status(mass_status);
884 }
885 }
886 }
887
888 pub fn reconcile_order_status_report(&mut self, report: &OrderStatusReport) {
898 let cache = self.cache.borrow();
899
900 let order = report
901 .client_order_id
902 .and_then(|id| cache.order(&id).cloned())
903 .or_else(|| {
904 cache
905 .client_order_id(&report.venue_order_id)
906 .and_then(|cid| cache.order(cid).cloned())
907 });
908
909 let instrument = cache.instrument(&report.instrument_id).cloned();
910
911 drop(cache);
912
913 if let Some(order) = order {
914 let ts_now = self.clock.borrow().timestamp_ns();
915 let events =
916 generate_reconciliation_order_events(&order, report, instrument.as_ref(), ts_now);
917
918 for event in &events {
919 self.handle_event(event);
920 }
921 } else {
922 self.create_external_order(report, instrument.as_ref());
923 }
924 }
925
926 fn create_external_order(
927 &mut self,
928 report: &OrderStatusReport,
929 instrument: Option<&InstrumentAny>,
930 ) {
931 let Some(instrument) = instrument else {
932 log::warn!(
933 "Cannot create external order for venue_order_id={}: instrument {} not found",
934 report.venue_order_id,
935 report.instrument_id
936 );
937 return;
938 };
939
940 let Some(order) = self.materialize_external_order_from_status(report) else {
941 return;
942 };
943
944 let ts_now = self.clock.borrow().timestamp_ns();
945 let events = generate_external_order_status_events(
946 &order,
947 report,
948 &report.account_id,
949 instrument,
950 ts_now,
951 );
952
953 for event in &events {
954 self.handle_event(event);
955 }
956 }
957
958 fn materialize_external_order_from_status(
961 &self,
962 report: &OrderStatusReport,
963 ) -> Option<OrderAny> {
964 let strategy_id = self.resolve_external_strategy(&report.instrument_id);
965
966 let client_order_id = report
967 .client_order_id
968 .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
969
970 let trader_id = get_message_bus().borrow().trader_id;
971 let ts_now = self.clock.borrow().timestamp_ns();
972
973 let initialized = OrderInitialized::new(
974 trader_id,
975 strategy_id,
976 report.instrument_id,
977 client_order_id,
978 report.order_side,
979 report.order_type,
980 report.quantity,
981 report.time_in_force,
982 report.post_only,
983 report.reduce_only,
984 false, true, UUID4::new(),
987 ts_now,
988 ts_now,
989 report.price,
990 report.trigger_price,
991 report.trigger_type,
992 report.limit_offset,
993 report.trailing_offset,
994 Some(report.trailing_offset_type),
995 report.expire_time,
996 report.display_qty,
997 None, None, Some(report.contingency_type),
1000 report.order_list_id,
1001 report.linked_order_ids.clone(),
1002 report.parent_order_id,
1003 None, None, None, None, );
1008
1009 self.materialize_external_order(
1010 initialized,
1011 client_order_id,
1012 report.venue_order_id,
1013 report.instrument_id,
1014 strategy_id,
1015 ts_now,
1016 Some(report.order_status),
1017 )
1018 }
1019
1020 fn materialize_external_order_from_fill(&self, report: &FillReport) -> Option<OrderAny> {
1028 let strategy_id = self.resolve_external_strategy(&report.instrument_id);
1029
1030 let client_order_id = report
1031 .client_order_id
1032 .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
1033
1034 let trader_id = get_message_bus().borrow().trader_id;
1035 let ts_now = self.clock.borrow().timestamp_ns();
1036
1037 let initialized = OrderInitialized::new(
1038 trader_id,
1039 strategy_id,
1040 report.instrument_id,
1041 client_order_id,
1042 report.order_side,
1043 OrderType::Market,
1044 report.last_qty,
1045 TimeInForce::Ioc,
1046 false, true, false, true, UUID4::new(),
1051 ts_now,
1052 ts_now,
1053 None, None, None, None, None, Some(TrailingOffsetType::NoTrailingOffset),
1059 None, None, None, None, Some(ContingencyType::NoContingency),
1064 None, None, None, None, None, None, None, );
1072
1073 self.materialize_external_order(
1074 initialized,
1075 client_order_id,
1076 report.venue_order_id,
1077 report.instrument_id,
1078 strategy_id,
1079 ts_now,
1080 None,
1081 )
1082 }
1083
1084 fn resolve_external_strategy(&self, instrument_id: &InstrumentId) -> StrategyId {
1085 self.external_order_claims
1086 .get(instrument_id)
1087 .copied()
1088 .unwrap_or_else(|| StrategyId::from("EXTERNAL"))
1089 }
1090
1091 #[allow(
1094 clippy::too_many_arguments,
1095 reason = "external order materialisation threads several ids and a timestamp"
1096 )]
1097 fn materialize_external_order(
1098 &self,
1099 initialized: OrderInitialized,
1100 client_order_id: ClientOrderId,
1101 venue_order_id: VenueOrderId,
1102 instrument_id: InstrumentId,
1103 strategy_id: StrategyId,
1104 ts_now: UnixNanos,
1105 order_status: Option<OrderStatus>,
1106 ) -> Option<OrderAny> {
1107 let order = match OrderAny::from_events(vec![OrderEventAny::Initialized(initialized)]) {
1108 Ok(order) => order,
1109 Err(e) => {
1110 log::error!("Failed to create external order from report: {e}");
1111 return None;
1112 }
1113 };
1114
1115 {
1116 let mut cache = self.cache.borrow_mut();
1117 if let Err(e) = cache.add_order(order.clone(), None, None, false) {
1118 log::error!("Failed to add external order to cache: {e}");
1119 return None;
1120 }
1121
1122 if let Err(e) = cache.add_venue_order_id(&client_order_id, &venue_order_id, false) {
1123 log::warn!("Failed to add venue order ID index: {e}");
1124 }
1125 }
1126
1127 match order_status {
1128 Some(status) => log::info!(
1129 "Created external order {client_order_id} ({venue_order_id}) for {instrument_id} [{status}]",
1130 ),
1131 None => log::info!(
1132 "Created external order {client_order_id} ({venue_order_id}) for {instrument_id}",
1133 ),
1134 }
1135
1136 self.register_external_order(
1137 client_order_id,
1138 venue_order_id,
1139 instrument_id,
1140 strategy_id,
1141 ts_now,
1142 );
1143
1144 Some(order)
1145 }
1146
1147 pub fn reconcile_fill_report(&mut self, report: &FillReport) {
1155 let cache = self.cache.borrow();
1156
1157 let order = report
1158 .client_order_id
1159 .and_then(|id| cache.order(&id).cloned())
1160 .or_else(|| {
1161 cache
1162 .client_order_id(&report.venue_order_id)
1163 .and_then(|cid| cache.order(cid).cloned())
1164 });
1165
1166 let instrument = cache.instrument(&report.instrument_id).cloned();
1167
1168 drop(cache);
1169
1170 let Some(instrument) = instrument else {
1171 log::debug!(
1172 "Cannot reconcile fill report for venue_order_id={}: instrument {} not found",
1173 report.venue_order_id,
1174 report.instrument_id
1175 );
1176 return;
1177 };
1178
1179 let order = match order {
1180 Some(order) => order,
1181 None => {
1182 let Some(order) = self.materialize_external_order_from_fill(report) else {
1183 return;
1184 };
1185 let ts_now = self.clock.borrow().timestamp_ns();
1186 let accepted = OrderAccepted::new(
1187 order.trader_id(),
1188 order.strategy_id(),
1189 order.instrument_id(),
1190 order.client_order_id(),
1191 report.venue_order_id,
1192 report.account_id,
1193 UUID4::new(),
1194 report.ts_event,
1195 ts_now,
1196 true, );
1198 self.handle_event(&OrderEventAny::Accepted(accepted));
1199 self.cache
1200 .borrow()
1201 .order(&order.client_order_id())
1202 .cloned()
1203 .unwrap_or(order)
1204 }
1205 };
1206
1207 let ts_now = self.clock.borrow().timestamp_ns();
1208
1209 if let Some(event) = reconcile_fill(
1210 &order,
1211 report,
1212 &instrument,
1213 ts_now,
1214 self.config.allow_overfills,
1215 ) {
1216 self.handle_event(&event);
1217 }
1218 }
1219
1220 pub fn reconcile_order_with_fills(&mut self, report: &OrderStatusReport, fills: &[FillReport]) {
1229 let cache = self.cache.borrow();
1230 let order = report
1231 .client_order_id
1232 .and_then(|id| cache.order(&id).cloned())
1233 .or_else(|| {
1234 cache
1235 .client_order_id(&report.venue_order_id)
1236 .and_then(|cid| cache.order(cid).cloned())
1237 });
1238 let instrument = cache.instrument(&report.instrument_id).cloned();
1239 drop(cache);
1240
1241 let Some(instrument) = instrument else {
1242 log::debug!(
1243 "Cannot reconcile bundled report for venue_order_id={}: instrument {} not found",
1244 report.venue_order_id,
1245 report.instrument_id,
1246 );
1247 return;
1248 };
1249
1250 let mut order = match order {
1253 Some(order) => order,
1254 None => {
1255 let Some(order) = self.materialize_external_order_from_status(report) else {
1256 return;
1257 };
1258 let ts_now = self.clock.borrow().timestamp_ns();
1259 let accepted = OrderAccepted::new(
1260 order.trader_id(),
1261 order.strategy_id(),
1262 order.instrument_id(),
1263 order.client_order_id(),
1264 report.venue_order_id,
1265 report.account_id,
1266 UUID4::new(),
1267 report.ts_accepted,
1268 ts_now,
1269 true, );
1271 self.handle_event(&OrderEventAny::Accepted(accepted));
1272 order
1273 }
1274 };
1275
1276 let client_order_id = order.client_order_id();
1277
1278 for fill in fills {
1279 let ts_now = self.clock.borrow().timestamp_ns();
1280
1281 if let Some(event) = reconcile_fill(
1282 &order,
1283 fill,
1284 &instrument,
1285 ts_now,
1286 self.config.allow_overfills,
1287 ) {
1288 self.handle_event(&event);
1289 }
1290
1291 if let Some(refreshed) = self.cache.borrow().order(&client_order_id).cloned() {
1293 order = refreshed;
1294 }
1295 }
1296
1297 if matches!(
1300 report.order_status,
1301 OrderStatus::PartiallyFilled | OrderStatus::Filled,
1302 ) && report.filled_qty > order.filled_qty()
1303 {
1304 let ts_now = self.clock.borrow().timestamp_ns();
1305
1306 if let Some(event) = create_incremental_inferred_fill(
1307 &order,
1308 report,
1309 &report.account_id,
1310 &instrument,
1311 ts_now,
1312 None,
1313 ) {
1314 self.handle_event(&event);
1315
1316 if let Some(refreshed) = self.cache.borrow().order(&client_order_id).cloned() {
1317 order = refreshed;
1318 }
1319 }
1320 }
1321
1322 match report.order_status {
1324 OrderStatus::Canceled if !order.is_closed() => {
1325 let ts_now = self.clock.borrow().timestamp_ns();
1326 let canceled = OrderCanceled::new(
1327 order.trader_id(),
1328 order.strategy_id(),
1329 order.instrument_id(),
1330 order.client_order_id(),
1331 UUID4::new(),
1332 report.ts_last,
1333 ts_now,
1334 true,
1335 Some(report.venue_order_id),
1336 Some(report.account_id),
1337 );
1338 self.handle_event(&OrderEventAny::Canceled(canceled));
1339 }
1340 OrderStatus::Expired if !order.is_closed() => {
1341 let ts_now = self.clock.borrow().timestamp_ns();
1342 let expired = OrderExpired::new(
1343 order.trader_id(),
1344 order.strategy_id(),
1345 order.instrument_id(),
1346 order.client_order_id(),
1347 UUID4::new(),
1348 report.ts_last,
1349 ts_now,
1350 true,
1351 Some(report.venue_order_id),
1352 Some(report.account_id),
1353 );
1354 self.handle_event(&OrderEventAny::Expired(expired));
1355 }
1356 _ => {}
1357 }
1358 }
1359
1360 pub fn reconcile_position_report(&mut self, report: &PositionStatusReport) {
1365 let cache = self.cache.borrow();
1366
1367 let size_precision = cache
1368 .instrument(&report.instrument_id)
1369 .map(|i| i.size_precision());
1370
1371 if report.venue_position_id.is_some() {
1372 self.reconcile_position_report_hedging(report, &cache);
1373 } else {
1374 self.reconcile_position_report_netting(report, &cache, size_precision);
1375 }
1376 }
1377
1378 fn reconcile_position_report_hedging(&self, report: &PositionStatusReport, cache: &Cache) {
1379 let venue_position_id = report.venue_position_id.as_ref().unwrap();
1380
1381 log::debug!(
1382 "Reconciling HEDGE position for {}, venue_position_id={}",
1383 report.instrument_id,
1384 venue_position_id
1385 );
1386
1387 let Some(position) = cache.position(venue_position_id) else {
1388 log::error!("Cannot reconcile position: {venue_position_id} not found in cache");
1389 return;
1390 };
1391
1392 let cached_signed_qty = match position.side {
1393 PositionSide::Long => position.quantity.as_decimal(),
1394 PositionSide::Short => -position.quantity.as_decimal(),
1395 _ => Decimal::ZERO,
1396 };
1397 let venue_signed_qty = report.signed_decimal_qty;
1398
1399 if cached_signed_qty != venue_signed_qty {
1400 log::error!(
1401 "Position mismatch for {} {}: cached={}, venue={}",
1402 report.instrument_id,
1403 venue_position_id,
1404 cached_signed_qty,
1405 venue_signed_qty
1406 );
1407 }
1408 }
1409
1410 fn reconcile_position_report_netting(
1411 &self,
1412 report: &PositionStatusReport,
1413 cache: &Cache,
1414 size_precision: Option<u8>,
1415 ) {
1416 log::debug!("Reconciling NET position for {}", report.instrument_id);
1417
1418 let positions_open =
1419 cache.positions_open(None, Some(&report.instrument_id), None, None, None);
1420
1421 let cached_signed_qty: Decimal = positions_open
1423 .iter()
1424 .map(|p| match p.side {
1425 PositionSide::Long => p.quantity.as_decimal(),
1426 PositionSide::Short => -p.quantity.as_decimal(),
1427 _ => Decimal::ZERO,
1428 })
1429 .sum();
1430
1431 log::debug!(
1432 "Position report: venue_signed_qty={}, cached_signed_qty={}",
1433 report.signed_decimal_qty,
1434 cached_signed_qty
1435 );
1436
1437 let _ = check_position_reconciliation(report, cached_signed_qty, size_precision);
1438 }
1439
1440 pub fn reconcile_execution_mass_status(&mut self, mass_status: &ExecutionMassStatus) {
1446 log::info!(
1447 "Reconciling mass status for client={}, account={}, venue={}",
1448 mass_status.client_id,
1449 mass_status.account_id,
1450 mass_status.venue
1451 );
1452
1453 let mut external_venue_ids = AHashSet::new();
1454
1455 for order_report in mass_status.order_reports().values() {
1456 let existed = {
1457 let cache = self.cache.borrow();
1458 order_report
1459 .client_order_id
1460 .and_then(|id| cache.order(&id).cloned())
1461 .or_else(|| {
1462 cache
1463 .client_order_id(&order_report.venue_order_id)
1464 .and_then(|cid| cache.order(cid).cloned())
1465 })
1466 .is_some()
1467 };
1468
1469 self.reconcile_order_status_report(order_report);
1470
1471 if !existed {
1472 external_venue_ids.insert(order_report.venue_order_id);
1473 }
1474 }
1475
1476 for fill_reports in mass_status.fill_reports().values() {
1477 for fill_report in fill_reports {
1478 if external_venue_ids.contains(&fill_report.venue_order_id) {
1479 log::debug!(
1480 "Skipping fill report for external order {}: covered by inferred fill",
1481 fill_report.venue_order_id
1482 );
1483 continue;
1484 }
1485
1486 self.reconcile_fill_report(fill_report);
1487 }
1488 }
1489
1490 for position_reports in mass_status.position_reports().values() {
1491 for position_report in position_reports {
1492 self.reconcile_position_report(position_report);
1493 }
1494 }
1495
1496 log::info!(
1497 "Mass status reconciliation complete: {} orders, {} fills, {} positions",
1498 mass_status.order_reports().len(),
1499 mass_status
1500 .fill_reports()
1501 .values()
1502 .map(|v| v.len())
1503 .sum::<usize>(),
1504 mass_status
1505 .position_reports()
1506 .values()
1507 .map(|v| v.len())
1508 .sum::<usize>()
1509 );
1510 }
1511
1512 pub fn execute(&self, command: TradingCommand) {
1514 self.execute_command(command);
1515 }
1516
1517 pub fn process(&mut self, event: &OrderEventAny) {
1519 self.handle_event(event);
1520 }
1521
1522 pub fn start(&mut self) {
1524 self.start_snapshot_timer();
1525 self.start_purge_timers();
1526
1527 log::info!("Started");
1528 }
1529
1530 pub fn stop(&mut self) {
1532 self.stop_snapshot_timer();
1533 self.stop_purge_timers();
1534
1535 log::info!("Stopped");
1536 }
1537
1538 pub fn reset(&mut self) {
1540 self.pos_id_generator.reset();
1541
1542 log::info!("Reset");
1543 }
1544
1545 pub fn dispose(&mut self) {
1547 log::info!("Disposed");
1548 }
1549
1550 fn execute_command(&self, command: TradingCommand) {
1551 if self.config.debug {
1552 log::debug!("{RECV}{CMD} {command:?}");
1553 }
1554
1555 if let Some(cid) = command.client_id()
1556 && self.external_clients.contains(&cid)
1557 {
1558 if self.config.debug {
1559 log::debug!("Skipping execution command for external client {cid}: {command:?}");
1560 }
1561 return;
1562 }
1563
1564 let client = if let Some(adapter) = command
1565 .client_id()
1566 .and_then(|cid| self.clients.get(&cid))
1567 .or_else(|| {
1568 self.routing_map
1569 .get(&command.instrument_id().venue)
1570 .and_then(|client_id| self.clients.get(client_id))
1571 })
1572 .or(self.default_client.as_ref())
1573 {
1574 adapter.client.as_ref()
1575 } else {
1576 log::error!(
1577 "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
1578 command.client_id(),
1579 command.instrument_id().venue,
1580 );
1581
1582 let reason = format!(
1583 "No execution client found for client_id={:?}, venue={}",
1584 command.client_id(),
1585 command.instrument_id().venue,
1586 );
1587
1588 match command {
1589 TradingCommand::SubmitOrder(cmd) => {
1590 let cache = self.cache.borrow();
1591 if let Some(order) = cache.order(&cmd.client_order_id) {
1592 let order = order.clone();
1593 drop(cache);
1594 self.deny_order(&order, &reason);
1595 }
1596 }
1597 TradingCommand::SubmitOrderList(cmd) => {
1598 let orders: Vec<OrderAny> = self
1599 .cache
1600 .borrow()
1601 .orders_for_ids(&cmd.order_list.client_order_ids, &cmd);
1602
1603 for order in &orders {
1604 self.deny_order(order, &reason);
1605 }
1606 }
1607 _ => {}
1608 }
1609
1610 return;
1611 };
1612
1613 match command {
1614 TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
1615 TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
1616 TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
1617 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
1618 TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
1619 TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
1620 TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
1621 TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
1622 }
1623 }
1624
1625 fn handle_submit_order(&self, client: &dyn ExecutionClient, cmd: SubmitOrder) {
1626 let client_order_id = cmd.client_order_id;
1627
1628 let order = {
1629 let cache = self.cache.borrow();
1630 match cache.order(&client_order_id) {
1631 Some(order) => order.clone(),
1632 None => {
1633 log::error!(
1634 "Cannot handle submit order: order not found in cache for {client_order_id}"
1635 );
1636 return;
1637 }
1638 }
1639 };
1640
1641 let order_venue = order.instrument_id().venue;
1642 let client_venue = client.venue();
1643 if order_venue != client_venue {
1644 self.deny_order(
1645 &order,
1646 &format!("Order venue {order_venue} does not match client venue {client_venue}"),
1647 );
1648 return;
1649 }
1650
1651 let instrument_id = order.instrument_id();
1652
1653 if self.config.snapshot_orders {
1654 self.create_order_state_snapshot(&order);
1655 }
1656
1657 {
1658 let cache = self.cache.borrow();
1659 if cache.instrument(&instrument_id).is_none() {
1660 log::error!(
1661 "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
1662 );
1663 return;
1664 }
1665 }
1666
1667 if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
1668 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
1669 own_book.add(order.to_own_book_order());
1670 }
1671
1672 if let Err(e) = client.submit_order(cmd) {
1673 self.deny_order(&order, &format!("failed-to-submit-order-to-client: {e}"));
1674 }
1675 }
1676
1677 fn handle_submit_order_list(&self, client: &dyn ExecutionClient, cmd: SubmitOrderList) {
1678 let orders: Vec<OrderAny> = self
1679 .cache
1680 .borrow()
1681 .orders_for_ids(&cmd.order_list.client_order_ids, &cmd);
1682
1683 if orders.len() != cmd.order_list.client_order_ids.len() {
1684 for order in &orders {
1685 self.deny_order(
1686 order,
1687 &format!("Incomplete order list: missing orders in cache for {cmd}"),
1688 );
1689 }
1690 return;
1691 }
1692
1693 let order_list_venue = cmd.instrument_id.venue;
1694 let client_venue = client.venue();
1695 if order_list_venue != client_venue {
1696 for order in &orders {
1697 self.deny_order(
1698 order,
1699 &format!("Order list venue {order_list_venue} does not match client venue {client_venue}"),
1700 );
1701 }
1702 return;
1703 }
1704
1705 if self.config.snapshot_orders {
1706 for order in &orders {
1707 self.create_order_state_snapshot(order);
1708 }
1709 }
1710
1711 {
1712 let cache = self.cache.borrow();
1713 if cache.instrument(&cmd.instrument_id).is_none() {
1714 log::error!(
1715 "Cannot handle submit order list: no instrument found for {}, {cmd}",
1716 cmd.instrument_id,
1717 );
1718 return;
1719 }
1720 }
1721
1722 if self.config.manage_own_order_books {
1723 let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
1724
1725 for order in &orders {
1726 if should_handle_own_book_order(order) {
1727 own_book.add(order.to_own_book_order());
1728 }
1729 }
1730 }
1731
1732 if let Err(e) = client.submit_order_list(cmd) {
1733 log::error!("Error submitting order list to client: {e}");
1734 for order in &orders {
1735 self.deny_order(
1736 order,
1737 &format!("failed-to-submit-order-list-to-client: {e}"),
1738 );
1739 }
1740 }
1741 }
1742
1743 fn handle_modify_order(&self, client: &dyn ExecutionClient, cmd: ModifyOrder) {
1744 if let Err(e) = client.modify_order(cmd) {
1745 log::error!("Error modifying order: {e}");
1746 }
1747 }
1748
1749 fn handle_cancel_order(&self, client: &dyn ExecutionClient, cmd: CancelOrder) {
1750 if let Err(e) = client.cancel_order(cmd) {
1751 log::error!("Error canceling order: {e}");
1752 }
1753 }
1754
1755 fn handle_cancel_all_orders(&self, client: &dyn ExecutionClient, cmd: CancelAllOrders) {
1756 if let Err(e) = client.cancel_all_orders(cmd) {
1757 log::error!("Error canceling all orders: {e}");
1758 }
1759 }
1760
1761 fn handle_batch_cancel_orders(&self, client: &dyn ExecutionClient, cmd: BatchCancelOrders) {
1762 if let Err(e) = client.batch_cancel_orders(cmd) {
1763 log::error!("Error batch canceling orders: {e}");
1764 }
1765 }
1766
1767 fn handle_query_account(&self, client: &dyn ExecutionClient, cmd: QueryAccount) {
1768 if let Err(e) = client.query_account(cmd) {
1769 log::error!("Error querying account: {e}");
1770 }
1771 }
1772
1773 fn handle_query_order(&self, client: &dyn ExecutionClient, cmd: QueryOrder) {
1774 if let Err(e) = client.query_order(cmd) {
1775 log::error!("Error querying order: {e}");
1776 }
1777 }
1778
1779 fn create_order_state_snapshot(&self, order: &OrderAny) {
1780 if self.config.debug {
1781 log::debug!("Creating order state snapshot for {order}");
1782 }
1783
1784 if self.cache.borrow().has_backing()
1785 && let Err(e) = self.cache.borrow().snapshot_order_state(order)
1786 {
1787 log::error!("Failed to snapshot order state: {e}");
1788 }
1789 }
1790
1791 fn create_position_state_snapshot(&self, position: &Position) {
1792 if self.config.debug {
1793 log::debug!("Creating position state snapshot for {position}");
1794 }
1795
1796 }
1801
1802 fn handle_event(&mut self, event: &OrderEventAny) {
1803 if self.config.debug {
1804 log::debug!("{RECV}{EVT} {event:?}");
1805 }
1806
1807 let client_order_id = event.client_order_id();
1808 let cache = self.cache.borrow();
1809 let mut order = if let Some(order) = cache.order(&client_order_id) {
1810 order.clone()
1811 } else {
1812 log::warn!(
1813 "Order with {} not found in the cache to apply {}",
1814 event.client_order_id(),
1815 event
1816 );
1817
1818 let venue_order_id = if let Some(id) = event.venue_order_id() {
1820 id
1821 } else {
1822 log::error!(
1823 "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
1824 event.client_order_id()
1825 );
1826 return;
1827 };
1828
1829 let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
1831 id
1832 } else {
1833 log::error!(
1834 "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
1835 event.client_order_id(),
1836 );
1837 return;
1838 };
1839
1840 if let Some(order) = cache.order(client_order_id) {
1842 log::info!("Order with {client_order_id} was found in the cache");
1843 order.clone()
1844 } else {
1845 log::error!(
1846 "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
1847 );
1848 return;
1849 }
1850 };
1851
1852 drop(cache);
1853
1854 match event {
1855 OrderEventAny::Filled(fill) => {
1856 let oms_type = self.determine_oms_type(fill);
1857 let position_id = self.determine_position_id(*fill, oms_type, Some(&order));
1858
1859 let mut fill = *fill;
1860 fill.position_id = Some(position_id);
1861
1862 if self.apply_fill_to_order(&mut order, fill).is_ok() {
1863 self.handle_order_fill(&order, fill, oms_type);
1864 }
1865 }
1866 _ => {
1867 let _ = self.apply_event_to_order(&mut order, event);
1868 }
1869 }
1870 }
1871
1872 fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
1873 if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
1875 return *oms_type;
1876 }
1877
1878 if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
1880 && let Some(client) = self.clients.get(client_id)
1881 {
1882 return client.oms_type();
1883 }
1884
1885 if let Some(client) = &self.default_client {
1886 return client.oms_type();
1887 }
1888
1889 OmsType::Netting }
1891
1892 fn determine_position_id(
1893 &mut self,
1894 fill: OrderFilled,
1895 oms_type: OmsType,
1896 order: Option<&OrderAny>,
1897 ) -> PositionId {
1898 let cache = self.cache.borrow();
1899 let cached_position_id = cache.position_id(&fill.client_order_id()).copied();
1900 drop(cache);
1901
1902 if self.config.debug {
1903 log::debug!(
1904 "Determining position ID for {}, position_id={:?}",
1905 fill.client_order_id(),
1906 cached_position_id,
1907 );
1908 }
1909
1910 if let Some(position_id) = cached_position_id {
1911 if let Some(fill_position_id) = fill.position_id
1912 && fill_position_id != position_id
1913 {
1914 log::warn!(
1915 "Incorrect position ID assigned to fill: \
1916 cached={position_id}, assigned={fill_position_id}; \
1917 re-assigning from cache",
1918 );
1919 }
1920
1921 if self.config.debug {
1922 log::debug!("Assigned {position_id} to {}", fill.client_order_id());
1923 }
1924
1925 return position_id;
1926 }
1927
1928 let position_id = match oms_type {
1929 OmsType::Hedging => self.determine_hedging_position_id(fill, order),
1930 OmsType::Netting => self.determine_netting_position_id(fill),
1931 _ => self.determine_netting_position_id(fill),
1932 };
1933
1934 let order = if let Some(o) = order {
1935 o.clone()
1936 } else {
1937 let cache = self.cache.borrow();
1938 cache
1939 .order(&fill.client_order_id())
1940 .cloned()
1941 .unwrap_or_else(|| {
1942 panic!(
1943 "Order for {} not found to determine position ID",
1944 fill.client_order_id()
1945 )
1946 })
1947 };
1948
1949 if order.exec_algorithm_id().is_some()
1950 && let Some(exec_spawn_id) = order.exec_spawn_id()
1951 {
1952 let cache = self.cache.borrow();
1953 let primary = if let Some(p) = cache.order(&exec_spawn_id) {
1954 p.clone()
1955 } else {
1956 log::warn!(
1957 "Primary exec spawn order {exec_spawn_id} not found, \
1958 skipping position ID propagation"
1959 );
1960 return position_id;
1961 };
1962 let primary_already_indexed = cache.position_id(&primary.client_order_id()).is_some();
1963 drop(cache);
1964
1965 if primary.position_id().is_none() && !primary_already_indexed {
1966 let mut cache = self.cache.borrow_mut();
1967 if let Some(primary_mut) = cache.mut_order(&exec_spawn_id) {
1968 primary_mut.set_position_id(Some(position_id));
1969 }
1970 let _ = cache.add_position_id(
1971 &position_id,
1972 &primary.instrument_id().venue,
1973 &primary.client_order_id(),
1974 &primary.strategy_id(),
1975 );
1976 log::debug!("Assigned primary order {position_id}");
1977 }
1978 }
1979
1980 position_id
1981 }
1982
1983 fn determine_hedging_position_id(
1984 &mut self,
1985 fill: OrderFilled,
1986 order: Option<&OrderAny>,
1987 ) -> PositionId {
1988 if let Some(position_id) = fill.position_id {
1990 if self.config.debug {
1991 log::debug!("Already had a position ID of: {position_id}");
1992 }
1993 return position_id;
1994 }
1995
1996 let cache = self.cache.borrow();
1997
1998 let order = if let Some(o) = order {
1999 o
2000 } else {
2001 match cache.order(&fill.client_order_id()) {
2002 Some(o) => o,
2003 None => {
2004 panic!(
2005 "Order for {} not found to determine position ID",
2006 fill.client_order_id()
2007 );
2008 }
2009 }
2010 };
2011
2012 if let Some(spawn_id) = order.exec_spawn_id() {
2014 let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
2015 for spawned_order in spawn_orders {
2016 if let Some(pos_id) = spawned_order.position_id() {
2017 if self.config.debug {
2018 log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
2019 }
2020 return pos_id;
2021 }
2022 }
2023 }
2024
2025 let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
2027
2028 if self.config.debug {
2029 log::debug!("Generated {} for {}", position_id, fill.client_order_id());
2030 }
2031 position_id
2032 }
2033
2034 fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
2035 PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
2036 }
2037
2038 fn apply_fill_to_order(&self, order: &mut OrderAny, fill: OrderFilled) -> anyhow::Result<()> {
2039 if order.is_duplicate_fill(&fill) {
2040 log::warn!(
2041 "Duplicate fill: {} trade_id={} already applied, skipping",
2042 order.client_order_id(),
2043 fill.trade_id
2044 );
2045 anyhow::bail!("Duplicate fill");
2046 }
2047
2048 self.check_overfill(order, &fill)?;
2049 let event = OrderEventAny::Filled(fill);
2050 self.apply_order_event(order, &event)
2051 }
2052
2053 fn apply_event_to_order(
2054 &self,
2055 order: &mut OrderAny,
2056 event: &OrderEventAny,
2057 ) -> anyhow::Result<()> {
2058 self.apply_order_event(order, event)
2059 }
2060
2061 fn apply_order_event(&self, order: &mut OrderAny, event: &OrderEventAny) -> anyhow::Result<()> {
2062 if let Err(e) = order.apply(event.clone()) {
2063 match e {
2064 OrderError::InvalidStateTransition => {
2065 log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
2068 }
2069 OrderError::DuplicateFill(trade_id) => {
2070 log::warn!(
2072 "Duplicate fill rejected at order level: trade_id={trade_id}, did not apply {event}"
2073 );
2074 anyhow::bail!("{e}");
2075 }
2076 _ => {
2077 log::error!("Error applying event: {e}, did not apply {event}");
2079
2080 if should_handle_own_book_order(order) {
2081 self.cache.borrow_mut().update_own_order_book(order);
2082 }
2083 anyhow::bail!("{e}");
2084 }
2085 }
2086 }
2087
2088 if let Err(e) = self.cache.borrow_mut().update_order(order) {
2089 log::error!("Error updating order in cache: {e}");
2090 }
2091
2092 if self.config.debug {
2093 log::debug!("{SEND}{EVT} {event}");
2094 }
2095
2096 let topic = switchboard::get_event_orders_topic(event.strategy_id());
2097 msgbus::publish_order_event(topic, event);
2098
2099 if self.config.snapshot_orders {
2100 self.create_order_state_snapshot(order);
2101 }
2102
2103 Ok(())
2104 }
2105
2106 fn check_overfill(&self, order: &OrderAny, fill: &OrderFilled) -> anyhow::Result<()> {
2107 let potential_overfill = order.calculate_overfill(fill.last_qty);
2108
2109 if potential_overfill.is_positive() {
2110 if self.config.allow_overfills {
2111 log::warn!(
2112 "Order overfill detected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}",
2113 order.client_order_id(),
2114 potential_overfill,
2115 order.filled_qty(),
2116 fill.last_qty,
2117 order.quantity()
2118 );
2119 } else {
2120 let msg = format!(
2121 "Order overfill rejected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}. \
2122 Set `allow_overfills=true` in ExecutionEngineConfig to allow overfills.",
2123 order.client_order_id(),
2124 potential_overfill,
2125 order.filled_qty(),
2126 fill.last_qty,
2127 order.quantity()
2128 );
2129 anyhow::bail!("{msg}");
2130 }
2131 }
2132
2133 Ok(())
2134 }
2135
2136 fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
2137 let instrument =
2138 if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
2139 instrument.clone()
2140 } else {
2141 log::error!(
2142 "Cannot handle order fill: no instrument found for {}, {fill}",
2143 fill.instrument_id,
2144 );
2145 return;
2146 };
2147
2148 if self.cache.borrow().account(&fill.account_id).is_none() {
2149 log::error!(
2150 "Cannot handle order fill: no account found for {}, {fill}",
2151 fill.instrument_id.venue,
2152 );
2153 return;
2154 }
2155
2156 let position = if instrument.is_spread() {
2159 None
2160 } else {
2161 self.handle_position_update(&instrument, fill, oms_type);
2162 let position_id = fill.position_id.unwrap();
2163 self.cache.borrow().position(&position_id).cloned()
2164 };
2165
2166 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
2169 if !instrument.is_spread()
2171 && let Some(ref pos) = position
2172 && pos.is_open()
2173 {
2174 let position_id = pos.id;
2175
2176 for client_order_id in order.linked_order_ids().unwrap_or_default() {
2177 let mut cache = self.cache.borrow_mut();
2178 let contingent_order = cache.mut_order(client_order_id);
2179 if let Some(contingent_order) = contingent_order
2180 && contingent_order.position_id().is_none()
2181 {
2182 contingent_order.set_position_id(Some(position_id));
2183
2184 if let Err(e) = self.cache.borrow_mut().add_position_id(
2185 &position_id,
2186 &contingent_order.instrument_id().venue,
2187 &contingent_order.client_order_id(),
2188 &contingent_order.strategy_id(),
2189 ) {
2190 log::error!("Failed to add position ID: {e}");
2191 }
2192 }
2193 }
2194 }
2195 }
2198 }
2199
2200 fn handle_position_update(
2204 &mut self,
2205 instrument: &InstrumentAny,
2206 fill: OrderFilled,
2207 oms_type: OmsType,
2208 ) {
2209 let position_id = if let Some(position_id) = fill.position_id {
2210 position_id
2211 } else {
2212 log::error!("Cannot handle position update: no position ID found for fill {fill}");
2213 return;
2214 };
2215
2216 let position_opt = self.cache.borrow().position(&position_id).cloned();
2217
2218 match position_opt {
2219 None => {
2220 if self.open_position(instrument, None, fill, oms_type).is_ok() {
2222 }
2224 }
2225 Some(pos) if pos.is_closed() => {
2226 if self
2228 .open_position(instrument, Some(&pos), fill, oms_type)
2229 .is_ok()
2230 {
2231 }
2233 }
2234 Some(mut pos) => {
2235 if self.will_flip_position(&pos, fill) {
2236 self.flip_position(instrument, &mut pos, fill, oms_type);
2238 } else {
2239 self.update_position(&mut pos, fill);
2241 }
2242 }
2243 }
2244 }
2245
2246 fn open_position(
2247 &self,
2248 instrument: &InstrumentAny,
2249 position: Option<&Position>,
2250 fill: OrderFilled,
2251 oms_type: OmsType,
2252 ) -> anyhow::Result<()> {
2253 if let Some(position) = position {
2254 if Self::is_duplicate_closed_fill(position, &fill) {
2255 log::warn!(
2256 "Ignoring duplicate fill {} for closed position {}; no position reopened (side={:?}, qty={}, px={})",
2257 fill.trade_id,
2258 position.id,
2259 fill.order_side,
2260 fill.last_qty,
2261 fill.last_px
2262 );
2263 return Ok(());
2264 }
2265 self.reopen_position(position, oms_type)?;
2266 }
2267
2268 let position = Position::new(instrument, fill);
2269 self.cache.borrow_mut().add_position(&position, oms_type)?;
2270
2271 if self.config.snapshot_positions {
2272 self.create_position_state_snapshot(&position);
2273 }
2274
2275 let ts_init = self.clock.borrow().timestamp_ns();
2276 let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
2277 let topic = switchboard::get_event_positions_topic(event.strategy_id);
2278 msgbus::publish_position_event(topic, &PositionEvent::PositionOpened(event));
2279
2280 Ok(())
2281 }
2282
2283 fn is_duplicate_closed_fill(position: &Position, fill: &OrderFilled) -> bool {
2284 position.events.iter().any(|event| {
2285 event.trade_id == fill.trade_id
2286 && event.order_side == fill.order_side
2287 && event.last_px == fill.last_px
2288 && event.last_qty == fill.last_qty
2289 })
2290 }
2291
2292 fn reopen_position(&self, position: &Position, oms_type: OmsType) -> anyhow::Result<()> {
2293 if oms_type == OmsType::Netting {
2294 if position.is_open() {
2295 anyhow::bail!(
2296 "Cannot reopen position {} (oms_type=NETTING): reopening is only valid for closed positions in NETTING mode",
2297 position.id
2298 );
2299 }
2300 self.cache.borrow_mut().snapshot_position(position)?;
2302 } else {
2303 log::warn!(
2305 "Received fill for closed position {} in HEDGING mode; creating new position and ignoring previous state",
2306 position.id
2307 );
2308 }
2309 Ok(())
2310 }
2311
2312 fn update_position(&self, position: &mut Position, fill: OrderFilled) {
2313 position.apply(&fill);
2315
2316 let is_closed = position.is_closed();
2318
2319 if let Err(e) = self.cache.borrow_mut().update_position(position) {
2321 log::error!("Failed to update position: {e:?}");
2322 return;
2323 }
2324
2325 let cache = self.cache.borrow();
2327
2328 drop(cache);
2329
2330 if self.config.snapshot_positions {
2332 self.create_position_state_snapshot(position);
2333 }
2334
2335 let topic = switchboard::get_event_positions_topic(position.strategy_id);
2337 let ts_init = self.clock.borrow().timestamp_ns();
2338
2339 if is_closed {
2340 let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
2341 msgbus::publish_position_event(topic, &PositionEvent::PositionClosed(event));
2342 } else {
2343 let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
2344 msgbus::publish_position_event(topic, &PositionEvent::PositionChanged(event));
2345 }
2346 }
2347
2348 fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
2349 position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
2350 }
2351
2352 fn flip_position(
2353 &mut self,
2354 instrument: &InstrumentAny,
2355 position: &mut Position,
2356 fill: OrderFilled,
2357 oms_type: OmsType,
2358 ) {
2359 let difference = match position.side {
2360 PositionSide::Long => Quantity::from_raw(
2361 fill.last_qty.raw - position.quantity.raw,
2362 position.size_precision,
2363 ),
2364 PositionSide::Short => Quantity::from_raw(
2365 position.quantity.raw.abs_diff(fill.last_qty.raw), position.size_precision,
2367 ),
2368 _ => fill.last_qty,
2369 };
2370
2371 let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
2373 let (commission1, commission2) = if let Some(commission) = fill.commission {
2374 let commission_currency = commission.currency;
2375 let commission1 = Money::new(commission * fill_percent, commission_currency);
2376 let commission2 = commission - commission1;
2377 (Some(commission1), Some(commission2))
2378 } else {
2379 log::error!("Commission is not available");
2380 (None, None)
2381 };
2382
2383 let mut fill_split1: Option<OrderFilled> = None;
2384
2385 if position.is_open() {
2386 fill_split1 = Some(OrderFilled::new(
2387 fill.trader_id,
2388 fill.strategy_id,
2389 fill.instrument_id,
2390 fill.client_order_id,
2391 fill.venue_order_id,
2392 fill.account_id,
2393 fill.trade_id,
2394 fill.order_side,
2395 fill.order_type,
2396 position.quantity,
2397 fill.last_px,
2398 fill.currency,
2399 fill.liquidity_side,
2400 UUID4::new(),
2401 fill.ts_event,
2402 fill.ts_init,
2403 fill.reconciliation,
2404 fill.position_id,
2405 commission1,
2406 ));
2407
2408 self.update_position(position, fill_split1.unwrap());
2409
2410 if oms_type == OmsType::Netting
2412 && let Err(e) = self.cache.borrow_mut().snapshot_position(position)
2413 {
2414 log::error!("Failed to snapshot position during flip: {e:?}");
2415 }
2416 }
2417
2418 if difference.raw == 0 {
2420 log::warn!(
2421 "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
2422 );
2423 return;
2424 }
2425
2426 let position_id_flip = if oms_type == OmsType::Hedging
2427 && let Some(position_id) = fill.position_id
2428 && position_id.is_virtual()
2429 {
2430 Some(self.pos_id_generator.generate(fill.strategy_id, true))
2432 } else {
2433 fill.position_id
2435 };
2436
2437 let fill_split2 = OrderFilled::new(
2438 fill.trader_id,
2439 fill.strategy_id,
2440 fill.instrument_id,
2441 fill.client_order_id,
2442 fill.venue_order_id,
2443 fill.account_id,
2444 fill.trade_id,
2445 fill.order_side,
2446 fill.order_type,
2447 difference,
2448 fill.last_px,
2449 fill.currency,
2450 fill.liquidity_side,
2451 UUID4::new(),
2452 fill.ts_event,
2453 fill.ts_init,
2454 fill.reconciliation,
2455 position_id_flip,
2456 commission2,
2457 );
2458
2459 if oms_type == OmsType::Hedging
2460 && let Some(position_id) = fill.position_id
2461 && position_id.is_virtual()
2462 {
2463 log::warn!("Closing position {fill_split1:?}");
2464 log::warn!("Flipping position {fill_split2:?}");
2465 }
2466
2467 if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
2469 log::error!("Failed to open flipped position: {e:?}");
2470 }
2471 }
2472
2473 pub fn set_position_id_counts(&mut self) {
2475 let cache = self.cache.borrow();
2476 let positions = cache.positions(None, None, None, None, None);
2477
2478 let mut counts: HashMap<StrategyId, usize> = HashMap::new();
2480
2481 for position in positions {
2482 *counts.entry(position.strategy_id).or_insert(0) += 1;
2483 }
2484
2485 self.pos_id_generator.reset();
2486
2487 for (strategy_id, count) in counts {
2488 self.pos_id_generator.set_count(count, strategy_id);
2489 log::info!("Set PositionId count for {strategy_id} to {count}");
2490 }
2491 }
2492
2493 fn deny_order(&self, order: &OrderAny, reason: &str) {
2494 let denied = OrderDenied::new(
2495 order.trader_id(),
2496 order.strategy_id(),
2497 order.instrument_id(),
2498 order.client_order_id(),
2499 reason.into(),
2500 UUID4::new(),
2501 self.clock.borrow().timestamp_ns(),
2502 self.clock.borrow().timestamp_ns(),
2503 );
2504
2505 let mut order = order.clone();
2506
2507 if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
2508 log::error!("Failed to apply denied event to order: {e}");
2509 return;
2510 }
2511
2512 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
2513 log::error!("Failed to update order in cache: {e}");
2514 return;
2515 }
2516
2517 let topic = switchboard::get_event_orders_topic(order.strategy_id());
2518 msgbus::publish_order_event(topic, &OrderEventAny::Denied(denied));
2519
2520 if self.config.snapshot_orders {
2521 self.create_order_state_snapshot(&order);
2522 }
2523 }
2524
2525 fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
2526 let mut cache = self.cache.borrow_mut();
2527 if cache.own_order_book_mut(instrument_id).is_none() {
2528 let own_book = OwnOrderBook::new(*instrument_id);
2529 cache.add_own_order_book(own_book).unwrap();
2530 }
2531
2532 RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
2533 }
2534}