Skip to main content

nautilus_binance/common/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Shared execution client utilities for Binance Spot and Futures adapters.
17
18use std::{
19    future::Future,
20    sync::Mutex,
21    time::{Duration, Instant},
22};
23
24use nautilus_common::live::get_runtime;
25use nautilus_core::MUTEX_POISONED;
26use nautilus_live::ExecutionClientCore;
27use nautilus_model::identifiers::AccountId;
28use tokio::task::JoinHandle;
29
30/// Spawns an async task and tracks its handle in `pending_tasks`.
31///
32/// Prunes finished handles before adding the new one to prevent unbounded growth.
33///
34/// # Panics
35///
36/// Panics if the `pending_tasks` mutex is poisoned.
37pub fn spawn_task<F>(pending_tasks: &Mutex<Vec<JoinHandle<()>>>, description: &'static str, fut: F)
38where
39    F: Future<Output = anyhow::Result<()>> + Send + 'static,
40{
41    let runtime = get_runtime();
42    let handle = runtime.spawn(async move {
43        if let Err(e) = fut.await {
44            log::warn!("{description} failed: {e}");
45        }
46    });
47
48    let mut tasks = pending_tasks.lock().expect(MUTEX_POISONED);
49    tasks.retain(|handle| !handle.is_finished());
50    tasks.push(handle);
51}
52
53/// Aborts all pending tasks tracked in the mutex.
54///
55/// # Panics
56///
57/// Panics if the `pending_tasks` mutex is poisoned.
58pub fn abort_pending_tasks(pending_tasks: &Mutex<Vec<JoinHandle<()>>>) {
59    let mut tasks = pending_tasks.lock().expect(MUTEX_POISONED);
60    for handle in tasks.drain(..) {
61        handle.abort();
62    }
63}
64
65/// Polls the cache until the account is registered or timeout is reached.
66///
67/// Each iteration borrows and drops the cache Ref to avoid holding the
68/// RefCell borrow across await points, which would block mutable access
69/// when the account state is registered by another task.
70///
71/// # Errors
72///
73/// Returns an error if the timeout is reached before the account is registered.
74pub async fn await_account_registered(
75    core: &ExecutionClientCore,
76    account_id: AccountId,
77    timeout_secs: f64,
78) -> anyhow::Result<()> {
79    if core.cache().account(&account_id).is_some() {
80        log::info!("Account {account_id} registered");
81        return Ok(());
82    }
83
84    let start = Instant::now();
85    let timeout = Duration::from_secs_f64(timeout_secs);
86    let interval = Duration::from_millis(10);
87
88    loop {
89        tokio::time::sleep(interval).await;
90
91        if core.cache().account(&account_id).is_some() {
92            log::info!("Account {account_id} registered");
93            return Ok(());
94        }
95
96        if start.elapsed() >= timeout {
97            anyhow::bail!(
98                "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
99            );
100        }
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use std::{cell::RefCell, rc::Rc};
107
108    use nautilus_common::cache::Cache;
109    use nautilus_live::ExecutionClientCore;
110    use nautilus_model::{
111        accounts::{AccountAny, CashAccount},
112        enums::{AccountType, OmsType},
113        events::AccountState,
114        identifiers::{AccountId, ClientId, TraderId, Venue},
115        types::{AccountBalance, Money},
116    };
117    use rstest::rstest;
118
119    use super::*;
120
121    #[rstest]
122    fn test_spawn_task_prunes_finished_handles() {
123        let finished = get_runtime().spawn(async {});
124        get_runtime().block_on(async {
125            tokio::time::timeout(Duration::from_secs(1), async {
126                while !finished.is_finished() {
127                    tokio::task::yield_now().await;
128                }
129            })
130            .await
131            .expect("Finished task should complete");
132        });
133
134        let pending_tasks = Mutex::new(vec![finished]);
135
136        spawn_task(&pending_tasks, "test task", async { Ok(()) });
137
138        assert_eq!(
139            pending_tasks.lock().expect(MUTEX_POISONED).len(),
140            1,
141            "spawn_task should drop finished handles before storing the new one",
142        );
143
144        abort_pending_tasks(&pending_tasks);
145    }
146
147    #[rstest]
148    #[tokio::test]
149    async fn test_abort_pending_tasks_aborts_running_tasks() {
150        let (drop_tx, drop_rx) = tokio::sync::oneshot::channel();
151        let guard = AbortDropSignal { tx: Some(drop_tx) };
152
153        let handle = get_runtime().spawn(async move {
154            let _guard = guard;
155            tokio::time::sleep(Duration::from_secs(60)).await;
156        });
157        let pending_tasks = Mutex::new(vec![handle]);
158
159        abort_pending_tasks(&pending_tasks);
160
161        assert!(pending_tasks.lock().expect(MUTEX_POISONED).is_empty());
162        tokio::time::timeout(Duration::from_secs(1), drop_rx)
163            .await
164            .expect("Aborted task should drop its future")
165            .expect("Drop signal should be sent");
166    }
167
168    #[rstest]
169    #[tokio::test]
170    async fn test_await_account_registered_returns_when_account_is_added() {
171        let account_id = AccountId::from("BINANCE-001");
172        let cache = Rc::new(RefCell::new(Cache::default()));
173        let core = create_test_core(cache.clone(), account_id);
174
175        let wait_fut = await_account_registered(&core, account_id, 0.5);
176        let register_fut = async move {
177            tokio::time::sleep(Duration::from_millis(20)).await;
178            add_test_account_to_cache(&cache, account_id);
179        };
180
181        let (result, ()) = tokio::join!(wait_fut, register_fut);
182        assert!(result.is_ok());
183    }
184
185    #[rstest]
186    #[tokio::test]
187    async fn test_await_account_registered_times_out() {
188        let account_id = AccountId::from("BINANCE-001");
189        let cache = Rc::new(RefCell::new(Cache::default()));
190        let core = create_test_core(cache, account_id);
191
192        let error = await_account_registered(&core, account_id, 0.02)
193            .await
194            .expect_err("Missing account should time out");
195
196        assert!(error.to_string().contains("BINANCE-001"));
197    }
198
199    struct AbortDropSignal {
200        tx: Option<tokio::sync::oneshot::Sender<()>>,
201    }
202
203    impl Drop for AbortDropSignal {
204        fn drop(&mut self) {
205            if let Some(tx) = self.tx.take() {
206                let _ = tx.send(());
207            }
208        }
209    }
210
211    fn create_test_core(cache: Rc<RefCell<Cache>>, account_id: AccountId) -> ExecutionClientCore {
212        ExecutionClientCore::new(
213            TraderId::from("TESTER-001"),
214            ClientId::from("BINANCE"),
215            Venue::from("BINANCE"),
216            OmsType::Hedging,
217            account_id,
218            AccountType::Cash,
219            None,
220            cache,
221        )
222    }
223
224    fn add_test_account_to_cache(cache: &Rc<RefCell<Cache>>, account_id: AccountId) {
225        let state = AccountState::new(
226            account_id,
227            AccountType::Cash,
228            vec![AccountBalance::new(
229                Money::from("1.0 BTC"),
230                Money::from("0 BTC"),
231                Money::from("1.0 BTC"),
232            )],
233            vec![],
234            true,
235            nautilus_core::UUID4::new(),
236            nautilus_core::UnixNanos::default(),
237            nautilus_core::UnixNanos::default(),
238            None,
239        );
240
241        let account = AccountAny::Cash(CashAccount::new(state, true, false));
242        cache.borrow_mut().add_account(account).unwrap();
243    }
244}