Skip to main content

nautilus_backtest/
execution_client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a `BacktestExecutionClient` implementation for backtesting.
17
18use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use async_trait::async_trait;
21use nautilus_common::{
22    cache::Cache,
23    clients::ExecutionClient,
24    clock::Clock,
25    factories::OrderEventFactory,
26    messages::execution::{
27        BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
28        SubmitOrder, SubmitOrderList, TradingCommand,
29    },
30    msgbus::{self, MessagingSwitchboard},
31};
32use nautilus_core::{SharedCell, UnixNanos, WeakCell};
33use nautilus_execution::client::core::ExecutionClientCore;
34use nautilus_model::{
35    accounts::AccountAny,
36    enums::OmsType,
37    events::OrderEventAny,
38    identifiers::{AccountId, ClientId, ClientOrderId, TraderId, Venue},
39    orders::OrderAny,
40    types::{AccountBalance, MarginBalance},
41};
42
43use crate::exchange::SimulatedExchange;
44
45/// Execution client implementation for backtesting trading operations.
46///
47/// The `BacktestExecutionClient` provides an execution client interface for
48/// backtesting environments, handling order management and trade execution
49/// through simulated exchanges. It processes trading commands and coordinates
50/// with the simulation infrastructure to provide realistic execution behavior.
51#[derive(Clone)]
52pub struct BacktestExecutionClient {
53    core: ExecutionClientCore,
54    factory: OrderEventFactory,
55    cache: Rc<RefCell<Cache>>,
56    clock: Rc<RefCell<dyn Clock>>,
57    exchange: WeakCell<SimulatedExchange>,
58    /// Buffered order events for deferred processing.
59    ///
60    /// Events like `OrderSubmitted` cannot be sent synchronously through
61    /// the msgbus during `submit_order` because the exec engine holds a
62    /// borrow via its `execute` handler. Instead, events are buffered here
63    /// and drained by the engine after the execute borrow is released.
64    queued_events: Rc<RefCell<Vec<OrderEventAny>>>,
65    routing: bool,
66    _frozen_account: bool,
67}
68
69impl Debug for BacktestExecutionClient {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct(stringify!(BacktestExecutionClient))
72            .field("client_id", &self.core.client_id)
73            .field("routing", &self.routing)
74            .finish()
75    }
76}
77
78impl BacktestExecutionClient {
79    /// Creates a new [`BacktestExecutionClient`] instance.
80    #[must_use]
81    pub fn new(
82        trader_id: TraderId,
83        account_id: AccountId,
84        exchange: &Rc<RefCell<SimulatedExchange>>,
85        cache: Rc<RefCell<Cache>>,
86        clock: Rc<RefCell<dyn Clock>>,
87        routing: Option<bool>,
88        frozen_account: Option<bool>,
89    ) -> Self {
90        let routing = routing.unwrap_or(false);
91        let frozen_account = frozen_account.unwrap_or(false);
92        let exchange_shared: SharedCell<SimulatedExchange> = SharedCell::from(exchange.clone());
93        let exchange_id = exchange_shared.borrow().id;
94        let account_type = exchange.borrow().account_type;
95        let base_currency = exchange.borrow().base_currency;
96
97        let core = ExecutionClientCore::new(
98            trader_id,
99            ClientId::from(exchange_id.as_str()),
100            Venue::from(exchange_id.as_str()),
101            exchange.borrow().oms_type,
102            account_id,
103            account_type,
104            base_currency,
105            cache.clone(),
106        );
107
108        let factory = OrderEventFactory::new(trader_id, account_id, account_type, base_currency);
109
110        if !frozen_account {
111            // TODO Register calculated account
112        }
113
114        Self {
115            core,
116            factory,
117            exchange: exchange_shared.downgrade(),
118            cache,
119            clock,
120            queued_events: Rc::new(RefCell::new(Vec::new())),
121            routing,
122            _frozen_account: frozen_account,
123        }
124    }
125
126    fn get_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
127        self.cache
128            .borrow()
129            .order(client_order_id)
130            .cloned()
131            .ok_or_else(|| anyhow::anyhow!("Order not found in cache for {client_order_id}"))
132    }
133
134    /// Drain buffered order events, sending each to the exec engine.
135    pub fn drain_queued_events(&self) {
136        let events: Vec<OrderEventAny> = self.queued_events.borrow_mut().drain(..).collect();
137        let endpoint = MessagingSwitchboard::exec_engine_process();
138        for event in events {
139            msgbus::send_order_event(endpoint, event);
140        }
141    }
142}
143
144#[async_trait(?Send)]
145impl ExecutionClient for BacktestExecutionClient {
146    fn is_connected(&self) -> bool {
147        self.core.is_connected()
148    }
149
150    fn client_id(&self) -> ClientId {
151        self.core.client_id
152    }
153
154    fn account_id(&self) -> AccountId {
155        self.core.account_id
156    }
157
158    fn venue(&self) -> Venue {
159        self.core.venue
160    }
161
162    fn oms_type(&self) -> OmsType {
163        self.core.oms_type
164    }
165
166    fn get_account(&self) -> Option<AccountAny> {
167        self.cache.borrow().account(&self.core.account_id).cloned()
168    }
169
170    fn generate_account_state(
171        &self,
172        balances: Vec<AccountBalance>,
173        margins: Vec<MarginBalance>,
174        reported: bool,
175        ts_event: UnixNanos,
176    ) -> anyhow::Result<()> {
177        let ts_init = self.clock.borrow().timestamp_ns();
178        let state = self
179            .factory
180            .generate_account_state(balances, margins, reported, ts_event, ts_init);
181        let endpoint = MessagingSwitchboard::portfolio_update_account();
182        msgbus::send_account_state(endpoint, &state);
183        Ok(())
184    }
185
186    fn start(&mut self) -> anyhow::Result<()> {
187        self.core.set_connected();
188        log::info!("Backtest execution client started");
189        Ok(())
190    }
191
192    fn stop(&mut self) -> anyhow::Result<()> {
193        self.core.set_disconnected();
194        log::info!("Backtest execution client stopped");
195        Ok(())
196    }
197
198    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
199        // Buffer the OrderSubmitted event for deferred processing to avoid
200        // RefCell re-entrancy (exec_engine holds a borrow during execute)
201        let order = self.get_order(&cmd.client_order_id)?;
202        let ts_init = self.clock.borrow().timestamp_ns();
203        let event = self.factory.generate_order_submitted(&order, ts_init);
204        self.queued_events.borrow_mut().push(event);
205
206        if let Some(exchange) = self.exchange.upgrade() {
207            exchange.borrow_mut().send(TradingCommand::SubmitOrder(cmd));
208        } else {
209            log::error!("submit_order: SimulatedExchange has been dropped");
210        }
211        Ok(())
212    }
213
214    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
215        let ts_init = self.clock.borrow().timestamp_ns();
216
217        let orders: Vec<OrderAny> = self
218            .cache
219            .borrow()
220            .orders_for_ids(&cmd.order_list.client_order_ids, &cmd);
221
222        // Buffer events for deferred processing
223        let mut queued = self.queued_events.borrow_mut();
224
225        for order in &orders {
226            let event = self.factory.generate_order_submitted(order, ts_init);
227            queued.push(event);
228        }
229        drop(queued);
230
231        if let Some(exchange) = self.exchange.upgrade() {
232            exchange
233                .borrow_mut()
234                .send(TradingCommand::SubmitOrderList(cmd));
235        } else {
236            log::error!("submit_order_list: SimulatedExchange has been dropped");
237        }
238        Ok(())
239    }
240
241    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
242        if let Some(exchange) = self.exchange.upgrade() {
243            exchange.borrow_mut().send(TradingCommand::ModifyOrder(cmd));
244        } else {
245            log::error!("modify_order: SimulatedExchange has been dropped");
246        }
247        Ok(())
248    }
249
250    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
251        if let Some(exchange) = self.exchange.upgrade() {
252            exchange.borrow_mut().send(TradingCommand::CancelOrder(cmd));
253        } else {
254            log::error!("cancel_order: SimulatedExchange has been dropped");
255        }
256        Ok(())
257    }
258
259    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
260        if let Some(exchange) = self.exchange.upgrade() {
261            exchange
262                .borrow_mut()
263                .send(TradingCommand::CancelAllOrders(cmd));
264        } else {
265            log::error!("cancel_all_orders: SimulatedExchange has been dropped");
266        }
267        Ok(())
268    }
269
270    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
271        if let Some(exchange) = self.exchange.upgrade() {
272            exchange
273                .borrow_mut()
274                .send(TradingCommand::BatchCancelOrders(cmd));
275        } else {
276            log::error!("batch_cancel_orders: SimulatedExchange has been dropped");
277        }
278        Ok(())
279    }
280
281    fn query_account(&self, cmd: QueryAccount) -> anyhow::Result<()> {
282        if let Some(exchange) = self.exchange.upgrade() {
283            exchange
284                .borrow_mut()
285                .send(TradingCommand::QueryAccount(cmd));
286        } else {
287            log::error!("query_account: SimulatedExchange has been dropped");
288        }
289        Ok(())
290    }
291
292    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
293        if let Some(exchange) = self.exchange.upgrade() {
294            exchange.borrow_mut().send(TradingCommand::QueryOrder(cmd));
295        } else {
296            log::error!("query_order: SimulatedExchange has been dropped");
297        }
298        Ok(())
299    }
300}