nautilus_common/
runner.rs1use std::{cell::RefCell, fmt::Debug, sync::Arc};
23
24use crate::{
25 messages::{data::DataCommand, execution::TradingCommand},
26 msgbus::{self, MessagingSwitchboard},
27 timer::TimeEventHandler,
28};
29
30pub trait DataCommandSender {
32 fn execute(&self, command: DataCommand);
37}
38
39#[derive(Debug)]
44pub struct SyncDataCommandSender;
45
46impl DataCommandSender for SyncDataCommandSender {
47 fn execute(&self, command: DataCommand) {
48 DATA_CMD_QUEUE.with(|q| q.borrow_mut().push(command));
49 }
50}
51
52pub fn drain_data_cmd_queue() {
54 DATA_CMD_QUEUE.with(|q| {
55 let commands: Vec<DataCommand> = q.borrow_mut().drain(..).collect();
56 let endpoint = MessagingSwitchboard::data_engine_execute();
57 for cmd in commands {
58 msgbus::send_data_command(endpoint, cmd);
59 }
60 });
61}
62
63pub fn data_cmd_queue_is_empty() -> bool {
65 DATA_CMD_QUEUE.with(|q| q.borrow().is_empty())
66}
67
68#[must_use]
74pub fn get_data_cmd_sender() -> Arc<dyn DataCommandSender> {
75 DATA_CMD_SENDER.with(|sender| {
76 sender
77 .borrow()
78 .as_ref()
79 .expect("Data command sender should be initialized by runner")
80 .clone()
81 })
82}
83
84pub fn set_data_cmd_sender(sender: Arc<dyn DataCommandSender>) {
93 DATA_CMD_SENDER.with(|s| {
94 let mut slot = s.borrow_mut();
95 assert!(slot.is_none(), "Data command sender can only be set once");
96 *slot = Some(sender);
97 });
98}
99
100pub fn replace_data_cmd_sender(sender: Arc<dyn DataCommandSender>) {
102 DATA_CMD_SENDER.with(|s| {
103 *s.borrow_mut() = Some(sender);
104 });
105}
106
107pub trait TimeEventSender: Debug + Send + Sync {
109 fn send(&self, handler: TimeEventHandler);
111}
112
113#[must_use]
119pub fn get_time_event_sender() -> Arc<dyn TimeEventSender> {
120 TIME_EVENT_SENDER.with(|sender| {
121 sender
122 .borrow()
123 .as_ref()
124 .expect("Time event sender should be initialized by runner")
125 .clone()
126 })
127}
128
129#[must_use]
133pub fn try_get_time_event_sender() -> Option<Arc<dyn TimeEventSender>> {
134 TIME_EVENT_SENDER.with(|sender| sender.borrow().as_ref().cloned())
135}
136
137pub fn set_time_event_sender(sender: Arc<dyn TimeEventSender>) {
145 TIME_EVENT_SENDER.with(|s| {
146 let mut slot = s.borrow_mut();
147 assert!(slot.is_none(), "Time event sender can only be set once");
148 *slot = Some(sender);
149 });
150}
151
152pub fn replace_time_event_sender(sender: Arc<dyn TimeEventSender>) {
154 TIME_EVENT_SENDER.with(|s| {
155 *s.borrow_mut() = Some(sender);
156 });
157}
158
159pub trait TradingCommandSender {
161 fn execute(&self, command: TradingCommand);
166}
167
168#[derive(Debug)]
173pub struct SyncTradingCommandSender;
174
175impl TradingCommandSender for SyncTradingCommandSender {
176 fn execute(&self, command: TradingCommand) {
177 TRADING_CMD_QUEUE.with(|q| q.borrow_mut().push(command));
178 }
179}
180
181pub fn drain_trading_cmd_queue() {
183 TRADING_CMD_QUEUE.with(|q| {
184 let commands: Vec<TradingCommand> = q.borrow_mut().drain(..).collect();
185 let endpoint = MessagingSwitchboard::exec_engine_execute();
186 for cmd in commands {
187 msgbus::send_trading_command(endpoint, cmd);
188 }
189 });
190}
191
192pub fn trading_cmd_queue_is_empty() -> bool {
194 TRADING_CMD_QUEUE.with(|q| q.borrow().is_empty())
195}
196
197#[must_use]
203pub fn get_trading_cmd_sender() -> Arc<dyn TradingCommandSender> {
204 EXEC_CMD_SENDER.with(|sender| {
205 sender
206 .borrow()
207 .as_ref()
208 .expect("Trading command sender should be initialized by runner")
209 .clone()
210 })
211}
212
213#[must_use]
217pub fn try_get_trading_cmd_sender() -> Option<Arc<dyn TradingCommandSender>> {
218 EXEC_CMD_SENDER.with(|sender| sender.borrow().as_ref().cloned())
219}
220
221pub fn set_exec_cmd_sender(sender: Arc<dyn TradingCommandSender>) {
230 EXEC_CMD_SENDER.with(|s| {
231 let mut slot = s.borrow_mut();
232 assert!(
233 slot.is_none(),
234 "Trading command sender can only be set once"
235 );
236 *slot = Some(sender);
237 });
238}
239
240pub fn replace_exec_cmd_sender(sender: Arc<dyn TradingCommandSender>) {
242 EXEC_CMD_SENDER.with(|s| {
243 *s.borrow_mut() = Some(sender);
244 });
245}
246
247thread_local! {
248 static TIME_EVENT_SENDER: RefCell<Option<Arc<dyn TimeEventSender>>> = const { RefCell::new(None) };
249 static DATA_CMD_SENDER: RefCell<Option<Arc<dyn DataCommandSender>>> = const { RefCell::new(None) };
250 static EXEC_CMD_SENDER: RefCell<Option<Arc<dyn TradingCommandSender>>> = const { RefCell::new(None) };
251 static DATA_CMD_QUEUE: RefCell<Vec<DataCommand>> = const { RefCell::new(Vec::new()) };
252 static TRADING_CMD_QUEUE: RefCell<Vec<TradingCommand>> = const { RefCell::new(Vec::new()) };
253}
254
255#[cfg(test)]
256mod tests {
257 use std::sync::Arc;
258
259 use rstest::rstest;
260
261 use super::*;
262
263 #[derive(Debug)]
264 struct NoopTimeEventSender;
265
266 impl TimeEventSender for NoopTimeEventSender {
267 fn send(&self, _handler: TimeEventHandler) {}
268 }
269
270 #[rstest]
271 fn test_replace_data_cmd_sender_overwrites_previous() {
272 std::thread::spawn(|| {
273 replace_data_cmd_sender(Arc::new(SyncDataCommandSender));
274 replace_data_cmd_sender(Arc::new(SyncDataCommandSender));
275 let _sender = get_data_cmd_sender();
276 })
277 .join()
278 .unwrap();
279 }
280
281 #[rstest]
282 fn test_replace_exec_cmd_sender_overwrites_previous() {
283 std::thread::spawn(|| {
284 replace_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
285 replace_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
286 let _sender = get_trading_cmd_sender();
287 })
288 .join()
289 .unwrap();
290 }
291
292 #[rstest]
293 fn test_replace_time_event_sender_overwrites_previous() {
294 std::thread::spawn(|| {
295 replace_time_event_sender(Arc::new(NoopTimeEventSender));
296 replace_time_event_sender(Arc::new(NoopTimeEventSender));
297 let _sender = get_time_event_sender();
298 })
299 .join()
300 .unwrap();
301 }
302
303 #[rstest]
304 fn test_set_data_cmd_sender_panics_on_double_set() {
305 let result = std::thread::spawn(|| {
306 set_data_cmd_sender(Arc::new(SyncDataCommandSender));
307 set_data_cmd_sender(Arc::new(SyncDataCommandSender));
308 })
309 .join();
310 assert!(result.is_err());
311 }
312
313 #[rstest]
314 fn test_set_exec_cmd_sender_panics_on_double_set() {
315 let result = std::thread::spawn(|| {
316 set_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
317 set_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
318 })
319 .join();
320 assert!(result.is_err());
321 }
322
323 #[rstest]
324 fn test_set_time_event_sender_panics_on_double_set() {
325 let result = std::thread::spawn(|| {
326 set_time_event_sender(Arc::new(NoopTimeEventSender));
327 set_time_event_sender(Arc::new(NoopTimeEventSender));
328 })
329 .join();
330 assert!(result.is_err());
331 }
332
333 #[rstest]
334 fn test_try_get_time_event_sender_returns_none_when_unset() {
335 let result = std::thread::spawn(try_get_time_event_sender)
336 .join()
337 .unwrap();
338 assert!(result.is_none());
339 }
340
341 #[rstest]
342 fn test_try_get_trading_cmd_sender_returns_none_when_unset() {
343 let is_none = std::thread::spawn(|| try_get_trading_cmd_sender().is_none())
344 .join()
345 .unwrap();
346 assert!(is_none);
347 }
348}