nautilus_common/live/
runner.rs1use std::cell::RefCell;
21
22use crate::messages::{DataEvent, ExecutionEvent};
23
24#[must_use]
30pub fn get_data_event_sender() -> tokio::sync::mpsc::UnboundedSender<DataEvent> {
31 DATA_EVENT_SENDER.with(|sender| {
32 sender
33 .borrow()
34 .as_ref()
35 .expect("Data event sender should be initialized by runner")
36 .clone()
37 })
38}
39
40#[must_use]
45pub fn try_get_data_event_sender() -> Option<tokio::sync::mpsc::UnboundedSender<DataEvent>> {
46 DATA_EVENT_SENDER.with(|sender| sender.borrow().as_ref().cloned())
47}
48
49pub fn set_data_event_sender(sender: tokio::sync::mpsc::UnboundedSender<DataEvent>) {
57 DATA_EVENT_SENDER.with(|s| {
58 let mut slot = s.borrow_mut();
59 assert!(slot.is_none(), "Data event sender can only be set once");
60 *slot = Some(sender);
61 });
62}
63
64pub fn replace_data_event_sender(sender: tokio::sync::mpsc::UnboundedSender<DataEvent>) {
66 DATA_EVENT_SENDER.with(|s| {
67 *s.borrow_mut() = Some(sender);
68 });
69}
70
71#[must_use]
77pub fn get_exec_event_sender() -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
78 EXEC_EVENT_SENDER.with(|sender| {
79 sender
80 .borrow()
81 .as_ref()
82 .expect("Execution event sender should be initialized by runner")
83 .clone()
84 })
85}
86
87#[must_use]
91pub fn try_get_exec_event_sender() -> Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>> {
92 EXEC_EVENT_SENDER.with(|sender| sender.borrow().as_ref().cloned())
93}
94
95pub fn set_exec_event_sender(sender: tokio::sync::mpsc::UnboundedSender<ExecutionEvent>) {
103 EXEC_EVENT_SENDER.with(|s| {
104 let mut slot = s.borrow_mut();
105 assert!(
106 slot.is_none(),
107 "Execution event sender can only be set once"
108 );
109 *slot = Some(sender);
110 });
111}
112
113pub fn replace_exec_event_sender(sender: tokio::sync::mpsc::UnboundedSender<ExecutionEvent>) {
115 EXEC_EVENT_SENDER.with(|s| {
116 *s.borrow_mut() = Some(sender);
117 });
118}
119
120thread_local! {
121 static DATA_EVENT_SENDER: RefCell<Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>> = const { RefCell::new(None) };
122 static EXEC_EVENT_SENDER: RefCell<Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>> = const { RefCell::new(None) };
123}
124
125#[cfg(test)]
126mod tests {
127 use rstest::rstest;
128
129 use super::*;
130
131 #[rstest]
132 fn test_replace_data_event_sender_overwrites_previous() {
133 std::thread::spawn(|| {
134 let (tx1, _rx1) = tokio::sync::mpsc::unbounded_channel();
135 let (tx2, _rx2) = tokio::sync::mpsc::unbounded_channel();
136 replace_data_event_sender(tx1);
137 replace_data_event_sender(tx2);
138 let _sender = get_data_event_sender();
139 })
140 .join()
141 .unwrap();
142 }
143
144 #[rstest]
145 fn test_replace_exec_event_sender_overwrites_previous() {
146 std::thread::spawn(|| {
147 let (tx1, _rx1) = tokio::sync::mpsc::unbounded_channel();
148 let (tx2, _rx2) = tokio::sync::mpsc::unbounded_channel();
149 replace_exec_event_sender(tx1);
150 replace_exec_event_sender(tx2);
151 let _sender = get_exec_event_sender();
152 })
153 .join()
154 .unwrap();
155 }
156
157 #[rstest]
158 fn test_set_data_event_sender_panics_on_double_set() {
159 let result = std::thread::spawn(|| {
160 let (tx1, _rx1) = tokio::sync::mpsc::unbounded_channel();
161 let (tx2, _rx2) = tokio::sync::mpsc::unbounded_channel();
162 set_data_event_sender(tx1);
163 set_data_event_sender(tx2);
164 })
165 .join();
166 assert!(result.is_err());
167 }
168
169 #[rstest]
170 fn test_set_exec_event_sender_panics_on_double_set() {
171 let result = std::thread::spawn(|| {
172 let (tx1, _rx1) = tokio::sync::mpsc::unbounded_channel();
173 let (tx2, _rx2) = tokio::sync::mpsc::unbounded_channel();
174 set_exec_event_sender(tx1);
175 set_exec_event_sender(tx2);
176 })
177 .join();
178 assert!(result.is_err());
179 }
180
181 #[rstest]
182 fn test_try_get_exec_event_sender_returns_none_when_unset() {
183 let result = std::thread::spawn(try_get_exec_event_sender)
184 .join()
185 .unwrap();
186 assert!(result.is_none());
187 }
188}