1use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use async_trait::async_trait;
21use nautilus_common::{
22 cache::Cache,
23 clients::ExecutionClient,
24 clock::Clock,
25 factories::OrderEventFactory,
26 messages::execution::{
27 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
28 SubmitOrder, SubmitOrderList, TradingCommand,
29 },
30 msgbus::{self, MessagingSwitchboard},
31};
32use nautilus_core::{SharedCell, UnixNanos, WeakCell};
33use nautilus_execution::client::core::ExecutionClientCore;
34use nautilus_model::{
35 accounts::AccountAny,
36 enums::OmsType,
37 events::OrderEventAny,
38 identifiers::{AccountId, ClientId, ClientOrderId, TraderId, Venue},
39 orders::OrderAny,
40 types::{AccountBalance, MarginBalance},
41};
42
43use crate::exchange::SimulatedExchange;
44
45#[derive(Clone)]
52pub struct BacktestExecutionClient {
53 core: ExecutionClientCore,
54 factory: OrderEventFactory,
55 cache: Rc<RefCell<Cache>>,
56 clock: Rc<RefCell<dyn Clock>>,
57 exchange: WeakCell<SimulatedExchange>,
58 queued_events: Rc<RefCell<Vec<OrderEventAny>>>,
65 routing: bool,
66 _frozen_account: bool,
67}
68
69impl Debug for BacktestExecutionClient {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct(stringify!(BacktestExecutionClient))
72 .field("client_id", &self.core.client_id)
73 .field("routing", &self.routing)
74 .finish()
75 }
76}
77
78impl BacktestExecutionClient {
79 #[must_use]
81 pub fn new(
82 trader_id: TraderId,
83 account_id: AccountId,
84 exchange: &Rc<RefCell<SimulatedExchange>>,
85 cache: Rc<RefCell<Cache>>,
86 clock: Rc<RefCell<dyn Clock>>,
87 routing: Option<bool>,
88 frozen_account: Option<bool>,
89 ) -> Self {
90 let routing = routing.unwrap_or(false);
91 let frozen_account = frozen_account.unwrap_or(false);
92 let exchange_shared: SharedCell<SimulatedExchange> = SharedCell::from(exchange.clone());
93 let exchange_id = exchange_shared.borrow().id;
94 let account_type = exchange.borrow().account_type;
95 let base_currency = exchange.borrow().base_currency;
96
97 let core = ExecutionClientCore::new(
98 trader_id,
99 ClientId::from(exchange_id.as_str()),
100 Venue::from(exchange_id.as_str()),
101 exchange.borrow().oms_type,
102 account_id,
103 account_type,
104 base_currency,
105 cache.clone(),
106 );
107
108 let factory = OrderEventFactory::new(trader_id, account_id, account_type, base_currency);
109
110 if !frozen_account {
111 }
113
114 Self {
115 core,
116 factory,
117 exchange: exchange_shared.downgrade(),
118 cache,
119 clock,
120 queued_events: Rc::new(RefCell::new(Vec::new())),
121 routing,
122 _frozen_account: frozen_account,
123 }
124 }
125
126 fn get_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
127 self.cache
128 .borrow()
129 .order(client_order_id)
130 .cloned()
131 .ok_or_else(|| anyhow::anyhow!("Order not found in cache for {client_order_id}"))
132 }
133
134 pub fn drain_queued_events(&self) {
136 let events: Vec<OrderEventAny> = self.queued_events.borrow_mut().drain(..).collect();
137 let endpoint = MessagingSwitchboard::exec_engine_process();
138 for event in events {
139 msgbus::send_order_event(endpoint, event);
140 }
141 }
142}
143
144#[async_trait(?Send)]
145impl ExecutionClient for BacktestExecutionClient {
146 fn is_connected(&self) -> bool {
147 self.core.is_connected()
148 }
149
150 fn client_id(&self) -> ClientId {
151 self.core.client_id
152 }
153
154 fn account_id(&self) -> AccountId {
155 self.core.account_id
156 }
157
158 fn venue(&self) -> Venue {
159 self.core.venue
160 }
161
162 fn oms_type(&self) -> OmsType {
163 self.core.oms_type
164 }
165
166 fn get_account(&self) -> Option<AccountAny> {
167 self.cache.borrow().account(&self.core.account_id).cloned()
168 }
169
170 fn generate_account_state(
171 &self,
172 balances: Vec<AccountBalance>,
173 margins: Vec<MarginBalance>,
174 reported: bool,
175 ts_event: UnixNanos,
176 ) -> anyhow::Result<()> {
177 let ts_init = self.clock.borrow().timestamp_ns();
178 let state = self
179 .factory
180 .generate_account_state(balances, margins, reported, ts_event, ts_init);
181 let endpoint = MessagingSwitchboard::portfolio_update_account();
182 msgbus::send_account_state(endpoint, &state);
183 Ok(())
184 }
185
186 fn start(&mut self) -> anyhow::Result<()> {
187 self.core.set_connected();
188 log::info!("Backtest execution client started");
189 Ok(())
190 }
191
192 fn stop(&mut self) -> anyhow::Result<()> {
193 self.core.set_disconnected();
194 log::info!("Backtest execution client stopped");
195 Ok(())
196 }
197
198 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
199 let order = self.get_order(&cmd.client_order_id)?;
202 let ts_init = self.clock.borrow().timestamp_ns();
203 let event = self.factory.generate_order_submitted(&order, ts_init);
204 self.queued_events.borrow_mut().push(event);
205
206 if let Some(exchange) = self.exchange.upgrade() {
207 exchange.borrow_mut().send(TradingCommand::SubmitOrder(cmd));
208 } else {
209 log::error!("submit_order: SimulatedExchange has been dropped");
210 }
211 Ok(())
212 }
213
214 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
215 let ts_init = self.clock.borrow().timestamp_ns();
216
217 let orders: Vec<OrderAny> = self
218 .cache
219 .borrow()
220 .orders_for_ids(&cmd.order_list.client_order_ids, &cmd);
221
222 let mut queued = self.queued_events.borrow_mut();
224
225 for order in &orders {
226 let event = self.factory.generate_order_submitted(order, ts_init);
227 queued.push(event);
228 }
229 drop(queued);
230
231 if let Some(exchange) = self.exchange.upgrade() {
232 exchange
233 .borrow_mut()
234 .send(TradingCommand::SubmitOrderList(cmd));
235 } else {
236 log::error!("submit_order_list: SimulatedExchange has been dropped");
237 }
238 Ok(())
239 }
240
241 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
242 if let Some(exchange) = self.exchange.upgrade() {
243 exchange.borrow_mut().send(TradingCommand::ModifyOrder(cmd));
244 } else {
245 log::error!("modify_order: SimulatedExchange has been dropped");
246 }
247 Ok(())
248 }
249
250 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
251 if let Some(exchange) = self.exchange.upgrade() {
252 exchange.borrow_mut().send(TradingCommand::CancelOrder(cmd));
253 } else {
254 log::error!("cancel_order: SimulatedExchange has been dropped");
255 }
256 Ok(())
257 }
258
259 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
260 if let Some(exchange) = self.exchange.upgrade() {
261 exchange
262 .borrow_mut()
263 .send(TradingCommand::CancelAllOrders(cmd));
264 } else {
265 log::error!("cancel_all_orders: SimulatedExchange has been dropped");
266 }
267 Ok(())
268 }
269
270 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
271 if let Some(exchange) = self.exchange.upgrade() {
272 exchange
273 .borrow_mut()
274 .send(TradingCommand::BatchCancelOrders(cmd));
275 } else {
276 log::error!("batch_cancel_orders: SimulatedExchange has been dropped");
277 }
278 Ok(())
279 }
280
281 fn query_account(&self, cmd: QueryAccount) -> anyhow::Result<()> {
282 if let Some(exchange) = self.exchange.upgrade() {
283 exchange
284 .borrow_mut()
285 .send(TradingCommand::QueryAccount(cmd));
286 } else {
287 log::error!("query_account: SimulatedExchange has been dropped");
288 }
289 Ok(())
290 }
291
292 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
293 if let Some(exchange) = self.exchange.upgrade() {
294 exchange.borrow_mut().send(TradingCommand::QueryOrder(cmd));
295 } else {
296 log::error!("query_order: SimulatedExchange has been dropped");
297 }
298 Ok(())
299 }
300}