Skip to main content

nautilus_common/live/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Tokio-based channel senders for live trading runtime.
17//!
18//! This module provides thread-local storage for tokio mpsc channels used in live trading.
19
20use std::cell::RefCell;
21
22use crate::messages::{DataEvent, ExecutionEvent};
23
24/// Gets the global data event sender.
25///
26/// # Panics
27///
28/// Panics if the sender is uninitialized.
29#[must_use]
30pub fn get_data_event_sender() -> tokio::sync::mpsc::UnboundedSender<DataEvent> {
31    DATA_EVENT_SENDER.with(|sender| {
32        sender
33            .borrow()
34            .as_ref()
35            .expect("Data event sender should be initialized by runner")
36            .clone()
37    })
38}
39
40/// Attempts to get the global data event sender without panicking.
41///
42/// Returns `None` if the sender is not initialized (e.g., in Python/v1 bridge environments
43/// before a runner or adapter bridge has registered a sender).
44#[must_use]
45pub fn try_get_data_event_sender() -> Option<tokio::sync::mpsc::UnboundedSender<DataEvent>> {
46    DATA_EVENT_SENDER.with(|sender| sender.borrow().as_ref().cloned())
47}
48
49/// Sets the global data event sender.
50///
51/// Can only be called once per thread.
52///
53/// # Panics
54///
55/// Panics if a sender has already been set.
56pub fn set_data_event_sender(sender: tokio::sync::mpsc::UnboundedSender<DataEvent>) {
57    DATA_EVENT_SENDER.with(|s| {
58        let mut slot = s.borrow_mut();
59        assert!(slot.is_none(), "Data event sender can only be set once");
60        *slot = Some(sender);
61    });
62}
63
64/// Replaces the global data event sender for the current thread.
65pub fn replace_data_event_sender(sender: tokio::sync::mpsc::UnboundedSender<DataEvent>) {
66    DATA_EVENT_SENDER.with(|s| {
67        *s.borrow_mut() = Some(sender);
68    });
69}
70
71/// Gets the global execution event sender.
72///
73/// # Panics
74///
75/// Panics if the sender is uninitialized.
76#[must_use]
77pub fn get_exec_event_sender() -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
78    EXEC_EVENT_SENDER.with(|sender| {
79        sender
80            .borrow()
81            .as_ref()
82            .expect("Execution event sender should be initialized by runner")
83            .clone()
84    })
85}
86
87/// Attempts to get the global execution event sender without panicking.
88///
89/// Returns `None` if the sender is not initialized (e.g., in test environments).
90#[must_use]
91pub fn try_get_exec_event_sender() -> Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>> {
92    EXEC_EVENT_SENDER.with(|sender| sender.borrow().as_ref().cloned())
93}
94
95/// Sets the global execution event sender.
96///
97/// Can only be called once per thread.
98///
99/// # Panics
100///
101/// Panics if a sender has already been set.
102pub fn set_exec_event_sender(sender: tokio::sync::mpsc::UnboundedSender<ExecutionEvent>) {
103    EXEC_EVENT_SENDER.with(|s| {
104        let mut slot = s.borrow_mut();
105        assert!(
106            slot.is_none(),
107            "Execution event sender can only be set once"
108        );
109        *slot = Some(sender);
110    });
111}
112
113/// Replaces the global execution event sender for the current thread.
114pub fn replace_exec_event_sender(sender: tokio::sync::mpsc::UnboundedSender<ExecutionEvent>) {
115    EXEC_EVENT_SENDER.with(|s| {
116        *s.borrow_mut() = Some(sender);
117    });
118}
119
120thread_local! {
121    static DATA_EVENT_SENDER: RefCell<Option<tokio::sync::mpsc::UnboundedSender<DataEvent>>> = const { RefCell::new(None) };
122    static EXEC_EVENT_SENDER: RefCell<Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>> = const { RefCell::new(None) };
123}
124
125#[cfg(test)]
126mod tests {
127    use rstest::rstest;
128
129    use super::*;
130
131    #[rstest]
132    fn test_replace_data_event_sender_overwrites_previous() {
133        std::thread::spawn(|| {
134            let (tx1, _rx1) = tokio::sync::mpsc::unbounded_channel();
135            let (tx2, _rx2) = tokio::sync::mpsc::unbounded_channel();
136            replace_data_event_sender(tx1);
137            replace_data_event_sender(tx2);
138            let _sender = get_data_event_sender();
139        })
140        .join()
141        .unwrap();
142    }
143
144    #[rstest]
145    fn test_replace_exec_event_sender_overwrites_previous() {
146        std::thread::spawn(|| {
147            let (tx1, _rx1) = tokio::sync::mpsc::unbounded_channel();
148            let (tx2, _rx2) = tokio::sync::mpsc::unbounded_channel();
149            replace_exec_event_sender(tx1);
150            replace_exec_event_sender(tx2);
151            let _sender = get_exec_event_sender();
152        })
153        .join()
154        .unwrap();
155    }
156
157    #[rstest]
158    fn test_set_data_event_sender_panics_on_double_set() {
159        let result = std::thread::spawn(|| {
160            let (tx1, _rx1) = tokio::sync::mpsc::unbounded_channel();
161            let (tx2, _rx2) = tokio::sync::mpsc::unbounded_channel();
162            set_data_event_sender(tx1);
163            set_data_event_sender(tx2);
164        })
165        .join();
166        assert!(result.is_err());
167    }
168
169    #[rstest]
170    fn test_set_exec_event_sender_panics_on_double_set() {
171        let result = std::thread::spawn(|| {
172            let (tx1, _rx1) = tokio::sync::mpsc::unbounded_channel();
173            let (tx2, _rx2) = tokio::sync::mpsc::unbounded_channel();
174            set_exec_event_sender(tx1);
175            set_exec_event_sender(tx2);
176        })
177        .join();
178        assert!(result.is_err());
179    }
180
181    #[rstest]
182    fn test_try_get_exec_event_sender_returns_none_when_unset() {
183        let result = std::thread::spawn(try_get_exec_event_sender)
184            .join()
185            .unwrap();
186        assert!(result.is_none());
187    }
188}