Skip to main content

nautilus_common/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Global runtime machinery and thread-local storage.
17//!
18//! This module provides global access to shared runtime resources including clocks,
19//! message queues, and time event channels. It manages thread-local storage for
20//! system-wide components that need to be accessible across threads.
21
22use std::{cell::RefCell, fmt::Debug, sync::Arc};
23
24use crate::{
25    messages::{data::DataCommand, execution::TradingCommand},
26    msgbus::{self, MessagingSwitchboard},
27    timer::TimeEventHandler,
28};
29
30/// Trait for data command sending that can be implemented for both sync and async runners.
31pub trait DataCommandSender {
32    /// Executes a data command.
33    ///
34    /// - **Sync runners** send the command to a queue for synchronous execution.
35    /// - **Async runners** send the command to a channel for asynchronous execution.
36    fn execute(&self, command: DataCommand);
37}
38
39/// Synchronous [`DataCommandSender`] for backtest environments.
40///
41/// Buffers commands in a thread-local queue for deferred execution,
42/// avoiding `RefCell` re-entrancy when sent from event handler callbacks.
43#[derive(Debug)]
44pub struct SyncDataCommandSender;
45
46impl DataCommandSender for SyncDataCommandSender {
47    fn execute(&self, command: DataCommand) {
48        DATA_CMD_QUEUE.with(|q| q.borrow_mut().push(command));
49    }
50}
51
52/// Drain all buffered data commands, dispatching each to the data engine.
53pub fn drain_data_cmd_queue() {
54    DATA_CMD_QUEUE.with(|q| {
55        let commands: Vec<DataCommand> = q.borrow_mut().drain(..).collect();
56        let endpoint = MessagingSwitchboard::data_engine_execute();
57        for cmd in commands {
58            msgbus::send_data_command(endpoint, cmd);
59        }
60    });
61}
62
63/// Returns `true` if the data command queue is empty.
64pub fn data_cmd_queue_is_empty() -> bool {
65    DATA_CMD_QUEUE.with(|q| q.borrow().is_empty())
66}
67
68/// Gets the global data command sender.
69///
70/// # Panics
71///
72/// Panics if the sender is uninitialized.
73#[must_use]
74pub fn get_data_cmd_sender() -> Arc<dyn DataCommandSender> {
75    DATA_CMD_SENDER.with(|sender| {
76        sender
77            .borrow()
78            .as_ref()
79            .expect("Data command sender should be initialized by runner")
80            .clone()
81    })
82}
83
84/// Sets the global data command sender.
85///
86/// This should be called by the runner when it initializes.
87/// Can only be called once per thread.
88///
89/// # Panics
90///
91/// Panics if a sender has already been set.
92pub fn set_data_cmd_sender(sender: Arc<dyn DataCommandSender>) {
93    DATA_CMD_SENDER.with(|s| {
94        let mut slot = s.borrow_mut();
95        assert!(slot.is_none(), "Data command sender can only be set once");
96        *slot = Some(sender);
97    });
98}
99
100/// Replaces the global data command sender for the current thread.
101pub fn replace_data_cmd_sender(sender: Arc<dyn DataCommandSender>) {
102    DATA_CMD_SENDER.with(|s| {
103        *s.borrow_mut() = Some(sender);
104    });
105}
106
107/// Trait for time event sending that can be implemented for both sync and async runners.
108pub trait TimeEventSender: Debug + Send + Sync {
109    /// Sends a time event handler.
110    fn send(&self, handler: TimeEventHandler);
111}
112
113/// Gets the global time event sender.
114///
115/// # Panics
116///
117/// Panics if the sender is uninitialized.
118#[must_use]
119pub fn get_time_event_sender() -> Arc<dyn TimeEventSender> {
120    TIME_EVENT_SENDER.with(|sender| {
121        sender
122            .borrow()
123            .as_ref()
124            .expect("Time event sender should be initialized by runner")
125            .clone()
126    })
127}
128
129/// Attempts to get the global time event sender without panicking.
130///
131/// Returns `None` if the sender is not initialized (e.g., in test environments).
132#[must_use]
133pub fn try_get_time_event_sender() -> Option<Arc<dyn TimeEventSender>> {
134    TIME_EVENT_SENDER.with(|sender| sender.borrow().as_ref().cloned())
135}
136
137/// Sets the global time event sender.
138///
139/// Can only be called once per thread.
140///
141/// # Panics
142///
143/// Panics if a sender has already been set.
144pub fn set_time_event_sender(sender: Arc<dyn TimeEventSender>) {
145    TIME_EVENT_SENDER.with(|s| {
146        let mut slot = s.borrow_mut();
147        assert!(slot.is_none(), "Time event sender can only be set once");
148        *slot = Some(sender);
149    });
150}
151
152/// Replaces the global time event sender for the current thread.
153pub fn replace_time_event_sender(sender: Arc<dyn TimeEventSender>) {
154    TIME_EVENT_SENDER.with(|s| {
155        *s.borrow_mut() = Some(sender);
156    });
157}
158
159/// Trait for trading command sending that can be implemented for both sync and async runners.
160pub trait TradingCommandSender {
161    /// Executes a trading command.
162    ///
163    /// - **Sync runners** send the command to a queue for synchronous execution.
164    /// - **Async runners** send the command to a channel for asynchronous execution.
165    fn execute(&self, command: TradingCommand);
166}
167
168/// Synchronous [`TradingCommandSender`] for backtest environments.
169///
170/// Buffers commands in a thread-local queue for deferred execution,
171/// avoiding `RefCell` re-entrancy when sent from event handler callbacks.
172#[derive(Debug)]
173pub struct SyncTradingCommandSender;
174
175impl TradingCommandSender for SyncTradingCommandSender {
176    fn execute(&self, command: TradingCommand) {
177        TRADING_CMD_QUEUE.with(|q| q.borrow_mut().push(command));
178    }
179}
180
181/// Drain all buffered trading commands, dispatching each to the exec engine.
182pub fn drain_trading_cmd_queue() {
183    TRADING_CMD_QUEUE.with(|q| {
184        let commands: Vec<TradingCommand> = q.borrow_mut().drain(..).collect();
185        let endpoint = MessagingSwitchboard::exec_engine_execute();
186        for cmd in commands {
187            msgbus::send_trading_command(endpoint, cmd);
188        }
189    });
190}
191
192/// Returns `true` if the trading command queue is empty.
193pub fn trading_cmd_queue_is_empty() -> bool {
194    TRADING_CMD_QUEUE.with(|q| q.borrow().is_empty())
195}
196
197/// Gets the global trading command sender.
198///
199/// # Panics
200///
201/// Panics if the sender is uninitialized.
202#[must_use]
203pub fn get_trading_cmd_sender() -> Arc<dyn TradingCommandSender> {
204    EXEC_CMD_SENDER.with(|sender| {
205        sender
206            .borrow()
207            .as_ref()
208            .expect("Trading command sender should be initialized by runner")
209            .clone()
210    })
211}
212
213/// Attempts to get the global trading command sender without panicking.
214///
215/// Returns `None` if the sender is not initialized (e.g., in test environments).
216#[must_use]
217pub fn try_get_trading_cmd_sender() -> Option<Arc<dyn TradingCommandSender>> {
218    EXEC_CMD_SENDER.with(|sender| sender.borrow().as_ref().cloned())
219}
220
221/// Sets the global trading command sender.
222///
223/// This should be called by the runner when it initializes.
224/// Can only be called once per thread.
225///
226/// # Panics
227///
228/// Panics if a sender has already been set.
229pub fn set_exec_cmd_sender(sender: Arc<dyn TradingCommandSender>) {
230    EXEC_CMD_SENDER.with(|s| {
231        let mut slot = s.borrow_mut();
232        assert!(
233            slot.is_none(),
234            "Trading command sender can only be set once"
235        );
236        *slot = Some(sender);
237    });
238}
239
240/// Replaces the global trading command sender for the current thread.
241pub fn replace_exec_cmd_sender(sender: Arc<dyn TradingCommandSender>) {
242    EXEC_CMD_SENDER.with(|s| {
243        *s.borrow_mut() = Some(sender);
244    });
245}
246
247thread_local! {
248    static TIME_EVENT_SENDER: RefCell<Option<Arc<dyn TimeEventSender>>> = const { RefCell::new(None) };
249    static DATA_CMD_SENDER: RefCell<Option<Arc<dyn DataCommandSender>>> = const { RefCell::new(None) };
250    static EXEC_CMD_SENDER: RefCell<Option<Arc<dyn TradingCommandSender>>> = const { RefCell::new(None) };
251    static DATA_CMD_QUEUE: RefCell<Vec<DataCommand>> = const { RefCell::new(Vec::new()) };
252    static TRADING_CMD_QUEUE: RefCell<Vec<TradingCommand>> = const { RefCell::new(Vec::new()) };
253}
254
255#[cfg(test)]
256mod tests {
257    use std::sync::Arc;
258
259    use rstest::rstest;
260
261    use super::*;
262
263    #[derive(Debug)]
264    struct NoopTimeEventSender;
265
266    impl TimeEventSender for NoopTimeEventSender {
267        fn send(&self, _handler: TimeEventHandler) {}
268    }
269
270    #[rstest]
271    fn test_replace_data_cmd_sender_overwrites_previous() {
272        std::thread::spawn(|| {
273            replace_data_cmd_sender(Arc::new(SyncDataCommandSender));
274            replace_data_cmd_sender(Arc::new(SyncDataCommandSender));
275            let _sender = get_data_cmd_sender();
276        })
277        .join()
278        .unwrap();
279    }
280
281    #[rstest]
282    fn test_replace_exec_cmd_sender_overwrites_previous() {
283        std::thread::spawn(|| {
284            replace_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
285            replace_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
286            let _sender = get_trading_cmd_sender();
287        })
288        .join()
289        .unwrap();
290    }
291
292    #[rstest]
293    fn test_replace_time_event_sender_overwrites_previous() {
294        std::thread::spawn(|| {
295            replace_time_event_sender(Arc::new(NoopTimeEventSender));
296            replace_time_event_sender(Arc::new(NoopTimeEventSender));
297            let _sender = get_time_event_sender();
298        })
299        .join()
300        .unwrap();
301    }
302
303    #[rstest]
304    fn test_set_data_cmd_sender_panics_on_double_set() {
305        let result = std::thread::spawn(|| {
306            set_data_cmd_sender(Arc::new(SyncDataCommandSender));
307            set_data_cmd_sender(Arc::new(SyncDataCommandSender));
308        })
309        .join();
310        assert!(result.is_err());
311    }
312
313    #[rstest]
314    fn test_set_exec_cmd_sender_panics_on_double_set() {
315        let result = std::thread::spawn(|| {
316            set_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
317            set_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
318        })
319        .join();
320        assert!(result.is_err());
321    }
322
323    #[rstest]
324    fn test_set_time_event_sender_panics_on_double_set() {
325        let result = std::thread::spawn(|| {
326            set_time_event_sender(Arc::new(NoopTimeEventSender));
327            set_time_event_sender(Arc::new(NoopTimeEventSender));
328        })
329        .join();
330        assert!(result.is_err());
331    }
332
333    #[rstest]
334    fn test_try_get_time_event_sender_returns_none_when_unset() {
335        let result = std::thread::spawn(try_get_time_event_sender)
336            .join()
337            .unwrap();
338        assert!(result.is_none());
339    }
340
341    #[rstest]
342    fn test_try_get_trading_cmd_sender_returns_none_when_unset() {
343        let is_none = std::thread::spawn(|| try_get_trading_cmd_sender().is_none())
344            .join()
345            .unwrap();
346        assert!(is_none);
347    }
348}