Skip to main content

nautilus_live/
runner.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Async event loop runner for live and sandbox trading nodes.
17//!
18//! `AsyncRunner` owns five tokio mpsc channel pairs plus a shutdown
19//! signal channel. Construction creates the channels without side
20//! effects. The sender halves are placed into thread-local storage
21//! via [`AsyncRunner::bind_senders`] so that adapters and engine
22//! components can resolve them through the `get_*_sender()` accessors
23//! in `nautilus_common::runner` and `nautilus_common::live::runner`.
24//!
25//! Channel pairs:
26//!
27//! - **Time events**: timer callbacks dispatched by the clock.
28//! - **Data commands**: subscribe/unsubscribe requests to data clients.
29//! - **Data events**: market data from adapters to the data engine.
30//! - **Trading commands**: order actions to execution clients.
31//! - **Execution events**: fills, order updates, and account state from
32//!   execution clients to the execution engine.
33//!
34//! The runner can drive the event loop in two ways:
35//!
36//! - **Standalone**: call [`AsyncRunner::run`], which binds senders and
37//!   enters a `tokio::select!` loop internally.
38//! - **Integrated**: call [`AsyncRunner::take_channels`] to extract the
39//!   receivers and run the `select!` loop directly inside `LiveNode::run`,
40//!   where it is interleaved with startup, reconciliation, and shutdown
41//!   phases.
42//!
43//! # Invariants
44//!
45//! - `bind_senders` must be called before any code that reads from TLS.
46//!   This includes adapter constructors, clock initialization, and
47//!   execution client start methods. Every path from construction to
48//!   the event loop must bind before the first TLS read.
49//! - The event loop and all TLS consumers must execute on the same
50//!   thread. Senders are cloneable and `Send`, but the `RefCell`-backed
51//!   TLS slots are not accessible from other threads.
52//! - Only one runner at a time should own the TLS slots on a given
53//!   thread. `bind_senders` unconditionally replaces the previous
54//!   contents, so the last caller wins.
55
56use std::{fmt::Debug, sync::Arc};
57
58use nautilus_common::{
59    live::runner::{replace_data_event_sender, replace_exec_event_sender},
60    messages::{
61        DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
62    },
63    msgbus::{self, MessagingSwitchboard},
64    runner::{
65        DataCommandSender, TimeEventSender, TradingCommandSender, replace_data_cmd_sender,
66        replace_exec_cmd_sender, replace_time_event_sender,
67    },
68    timer::TimeEventHandler,
69};
70use nautilus_model::events::OrderEventAny;
71
72/// Asynchronous implementation of `DataCommandSender` for live environments.
73#[derive(Debug)]
74pub struct AsyncDataCommandSender {
75    cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>,
76}
77
78impl AsyncDataCommandSender {
79    #[must_use]
80    pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>) -> Self {
81        Self { cmd_tx }
82    }
83}
84
85impl DataCommandSender for AsyncDataCommandSender {
86    fn execute(&self, command: DataCommand) {
87        if let Err(e) = self.cmd_tx.send(command) {
88            log::error!("Failed to send data command: {e}");
89        }
90    }
91}
92
93/// Asynchronous implementation of `TimeEventSender` for live environments.
94#[derive(Debug, Clone)]
95pub struct AsyncTimeEventSender {
96    time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandler>,
97}
98
99impl AsyncTimeEventSender {
100    #[must_use]
101    pub const fn new(time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandler>) -> Self {
102        Self { time_tx }
103    }
104
105    /// Gets a clone of the underlying channel sender for async use.
106    ///
107    /// This allows async contexts to get a direct channel sender that
108    /// can be moved into async tasks without `RefCell` borrowing issues.
109    #[must_use]
110    pub fn get_channel_sender(&self) -> tokio::sync::mpsc::UnboundedSender<TimeEventHandler> {
111        self.time_tx.clone()
112    }
113}
114
115impl TimeEventSender for AsyncTimeEventSender {
116    fn send(&self, handler: TimeEventHandler) {
117        if let Err(e) = self.time_tx.send(handler) {
118            log::error!("Failed to send time event handler: {e}");
119        }
120    }
121}
122
123/// Asynchronous implementation of `TradingCommandSender` for live environments.
124#[derive(Debug)]
125pub struct AsyncTradingCommandSender {
126    cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
127}
128
129impl AsyncTradingCommandSender {
130    #[must_use]
131    pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>) -> Self {
132        Self { cmd_tx }
133    }
134}
135
136impl TradingCommandSender for AsyncTradingCommandSender {
137    fn execute(&self, command: TradingCommand) {
138        if let Err(e) = self.cmd_tx.send(command) {
139            log::error!("Failed to send trading command: {e}");
140        }
141    }
142}
143
144pub trait Runner {
145    fn run(&mut self);
146}
147
148/// Channel receivers for the async event loop.
149///
150/// These can be extracted from `AsyncRunner` via `take_channels()` to drive
151/// the event loop directly on the same thread as the msgbus endpoints.
152#[derive(Debug)]
153pub struct AsyncRunnerChannels {
154    pub time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
155    pub data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
156    pub data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
157    pub exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
158    pub exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
159}
160
161pub struct AsyncRunner {
162    channels: AsyncRunnerChannels,
163    time_evt_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandler>,
164    data_cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>,
165    data_evt_tx: tokio::sync::mpsc::UnboundedSender<DataEvent>,
166    exec_cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
167    exec_evt_tx: tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
168    signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
169    signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
170}
171
172/// Handle for stopping the AsyncRunner from another context.
173#[derive(Clone, Debug)]
174pub struct AsyncRunnerHandle {
175    signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
176}
177
178impl AsyncRunnerHandle {
179    /// Signals the runner to stop.
180    pub fn stop(&self) {
181        if let Err(e) = self.signal_tx.send(()) {
182            log::error!("Failed to send shutdown signal: {e}");
183        }
184    }
185}
186
187impl Default for AsyncRunner {
188    fn default() -> Self {
189        Self::new()
190    }
191}
192
193impl Debug for AsyncRunner {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        f.debug_struct(stringify!(AsyncRunner)).finish()
196    }
197}
198
199impl AsyncRunner {
200    /// Creates a new [`AsyncRunner`] instance.
201    ///
202    /// Creates channels but does not bind senders to thread-local storage.
203    /// Call [`bind_senders`](Self::bind_senders) before creating clients that
204    /// read from TLS, and again before entering the event loop.
205    #[must_use]
206    pub fn new() -> Self {
207        use tokio::sync::mpsc::unbounded_channel; // tokio-import-ok
208
209        let (time_evt_tx, time_evt_rx) = unbounded_channel::<TimeEventHandler>();
210        let (data_cmd_tx, data_cmd_rx) = unbounded_channel::<DataCommand>();
211        let (data_evt_tx, data_evt_rx) = unbounded_channel::<DataEvent>();
212        let (exec_cmd_tx, exec_cmd_rx) = unbounded_channel::<TradingCommand>();
213        let (exec_evt_tx, exec_evt_rx) = unbounded_channel::<ExecutionEvent>();
214        let (signal_tx, signal_rx) = unbounded_channel::<()>();
215
216        Self {
217            channels: AsyncRunnerChannels {
218                time_evt_rx,
219                data_evt_rx,
220                data_cmd_rx,
221                exec_evt_rx,
222                exec_cmd_rx,
223            },
224            time_evt_tx,
225            data_cmd_tx,
226            data_evt_tx,
227            exec_cmd_tx,
228            exec_evt_tx,
229            signal_rx,
230            signal_tx,
231        }
232    }
233
234    /// Binds this runner's channel senders to thread-local storage.
235    ///
236    /// Call before creating clients that read from TLS (e.g., in the builder),
237    /// and again before entering the event loop to reclaim ownership if another
238    /// runner was constructed on this thread in the interim.
239    pub fn bind_senders(&self) {
240        replace_time_event_sender(Arc::new(AsyncTimeEventSender::new(
241            self.time_evt_tx.clone(),
242        )));
243        replace_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(
244            self.data_cmd_tx.clone(),
245        )));
246        replace_data_event_sender(self.data_evt_tx.clone());
247        replace_exec_cmd_sender(Arc::new(AsyncTradingCommandSender::new(
248            self.exec_cmd_tx.clone(),
249        )));
250        replace_exec_event_sender(self.exec_evt_tx.clone());
251    }
252
253    /// Stops the runner with an internal shutdown signal.
254    pub fn stop(&self) {
255        if let Err(e) = self.signal_tx.send(()) {
256            log::error!("Failed to send shutdown signal: {e}");
257        }
258    }
259
260    /// Returns a handle that can be used to stop the runner from another context.
261    #[must_use]
262    pub fn handle(&self) -> AsyncRunnerHandle {
263        AsyncRunnerHandle {
264            signal_tx: self.signal_tx.clone(),
265        }
266    }
267
268    /// Consumes the runner and returns the channel receivers for direct event loop driving.
269    ///
270    /// This is used when the event loop needs to run on the same thread as the msgbus
271    /// endpoints (which use thread-local storage).
272    #[must_use]
273    pub fn take_channels(self) -> AsyncRunnerChannels {
274        self.channels
275    }
276
277    /// Flushes all pending data events and commands from the channels.
278    ///
279    /// Loops until both data channels are empty, processing each item
280    /// into the cache immediately. Used in `start()` where channels are
281    /// not extracted.
282    pub fn flush_pending_data(&mut self) {
283        let mut total = 0;
284
285        loop {
286            let mut progressed = false;
287
288            while let Ok(evt) = self.channels.data_evt_rx.try_recv() {
289                Self::handle_data_event(evt);
290                progressed = true;
291                total += 1;
292            }
293
294            while let Ok(cmd) = self.channels.data_cmd_rx.try_recv() {
295                Self::handle_data_command(cmd);
296                progressed = true;
297                total += 1;
298            }
299
300            if !progressed {
301                break;
302            }
303        }
304
305        if total > 0 {
306            log::debug!("Flushed {total} pending data events/commands");
307        }
308    }
309
310    /// Runs the async runner event loop.
311    ///
312    /// This method processes data events, time events, execution events, and signal events in an async loop.
313    /// It will run until a signal is received or the event streams are closed.
314    pub async fn run(&mut self) {
315        self.bind_senders();
316
317        log::info!("AsyncRunner starting");
318
319        loop {
320            tokio::select! {
321                biased;
322
323                Some(()) = self.signal_rx.recv() => {
324                    log::info!("AsyncRunner received signal, shutting down");
325                    return;
326                },
327                Some(handler) = self.channels.time_evt_rx.recv() => {
328                    Self::handle_time_event(handler);
329                },
330                Some(cmd) = self.channels.data_cmd_rx.recv() => {
331                    Self::handle_data_command(cmd);
332                },
333                Some(evt) = self.channels.data_evt_rx.recv() => {
334                    Self::handle_data_event(evt);
335                },
336                Some(cmd) = self.channels.exec_cmd_rx.recv() => {
337                    Self::handle_exec_command(cmd);
338                },
339                Some(evt) = self.channels.exec_evt_rx.recv() => {
340                    Self::handle_exec_event(evt);
341                },
342                else => {
343                    log::debug!("AsyncRunner all channels closed, exiting");
344                    return;
345                }
346            };
347        }
348    }
349
350    /// Handles a time event by running its callback.
351    #[inline]
352    pub fn handle_time_event(handler: TimeEventHandler) {
353        handler.run();
354    }
355
356    /// Handles a data command by sending to the DataEngine.
357    #[inline]
358    pub fn handle_data_command(cmd: DataCommand) {
359        msgbus::send_data_command(MessagingSwitchboard::data_engine_execute(), cmd);
360    }
361
362    /// Handles a data event by sending to the appropriate DataEngine endpoint.
363    #[inline]
364    pub fn handle_data_event(event: DataEvent) {
365        match event {
366            DataEvent::Data(data) => {
367                msgbus::send_data(MessagingSwitchboard::data_engine_process_data(), data);
368            }
369            DataEvent::Instrument(data) => {
370                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
371            }
372            DataEvent::Response(resp) => {
373                msgbus::send_data_response(MessagingSwitchboard::data_engine_response(), resp);
374            }
375            DataEvent::FundingRate(funding_rate) => {
376                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &funding_rate);
377            }
378            DataEvent::InstrumentStatus(status) => {
379                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &status);
380            }
381            DataEvent::OptionGreeks(greeks) => {
382                msgbus::send_any(MessagingSwitchboard::data_engine_process(), &greeks);
383            }
384            #[cfg(feature = "defi")]
385            DataEvent::DeFi(data) => {
386                msgbus::send_defi_data(MessagingSwitchboard::data_engine_process_defi_data(), data);
387            }
388        }
389    }
390
391    /// Handles an execution command by sending to the ExecEngine.
392    #[inline]
393    pub fn handle_exec_command(cmd: TradingCommand) {
394        msgbus::send_trading_command(MessagingSwitchboard::exec_engine_execute(), cmd);
395    }
396
397    /// Handles an execution event by sending to the appropriate engine endpoint.
398    #[inline]
399    pub fn handle_exec_event(event: ExecutionEvent) {
400        match event {
401            ExecutionEvent::Order(order_event) => {
402                msgbus::send_order_event(MessagingSwitchboard::exec_engine_process(), order_event);
403            }
404            ExecutionEvent::OrderSubmittedBatch(batch) => {
405                for submitted in batch {
406                    msgbus::send_order_event(
407                        MessagingSwitchboard::exec_engine_process(),
408                        OrderEventAny::Submitted(submitted),
409                    );
410                }
411            }
412            ExecutionEvent::OrderAcceptedBatch(batch) => {
413                for accepted in batch {
414                    msgbus::send_order_event(
415                        MessagingSwitchboard::exec_engine_process(),
416                        OrderEventAny::Accepted(accepted),
417                    );
418                }
419            }
420            ExecutionEvent::OrderCanceledBatch(batch) => {
421                for canceled in batch {
422                    msgbus::send_order_event(
423                        MessagingSwitchboard::exec_engine_process(),
424                        OrderEventAny::Canceled(canceled),
425                    );
426                }
427            }
428            ExecutionEvent::Report(report) => {
429                Self::handle_exec_report(report);
430            }
431            ExecutionEvent::Account(ref account) => {
432                msgbus::send_account_state(
433                    MessagingSwitchboard::portfolio_update_account(),
434                    account,
435                );
436            }
437        }
438    }
439
440    #[inline]
441    pub fn handle_exec_report(report: ExecutionReport) {
442        let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_report();
443        msgbus::send_execution_report(endpoint, report);
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use std::time::Duration;
450
451    use nautilus_common::{
452        live::runner::{get_data_event_sender, get_exec_event_sender},
453        messages::{
454            ExecutionEvent, ExecutionReport,
455            data::{SubscribeCommand, SubscribeCustomData},
456            execution::{CancelAllOrders, TradingCommand},
457        },
458        runner::{
459            get_data_cmd_sender, get_time_event_sender, get_trading_cmd_sender,
460            try_get_time_event_sender, try_get_trading_cmd_sender,
461        },
462        timer::{TimeEvent, TimeEventCallback, TimeEventHandler},
463    };
464    use nautilus_core::{UUID4, UnixNanos};
465    use nautilus_model::{
466        data::{Data, DataType, quote::QuoteTick},
467        enums::{
468            AccountType, LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified,
469            TimeInForce,
470        },
471        events::{
472            OrderAccepted, OrderAcceptedBatch, OrderCanceled, OrderCanceledBatch, OrderEvent,
473            OrderEventAny, OrderSubmitted, OrderSubmittedBatch, account::state::AccountState,
474        },
475        identifiers::{
476            AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId,
477            TraderId, VenueOrderId,
478        },
479        reports::{FillReport, OrderStatusReport, PositionStatusReport},
480        types::{Money, Price, Quantity},
481    };
482    use rstest::rstest;
483    use ustr::Ustr;
484
485    use super::*;
486
487    // Test fixture for creating test quotes
488    fn test_quote() -> QuoteTick {
489        QuoteTick {
490            instrument_id: InstrumentId::from("EUR/USD.SIM"),
491            bid_price: Price::from("1.10000"),
492            ask_price: Price::from("1.10001"),
493            bid_size: Quantity::from(1_000_000),
494            ask_size: Quantity::from(1_000_000),
495            ts_event: UnixNanos::default(),
496            ts_init: UnixNanos::default(),
497        }
498    }
499
500    // Test helper to create AsyncRunner with manual channels.
501    // Sender halves are dummies (not connected to the test receivers) since
502    // these tests exercise the event loop, not TLS binding.
503    fn create_test_runner(
504        time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
505        data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
506        data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
507        exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
508        exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
509        signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
510        signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
511    ) -> AsyncRunner {
512        let (time_evt_tx, _) = tokio::sync::mpsc::unbounded_channel();
513        let (data_cmd_tx, _) = tokio::sync::mpsc::unbounded_channel();
514        let (data_evt_tx, _) = tokio::sync::mpsc::unbounded_channel();
515        let (exec_cmd_tx, _) = tokio::sync::mpsc::unbounded_channel();
516        let (exec_evt_tx, _) = tokio::sync::mpsc::unbounded_channel();
517
518        AsyncRunner {
519            channels: AsyncRunnerChannels {
520                time_evt_rx,
521                data_evt_rx,
522                data_cmd_rx,
523                exec_evt_rx,
524                exec_cmd_rx,
525            },
526            time_evt_tx,
527            data_cmd_tx,
528            data_evt_tx,
529            exec_cmd_tx,
530            exec_evt_tx,
531            signal_rx,
532            signal_tx,
533        }
534    }
535
536    #[rstest]
537    fn test_async_data_command_sender_creation() {
538        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
539        let sender = AsyncDataCommandSender::new(tx);
540        assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
541    }
542
543    #[rstest]
544    fn test_async_time_event_sender_creation() {
545        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
546        let sender = AsyncTimeEventSender::new(tx);
547        assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
548    }
549
550    #[rstest]
551    fn test_async_time_event_sender_get_channel() {
552        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
553        let sender = AsyncTimeEventSender::new(tx);
554        let channel = sender.get_channel_sender();
555
556        // Verify the channel is functional
557        let event = TimeEvent::new(
558            Ustr::from("test"),
559            UUID4::new(),
560            UnixNanos::from(1),
561            UnixNanos::from(2),
562        );
563        let callback = TimeEventCallback::from(|_: TimeEvent| {});
564        let handler = TimeEventHandler::new(event, callback);
565
566        assert!(channel.send(handler).is_ok());
567    }
568
569    #[tokio::test]
570    async fn test_async_data_command_sender_execute() {
571        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
572        let sender = AsyncDataCommandSender::new(tx);
573
574        let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
575            client_id: Some(ClientId::from("TEST")),
576            venue: None,
577            data_type: DataType::new("QuoteTick", None, None),
578            command_id: UUID4::new(),
579            ts_init: UnixNanos::default(),
580            correlation_id: None,
581            params: None,
582        }));
583
584        sender.execute(command.clone());
585
586        let received = rx.recv().await.unwrap();
587        match (received, command) {
588            (
589                DataCommand::Subscribe(SubscribeCommand::Data(r)),
590                DataCommand::Subscribe(SubscribeCommand::Data(c)),
591            ) => {
592                assert_eq!(r.client_id, c.client_id);
593                assert_eq!(r.data_type, c.data_type);
594            }
595            _ => panic!("Command mismatch"),
596        }
597    }
598
599    #[tokio::test]
600    async fn test_async_time_event_sender_send() {
601        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
602        let sender = AsyncTimeEventSender::new(tx);
603
604        let event = TimeEvent::new(
605            Ustr::from("test"),
606            UUID4::new(),
607            UnixNanos::from(1),
608            UnixNanos::from(2),
609        );
610        let callback = TimeEventCallback::from(|_: TimeEvent| {});
611        let handler = TimeEventHandler::new(event, callback);
612
613        sender.send(handler);
614
615        assert!(rx.recv().await.is_some());
616    }
617
618    #[tokio::test]
619    async fn test_runner_shutdown_signal() {
620        // Create runner with manual channels to avoid global state
621        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
622        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
623        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
624        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
625        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
626        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
627
628        let mut runner = create_test_runner(
629            time_evt_rx,
630            data_evt_rx,
631            data_cmd_rx,
632            exec_evt_rx,
633            exec_cmd_rx,
634            signal_rx,
635            signal_tx.clone(),
636        );
637
638        // Start runner
639        let runner_handle = tokio::spawn(async move {
640            runner.run().await;
641        });
642
643        // Send shutdown signal
644        signal_tx.send(()).unwrap();
645
646        // Runner should stop quickly
647        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
648        assert!(result.is_ok(), "Runner should stop on signal");
649    }
650
651    #[tokio::test]
652    async fn test_runner_closes_on_channel_drop() {
653        let (data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
654        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
655        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
656        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
657        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
658        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
659
660        let mut runner = create_test_runner(
661            time_evt_rx,
662            data_evt_rx,
663            data_cmd_rx,
664            exec_evt_rx,
665            exec_cmd_rx,
666            signal_rx,
667            signal_tx.clone(),
668        );
669
670        // Start runner
671        let runner_handle = tokio::spawn(async move {
672            runner.run().await;
673        });
674
675        drop(data_tx);
676
677        // Yield to let runner enter event loop before stop signal
678        tokio::task::yield_now().await;
679        signal_tx.send(()).ok();
680
681        // Runner should stop when channels close or on signal
682        let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
683        assert!(
684            result.is_ok(),
685            "Runner should stop when channels close or on signal"
686        );
687    }
688
689    #[tokio::test]
690    async fn test_concurrent_event_sending() {
691        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
692        let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
693        let (_time_evt_tx, time_evt_rx) =
694            tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
695        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
696        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
697        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
698
699        // Setup runner
700        let mut runner = create_test_runner(
701            time_evt_rx,
702            data_evt_rx,
703            data_cmd_rx,
704            exec_evt_rx,
705            exec_cmd_rx,
706            signal_rx,
707            signal_tx.clone(),
708        );
709
710        // Spawn multiple concurrent senders
711        let mut handles = vec![];
712
713        for _ in 0..5 {
714            let tx_clone = data_evt_tx.clone();
715
716            let handle = tokio::spawn(async move {
717                for _ in 0..20 {
718                    let quote = test_quote();
719                    tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
720                    tokio::task::yield_now().await;
721                }
722            });
723            handles.push(handle);
724        }
725
726        // Start runner in background
727        let runner_handle = tokio::spawn(async move {
728            runner.run().await;
729        });
730
731        // Wait for all senders
732        for handle in handles {
733            handle.await.unwrap();
734        }
735
736        // Yield to let runner enter event loop before stop signal
737        tokio::task::yield_now().await;
738        signal_tx.send(()).unwrap();
739
740        let _ = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
741    }
742
743    #[rstest]
744    #[case(10)]
745    #[case(100)]
746    #[case(1000)]
747    fn test_channel_send_performance(#[case] count: usize) {
748        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
749        let quote = test_quote();
750
751        // Send events
752        for _ in 0..count {
753            tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
754        }
755
756        // Verify all received
757        let mut received = 0;
758        while rx.try_recv().is_ok() {
759            received += 1;
760        }
761
762        assert_eq!(received, count);
763    }
764
765    #[rstest]
766    fn test_async_trading_command_sender_creation() {
767        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
768        let sender = AsyncTradingCommandSender::new(tx);
769        assert!(format!("{sender:?}").contains("AsyncTradingCommandSender"));
770    }
771
772    #[tokio::test]
773    async fn test_async_trading_command_sender_execute() {
774        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
775        let sender = AsyncTradingCommandSender::new(tx);
776
777        let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
778            TraderId::from("TRADER-001"),
779            None,
780            StrategyId::from("S-001"),
781            InstrumentId::from("EUR/USD.SIM"),
782            OrderSide::Buy,
783            UUID4::new(),
784            UnixNanos::default(),
785            None,
786        ));
787
788        sender.execute(command);
789
790        let received = rx.recv().await;
791        assert!(received.is_some());
792        assert!(matches!(
793            received.unwrap(),
794            TradingCommand::CancelAllOrders(_)
795        ));
796    }
797
798    #[tokio::test]
799    async fn test_runner_processes_trading_commands() {
800        let (_data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
801        let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
802        let (_time_evt_tx, time_evt_rx) =
803            tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
804        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
805        let (exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
806        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
807
808        let mut runner = create_test_runner(
809            time_evt_rx,
810            data_evt_rx,
811            data_cmd_rx,
812            exec_evt_rx,
813            exec_cmd_rx,
814            signal_rx,
815            signal_tx.clone(),
816        );
817
818        let runner_handle = tokio::spawn(async move {
819            runner.run().await;
820        });
821
822        let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
823            TraderId::from("TRADER-001"),
824            None,
825            StrategyId::from("S-001"),
826            InstrumentId::from("EUR/USD.SIM"),
827            OrderSide::Buy,
828            UUID4::new(),
829            UnixNanos::default(),
830            None,
831        ));
832        exec_cmd_tx.send(command).unwrap();
833
834        tokio::task::yield_now().await;
835        signal_tx.send(()).unwrap();
836
837        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
838        assert!(result.is_ok(), "Runner should process command and stop");
839    }
840
841    #[tokio::test]
842    async fn test_runner_processes_multiple_trading_commands() {
843        let (_data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
844        let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
845        let (_time_evt_tx, time_evt_rx) =
846            tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
847        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
848        let (exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
849        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
850
851        let mut runner = create_test_runner(
852            time_evt_rx,
853            data_evt_rx,
854            data_cmd_rx,
855            exec_evt_rx,
856            exec_cmd_rx,
857            signal_rx,
858            signal_tx.clone(),
859        );
860
861        let runner_handle = tokio::spawn(async move {
862            runner.run().await;
863        });
864
865        for i in 0..10 {
866            let strategy_id = format!("S-{i:03}");
867            let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
868                TraderId::from("TRADER-001"),
869                None,
870                StrategyId::from(strategy_id.as_str()),
871                InstrumentId::from("EUR/USD.SIM"),
872                OrderSide::Buy,
873                UUID4::new(),
874                UnixNanos::default(),
875                None,
876            ));
877            exec_cmd_tx.send(command).unwrap();
878        }
879
880        tokio::task::yield_now().await;
881        signal_tx.send(()).unwrap();
882
883        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
884        assert!(
885            result.is_ok(),
886            "Runner should process all commands and stop"
887        );
888    }
889
890    #[tokio::test]
891    async fn test_execution_event_order_channel() {
892        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
893
894        let event = OrderSubmitted::new(
895            TraderId::from("TRADER-001"),
896            StrategyId::from("S-001"),
897            InstrumentId::from("EUR/USD.SIM"),
898            ClientOrderId::from("O-001"),
899            AccountId::from("SIM-001"),
900            UUID4::new(),
901            UnixNanos::from(1),
902            UnixNanos::from(2),
903        );
904
905        tx.send(ExecutionEvent::Order(OrderEventAny::Submitted(event)))
906            .unwrap();
907
908        let received = rx.recv().await.unwrap();
909        match received {
910            ExecutionEvent::Order(OrderEventAny::Submitted(e)) => {
911                assert_eq!(e.client_order_id(), ClientOrderId::from("O-001"));
912            }
913            _ => panic!("Expected OrderSubmitted event"),
914        }
915    }
916
917    #[tokio::test]
918    async fn test_execution_report_order_status_channel() {
919        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
920
921        let report = OrderStatusReport::new(
922            AccountId::from("SIM-001"),
923            InstrumentId::from("EUR/USD.SIM"),
924            Some(ClientOrderId::from("O-001")),
925            VenueOrderId::from("V-001"),
926            OrderSide::Buy,
927            OrderType::Market,
928            TimeInForce::Gtc,
929            OrderStatus::Accepted,
930            Quantity::from(100_000),
931            Quantity::from(100_000),
932            UnixNanos::from(1),
933            UnixNanos::from(2),
934            UnixNanos::from(3),
935            None,
936        );
937
938        tx.send(ExecutionEvent::Report(ExecutionReport::Order(Box::new(
939            report,
940        ))))
941        .unwrap();
942
943        let received = rx.recv().await.unwrap();
944        match received {
945            ExecutionEvent::Report(ExecutionReport::Order(r)) => {
946                assert_eq!(r.venue_order_id.as_str(), "V-001");
947                assert_eq!(r.order_status, OrderStatus::Accepted);
948            }
949            _ => panic!("Expected OrderStatusReport"),
950        }
951    }
952
953    #[tokio::test]
954    async fn test_execution_report_fill() {
955        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
956
957        let report = FillReport::new(
958            AccountId::from("SIM-001"),
959            InstrumentId::from("EUR/USD.SIM"),
960            VenueOrderId::from("V-001"),
961            TradeId::from("T-001"),
962            OrderSide::Buy,
963            Quantity::from(100_000),
964            Price::from("1.10000"),
965            Money::from("10 USD"),
966            LiquiditySide::Taker,
967            Some(ClientOrderId::from("O-001")),
968            None,
969            UnixNanos::from(1),
970            UnixNanos::from(2),
971            None,
972        );
973
974        tx.send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
975            report,
976        ))))
977        .unwrap();
978
979        let received = rx.recv().await.unwrap();
980        match received {
981            ExecutionEvent::Report(ExecutionReport::Fill(r)) => {
982                assert_eq!(r.venue_order_id.as_str(), "V-001");
983                assert_eq!(r.trade_id.to_string(), "T-001");
984            }
985            _ => panic!("Expected FillReport"),
986        }
987    }
988
989    #[tokio::test]
990    async fn test_execution_report_position() {
991        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
992
993        let report = PositionStatusReport::new(
994            AccountId::from("SIM-001"),
995            InstrumentId::from("EUR/USD.SIM"),
996            PositionSideSpecified::Long,
997            Quantity::from(100_000),
998            UnixNanos::from(1),
999            UnixNanos::from(2),
1000            None,
1001            Some(PositionId::from("P-001")),
1002            None,
1003        );
1004
1005        tx.send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
1006            report,
1007        ))))
1008        .unwrap();
1009
1010        let received = rx.recv().await.unwrap();
1011        match received {
1012            ExecutionEvent::Report(ExecutionReport::Position(r)) => {
1013                assert_eq!(r.venue_position_id.unwrap().as_str(), "P-001");
1014            }
1015            _ => panic!("Expected PositionStatusReport"),
1016        }
1017    }
1018
1019    #[tokio::test]
1020    async fn test_execution_event_account() {
1021        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1022
1023        let account_state = AccountState::new(
1024            AccountId::from("SIM-001"),
1025            AccountType::Cash,
1026            vec![],
1027            vec![],
1028            true,
1029            UUID4::new(),
1030            UnixNanos::from(1),
1031            UnixNanos::from(2),
1032            None,
1033        );
1034
1035        tx.send(ExecutionEvent::Account(account_state)).unwrap();
1036
1037        let received = rx.recv().await.unwrap();
1038        match received {
1039            ExecutionEvent::Account(r) => {
1040                assert_eq!(r.account_id.as_str(), "SIM-001");
1041            }
1042            _ => panic!("Expected AccountState"),
1043        }
1044    }
1045
1046    #[tokio::test]
1047    async fn test_runner_stop_method() {
1048        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1049        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1050        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
1051        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1052        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1053        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1054
1055        let mut runner = create_test_runner(
1056            time_evt_rx,
1057            data_evt_rx,
1058            data_cmd_rx,
1059            exec_evt_rx,
1060            exec_cmd_rx,
1061            signal_rx,
1062            signal_tx.clone(),
1063        );
1064
1065        let runner_handle = tokio::spawn(async move {
1066            runner.run().await;
1067        });
1068
1069        // Use stop via signal_tx directly
1070        signal_tx.send(()).unwrap();
1071
1072        let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
1073        assert!(result.is_ok(), "Runner should stop when stop() is called");
1074    }
1075
1076    #[tokio::test]
1077    async fn test_all_event_types_integration() {
1078        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1079        let (data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1080        let (time_evt_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
1081        let (exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1082        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1083        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1084
1085        let mut runner = create_test_runner(
1086            time_evt_rx,
1087            data_evt_rx,
1088            data_cmd_rx,
1089            exec_evt_rx,
1090            exec_cmd_rx,
1091            signal_rx,
1092            signal_tx.clone(),
1093        );
1094
1095        let runner_handle = tokio::spawn(async move {
1096            runner.run().await;
1097        });
1098
1099        // Send data event
1100        let quote = test_quote();
1101        data_evt_tx
1102            .send(DataEvent::Data(Data::Quote(quote)))
1103            .unwrap();
1104
1105        // Send data command
1106        let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
1107            client_id: Some(ClientId::from("TEST")),
1108            venue: None,
1109            data_type: DataType::new("QuoteTick", None, None),
1110            command_id: UUID4::new(),
1111            ts_init: UnixNanos::default(),
1112            correlation_id: None,
1113            params: None,
1114        }));
1115        data_cmd_tx.send(command).unwrap();
1116
1117        // Send time event
1118        let event = TimeEvent::new(
1119            Ustr::from("test"),
1120            UUID4::new(),
1121            UnixNanos::from(1),
1122            UnixNanos::from(2),
1123        );
1124        let callback = TimeEventCallback::from(|_: TimeEvent| {});
1125        let handler = TimeEventHandler::new(event, callback);
1126        time_evt_tx.send(handler).unwrap();
1127
1128        // Send execution order event
1129        let order_event = OrderSubmitted::new(
1130            TraderId::from("TRADER-001"),
1131            StrategyId::from("S-001"),
1132            InstrumentId::from("EUR/USD.SIM"),
1133            ClientOrderId::from("O-001"),
1134            AccountId::from("SIM-001"),
1135            UUID4::new(),
1136            UnixNanos::from(1),
1137            UnixNanos::from(2),
1138        );
1139        exec_evt_tx
1140            .send(ExecutionEvent::Order(OrderEventAny::Submitted(order_event)))
1141            .unwrap();
1142
1143        // Send execution report (OrderStatus)
1144        let order_status = OrderStatusReport::new(
1145            AccountId::from("SIM-001"),
1146            InstrumentId::from("EUR/USD.SIM"),
1147            Some(ClientOrderId::from("O-001")),
1148            VenueOrderId::from("V-001"),
1149            OrderSide::Buy,
1150            OrderType::Market,
1151            TimeInForce::Gtc,
1152            OrderStatus::Accepted,
1153            Quantity::from(100_000),
1154            Quantity::from(100_000),
1155            UnixNanos::from(1),
1156            UnixNanos::from(2),
1157            UnixNanos::from(3),
1158            None,
1159        );
1160        exec_evt_tx
1161            .send(ExecutionEvent::Report(ExecutionReport::Order(Box::new(
1162                order_status,
1163            ))))
1164            .unwrap();
1165
1166        // Send execution report (Fill)
1167        let fill = FillReport::new(
1168            AccountId::from("SIM-001"),
1169            InstrumentId::from("EUR/USD.SIM"),
1170            VenueOrderId::from("V-001"),
1171            TradeId::from("T-001"),
1172            OrderSide::Buy,
1173            Quantity::from(100_000),
1174            Price::from("1.10000"),
1175            Money::from("10 USD"),
1176            LiquiditySide::Taker,
1177            Some(ClientOrderId::from("O-001")),
1178            None,
1179            UnixNanos::from(1),
1180            UnixNanos::from(2),
1181            None,
1182        );
1183        exec_evt_tx
1184            .send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
1185                fill,
1186            ))))
1187            .unwrap();
1188
1189        // Send execution report (Position)
1190        let position = PositionStatusReport::new(
1191            AccountId::from("SIM-001"),
1192            InstrumentId::from("EUR/USD.SIM"),
1193            PositionSideSpecified::Long,
1194            Quantity::from(100_000),
1195            UnixNanos::from(1),
1196            UnixNanos::from(2),
1197            None,
1198            Some(PositionId::from("P-001")),
1199            None,
1200        );
1201        exec_evt_tx
1202            .send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
1203                position,
1204            ))))
1205            .unwrap();
1206
1207        // Send account event
1208        let account_state = AccountState::new(
1209            AccountId::from("SIM-001"),
1210            AccountType::Cash,
1211            vec![],
1212            vec![],
1213            true,
1214            UUID4::new(),
1215            UnixNanos::from(1),
1216            UnixNanos::from(2),
1217            None,
1218        );
1219        exec_evt_tx
1220            .send(ExecutionEvent::Account(account_state))
1221            .unwrap();
1222
1223        // Yield to let runner enter event loop before stop signal
1224        tokio::task::yield_now().await;
1225        signal_tx.send(()).unwrap();
1226
1227        let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
1228        assert!(
1229            result.is_ok(),
1230            "Runner should process all event types and stop cleanly"
1231        );
1232    }
1233
1234    #[tokio::test]
1235    async fn test_runner_handle_stops_runner() {
1236        let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1237        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1238        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
1239        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1240        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1241        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1242
1243        let mut runner = create_test_runner(
1244            time_evt_rx,
1245            data_evt_rx,
1246            data_cmd_rx,
1247            exec_evt_rx,
1248            exec_cmd_rx,
1249            signal_rx,
1250            signal_tx.clone(),
1251        );
1252
1253        // Get handle before moving runner
1254        let handle = runner.handle();
1255
1256        let runner_task = tokio::spawn(async move {
1257            runner.run().await;
1258        });
1259
1260        // Use handle to stop
1261        handle.stop();
1262
1263        let result = tokio::time::timeout(Duration::from_millis(100), runner_task).await;
1264        assert!(result.is_ok(), "Runner should stop via handle");
1265    }
1266
1267    #[tokio::test]
1268    async fn test_runner_handle_is_cloneable() {
1269        let (signal_tx, _signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1270        let handle = AsyncRunnerHandle { signal_tx };
1271
1272        let handle2 = handle.clone();
1273
1274        // Both handles should be able to send stop signals
1275        assert!(handle.signal_tx.send(()).is_ok());
1276        assert!(handle2.signal_tx.send(()).is_ok());
1277    }
1278
1279    #[tokio::test]
1280    async fn test_runner_processes_events_before_stop() {
1281        let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1282        let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1283        let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
1284        let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1285        let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1286        let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1287
1288        let mut runner = create_test_runner(
1289            time_evt_rx,
1290            data_evt_rx,
1291            data_cmd_rx,
1292            exec_evt_rx,
1293            exec_cmd_rx,
1294            signal_rx,
1295            signal_tx.clone(),
1296        );
1297
1298        let handle = runner.handle();
1299
1300        // Send events before starting runner
1301        for _ in 0..10 {
1302            let quote = test_quote();
1303            data_evt_tx
1304                .send(DataEvent::Data(Data::Quote(quote)))
1305                .unwrap();
1306        }
1307
1308        let runner_task = tokio::spawn(async move {
1309            runner.run().await;
1310        });
1311
1312        // Yield to let runner enter event loop before stop signal
1313        tokio::task::yield_now().await;
1314        handle.stop();
1315
1316        let result = tokio::time::timeout(Duration::from_millis(200), runner_task).await;
1317        assert!(result.is_ok(), "Runner should process events and stop");
1318    }
1319
1320    #[rstest]
1321    fn test_new_does_not_bind_tls() {
1322        std::thread::spawn(|| {
1323            let _runner = AsyncRunner::new();
1324            assert!(try_get_time_event_sender().is_none());
1325            assert!(try_get_trading_cmd_sender().is_none());
1326        })
1327        .join()
1328        .unwrap();
1329    }
1330
1331    #[rstest]
1332    fn test_bind_senders_routes_to_runner_channels() {
1333        std::thread::spawn(|| {
1334            let mut runner = AsyncRunner::new();
1335            runner.bind_senders();
1336
1337            get_data_cmd_sender().execute(DataCommand::Subscribe(SubscribeCommand::Data(
1338                SubscribeCustomData {
1339                    client_id: Some(ClientId::from("TEST")),
1340                    venue: None,
1341                    data_type: DataType::new("test", None, None),
1342                    command_id: UUID4::new(),
1343                    ts_init: UnixNanos::default(),
1344                    correlation_id: None,
1345                    params: None,
1346                },
1347            )));
1348            assert!(runner.channels.data_cmd_rx.try_recv().is_ok());
1349
1350            get_trading_cmd_sender().execute(TradingCommand::CancelAllOrders(
1351                CancelAllOrders::new(
1352                    TraderId::from("TRADER-001"),
1353                    None,
1354                    StrategyId::from("S-001"),
1355                    InstrumentId::from("EUR/USD.SIM"),
1356                    OrderSide::Buy,
1357                    UUID4::new(),
1358                    UnixNanos::default(),
1359                    None,
1360                ),
1361            ));
1362            assert!(runner.channels.exec_cmd_rx.try_recv().is_ok());
1363
1364            let event = TimeEvent::new(
1365                Ustr::from("test"),
1366                UUID4::new(),
1367                UnixNanos::from(1),
1368                UnixNanos::from(2),
1369            );
1370            let callback = TimeEventCallback::from(|_: TimeEvent| {});
1371            get_time_event_sender().send(TimeEventHandler::new(event, callback));
1372            assert!(runner.channels.time_evt_rx.try_recv().is_ok());
1373
1374            get_data_event_sender()
1375                .send(DataEvent::Data(Data::Quote(test_quote())))
1376                .unwrap();
1377            assert!(runner.channels.data_evt_rx.try_recv().is_ok());
1378
1379            let account = AccountState::new(
1380                AccountId::from("SIM-001"),
1381                AccountType::Cash,
1382                vec![],
1383                vec![],
1384                true,
1385                UUID4::new(),
1386                UnixNanos::from(1),
1387                UnixNanos::from(2),
1388                None,
1389            );
1390            get_exec_event_sender()
1391                .send(ExecutionEvent::Account(account))
1392                .unwrap();
1393            assert!(runner.channels.exec_evt_rx.try_recv().is_ok());
1394        })
1395        .join()
1396        .unwrap();
1397    }
1398
1399    #[rstest]
1400    fn test_bind_senders_reclaims_tls_from_previous_runner() {
1401        std::thread::spawn(|| {
1402            let mut runner1 = AsyncRunner::new();
1403            runner1.bind_senders();
1404
1405            let mut runner2 = AsyncRunner::new();
1406            runner2.bind_senders();
1407
1408            get_data_cmd_sender().execute(DataCommand::Subscribe(SubscribeCommand::Data(
1409                SubscribeCustomData {
1410                    client_id: Some(ClientId::from("TEST")),
1411                    venue: None,
1412                    data_type: DataType::new("test", None, None),
1413                    command_id: UUID4::new(),
1414                    ts_init: UnixNanos::default(),
1415                    correlation_id: None,
1416                    params: None,
1417                },
1418            )));
1419
1420            assert!(runner2.channels.data_cmd_rx.try_recv().is_ok());
1421            assert!(runner1.channels.data_cmd_rx.try_recv().is_err());
1422        })
1423        .join()
1424        .unwrap();
1425    }
1426
1427    #[tokio::test]
1428    async fn test_execution_event_order_submitted_batch_channel() {
1429        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1430
1431        let events = vec![
1432            OrderSubmitted::new(
1433                TraderId::from("TRADER-001"),
1434                StrategyId::from("S-001"),
1435                InstrumentId::from("EUR/USD.SIM"),
1436                ClientOrderId::from("O-001"),
1437                AccountId::from("SIM-001"),
1438                UUID4::new(),
1439                UnixNanos::from(1),
1440                UnixNanos::from(2),
1441            ),
1442            OrderSubmitted::new(
1443                TraderId::from("TRADER-001"),
1444                StrategyId::from("S-001"),
1445                InstrumentId::from("EUR/USD.SIM"),
1446                ClientOrderId::from("O-002"),
1447                AccountId::from("SIM-001"),
1448                UUID4::new(),
1449                UnixNanos::from(3),
1450                UnixNanos::from(4),
1451            ),
1452        ];
1453
1454        let batch = OrderSubmittedBatch::new(events);
1455        tx.send(ExecutionEvent::OrderSubmittedBatch(batch)).unwrap();
1456
1457        let received = rx.recv().await.unwrap();
1458        match received {
1459            ExecutionEvent::OrderSubmittedBatch(b) => {
1460                assert_eq!(b.len(), 2);
1461                assert_eq!(b.events[0].client_order_id, ClientOrderId::from("O-001"));
1462                assert_eq!(b.events[1].client_order_id, ClientOrderId::from("O-002"));
1463            }
1464            _ => panic!("Expected OrderSubmittedBatch event"),
1465        }
1466    }
1467
1468    #[tokio::test]
1469    async fn test_execution_event_order_accepted_batch_channel() {
1470        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1471
1472        let events = vec![
1473            OrderAccepted::new(
1474                TraderId::from("TRADER-001"),
1475                StrategyId::from("S-001"),
1476                InstrumentId::from("EUR/USD.SIM"),
1477                ClientOrderId::from("O-001"),
1478                VenueOrderId::from("V-001"),
1479                AccountId::from("SIM-001"),
1480                UUID4::new(),
1481                UnixNanos::from(1),
1482                UnixNanos::from(2),
1483                false,
1484            ),
1485            OrderAccepted::new(
1486                TraderId::from("TRADER-001"),
1487                StrategyId::from("S-001"),
1488                InstrumentId::from("EUR/USD.SIM"),
1489                ClientOrderId::from("O-002"),
1490                VenueOrderId::from("V-002"),
1491                AccountId::from("SIM-001"),
1492                UUID4::new(),
1493                UnixNanos::from(3),
1494                UnixNanos::from(4),
1495                false,
1496            ),
1497        ];
1498
1499        let batch = OrderAcceptedBatch::new(events);
1500        tx.send(ExecutionEvent::OrderAcceptedBatch(batch)).unwrap();
1501
1502        let received = rx.recv().await.unwrap();
1503        match received {
1504            ExecutionEvent::OrderAcceptedBatch(b) => {
1505                assert_eq!(b.len(), 2);
1506                assert_eq!(b.events[0].client_order_id, ClientOrderId::from("O-001"));
1507                assert_eq!(b.events[1].client_order_id, ClientOrderId::from("O-002"));
1508            }
1509            _ => panic!("Expected OrderAcceptedBatch event"),
1510        }
1511    }
1512
1513    #[tokio::test]
1514    async fn test_execution_event_order_canceled_batch_channel() {
1515        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1516
1517        let events = vec![
1518            OrderCanceled::new(
1519                TraderId::from("TRADER-001"),
1520                StrategyId::from("S-001"),
1521                InstrumentId::from("EUR/USD.SIM"),
1522                ClientOrderId::from("O-001"),
1523                UUID4::new(),
1524                UnixNanos::from(1),
1525                UnixNanos::from(2),
1526                false,
1527                None,
1528                Some(AccountId::from("SIM-001")),
1529            ),
1530            OrderCanceled::new(
1531                TraderId::from("TRADER-001"),
1532                StrategyId::from("S-001"),
1533                InstrumentId::from("EUR/USD.SIM"),
1534                ClientOrderId::from("O-002"),
1535                UUID4::new(),
1536                UnixNanos::from(3),
1537                UnixNanos::from(4),
1538                false,
1539                None,
1540                Some(AccountId::from("SIM-001")),
1541            ),
1542        ];
1543
1544        let batch = OrderCanceledBatch::new(events);
1545        tx.send(ExecutionEvent::OrderCanceledBatch(batch)).unwrap();
1546
1547        let received = rx.recv().await.unwrap();
1548        match received {
1549            ExecutionEvent::OrderCanceledBatch(b) => {
1550                assert_eq!(b.len(), 2);
1551                assert_eq!(b.events[0].client_order_id, ClientOrderId::from("O-001"));
1552                assert_eq!(b.events[1].client_order_id, ClientOrderId::from("O-002"));
1553            }
1554            _ => panic!("Expected OrderCanceledBatch event"),
1555        }
1556    }
1557}