1pub mod config;
19
20use std::{cell::RefCell, fmt::Debug, rc::Rc};
21
22use ahash::AHashMap;
23use config::RiskEngineConfig;
24use indexmap::IndexMap;
25use nautilus_common::{
26 cache::Cache,
27 clock::Clock,
28 logging::{CMD, EVT, RECV},
29 messages::{
30 execution::{ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand},
31 system::trading::TradingStateChanged,
32 },
33 msgbus,
34 msgbus::{MessagingSwitchboard, TypedIntoHandler, get_message_bus},
35 runner::try_get_trading_cmd_sender,
36 throttler::{RateLimit, Throttler},
37};
38use nautilus_core::{UUID4, WeakCell};
39use nautilus_execution::trailing::{
40 trailing_stop_calculate_with_bid_ask, trailing_stop_calculate_with_last,
41};
42use nautilus_model::{
43 accounts::{Account, AccountAny},
44 enums::{
45 OrderSide, OrderStatus, PositionSide, TimeInForce, TradingState, TrailingOffsetType,
46 TriggerType,
47 },
48 events::{OrderDenied, OrderEventAny, OrderModifyRejected},
49 identifiers::{AccountId, InstrumentId},
50 instruments::{Instrument, InstrumentAny},
51 orders::{Order, OrderAny},
52 types::{Currency, Money, Price, Quantity, quantity::QuantityRaw},
53};
54use nautilus_portfolio::Portfolio;
55use rust_decimal::{Decimal, prelude::ToPrimitive};
56use ustr::Ustr;
57
58fn format_rate_limit(rate_limit: &RateLimit) -> String {
59 let total_secs = rate_limit.interval_ns / 1_000_000_000;
60 let remainder_ns = rate_limit.interval_ns % 1_000_000_000;
61 let hours = total_secs / 3600;
62 let minutes = (total_secs % 3600) / 60;
63 let seconds = total_secs % 60;
64
65 if remainder_ns == 0 {
66 format!("{}/{hours:02}:{minutes:02}:{seconds:02}", rate_limit.limit)
67 } else {
68 let micros = remainder_ns / 1_000;
69 format!(
70 "{}/{hours:02}:{minutes:02}:{seconds:02}.{micros:06}",
71 rate_limit.limit
72 )
73 }
74}
75
76type SubmitCommandFn = Box<dyn Fn(TradingCommand)>;
77type ModifyOrderFn = Box<dyn Fn(ModifyOrder)>;
78
79#[allow(dead_code)]
86pub struct RiskEngine {
87 clock: Rc<RefCell<dyn Clock>>,
88 cache: Rc<RefCell<Cache>>,
89 portfolio: Portfolio,
90 pub throttled_submit: Throttler<TradingCommand, SubmitCommandFn>,
91 pub throttled_modify_order: Throttler<ModifyOrder, ModifyOrderFn>,
92 max_notional_per_order: AHashMap<InstrumentId, Decimal>,
93 trading_state: TradingState,
94 config: RiskEngineConfig,
95}
96
97impl Debug for RiskEngine {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 f.debug_struct(stringify!(RiskEngine)).finish()
100 }
101}
102
103impl RiskEngine {
104 pub fn new(
106 config: RiskEngineConfig,
107 portfolio: Portfolio,
108 clock: Rc<RefCell<dyn Clock>>,
109 cache: Rc<RefCell<Cache>>,
110 ) -> Self {
111 let throttled_submit = Self::create_submit_throttler(&config, clock.clone(), cache.clone());
112
113 let throttled_modify_order =
114 Self::create_modify_order_throttler(&config, clock.clone(), cache.clone());
115
116 Self {
117 clock,
118 cache,
119 portfolio,
120 throttled_submit,
121 throttled_modify_order,
122 max_notional_per_order: config.max_notional_per_order.clone(),
123 trading_state: TradingState::Active,
124 config,
125 }
126 }
127
128 pub fn register_msgbus_handlers(engine: &Rc<RefCell<Self>>) {
130 let weak = WeakCell::from(Rc::downgrade(engine));
131
132 msgbus::register_trading_command_endpoint(
133 MessagingSwitchboard::risk_engine_execute(),
134 TypedIntoHandler::from(move |cmd: TradingCommand| {
135 if let Some(rc) = weak.upgrade() {
136 rc.borrow_mut().execute(cmd);
137 }
138 }),
139 );
140
141 msgbus::register_trading_command_endpoint(
150 MessagingSwitchboard::risk_engine_queue_execute(),
151 TypedIntoHandler::from(move |cmd: TradingCommand| {
152 if let Some(sender) = try_get_trading_cmd_sender() {
153 sender.execute(cmd);
154 } else {
155 let endpoint = MessagingSwitchboard::risk_engine_execute();
156 msgbus::send_trading_command(endpoint, cmd);
157 }
158 }),
159 );
160 }
161
162 fn create_submit_throttler(
163 config: &RiskEngineConfig,
164 clock: Rc<RefCell<dyn Clock>>,
165 cache: Rc<RefCell<Cache>>,
166 ) -> Throttler<TradingCommand, SubmitCommandFn> {
167 let success_handler = {
168 Box::new(move |command: TradingCommand| {
169 let endpoint = MessagingSwitchboard::exec_engine_queue_execute();
170 msgbus::send_trading_command(endpoint, command);
171 }) as Box<dyn Fn(TradingCommand)>
172 };
173
174 let failure_handler = {
175 let cache = cache;
176 let clock = clock.clone();
177 Box::new(move |command: TradingCommand| {
178 let reason = "REJECTED BY THROTTLER";
179
180 match command {
181 TradingCommand::SubmitOrder(submit_order) => {
182 log::warn!(
183 "SubmitOrder for {} DENIED: {reason}",
184 submit_order.client_order_id,
185 );
186
187 Self::handle_submit_order_cache(&cache, &submit_order);
188
189 let denied = Self::create_order_denied(&submit_order, reason, &clock);
190
191 let endpoint = MessagingSwitchboard::exec_engine_process();
192 msgbus::send_order_event(endpoint, denied);
193 }
194 TradingCommand::SubmitOrderList(submit_order_list) => {
195 log::warn!(
196 "SubmitOrderList for {} DENIED: {reason}",
197 submit_order_list.order_list.id,
198 );
199
200 let orders: Vec<OrderAny> = cache.borrow().orders_for_ids(
201 &submit_order_list.order_list.client_order_ids,
202 &submit_order_list,
203 );
204
205 let timestamp = clock.borrow().timestamp_ns();
206
207 for order in &orders {
208 if order.status() == OrderStatus::Initialized {
209 let denied = OrderEventAny::Denied(OrderDenied::new(
210 order.trader_id(),
211 order.strategy_id(),
212 order.instrument_id(),
213 order.client_order_id(),
214 reason.into(),
215 UUID4::new(),
216 timestamp,
217 timestamp,
218 ));
219 let endpoint = MessagingSwitchboard::exec_engine_process();
220 msgbus::send_order_event(endpoint, denied);
221 }
222 }
223 }
224 _ => {
225 log::error!("Unexpected command type in submit throttler: {command}");
226 }
227 }
228 }) as Box<dyn Fn(TradingCommand)>
229 };
230
231 Throttler::new(
232 config.max_order_submit.limit,
233 config.max_order_submit.interval_ns,
234 clock,
235 "ORDER_SUBMIT_THROTTLER",
236 success_handler,
237 Some(failure_handler),
238 Ustr::from(UUID4::new().as_str()),
239 )
240 }
241
242 fn create_modify_order_throttler(
243 config: &RiskEngineConfig,
244 clock: Rc<RefCell<dyn Clock>>,
245 cache: Rc<RefCell<Cache>>,
246 ) -> Throttler<ModifyOrder, ModifyOrderFn> {
247 let success_handler = {
248 Box::new(move |order: ModifyOrder| {
249 let endpoint = MessagingSwitchboard::exec_engine_queue_execute();
250 msgbus::send_trading_command(endpoint, TradingCommand::ModifyOrder(order));
251 }) as Box<dyn Fn(ModifyOrder)>
252 };
253
254 let failure_handler = {
255 let cache = cache;
256 let clock = clock.clone();
257 Box::new(move |order: ModifyOrder| {
258 let reason = "Exceeded MAX_ORDER_MODIFY_RATE";
259 log::warn!(
260 "SubmitOrder for {} DENIED: {}",
261 order.client_order_id,
262 reason
263 );
264
265 let order = match Self::get_existing_order(&cache, &order) {
266 Some(order) => order,
267 None => return,
268 };
269
270 let rejected = Self::create_modify_rejected(&order, reason, &clock);
271
272 let endpoint = MessagingSwitchboard::exec_engine_process();
273 msgbus::send_order_event(endpoint, rejected);
274 }) as Box<dyn Fn(ModifyOrder)>
275 };
276
277 Throttler::new(
278 config.max_order_modify.limit,
279 config.max_order_modify.interval_ns,
280 clock,
281 "ORDER_MODIFY_THROTTLER",
282 success_handler,
283 Some(failure_handler),
284 Ustr::from(UUID4::new().as_str()),
285 )
286 }
287
288 fn handle_submit_order_cache(cache: &Rc<RefCell<Cache>>, submit_order: &SubmitOrder) {
289 let cache = cache.borrow();
290 if !cache.order_exists(&submit_order.client_order_id) {
291 log::error!(
292 "Order not found in cache for client_order_id: {}",
293 submit_order.client_order_id
294 );
295 }
296 }
297
298 fn get_existing_order(cache: &Rc<RefCell<Cache>>, order: &ModifyOrder) -> Option<OrderAny> {
299 let cache = cache.borrow();
300 if let Some(order) = cache.order(&order.client_order_id) {
301 Some(order.clone())
302 } else {
303 log::error!(
304 "Order with command.client_order_id: {} not found",
305 order.client_order_id
306 );
307 None
308 }
309 }
310
311 fn create_order_denied(
312 submit_order: &SubmitOrder,
313 reason: &str,
314 clock: &Rc<RefCell<dyn Clock>>,
315 ) -> OrderEventAny {
316 let timestamp = clock.borrow().timestamp_ns();
317 OrderEventAny::Denied(OrderDenied::new(
318 submit_order.trader_id,
319 submit_order.strategy_id,
320 submit_order.instrument_id,
321 submit_order.client_order_id,
322 reason.into(),
323 UUID4::new(),
324 timestamp,
325 timestamp,
326 ))
327 }
328
329 fn create_modify_rejected(
330 order: &OrderAny,
331 reason: &str,
332 clock: &Rc<RefCell<dyn Clock>>,
333 ) -> OrderEventAny {
334 let timestamp = clock.borrow().timestamp_ns();
335 OrderEventAny::ModifyRejected(OrderModifyRejected::new(
336 order.trader_id(),
337 order.strategy_id(),
338 order.instrument_id(),
339 order.client_order_id(),
340 reason.into(),
341 UUID4::new(),
342 timestamp,
343 timestamp,
344 false,
345 order.venue_order_id(),
346 None,
347 ))
348 }
349
350 pub fn execute(&mut self, command: TradingCommand) {
353 self.handle_command(command);
355 }
356
357 #[expect(clippy::needless_pass_by_value)] pub fn process(&mut self, event: OrderEventAny) {
360 self.handle_event(&event);
362 }
363
364 pub fn set_trading_state(&mut self, state: TradingState) {
366 if state == self.trading_state {
367 log::warn!("No change to trading state: already set to {state:?}");
368 return;
369 }
370
371 self.trading_state = state;
372
373 let ts_now = self.clock.borrow().timestamp_ns();
374 let trader_id = get_message_bus().borrow().trader_id;
375
376 let config = self.config_as_map();
377 let event =
378 TradingStateChanged::new(trader_id, state, config, UUID4::new(), ts_now, ts_now);
379
380 msgbus::publish_any("events.risk".into(), &event);
381
382 log::info!("Trading state set to {state:?}");
383 }
384
385 pub fn set_max_notional_per_order(&mut self, instrument_id: InstrumentId, new_value: Decimal) {
387 self.max_notional_per_order.insert(instrument_id, new_value);
388
389 let new_value_str = new_value.to_string();
390 log::info!("Set MAX_NOTIONAL_PER_ORDER: {instrument_id} {new_value_str}");
391 }
392
393 pub fn start(&mut self) {
395 log::info!("Started");
396 }
397
398 pub fn stop(&mut self) {
400 log::info!("Stopped");
401 }
402
403 pub fn reset(&mut self) {
405 self.throttled_submit.reset();
406 self.throttled_modify_order.reset();
407 self.max_notional_per_order = self.config.max_notional_per_order.clone();
408 self.trading_state = TradingState::Active;
409
410 log::info!("Reset");
411 }
412
413 pub fn dispose(&mut self) {
415 log::info!("Disposed");
416 }
417
418 #[must_use]
420 pub fn clock(&self) -> &Rc<RefCell<dyn Clock>> {
421 &self.clock
422 }
423
424 #[must_use]
426 pub fn cache(&self) -> &Rc<RefCell<Cache>> {
427 &self.cache
428 }
429
430 pub fn portfolio_mut(&mut self) -> &mut Portfolio {
432 &mut self.portfolio
433 }
434
435 #[must_use]
437 pub const fn config(&self) -> &RiskEngineConfig {
438 &self.config
439 }
440
441 #[must_use]
443 pub const fn trading_state(&self) -> TradingState {
444 self.trading_state
445 }
446
447 #[must_use]
449 pub const fn max_notional_per_order(&self) -> &AHashMap<InstrumentId, Decimal> {
450 &self.max_notional_per_order
451 }
452
453 fn config_as_map(&self) -> IndexMap<String, String> {
454 let mut map = IndexMap::new();
455 map.insert("bypass".to_string(), self.config.bypass.to_string());
456 map.insert(
457 "max_order_submit_rate".to_string(),
458 format_rate_limit(&self.config.max_order_submit),
459 );
460 map.insert(
461 "max_order_modify_rate".to_string(),
462 format_rate_limit(&self.config.max_order_modify),
463 );
464
465 for (instrument_id, value) in &self.max_notional_per_order {
466 map.insert(
467 format!("max_notional_per_order.{instrument_id}"),
468 value.to_string(),
469 );
470 }
471
472 map.insert("debug".to_string(), self.config.debug.to_string());
473 map
474 }
475
476 fn handle_command(&mut self, command: TradingCommand) {
477 if self.config.debug {
478 log::debug!("{CMD}{RECV} {command:?}");
479 }
480
481 match command {
482 TradingCommand::SubmitOrder(submit_order) => self.handle_submit_order(submit_order),
483 TradingCommand::SubmitOrderList(submit_order_list) => {
484 self.handle_submit_order_list(submit_order_list);
485 }
486 TradingCommand::ModifyOrder(modify_order) => self.handle_modify_order(modify_order),
487 TradingCommand::QueryAccount(query_account) => {
488 self.send_to_execution(TradingCommand::QueryAccount(query_account));
489 }
490 _ => {
491 log::error!("Cannot handle command: {command}");
492 }
493 }
494 }
495
496 fn handle_submit_order(&mut self, command: SubmitOrder) {
497 if self.config.bypass {
498 self.send_to_execution(TradingCommand::SubmitOrder(command));
499 return;
500 }
501
502 let order = {
503 let cache = self.cache.borrow();
504 match cache.order(&command.client_order_id) {
505 Some(order) => order.clone(),
506 None => {
507 log::error!(
508 "Cannot handle submit order: order not found in cache for {}",
509 command.client_order_id
510 );
511 return;
512 }
513 }
514 };
515
516 if let Some(position_id) = command.position_id
517 && order.is_reduce_only()
518 {
519 let position_exists = {
520 let cache = self.cache.borrow();
521 cache
522 .position(&position_id)
523 .map(|pos| (pos.side, pos.quantity))
524 };
525
526 if let Some((pos_side, pos_quantity)) = position_exists {
527 if !order.would_reduce_only(pos_side, pos_quantity) {
528 self.deny_command(
529 TradingCommand::SubmitOrder(command),
530 &format!("Reduce only order would increase position {position_id}"),
531 );
532 return; }
534 } else {
535 self.deny_command(
536 TradingCommand::SubmitOrder(command),
537 &format!("Position {position_id} not found for reduce-only order"),
538 );
539 return;
540 }
541 }
542
543 let instrument_exists = {
544 let cache = self.cache.borrow();
545 cache.instrument(&command.instrument_id).cloned()
546 };
547
548 let instrument = if let Some(instrument) = instrument_exists {
549 instrument
550 } else {
551 self.deny_command(
552 TradingCommand::SubmitOrder(command.clone()),
553 &format!("Instrument for {} not found", command.instrument_id),
554 );
555 return; };
557
558 if !self.check_order(&instrument, &order) {
559 return; }
561
562 if !self.check_orders_risk(&instrument, &[order]) {
563 return; }
565
566 self.execution_gateway(&instrument, TradingCommand::SubmitOrder(command));
568 }
569
570 fn handle_submit_order_list(&mut self, command: SubmitOrderList) {
571 if self.config.bypass {
572 self.send_to_execution(TradingCommand::SubmitOrderList(command));
573 return;
574 }
575
576 let instrument_exists = {
577 let cache = self.cache.borrow();
578 cache.instrument(&command.instrument_id).cloned()
579 };
580
581 let instrument = if let Some(instrument) = instrument_exists {
582 instrument
583 } else {
584 self.deny_command(
585 TradingCommand::SubmitOrderList(command.clone()),
586 &format!("no instrument found for {}", command.instrument_id),
587 );
588 return; };
590
591 let orders: Vec<OrderAny> = self
592 .cache
593 .borrow()
594 .orders_for_ids(&command.order_list.client_order_ids, &command);
595
596 if orders.len() != command.order_list.client_order_ids.len() {
597 self.deny_order_list(
598 &orders,
599 &format!("Incomplete order list: missing orders in cache for {command}"),
600 );
601 return; }
603
604 for order in &orders {
605 if !self.check_order(&instrument, order) {
606 return; }
608 }
609
610 if !self.check_orders_risk(&instrument, &orders) {
611 self.deny_order_list(
612 &orders,
613 &format!("OrderList {} DENIED", command.order_list.id),
614 );
615 return; }
617
618 self.execution_gateway(&instrument, TradingCommand::SubmitOrderList(command));
619 }
620
621 fn handle_modify_order(&mut self, command: ModifyOrder) {
622 let order_exists = {
623 let cache = self.cache.borrow();
624 cache.order(&command.client_order_id).cloned()
625 };
626
627 let order = if let Some(order) = order_exists {
628 order
629 } else {
630 log::error!(
631 "ModifyOrder DENIED: Order with command.client_order_id: {} not found",
632 command.client_order_id
633 );
634 return;
635 };
636
637 if order.is_closed() {
638 self.reject_modify_order(
639 &order,
640 &format!(
641 "Order with command.client_order_id: {} already closed",
642 command.client_order_id
643 ),
644 );
645 return;
646 } else if order.status() == OrderStatus::PendingCancel {
647 self.reject_modify_order(
648 &order,
649 &format!(
650 "Order with command.client_order_id: {} is already pending cancel",
651 command.client_order_id
652 ),
653 );
654 return;
655 }
656
657 let maybe_instrument = {
658 let cache = self.cache.borrow();
659 cache.instrument(&command.instrument_id).cloned()
660 };
661
662 let instrument = if let Some(instrument) = maybe_instrument {
663 instrument
664 } else {
665 self.reject_modify_order(
666 &order,
667 &format!("no instrument found for {:?}", command.instrument_id),
668 );
669 return; };
671
672 let mut risk_msg = self.check_price(&instrument, command.price);
674 if let Some(risk_msg) = risk_msg {
675 self.reject_modify_order(&order, &risk_msg);
676 return; }
678
679 risk_msg = self.check_price(&instrument, command.trigger_price);
681 if let Some(risk_msg) = risk_msg {
682 self.reject_modify_order(&order, &risk_msg);
683 return; }
685
686 risk_msg = self.check_quantity(&instrument, command.quantity, order.is_quote_quantity());
688 if let Some(risk_msg) = risk_msg {
689 self.reject_modify_order(&order, &risk_msg);
690 return; }
692
693 match self.trading_state {
695 TradingState::Halted => {
696 self.reject_modify_order(&order, "TradingState is HALTED: Cannot modify order");
697 return;
698 }
699 TradingState::Reducing => {
700 if let Some(quantity) = command.quantity
701 && quantity > order.quantity()
702 && ((order.is_buy() && self.portfolio.is_net_long(&instrument.id()))
703 || (order.is_sell() && self.portfolio.is_net_short(&instrument.id())))
704 {
705 self.reject_modify_order(
706 &order,
707 &format!(
708 "TradingState is REDUCING and update will increase exposure {}",
709 instrument.id()
710 ),
711 );
712 return;
713 }
714 }
715 _ => {}
716 }
717
718 self.throttled_modify_order.send(command);
719 }
720
721 fn check_order(&self, instrument: &InstrumentAny, order: &OrderAny) -> bool {
722 if !self.check_order_price(instrument, order)
723 || !self.check_order_quantity(instrument, order)
724 {
725 return false; }
727
728 if order.time_in_force() == TimeInForce::Gtd {
729 let expire_time = order.expire_time().expect("GTD has expire time");
730 if expire_time <= self.clock.borrow().timestamp_ns() {
731 self.deny_order(
732 order,
733 &format!("GTD {} already past", expire_time.to_rfc3339()),
734 );
735 return false; }
737 }
738
739 true
740 }
741
742 fn check_order_price(&self, instrument: &InstrumentAny, order: &OrderAny) -> bool {
743 if order.price().is_some() {
744 let risk_msg = self.check_price(instrument, order.price());
745 if let Some(risk_msg) = risk_msg {
746 self.deny_order(order, &risk_msg);
747 return false; }
749 }
750
751 if order.trigger_price().is_some() {
752 let risk_msg = self.check_price(instrument, order.trigger_price());
753 if let Some(risk_msg) = risk_msg {
754 self.deny_order(order, &format!("trigger {risk_msg}"));
755 return false; }
757 }
758
759 true
760 }
761
762 fn check_order_quantity(&self, instrument: &InstrumentAny, order: &OrderAny) -> bool {
763 let risk_msg = self.check_quantity(
764 instrument,
765 Some(order.quantity()),
766 order.is_quote_quantity(),
767 );
768
769 if let Some(risk_msg) = risk_msg {
770 self.deny_order(order, &risk_msg);
771 return false; }
773
774 true
775 }
776
777 fn check_orders_risk(&self, instrument: &InstrumentAny, orders: &[OrderAny]) -> bool {
778 let mut orders_by_account: AHashMap<Option<AccountId>, Vec<&OrderAny>> = AHashMap::new();
779 for order in orders {
780 orders_by_account
781 .entry(order.account_id())
782 .or_default()
783 .push(order);
784 }
785
786 for (account_id, account_orders) in &orders_by_account {
787 if !self.check_orders_risk_for_account(instrument, account_orders, *account_id) {
788 return false;
789 }
790 }
791
792 true
793 }
794
795 fn check_orders_risk_for_account(
796 &self,
797 instrument: &InstrumentAny,
798 orders: &[&OrderAny],
799 account_id: Option<AccountId>,
800 ) -> bool {
801 let mut last_px: Option<Price> = None;
802 let mut max_notional: Option<Money> = None;
803
804 let max_notional_setting = self.max_notional_per_order.get(&instrument.id());
806 if let Some(max_notional_setting_val) = max_notional_setting.copied() {
807 max_notional = Some(Money::new(
808 max_notional_setting_val
809 .to_f64()
810 .expect("Invalid decimal conversion"),
811 instrument.quote_currency(),
812 ));
813 }
814
815 let resolved_account = {
817 let cache = self.cache.borrow();
818
819 if let Some(account_id) = account_id {
820 cache.account(&account_id).cloned()
821 } else {
822 cache.account_for_venue(&instrument.id().venue).cloned()
823 }
824 };
825
826 let mut account = if let Some(account) = resolved_account {
827 account
828 } else {
829 log::debug!(
830 "Cannot find account for venue {} (account_id={account_id:?})",
831 instrument.id().venue
832 );
833 return true;
834 };
835
836 let is_margin = matches!(account, AccountAny::Margin(_));
837 let is_betting = matches!(account, AccountAny::Betting(_));
838 let free = match &account {
839 AccountAny::Margin(margin) => margin.balance_free(Some(instrument.quote_currency())),
840 AccountAny::Cash(cash) => cash.balance_free(Some(instrument.quote_currency())),
841 AccountAny::Betting(betting) => betting.balance_free(Some(instrument.quote_currency())),
842 };
843 let allow_borrowing = match &account {
844 AccountAny::Margin(_) => false,
845 AccountAny::Cash(cash) => cash.allow_borrowing,
846 AccountAny::Betting(_) => false,
847 };
848
849 if self.config.debug {
850 log::debug!("Free balance: {free:?}");
851 }
852
853 let (net_long_qty_raw, pending_sell_qty_raw) = {
856 let cache = self.cache.borrow();
857 let long_qty: QuantityRaw = cache
858 .positions_open(
859 None,
860 Some(&instrument.id()),
861 None,
862 None,
863 Some(PositionSide::Long),
864 )
865 .iter()
866 .map(|pos| pos.quantity.raw)
867 .sum();
868 let pending_sells: QuantityRaw = cache
869 .orders_open(
870 None,
871 Some(&instrument.id()),
872 None,
873 None,
874 Some(OrderSide::Sell),
875 )
876 .iter()
877 .map(|ord| ord.leaves_qty().raw)
878 .sum();
879 (long_qty, pending_sells)
880 };
881
882 let available_long_qty_raw = net_long_qty_raw.saturating_sub(pending_sell_qty_raw);
884
885 if self.config.debug && net_long_qty_raw > 0 {
886 log::debug!(
887 "Net LONG qty (raw): {net_long_qty_raw}, pending sells: {pending_sell_qty_raw}, available: {available_long_qty_raw}"
888 );
889 }
890
891 let available_short_qty_raw = if is_margin || is_betting {
893 let cache = self.cache.borrow();
894 let short_qty: QuantityRaw = cache
895 .positions_open(
896 None,
897 Some(&instrument.id()),
898 None,
899 None,
900 Some(PositionSide::Short),
901 )
902 .iter()
903 .map(|pos| pos.quantity.raw)
904 .sum();
905 let pending_buys: QuantityRaw = cache
906 .orders_open(
907 None,
908 Some(&instrument.id()),
909 None,
910 None,
911 Some(OrderSide::Buy),
912 )
913 .iter()
914 .map(|ord| ord.leaves_qty().raw)
915 .sum();
916
917 if self.config.debug && short_qty > 0 {
918 log::debug!(
919 "Net SHORT qty (raw): {short_qty}, pending buys: {pending_buys}, available: {}",
920 short_qty.saturating_sub(pending_buys)
921 );
922 }
923
924 short_qty.saturating_sub(pending_buys)
925 } else {
926 0
927 };
928
929 let mut cum_sell_qty_raw: QuantityRaw = 0;
931 let mut cum_buy_qty_raw: QuantityRaw = 0;
932
933 let mut cum_notional_buy: Option<Money> = None;
934 let mut cum_notional_sell: Option<Money> = None;
935 let mut cum_margin_required: Option<Money> = None;
936 let mut base_currency: Option<Currency> = None;
937
938 for order in orders {
939 last_px = match order {
941 OrderAny::Market(_) | OrderAny::MarketToLimit(_) => {
942 if last_px.is_none() {
943 let cache = self.cache.borrow();
944 if let Some(last_quote) = cache.quote(&instrument.id()) {
945 match order.order_side() {
946 OrderSide::Buy => Some(last_quote.ask_price),
947 OrderSide::Sell => Some(last_quote.bid_price),
948 _ => panic!("Invalid order side"),
949 }
950 } else {
951 let cache = self.cache.borrow();
952 let last_trade = cache.trade(&instrument.id());
953
954 if let Some(last_trade) = last_trade {
955 Some(last_trade.price)
956 } else {
957 log::warn!(
958 "Cannot check MARKET order risk: no prices for {}",
959 instrument.id()
960 );
961 continue;
962 }
963 }
964 } else {
965 last_px
966 }
967 }
968 OrderAny::StopMarket(_) | OrderAny::MarketIfTouched(_) => order.trigger_price(),
969 OrderAny::TrailingStopMarket(_) | OrderAny::TrailingStopLimit(_) => {
970 if let Some(trigger_price) = order.trigger_price() {
971 Some(trigger_price)
972 } else {
973 let offset_type = order.trailing_offset_type().unwrap();
975
976 if !matches!(
977 offset_type,
978 TrailingOffsetType::Price
979 | TrailingOffsetType::BasisPoints
980 | TrailingOffsetType::Ticks
981 ) {
982 self.deny_order(
983 order,
984 &format!("UNSUPPORTED_TRAILING_OFFSET_TYPE: {offset_type:?}"),
985 );
986 return false;
987 }
988
989 let trigger_type = order.trigger_type().unwrap();
990
991 let calc_result: Result<Option<Price>, String> = {
994 let cache = self.cache.borrow();
995
996 if trigger_type == TriggerType::BidAsk {
997 if let Some(quote) = cache.quote(&instrument.id()) {
998 trailing_stop_calculate_with_bid_ask(
999 instrument.price_increment(),
1000 order.trailing_offset_type().unwrap(),
1001 order.order_side_specified(),
1002 order.trailing_offset().unwrap(),
1003 quote.bid_price,
1004 quote.ask_price,
1005 )
1006 .map(Some)
1007 .map_err(|e| e.to_string())
1008 } else {
1009 log::warn!(
1010 "Cannot check {} order risk: no trigger price set and no bid/ask quotes available for {}",
1011 order.order_type(),
1012 instrument.id()
1013 );
1014 Ok(None)
1015 }
1016 } else if let Some(last_trade) = cache.trade(&instrument.id()) {
1017 trailing_stop_calculate_with_last(
1018 instrument.price_increment(),
1019 order.trailing_offset_type().unwrap(),
1020 order.order_side_specified(),
1021 order.trailing_offset().unwrap(),
1022 last_trade.price,
1023 )
1024 .map(Some)
1025 .map_err(|e| e.to_string())
1026 } else if trigger_type == TriggerType::LastOrBidAsk {
1027 if let Some(quote) = cache.quote(&instrument.id()) {
1028 trailing_stop_calculate_with_bid_ask(
1029 instrument.price_increment(),
1030 order.trailing_offset_type().unwrap(),
1031 order.order_side_specified(),
1032 order.trailing_offset().unwrap(),
1033 quote.bid_price,
1034 quote.ask_price,
1035 )
1036 .map(Some)
1037 .map_err(|e| e.to_string())
1038 } else {
1039 log::warn!(
1040 "Cannot check {} order risk: no trigger price set and no market data available for {}",
1041 order.order_type(),
1042 instrument.id()
1043 );
1044 Ok(None)
1045 }
1046 } else {
1047 log::warn!(
1048 "Cannot check {} order risk: no trigger price set and no market data available for {}",
1049 order.order_type(),
1050 instrument.id()
1051 );
1052 Ok(None)
1053 }
1054 };
1055 match calc_result {
1058 Ok(Some(trigger)) => Some(trigger),
1059 Ok(None) => {
1060 continue;
1061 }
1062 Err(e) => {
1063 self.deny_order(order, &format!("TRAILING_STOP_CALC_FAILED: {e}"));
1064 return false;
1065 }
1066 }
1067 }
1068 }
1069 _ => order.price(),
1070 };
1071
1072 let last_px = if let Some(px) = last_px {
1073 px
1074 } else {
1075 log::error!("Cannot check order risk: no price available");
1076 continue;
1077 };
1078
1079 let effective_price = if order.is_quote_quantity()
1081 && !instrument.is_inverse()
1082 && matches!(order, OrderAny::Limit(_) | OrderAny::StopLimit(_))
1083 {
1084 let cache = self.cache.borrow();
1086 if let Some(quote_tick) = cache.quote(&instrument.id()) {
1087 match order.order_side() {
1088 OrderSide::Buy => last_px.min(quote_tick.ask_price),
1090 OrderSide::Sell => last_px.max(quote_tick.bid_price),
1092 _ => last_px,
1093 }
1094 } else {
1095 last_px }
1097 } else {
1098 last_px
1099 };
1100
1101 let effective_quantity = if order.is_quote_quantity() && !instrument.is_inverse() {
1102 instrument.calculate_base_quantity(order.quantity(), effective_price)
1103 } else {
1104 order.quantity()
1105 };
1106
1107 if !order.is_quote_quantity() {
1113 if let Some(max_quantity) = instrument.max_quantity()
1114 && effective_quantity > max_quantity
1115 {
1116 self.deny_order(
1117 order,
1118 &format!(
1119 "QUANTITY_EXCEEDS_MAXIMUM: effective_quantity={effective_quantity}, max_quantity={max_quantity}"
1120 ),
1121 );
1122 return false; }
1124
1125 if let Some(min_quantity) = instrument.min_quantity()
1126 && effective_quantity < min_quantity
1127 {
1128 self.deny_order(
1129 order,
1130 &format!(
1131 "QUANTITY_BELOW_MINIMUM: effective_quantity={effective_quantity}, min_quantity={min_quantity}"
1132 ),
1133 );
1134 return false; }
1136 }
1137
1138 let notional =
1139 instrument.calculate_notional_value(effective_quantity, last_px, Some(true));
1140
1141 if self.config.debug {
1142 log::debug!("Notional: {notional:?}");
1143 }
1144
1145 if let Some(max_notional_value) = max_notional
1147 && notional > max_notional_value
1148 {
1149 self.deny_order(
1150 order,
1151 &format!(
1152 "NOTIONAL_EXCEEDS_MAX_PER_ORDER: max_notional={max_notional_value:?}, notional={notional:?}"
1153 ),
1154 );
1155 return false; }
1157
1158 if let Some(min_notional) = instrument.min_notional()
1160 && notional.currency == min_notional.currency
1161 && notional < min_notional
1162 {
1163 self.deny_order(
1164 order,
1165 &format!(
1166 "NOTIONAL_LESS_THAN_MIN_FOR_INSTRUMENT: min_notional={min_notional:?}, notional={notional:?}"
1167 ),
1168 );
1169 return false; }
1171
1172 if let Some(max_notional) = instrument.max_notional()
1174 && notional.currency == max_notional.currency
1175 && notional > max_notional
1176 {
1177 self.deny_order(
1178 order,
1179 &format!(
1180 "NOTIONAL_GREATER_THAN_MAX_FOR_INSTRUMENT: max_notional={max_notional:?}, notional={notional:?}"
1181 ),
1182 );
1183 return false; }
1185
1186 if is_margin {
1187 let margin_req = match &mut account {
1189 AccountAny::Margin(margin) => margin
1190 .calculate_initial_margin(instrument, effective_quantity, last_px, None)
1191 .unwrap_or_else(|e| {
1192 log::error!("Failed to calculate initial margin: {e}");
1193 Money::new(0.0, instrument.quote_currency())
1194 }),
1195 _ => unreachable!(),
1196 };
1197
1198 if self.config.debug {
1199 log::debug!("Initial margin required: {margin_req}");
1200 }
1201
1202 let is_reducing = order.is_reduce_only()
1204 || (order.is_sell()
1205 && (cum_sell_qty_raw + effective_quantity.raw) <= available_long_qty_raw)
1206 || (order.is_buy()
1207 && (cum_buy_qty_raw + effective_quantity.raw) <= available_short_qty_raw);
1208
1209 if order.is_sell() {
1210 cum_sell_qty_raw += effective_quantity.raw;
1211 } else if order.is_buy() {
1212 cum_buy_qty_raw += effective_quantity.raw;
1213 }
1214
1215 if is_reducing {
1216 if self.config.debug {
1217 log::debug!("Position-reducing order skips margin check");
1218 }
1219 continue;
1220 }
1221
1222 let margin_free = match &account {
1225 AccountAny::Margin(margin) => margin.balance_free(Some(margin_req.currency)),
1226 _ => unreachable!(),
1227 };
1228
1229 let margin_free_val = match margin_free {
1230 Some(val) => val,
1231 None => {
1232 if self.config.debug {
1233 log::debug!(
1234 "No balance for margin currency {}, skipping margin check",
1235 margin_req.currency
1236 );
1237 }
1238 continue;
1239 }
1240 };
1241
1242 if margin_req > margin_free_val {
1244 self.deny_order(
1245 order,
1246 &format!(
1247 "MARGIN_EXCEEDS_FREE_BALANCE: free={margin_free_val}, margin_required={margin_req}"
1248 ),
1249 );
1250 return false;
1251 }
1252
1253 match cum_margin_required.as_mut() {
1255 Some(cum) => cum.raw += margin_req.raw,
1256 None => cum_margin_required = Some(margin_req),
1257 }
1258
1259 if self.config.debug {
1260 log::debug!("Cumulative margin required: {cum_margin_required:?}");
1261 }
1262
1263 if let Some(cum_margin) = cum_margin_required
1264 && cum_margin > margin_free_val
1265 {
1266 self.deny_order(
1267 order,
1268 &format!(
1269 "CUM_MARGIN_EXCEEDS_FREE_BALANCE: free={margin_free_val}, cum_margin={cum_margin}"
1270 ),
1271 );
1272 return false;
1273 }
1274 } else {
1275 let notional =
1277 instrument.calculate_notional_value(effective_quantity, last_px, None);
1278 let order_balance_impact = if is_betting {
1279 match &mut account {
1280 AccountAny::Betting(betting) => Money::from_raw(
1281 -betting
1282 .calculate_balance_locked(
1283 instrument,
1284 order.order_side(),
1285 effective_quantity,
1286 last_px,
1287 None,
1288 )
1289 .unwrap_or_else(|e| {
1290 log::error!("Failed to calculate betting balance locked: {e}");
1291 Money::new(0.0, instrument.quote_currency())
1292 })
1293 .raw,
1294 instrument.quote_currency(),
1295 ),
1296 _ => unreachable!(),
1297 }
1298 } else {
1299 match order.order_side() {
1300 OrderSide::Buy => Money::from_raw(-notional.raw, notional.currency),
1301 OrderSide::Sell => Money::from_raw(notional.raw, notional.currency),
1302 OrderSide::NoOrderSide => {
1303 panic!("invalid `OrderSide`, was {}", order.order_side());
1304 }
1305 }
1306 };
1307
1308 if self.config.debug {
1309 log::debug!("Balance impact: {order_balance_impact}");
1310 }
1311
1312 let is_position_reducing = if order.is_buy() {
1314 let reducing = order.is_reduce_only()
1315 || (cum_buy_qty_raw + effective_quantity.raw) <= available_short_qty_raw;
1316 cum_buy_qty_raw += effective_quantity.raw;
1317 reducing
1318 } else if order.is_sell() {
1319 let reducing = order.is_reduce_only()
1320 || (cum_sell_qty_raw + effective_quantity.raw) <= available_long_qty_raw;
1321 cum_sell_qty_raw += effective_quantity.raw;
1322 reducing
1323 } else {
1324 false
1325 };
1326
1327 if is_position_reducing {
1328 if self.config.debug {
1329 log::debug!("Position-reducing order skips balance check");
1330 }
1331 continue;
1332 }
1333
1334 if !allow_borrowing
1336 && let Some(free_val) = free
1337 && (free_val.as_decimal() + order_balance_impact.as_decimal()) < Decimal::ZERO
1338 {
1339 self.deny_order(
1340 order,
1341 &format!(
1342 "NOTIONAL_EXCEEDS_FREE_BALANCE: free={free_val:?}, notional={notional:?}"
1343 ),
1344 );
1345 return false;
1346 }
1347
1348 if base_currency.is_none() {
1349 base_currency = instrument.base_currency();
1350 }
1351
1352 if order.is_buy() {
1353 match cum_notional_buy.as_mut() {
1354 Some(cum_notional_buy_val) => {
1355 cum_notional_buy_val.raw += -order_balance_impact.raw;
1356 }
1357 None => {
1358 cum_notional_buy = Some(Money::from_raw(
1359 -order_balance_impact.raw,
1360 order_balance_impact.currency,
1361 ));
1362 }
1363 }
1364
1365 if self.config.debug {
1366 log::debug!("Cumulative notional BUY: {cum_notional_buy:?}");
1367 }
1368
1369 if !allow_borrowing
1370 && let (Some(free), Some(cum_notional_buy)) = (free, cum_notional_buy)
1371 && cum_notional_buy > free
1372 {
1373 self.deny_order(order, &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_buy}"));
1374 return false; }
1376 } else if order.is_sell() {
1377 if is_betting {
1378 match cum_notional_sell.as_mut() {
1379 Some(cum_notional_sell_val) => {
1380 cum_notional_sell_val.raw += -order_balance_impact.raw;
1381 }
1382 None => {
1383 cum_notional_sell = Some(Money::from_raw(
1384 -order_balance_impact.raw,
1385 order_balance_impact.currency,
1386 ));
1387 }
1388 }
1389
1390 if self.config.debug {
1391 log::debug!("Cumulative betting SELL liability: {cum_notional_sell:?}");
1392 }
1393
1394 if !allow_borrowing
1395 && let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1396 && cum_notional_sell > free
1397 {
1398 self.deny_order(order, &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1399 return false;
1400 }
1401
1402 continue;
1403 }
1404
1405 let has_base_currency = match &account {
1406 AccountAny::Margin(_) => false,
1407 AccountAny::Cash(cash) => cash.base_currency.is_some(),
1408 AccountAny::Betting(betting) => betting.base_currency.is_some(),
1409 };
1410
1411 if has_base_currency {
1412 match cum_notional_sell.as_mut() {
1413 Some(cum_notional_sell_val) => {
1414 cum_notional_sell_val.raw += order_balance_impact.raw;
1415 }
1416 None => {
1417 cum_notional_sell = Some(Money::from_raw(
1418 order_balance_impact.raw,
1419 order_balance_impact.currency,
1420 ));
1421 }
1422 }
1423
1424 if self.config.debug {
1425 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1426 }
1427
1428 if !allow_borrowing
1429 && let (Some(free), Some(cum_notional_sell)) = (free, cum_notional_sell)
1430 && cum_notional_sell > free
1431 {
1432 self.deny_order(order, &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={free}, cum_notional={cum_notional_sell}"));
1433 return false; }
1435 } else if let Some(base_currency) = base_currency {
1436 let cash_value = Money::from_raw(
1437 effective_quantity
1438 .raw
1439 .try_into()
1440 .map_err(|e| {
1441 log::error!("Unable to convert Quantity to f64: {e}");
1442 })
1443 .unwrap(),
1444 base_currency,
1445 );
1446
1447 let base_free = match &account {
1449 AccountAny::Margin(_) => None,
1450 AccountAny::Cash(cash) => cash.balance_free(Some(base_currency)),
1451 AccountAny::Betting(betting) => {
1452 betting.balance_free(Some(base_currency))
1453 }
1454 };
1455
1456 if self.config.debug
1457 && let AccountAny::Cash(cash) = &account
1458 {
1459 log::debug!("Cash value: {cash_value:?}");
1460 log::debug!("Total: {:?}", cash.balance_total(Some(base_currency)));
1461 log::debug!("Locked: {:?}", cash.balance_locked(Some(base_currency)));
1462 log::debug!("Free: {base_free:?}");
1463 }
1464
1465 if self.config.debug
1466 && let AccountAny::Betting(betting) = &account
1467 {
1468 log::debug!("Cash value: {cash_value:?}");
1469 log::debug!("Total: {:?}", betting.balance_total(Some(base_currency)));
1470 log::debug!(
1471 "Locked: {:?}",
1472 betting.balance_locked(Some(base_currency))
1473 );
1474 log::debug!("Free: {base_free:?}");
1475 }
1476
1477 match cum_notional_sell {
1478 Some(mut value) => {
1479 value.raw += cash_value.raw;
1480 cum_notional_sell = Some(value);
1481 }
1482 None => cum_notional_sell = Some(cash_value),
1483 }
1484
1485 if self.config.debug {
1486 log::debug!("Cumulative notional SELL: {cum_notional_sell:?}");
1487 }
1488
1489 if !allow_borrowing
1490 && let (Some(base_free), Some(cum_notional_sell)) =
1491 (base_free, cum_notional_sell)
1492 && cum_notional_sell.raw > base_free.raw
1493 {
1494 self.deny_order(order, &format!("CUM_NOTIONAL_EXCEEDS_FREE_BALANCE: free={base_free}, cum_notional={cum_notional_sell}"));
1495 return false; }
1497 }
1498 }
1499 }
1500 }
1501
1502 true }
1505
1506 fn check_price(&self, instrument: &InstrumentAny, price: Option<Price>) -> Option<String> {
1507 let price_val = price?;
1508
1509 if price_val.precision > instrument.price_precision() {
1510 return Some(format!(
1511 "price {} invalid (precision {} > {})",
1512 price_val,
1513 price_val.precision,
1514 instrument.price_precision()
1515 ));
1516 }
1517
1518 if !instrument.instrument_class().allows_negative_price() && price_val.raw <= 0 {
1519 return Some(format!("price {price_val} invalid (<= 0)"));
1520 }
1521
1522 None
1523 }
1524
1525 fn check_quantity(
1526 &self,
1527 instrument: &InstrumentAny,
1528 quantity: Option<Quantity>,
1529 is_quote_quantity: bool,
1530 ) -> Option<String> {
1531 let quantity_val = quantity?;
1532
1533 if quantity_val.precision > instrument.size_precision() {
1535 return Some(format!(
1536 "quantity {} invalid (precision {} > {})",
1537 quantity_val,
1538 quantity_val.precision,
1539 instrument.size_precision()
1540 ));
1541 }
1542
1543 if is_quote_quantity {
1545 return None;
1546 }
1547
1548 if let Some(max_quantity) = instrument.max_quantity()
1550 && quantity_val > max_quantity
1551 {
1552 return Some(format!(
1553 "quantity {quantity_val} invalid (> maximum trade size of {max_quantity})"
1554 ));
1555 }
1556
1557 if let Some(min_quantity) = instrument.min_quantity()
1559 && quantity_val < min_quantity
1560 {
1561 return Some(format!(
1562 "quantity {quantity_val} invalid (< minimum trade size of {min_quantity})"
1563 ));
1564 }
1565
1566 None
1567 }
1568
1569 fn deny_command(&self, command: TradingCommand, reason: &str) {
1570 match command {
1571 TradingCommand::SubmitOrder(command) => {
1572 let order = {
1573 let cache = self.cache.borrow();
1574 cache.order(&command.client_order_id).cloned()
1575 };
1576
1577 if let Some(ref order) = order {
1578 self.deny_order(order, reason);
1579 } else {
1580 log::error!(
1581 "Cannot deny order: not found in cache for {}",
1582 command.client_order_id
1583 );
1584 }
1585 }
1586 TradingCommand::SubmitOrderList(command) => {
1587 let orders: Vec<OrderAny> = self
1588 .cache
1589 .borrow()
1590 .orders_for_ids(&command.order_list.client_order_ids, &command);
1591 self.deny_order_list(&orders, reason);
1592 }
1593 _ => {
1594 panic!("Cannot deny command {command}");
1595 }
1596 }
1597 }
1598
1599 fn deny_order(&self, order: &OrderAny, reason: &str) {
1600 log::warn!(
1601 "SubmitOrder for {} DENIED: {}",
1602 order.client_order_id(),
1603 reason
1604 );
1605
1606 if order.status() != OrderStatus::Initialized {
1607 return;
1608 }
1609
1610 {
1612 let mut cache = self.cache.borrow_mut();
1613 if !cache.order_exists(&order.client_order_id()) {
1614 cache
1615 .add_order(order.clone(), None, None, false)
1616 .map_err(|e| {
1617 log::error!("Cannot add order to cache: {e}");
1618 })
1619 .unwrap();
1620 }
1621 }
1622
1623 let denied = OrderEventAny::Denied(OrderDenied::new(
1624 order.trader_id(),
1625 order.strategy_id(),
1626 order.instrument_id(),
1627 order.client_order_id(),
1628 reason.into(),
1629 UUID4::new(),
1630 self.clock.borrow().timestamp_ns(),
1631 self.clock.borrow().timestamp_ns(),
1632 ));
1633
1634 let endpoint = MessagingSwitchboard::exec_engine_process();
1635 msgbus::send_order_event(endpoint, denied);
1636 }
1637
1638 fn deny_order_list(&self, orders: &[OrderAny], reason: &str) {
1639 for order in orders {
1640 if !order.is_closed() {
1641 self.deny_order(order, reason);
1642 }
1643 }
1644 }
1645
1646 fn reject_modify_order(&self, order: &OrderAny, reason: &str) {
1647 let ts_event = self.clock.borrow().timestamp_ns();
1648 let denied = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
1649 order.trader_id(),
1650 order.strategy_id(),
1651 order.instrument_id(),
1652 order.client_order_id(),
1653 reason.into(),
1654 UUID4::new(),
1655 ts_event,
1656 ts_event,
1657 false,
1658 order.venue_order_id(),
1659 order.account_id(),
1660 ));
1661
1662 let endpoint = MessagingSwitchboard::exec_engine_process();
1663 msgbus::send_order_event(endpoint, denied);
1664 }
1665
1666 fn execution_gateway(&mut self, instrument: &InstrumentAny, command: TradingCommand) {
1667 match self.trading_state {
1668 TradingState::Halted => match command {
1669 TradingCommand::SubmitOrder(submit_order) => {
1670 let order = {
1671 let cache = self.cache.borrow();
1672 cache.order(&submit_order.client_order_id).cloned()
1673 };
1674
1675 if let Some(ref order) = order {
1676 self.deny_order(order, "TradingState::HALTED");
1677 }
1678 }
1679 TradingCommand::SubmitOrderList(submit_order_list) => {
1680 let orders: Vec<OrderAny> = self.cache.borrow().orders_for_ids(
1681 &submit_order_list.order_list.client_order_ids,
1682 &submit_order_list,
1683 );
1684 self.deny_order_list(&orders, "TradingState::HALTED");
1685 }
1686 _ => {}
1687 },
1688 TradingState::Reducing => {
1689 match &command {
1690 TradingCommand::SubmitOrder(submit_order) => {
1691 let order = {
1692 let cache = self.cache.borrow();
1693 cache.order(&submit_order.client_order_id).cloned()
1694 };
1695
1696 if let Some(ref order) = order {
1697 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1698 self.deny_order(
1699 order,
1700 &format!(
1701 "BUY when TradingState::REDUCING and LONG {}",
1702 instrument.id()
1703 ),
1704 );
1705 return;
1706 } else if order.is_sell()
1707 && self.portfolio.is_net_short(&instrument.id())
1708 {
1709 self.deny_order(
1710 order,
1711 &format!(
1712 "SELL when TradingState::REDUCING and SHORT {}",
1713 instrument.id()
1714 ),
1715 );
1716 return;
1717 }
1718 }
1719 }
1720 TradingCommand::SubmitOrderList(submit_order_list) => {
1721 let orders: Vec<OrderAny> = self.cache.borrow().orders_for_ids(
1722 &submit_order_list.order_list.client_order_ids,
1723 &submit_order_list,
1724 );
1725
1726 for order in &orders {
1727 if order.is_buy() && self.portfolio.is_net_long(&instrument.id()) {
1728 self.deny_order_list(
1729 &orders,
1730 &format!(
1731 "BUY when TradingState::REDUCING and LONG {}",
1732 instrument.id()
1733 ),
1734 );
1735 return;
1736 } else if order.is_sell()
1737 && self.portfolio.is_net_short(&instrument.id())
1738 {
1739 self.deny_order_list(
1740 &orders,
1741 &format!(
1742 "SELL when TradingState::REDUCING and SHORT {}",
1743 instrument.id()
1744 ),
1745 );
1746 return;
1747 }
1748 }
1749 }
1750 _ => {}
1751 }
1752 self.throttled_submit.send(command);
1754 }
1755 TradingState::Active => match command {
1756 TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
1757 self.throttled_submit.send(command);
1758 }
1759 _ => {}
1760 },
1761 }
1762 }
1763
1764 fn send_to_execution(&self, command: TradingCommand) {
1765 let endpoint = MessagingSwitchboard::exec_engine_queue_execute();
1766 msgbus::send_trading_command(endpoint, command);
1767 }
1768
1769 fn handle_event(&self, event: &OrderEventAny) {
1770 if self.config.debug {
1773 log::debug!("{RECV}{EVT} {event:?}");
1774 }
1775 }
1776}