nautilus_binance/common/
execution.rs1use std::{
19 future::Future,
20 sync::Mutex,
21 time::{Duration, Instant},
22};
23
24use nautilus_common::live::get_runtime;
25use nautilus_core::MUTEX_POISONED;
26use nautilus_live::ExecutionClientCore;
27use nautilus_model::identifiers::AccountId;
28use tokio::task::JoinHandle;
29
30pub fn spawn_task<F>(pending_tasks: &Mutex<Vec<JoinHandle<()>>>, description: &'static str, fut: F)
38where
39 F: Future<Output = anyhow::Result<()>> + Send + 'static,
40{
41 let runtime = get_runtime();
42 let handle = runtime.spawn(async move {
43 if let Err(e) = fut.await {
44 log::warn!("{description} failed: {e}");
45 }
46 });
47
48 let mut tasks = pending_tasks.lock().expect(MUTEX_POISONED);
49 tasks.retain(|handle| !handle.is_finished());
50 tasks.push(handle);
51}
52
53pub fn abort_pending_tasks(pending_tasks: &Mutex<Vec<JoinHandle<()>>>) {
59 let mut tasks = pending_tasks.lock().expect(MUTEX_POISONED);
60 for handle in tasks.drain(..) {
61 handle.abort();
62 }
63}
64
65pub async fn await_account_registered(
75 core: &ExecutionClientCore,
76 account_id: AccountId,
77 timeout_secs: f64,
78) -> anyhow::Result<()> {
79 if core.cache().account(&account_id).is_some() {
80 log::info!("Account {account_id} registered");
81 return Ok(());
82 }
83
84 let start = Instant::now();
85 let timeout = Duration::from_secs_f64(timeout_secs);
86 let interval = Duration::from_millis(10);
87
88 loop {
89 tokio::time::sleep(interval).await;
90
91 if core.cache().account(&account_id).is_some() {
92 log::info!("Account {account_id} registered");
93 return Ok(());
94 }
95
96 if start.elapsed() >= timeout {
97 anyhow::bail!(
98 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
99 );
100 }
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use std::{cell::RefCell, rc::Rc};
107
108 use nautilus_common::cache::Cache;
109 use nautilus_live::ExecutionClientCore;
110 use nautilus_model::{
111 accounts::{AccountAny, CashAccount},
112 enums::{AccountType, OmsType},
113 events::AccountState,
114 identifiers::{AccountId, ClientId, TraderId, Venue},
115 types::{AccountBalance, Money},
116 };
117 use rstest::rstest;
118
119 use super::*;
120
121 #[rstest]
122 fn test_spawn_task_prunes_finished_handles() {
123 let finished = get_runtime().spawn(async {});
124 get_runtime().block_on(async {
125 tokio::time::timeout(Duration::from_secs(1), async {
126 while !finished.is_finished() {
127 tokio::task::yield_now().await;
128 }
129 })
130 .await
131 .expect("Finished task should complete");
132 });
133
134 let pending_tasks = Mutex::new(vec![finished]);
135
136 spawn_task(&pending_tasks, "test task", async { Ok(()) });
137
138 assert_eq!(
139 pending_tasks.lock().expect(MUTEX_POISONED).len(),
140 1,
141 "spawn_task should drop finished handles before storing the new one",
142 );
143
144 abort_pending_tasks(&pending_tasks);
145 }
146
147 #[rstest]
148 #[tokio::test]
149 async fn test_abort_pending_tasks_aborts_running_tasks() {
150 let (drop_tx, drop_rx) = tokio::sync::oneshot::channel();
151 let guard = AbortDropSignal { tx: Some(drop_tx) };
152
153 let handle = get_runtime().spawn(async move {
154 let _guard = guard;
155 tokio::time::sleep(Duration::from_secs(60)).await;
156 });
157 let pending_tasks = Mutex::new(vec![handle]);
158
159 abort_pending_tasks(&pending_tasks);
160
161 assert!(pending_tasks.lock().expect(MUTEX_POISONED).is_empty());
162 tokio::time::timeout(Duration::from_secs(1), drop_rx)
163 .await
164 .expect("Aborted task should drop its future")
165 .expect("Drop signal should be sent");
166 }
167
168 #[rstest]
169 #[tokio::test]
170 async fn test_await_account_registered_returns_when_account_is_added() {
171 let account_id = AccountId::from("BINANCE-001");
172 let cache = Rc::new(RefCell::new(Cache::default()));
173 let core = create_test_core(cache.clone(), account_id);
174
175 let wait_fut = await_account_registered(&core, account_id, 0.5);
176 let register_fut = async move {
177 tokio::time::sleep(Duration::from_millis(20)).await;
178 add_test_account_to_cache(&cache, account_id);
179 };
180
181 let (result, ()) = tokio::join!(wait_fut, register_fut);
182 assert!(result.is_ok());
183 }
184
185 #[rstest]
186 #[tokio::test]
187 async fn test_await_account_registered_times_out() {
188 let account_id = AccountId::from("BINANCE-001");
189 let cache = Rc::new(RefCell::new(Cache::default()));
190 let core = create_test_core(cache, account_id);
191
192 let error = await_account_registered(&core, account_id, 0.02)
193 .await
194 .expect_err("Missing account should time out");
195
196 assert!(error.to_string().contains("BINANCE-001"));
197 }
198
199 struct AbortDropSignal {
200 tx: Option<tokio::sync::oneshot::Sender<()>>,
201 }
202
203 impl Drop for AbortDropSignal {
204 fn drop(&mut self) {
205 if let Some(tx) = self.tx.take() {
206 let _ = tx.send(());
207 }
208 }
209 }
210
211 fn create_test_core(cache: Rc<RefCell<Cache>>, account_id: AccountId) -> ExecutionClientCore {
212 ExecutionClientCore::new(
213 TraderId::from("TESTER-001"),
214 ClientId::from("BINANCE"),
215 Venue::from("BINANCE"),
216 OmsType::Hedging,
217 account_id,
218 AccountType::Cash,
219 None,
220 cache,
221 )
222 }
223
224 fn add_test_account_to_cache(cache: &Rc<RefCell<Cache>>, account_id: AccountId) {
225 let state = AccountState::new(
226 account_id,
227 AccountType::Cash,
228 vec![AccountBalance::new(
229 Money::from("1.0 BTC"),
230 Money::from("0 BTC"),
231 Money::from("1.0 BTC"),
232 )],
233 vec![],
234 true,
235 nautilus_core::UUID4::new(),
236 nautilus_core::UnixNanos::default(),
237 nautilus_core::UnixNanos::default(),
238 None,
239 );
240
241 let account = AccountAny::Cash(CashAccount::new(state, true, false));
242 cache.borrow_mut().add_account(account).unwrap();
243 }
244}