1use std::{fmt::Debug, sync::Arc};
57
58use nautilus_common::{
59 live::runner::{replace_data_event_sender, replace_exec_event_sender},
60 messages::{
61 DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
62 },
63 msgbus::{self, MessagingSwitchboard},
64 runner::{
65 DataCommandSender, TimeEventSender, TradingCommandSender, replace_data_cmd_sender,
66 replace_exec_cmd_sender, replace_time_event_sender,
67 },
68 timer::TimeEventHandler,
69};
70use nautilus_model::events::OrderEventAny;
71
72#[derive(Debug)]
74pub struct AsyncDataCommandSender {
75 cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>,
76}
77
78impl AsyncDataCommandSender {
79 #[must_use]
80 pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>) -> Self {
81 Self { cmd_tx }
82 }
83}
84
85impl DataCommandSender for AsyncDataCommandSender {
86 fn execute(&self, command: DataCommand) {
87 if let Err(e) = self.cmd_tx.send(command) {
88 log::error!("Failed to send data command: {e}");
89 }
90 }
91}
92
93#[derive(Debug, Clone)]
95pub struct AsyncTimeEventSender {
96 time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandler>,
97}
98
99impl AsyncTimeEventSender {
100 #[must_use]
101 pub const fn new(time_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandler>) -> Self {
102 Self { time_tx }
103 }
104
105 #[must_use]
110 pub fn get_channel_sender(&self) -> tokio::sync::mpsc::UnboundedSender<TimeEventHandler> {
111 self.time_tx.clone()
112 }
113}
114
115impl TimeEventSender for AsyncTimeEventSender {
116 fn send(&self, handler: TimeEventHandler) {
117 if let Err(e) = self.time_tx.send(handler) {
118 log::error!("Failed to send time event handler: {e}");
119 }
120 }
121}
122
123#[derive(Debug)]
125pub struct AsyncTradingCommandSender {
126 cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
127}
128
129impl AsyncTradingCommandSender {
130 #[must_use]
131 pub const fn new(cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>) -> Self {
132 Self { cmd_tx }
133 }
134}
135
136impl TradingCommandSender for AsyncTradingCommandSender {
137 fn execute(&self, command: TradingCommand) {
138 if let Err(e) = self.cmd_tx.send(command) {
139 log::error!("Failed to send trading command: {e}");
140 }
141 }
142}
143
144pub trait Runner {
145 fn run(&mut self);
146}
147
148#[derive(Debug)]
153pub struct AsyncRunnerChannels {
154 pub time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
155 pub data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
156 pub data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
157 pub exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
158 pub exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
159}
160
161pub struct AsyncRunner {
162 channels: AsyncRunnerChannels,
163 time_evt_tx: tokio::sync::mpsc::UnboundedSender<TimeEventHandler>,
164 data_cmd_tx: tokio::sync::mpsc::UnboundedSender<DataCommand>,
165 data_evt_tx: tokio::sync::mpsc::UnboundedSender<DataEvent>,
166 exec_cmd_tx: tokio::sync::mpsc::UnboundedSender<TradingCommand>,
167 exec_evt_tx: tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
168 signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
169 signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
170}
171
172#[derive(Clone, Debug)]
174pub struct AsyncRunnerHandle {
175 signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
176}
177
178impl AsyncRunnerHandle {
179 pub fn stop(&self) {
181 if let Err(e) = self.signal_tx.send(()) {
182 log::error!("Failed to send shutdown signal: {e}");
183 }
184 }
185}
186
187impl Default for AsyncRunner {
188 fn default() -> Self {
189 Self::new()
190 }
191}
192
193impl Debug for AsyncRunner {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 f.debug_struct(stringify!(AsyncRunner)).finish()
196 }
197}
198
199impl AsyncRunner {
200 #[must_use]
206 pub fn new() -> Self {
207 use tokio::sync::mpsc::unbounded_channel; let (time_evt_tx, time_evt_rx) = unbounded_channel::<TimeEventHandler>();
210 let (data_cmd_tx, data_cmd_rx) = unbounded_channel::<DataCommand>();
211 let (data_evt_tx, data_evt_rx) = unbounded_channel::<DataEvent>();
212 let (exec_cmd_tx, exec_cmd_rx) = unbounded_channel::<TradingCommand>();
213 let (exec_evt_tx, exec_evt_rx) = unbounded_channel::<ExecutionEvent>();
214 let (signal_tx, signal_rx) = unbounded_channel::<()>();
215
216 Self {
217 channels: AsyncRunnerChannels {
218 time_evt_rx,
219 data_evt_rx,
220 data_cmd_rx,
221 exec_evt_rx,
222 exec_cmd_rx,
223 },
224 time_evt_tx,
225 data_cmd_tx,
226 data_evt_tx,
227 exec_cmd_tx,
228 exec_evt_tx,
229 signal_rx,
230 signal_tx,
231 }
232 }
233
234 pub fn bind_senders(&self) {
240 replace_time_event_sender(Arc::new(AsyncTimeEventSender::new(
241 self.time_evt_tx.clone(),
242 )));
243 replace_data_cmd_sender(Arc::new(AsyncDataCommandSender::new(
244 self.data_cmd_tx.clone(),
245 )));
246 replace_data_event_sender(self.data_evt_tx.clone());
247 replace_exec_cmd_sender(Arc::new(AsyncTradingCommandSender::new(
248 self.exec_cmd_tx.clone(),
249 )));
250 replace_exec_event_sender(self.exec_evt_tx.clone());
251 }
252
253 pub fn stop(&self) {
255 if let Err(e) = self.signal_tx.send(()) {
256 log::error!("Failed to send shutdown signal: {e}");
257 }
258 }
259
260 #[must_use]
262 pub fn handle(&self) -> AsyncRunnerHandle {
263 AsyncRunnerHandle {
264 signal_tx: self.signal_tx.clone(),
265 }
266 }
267
268 #[must_use]
273 pub fn take_channels(self) -> AsyncRunnerChannels {
274 self.channels
275 }
276
277 pub fn flush_pending_data(&mut self) {
283 let mut total = 0;
284
285 loop {
286 let mut progressed = false;
287
288 while let Ok(evt) = self.channels.data_evt_rx.try_recv() {
289 Self::handle_data_event(evt);
290 progressed = true;
291 total += 1;
292 }
293
294 while let Ok(cmd) = self.channels.data_cmd_rx.try_recv() {
295 Self::handle_data_command(cmd);
296 progressed = true;
297 total += 1;
298 }
299
300 if !progressed {
301 break;
302 }
303 }
304
305 if total > 0 {
306 log::debug!("Flushed {total} pending data events/commands");
307 }
308 }
309
310 pub async fn run(&mut self) {
315 self.bind_senders();
316
317 log::info!("AsyncRunner starting");
318
319 loop {
320 tokio::select! {
321 biased;
322
323 Some(()) = self.signal_rx.recv() => {
324 log::info!("AsyncRunner received signal, shutting down");
325 return;
326 },
327 Some(handler) = self.channels.time_evt_rx.recv() => {
328 Self::handle_time_event(handler);
329 },
330 Some(cmd) = self.channels.data_cmd_rx.recv() => {
331 Self::handle_data_command(cmd);
332 },
333 Some(evt) = self.channels.data_evt_rx.recv() => {
334 Self::handle_data_event(evt);
335 },
336 Some(cmd) = self.channels.exec_cmd_rx.recv() => {
337 Self::handle_exec_command(cmd);
338 },
339 Some(evt) = self.channels.exec_evt_rx.recv() => {
340 Self::handle_exec_event(evt);
341 },
342 else => {
343 log::debug!("AsyncRunner all channels closed, exiting");
344 return;
345 }
346 };
347 }
348 }
349
350 #[inline]
352 pub fn handle_time_event(handler: TimeEventHandler) {
353 handler.run();
354 }
355
356 #[inline]
358 pub fn handle_data_command(cmd: DataCommand) {
359 msgbus::send_data_command(MessagingSwitchboard::data_engine_execute(), cmd);
360 }
361
362 #[inline]
364 pub fn handle_data_event(event: DataEvent) {
365 match event {
366 DataEvent::Data(data) => {
367 msgbus::send_data(MessagingSwitchboard::data_engine_process_data(), data);
368 }
369 DataEvent::Instrument(data) => {
370 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &data);
371 }
372 DataEvent::Response(resp) => {
373 msgbus::send_data_response(MessagingSwitchboard::data_engine_response(), resp);
374 }
375 DataEvent::FundingRate(funding_rate) => {
376 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &funding_rate);
377 }
378 DataEvent::InstrumentStatus(status) => {
379 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &status);
380 }
381 DataEvent::OptionGreeks(greeks) => {
382 msgbus::send_any(MessagingSwitchboard::data_engine_process(), &greeks);
383 }
384 #[cfg(feature = "defi")]
385 DataEvent::DeFi(data) => {
386 msgbus::send_defi_data(MessagingSwitchboard::data_engine_process_defi_data(), data);
387 }
388 }
389 }
390
391 #[inline]
393 pub fn handle_exec_command(cmd: TradingCommand) {
394 msgbus::send_trading_command(MessagingSwitchboard::exec_engine_execute(), cmd);
395 }
396
397 #[inline]
399 pub fn handle_exec_event(event: ExecutionEvent) {
400 match event {
401 ExecutionEvent::Order(order_event) => {
402 msgbus::send_order_event(MessagingSwitchboard::exec_engine_process(), order_event);
403 }
404 ExecutionEvent::OrderSubmittedBatch(batch) => {
405 for submitted in batch {
406 msgbus::send_order_event(
407 MessagingSwitchboard::exec_engine_process(),
408 OrderEventAny::Submitted(submitted),
409 );
410 }
411 }
412 ExecutionEvent::OrderAcceptedBatch(batch) => {
413 for accepted in batch {
414 msgbus::send_order_event(
415 MessagingSwitchboard::exec_engine_process(),
416 OrderEventAny::Accepted(accepted),
417 );
418 }
419 }
420 ExecutionEvent::OrderCanceledBatch(batch) => {
421 for canceled in batch {
422 msgbus::send_order_event(
423 MessagingSwitchboard::exec_engine_process(),
424 OrderEventAny::Canceled(canceled),
425 );
426 }
427 }
428 ExecutionEvent::Report(report) => {
429 Self::handle_exec_report(report);
430 }
431 ExecutionEvent::Account(ref account) => {
432 msgbus::send_account_state(
433 MessagingSwitchboard::portfolio_update_account(),
434 account,
435 );
436 }
437 }
438 }
439
440 #[inline]
441 pub fn handle_exec_report(report: ExecutionReport) {
442 let endpoint = MessagingSwitchboard::exec_engine_reconcile_execution_report();
443 msgbus::send_execution_report(endpoint, report);
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use std::time::Duration;
450
451 use nautilus_common::{
452 live::runner::{get_data_event_sender, get_exec_event_sender},
453 messages::{
454 ExecutionEvent, ExecutionReport,
455 data::{SubscribeCommand, SubscribeCustomData},
456 execution::{CancelAllOrders, TradingCommand},
457 },
458 runner::{
459 get_data_cmd_sender, get_time_event_sender, get_trading_cmd_sender,
460 try_get_time_event_sender, try_get_trading_cmd_sender,
461 },
462 timer::{TimeEvent, TimeEventCallback, TimeEventHandler},
463 };
464 use nautilus_core::{UUID4, UnixNanos};
465 use nautilus_model::{
466 data::{Data, DataType, quote::QuoteTick},
467 enums::{
468 AccountType, LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSideSpecified,
469 TimeInForce,
470 },
471 events::{
472 OrderAccepted, OrderAcceptedBatch, OrderCanceled, OrderCanceledBatch, OrderEvent,
473 OrderEventAny, OrderSubmitted, OrderSubmittedBatch, account::state::AccountState,
474 },
475 identifiers::{
476 AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId,
477 TraderId, VenueOrderId,
478 },
479 reports::{FillReport, OrderStatusReport, PositionStatusReport},
480 types::{Money, Price, Quantity},
481 };
482 use rstest::rstest;
483 use ustr::Ustr;
484
485 use super::*;
486
487 fn test_quote() -> QuoteTick {
489 QuoteTick {
490 instrument_id: InstrumentId::from("EUR/USD.SIM"),
491 bid_price: Price::from("1.10000"),
492 ask_price: Price::from("1.10001"),
493 bid_size: Quantity::from(1_000_000),
494 ask_size: Quantity::from(1_000_000),
495 ts_event: UnixNanos::default(),
496 ts_init: UnixNanos::default(),
497 }
498 }
499
500 fn create_test_runner(
504 time_evt_rx: tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
505 data_evt_rx: tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
506 data_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
507 exec_evt_rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
508 exec_cmd_rx: tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
509 signal_rx: tokio::sync::mpsc::UnboundedReceiver<()>,
510 signal_tx: tokio::sync::mpsc::UnboundedSender<()>,
511 ) -> AsyncRunner {
512 let (time_evt_tx, _) = tokio::sync::mpsc::unbounded_channel();
513 let (data_cmd_tx, _) = tokio::sync::mpsc::unbounded_channel();
514 let (data_evt_tx, _) = tokio::sync::mpsc::unbounded_channel();
515 let (exec_cmd_tx, _) = tokio::sync::mpsc::unbounded_channel();
516 let (exec_evt_tx, _) = tokio::sync::mpsc::unbounded_channel();
517
518 AsyncRunner {
519 channels: AsyncRunnerChannels {
520 time_evt_rx,
521 data_evt_rx,
522 data_cmd_rx,
523 exec_evt_rx,
524 exec_cmd_rx,
525 },
526 time_evt_tx,
527 data_cmd_tx,
528 data_evt_tx,
529 exec_cmd_tx,
530 exec_evt_tx,
531 signal_rx,
532 signal_tx,
533 }
534 }
535
536 #[rstest]
537 fn test_async_data_command_sender_creation() {
538 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
539 let sender = AsyncDataCommandSender::new(tx);
540 assert!(format!("{sender:?}").contains("AsyncDataCommandSender"));
541 }
542
543 #[rstest]
544 fn test_async_time_event_sender_creation() {
545 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
546 let sender = AsyncTimeEventSender::new(tx);
547 assert!(format!("{sender:?}").contains("AsyncTimeEventSender"));
548 }
549
550 #[rstest]
551 fn test_async_time_event_sender_get_channel() {
552 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
553 let sender = AsyncTimeEventSender::new(tx);
554 let channel = sender.get_channel_sender();
555
556 let event = TimeEvent::new(
558 Ustr::from("test"),
559 UUID4::new(),
560 UnixNanos::from(1),
561 UnixNanos::from(2),
562 );
563 let callback = TimeEventCallback::from(|_: TimeEvent| {});
564 let handler = TimeEventHandler::new(event, callback);
565
566 assert!(channel.send(handler).is_ok());
567 }
568
569 #[tokio::test]
570 async fn test_async_data_command_sender_execute() {
571 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
572 let sender = AsyncDataCommandSender::new(tx);
573
574 let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
575 client_id: Some(ClientId::from("TEST")),
576 venue: None,
577 data_type: DataType::new("QuoteTick", None, None),
578 command_id: UUID4::new(),
579 ts_init: UnixNanos::default(),
580 correlation_id: None,
581 params: None,
582 }));
583
584 sender.execute(command.clone());
585
586 let received = rx.recv().await.unwrap();
587 match (received, command) {
588 (
589 DataCommand::Subscribe(SubscribeCommand::Data(r)),
590 DataCommand::Subscribe(SubscribeCommand::Data(c)),
591 ) => {
592 assert_eq!(r.client_id, c.client_id);
593 assert_eq!(r.data_type, c.data_type);
594 }
595 _ => panic!("Command mismatch"),
596 }
597 }
598
599 #[tokio::test]
600 async fn test_async_time_event_sender_send() {
601 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
602 let sender = AsyncTimeEventSender::new(tx);
603
604 let event = TimeEvent::new(
605 Ustr::from("test"),
606 UUID4::new(),
607 UnixNanos::from(1),
608 UnixNanos::from(2),
609 );
610 let callback = TimeEventCallback::from(|_: TimeEvent| {});
611 let handler = TimeEventHandler::new(event, callback);
612
613 sender.send(handler);
614
615 assert!(rx.recv().await.is_some());
616 }
617
618 #[tokio::test]
619 async fn test_runner_shutdown_signal() {
620 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
622 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
623 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
624 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
625 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
626 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
627
628 let mut runner = create_test_runner(
629 time_evt_rx,
630 data_evt_rx,
631 data_cmd_rx,
632 exec_evt_rx,
633 exec_cmd_rx,
634 signal_rx,
635 signal_tx.clone(),
636 );
637
638 let runner_handle = tokio::spawn(async move {
640 runner.run().await;
641 });
642
643 signal_tx.send(()).unwrap();
645
646 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
648 assert!(result.is_ok(), "Runner should stop on signal");
649 }
650
651 #[tokio::test]
652 async fn test_runner_closes_on_channel_drop() {
653 let (data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
654 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
655 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
656 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
657 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
658 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
659
660 let mut runner = create_test_runner(
661 time_evt_rx,
662 data_evt_rx,
663 data_cmd_rx,
664 exec_evt_rx,
665 exec_cmd_rx,
666 signal_rx,
667 signal_tx.clone(),
668 );
669
670 let runner_handle = tokio::spawn(async move {
672 runner.run().await;
673 });
674
675 drop(data_tx);
676
677 tokio::task::yield_now().await;
679 signal_tx.send(()).ok();
680
681 let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
683 assert!(
684 result.is_ok(),
685 "Runner should stop when channels close or on signal"
686 );
687 }
688
689 #[tokio::test]
690 async fn test_concurrent_event_sending() {
691 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
692 let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
693 let (_time_evt_tx, time_evt_rx) =
694 tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
695 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
696 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
697 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
698
699 let mut runner = create_test_runner(
701 time_evt_rx,
702 data_evt_rx,
703 data_cmd_rx,
704 exec_evt_rx,
705 exec_cmd_rx,
706 signal_rx,
707 signal_tx.clone(),
708 );
709
710 let mut handles = vec![];
712
713 for _ in 0..5 {
714 let tx_clone = data_evt_tx.clone();
715
716 let handle = tokio::spawn(async move {
717 for _ in 0..20 {
718 let quote = test_quote();
719 tx_clone.send(DataEvent::Data(Data::Quote(quote))).unwrap();
720 tokio::task::yield_now().await;
721 }
722 });
723 handles.push(handle);
724 }
725
726 let runner_handle = tokio::spawn(async move {
728 runner.run().await;
729 });
730
731 for handle in handles {
733 handle.await.unwrap();
734 }
735
736 tokio::task::yield_now().await;
738 signal_tx.send(()).unwrap();
739
740 let _ = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
741 }
742
743 #[rstest]
744 #[case(10)]
745 #[case(100)]
746 #[case(1000)]
747 fn test_channel_send_performance(#[case] count: usize) {
748 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
749 let quote = test_quote();
750
751 for _ in 0..count {
753 tx.send(DataEvent::Data(Data::Quote(quote))).unwrap();
754 }
755
756 let mut received = 0;
758 while rx.try_recv().is_ok() {
759 received += 1;
760 }
761
762 assert_eq!(received, count);
763 }
764
765 #[rstest]
766 fn test_async_trading_command_sender_creation() {
767 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
768 let sender = AsyncTradingCommandSender::new(tx);
769 assert!(format!("{sender:?}").contains("AsyncTradingCommandSender"));
770 }
771
772 #[tokio::test]
773 async fn test_async_trading_command_sender_execute() {
774 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
775 let sender = AsyncTradingCommandSender::new(tx);
776
777 let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
778 TraderId::from("TRADER-001"),
779 None,
780 StrategyId::from("S-001"),
781 InstrumentId::from("EUR/USD.SIM"),
782 OrderSide::Buy,
783 UUID4::new(),
784 UnixNanos::default(),
785 None,
786 ));
787
788 sender.execute(command);
789
790 let received = rx.recv().await;
791 assert!(received.is_some());
792 assert!(matches!(
793 received.unwrap(),
794 TradingCommand::CancelAllOrders(_)
795 ));
796 }
797
798 #[tokio::test]
799 async fn test_runner_processes_trading_commands() {
800 let (_data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
801 let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
802 let (_time_evt_tx, time_evt_rx) =
803 tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
804 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
805 let (exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
806 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
807
808 let mut runner = create_test_runner(
809 time_evt_rx,
810 data_evt_rx,
811 data_cmd_rx,
812 exec_evt_rx,
813 exec_cmd_rx,
814 signal_rx,
815 signal_tx.clone(),
816 );
817
818 let runner_handle = tokio::spawn(async move {
819 runner.run().await;
820 });
821
822 let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
823 TraderId::from("TRADER-001"),
824 None,
825 StrategyId::from("S-001"),
826 InstrumentId::from("EUR/USD.SIM"),
827 OrderSide::Buy,
828 UUID4::new(),
829 UnixNanos::default(),
830 None,
831 ));
832 exec_cmd_tx.send(command).unwrap();
833
834 tokio::task::yield_now().await;
835 signal_tx.send(()).unwrap();
836
837 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
838 assert!(result.is_ok(), "Runner should process command and stop");
839 }
840
841 #[tokio::test]
842 async fn test_runner_processes_multiple_trading_commands() {
843 let (_data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
844 let (_data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
845 let (_time_evt_tx, time_evt_rx) =
846 tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
847 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
848 let (exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
849 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
850
851 let mut runner = create_test_runner(
852 time_evt_rx,
853 data_evt_rx,
854 data_cmd_rx,
855 exec_evt_rx,
856 exec_cmd_rx,
857 signal_rx,
858 signal_tx.clone(),
859 );
860
861 let runner_handle = tokio::spawn(async move {
862 runner.run().await;
863 });
864
865 for i in 0..10 {
866 let strategy_id = format!("S-{i:03}");
867 let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
868 TraderId::from("TRADER-001"),
869 None,
870 StrategyId::from(strategy_id.as_str()),
871 InstrumentId::from("EUR/USD.SIM"),
872 OrderSide::Buy,
873 UUID4::new(),
874 UnixNanos::default(),
875 None,
876 ));
877 exec_cmd_tx.send(command).unwrap();
878 }
879
880 tokio::task::yield_now().await;
881 signal_tx.send(()).unwrap();
882
883 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
884 assert!(
885 result.is_ok(),
886 "Runner should process all commands and stop"
887 );
888 }
889
890 #[tokio::test]
891 async fn test_execution_event_order_channel() {
892 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
893
894 let event = OrderSubmitted::new(
895 TraderId::from("TRADER-001"),
896 StrategyId::from("S-001"),
897 InstrumentId::from("EUR/USD.SIM"),
898 ClientOrderId::from("O-001"),
899 AccountId::from("SIM-001"),
900 UUID4::new(),
901 UnixNanos::from(1),
902 UnixNanos::from(2),
903 );
904
905 tx.send(ExecutionEvent::Order(OrderEventAny::Submitted(event)))
906 .unwrap();
907
908 let received = rx.recv().await.unwrap();
909 match received {
910 ExecutionEvent::Order(OrderEventAny::Submitted(e)) => {
911 assert_eq!(e.client_order_id(), ClientOrderId::from("O-001"));
912 }
913 _ => panic!("Expected OrderSubmitted event"),
914 }
915 }
916
917 #[tokio::test]
918 async fn test_execution_report_order_status_channel() {
919 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
920
921 let report = OrderStatusReport::new(
922 AccountId::from("SIM-001"),
923 InstrumentId::from("EUR/USD.SIM"),
924 Some(ClientOrderId::from("O-001")),
925 VenueOrderId::from("V-001"),
926 OrderSide::Buy,
927 OrderType::Market,
928 TimeInForce::Gtc,
929 OrderStatus::Accepted,
930 Quantity::from(100_000),
931 Quantity::from(100_000),
932 UnixNanos::from(1),
933 UnixNanos::from(2),
934 UnixNanos::from(3),
935 None,
936 );
937
938 tx.send(ExecutionEvent::Report(ExecutionReport::Order(Box::new(
939 report,
940 ))))
941 .unwrap();
942
943 let received = rx.recv().await.unwrap();
944 match received {
945 ExecutionEvent::Report(ExecutionReport::Order(r)) => {
946 assert_eq!(r.venue_order_id.as_str(), "V-001");
947 assert_eq!(r.order_status, OrderStatus::Accepted);
948 }
949 _ => panic!("Expected OrderStatusReport"),
950 }
951 }
952
953 #[tokio::test]
954 async fn test_execution_report_fill() {
955 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
956
957 let report = FillReport::new(
958 AccountId::from("SIM-001"),
959 InstrumentId::from("EUR/USD.SIM"),
960 VenueOrderId::from("V-001"),
961 TradeId::from("T-001"),
962 OrderSide::Buy,
963 Quantity::from(100_000),
964 Price::from("1.10000"),
965 Money::from("10 USD"),
966 LiquiditySide::Taker,
967 Some(ClientOrderId::from("O-001")),
968 None,
969 UnixNanos::from(1),
970 UnixNanos::from(2),
971 None,
972 );
973
974 tx.send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
975 report,
976 ))))
977 .unwrap();
978
979 let received = rx.recv().await.unwrap();
980 match received {
981 ExecutionEvent::Report(ExecutionReport::Fill(r)) => {
982 assert_eq!(r.venue_order_id.as_str(), "V-001");
983 assert_eq!(r.trade_id.to_string(), "T-001");
984 }
985 _ => panic!("Expected FillReport"),
986 }
987 }
988
989 #[tokio::test]
990 async fn test_execution_report_position() {
991 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
992
993 let report = PositionStatusReport::new(
994 AccountId::from("SIM-001"),
995 InstrumentId::from("EUR/USD.SIM"),
996 PositionSideSpecified::Long,
997 Quantity::from(100_000),
998 UnixNanos::from(1),
999 UnixNanos::from(2),
1000 None,
1001 Some(PositionId::from("P-001")),
1002 None,
1003 );
1004
1005 tx.send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
1006 report,
1007 ))))
1008 .unwrap();
1009
1010 let received = rx.recv().await.unwrap();
1011 match received {
1012 ExecutionEvent::Report(ExecutionReport::Position(r)) => {
1013 assert_eq!(r.venue_position_id.unwrap().as_str(), "P-001");
1014 }
1015 _ => panic!("Expected PositionStatusReport"),
1016 }
1017 }
1018
1019 #[tokio::test]
1020 async fn test_execution_event_account() {
1021 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1022
1023 let account_state = AccountState::new(
1024 AccountId::from("SIM-001"),
1025 AccountType::Cash,
1026 vec![],
1027 vec![],
1028 true,
1029 UUID4::new(),
1030 UnixNanos::from(1),
1031 UnixNanos::from(2),
1032 None,
1033 );
1034
1035 tx.send(ExecutionEvent::Account(account_state)).unwrap();
1036
1037 let received = rx.recv().await.unwrap();
1038 match received {
1039 ExecutionEvent::Account(r) => {
1040 assert_eq!(r.account_id.as_str(), "SIM-001");
1041 }
1042 _ => panic!("Expected AccountState"),
1043 }
1044 }
1045
1046 #[tokio::test]
1047 async fn test_runner_stop_method() {
1048 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1049 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1050 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
1051 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1052 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1053 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1054
1055 let mut runner = create_test_runner(
1056 time_evt_rx,
1057 data_evt_rx,
1058 data_cmd_rx,
1059 exec_evt_rx,
1060 exec_cmd_rx,
1061 signal_rx,
1062 signal_tx.clone(),
1063 );
1064
1065 let runner_handle = tokio::spawn(async move {
1066 runner.run().await;
1067 });
1068
1069 signal_tx.send(()).unwrap();
1071
1072 let result = tokio::time::timeout(Duration::from_millis(100), runner_handle).await;
1073 assert!(result.is_ok(), "Runner should stop when stop() is called");
1074 }
1075
1076 #[tokio::test]
1077 async fn test_all_event_types_integration() {
1078 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1079 let (data_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1080 let (time_evt_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
1081 let (exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1082 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1083 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1084
1085 let mut runner = create_test_runner(
1086 time_evt_rx,
1087 data_evt_rx,
1088 data_cmd_rx,
1089 exec_evt_rx,
1090 exec_cmd_rx,
1091 signal_rx,
1092 signal_tx.clone(),
1093 );
1094
1095 let runner_handle = tokio::spawn(async move {
1096 runner.run().await;
1097 });
1098
1099 let quote = test_quote();
1101 data_evt_tx
1102 .send(DataEvent::Data(Data::Quote(quote)))
1103 .unwrap();
1104
1105 let command = DataCommand::Subscribe(SubscribeCommand::Data(SubscribeCustomData {
1107 client_id: Some(ClientId::from("TEST")),
1108 venue: None,
1109 data_type: DataType::new("QuoteTick", None, None),
1110 command_id: UUID4::new(),
1111 ts_init: UnixNanos::default(),
1112 correlation_id: None,
1113 params: None,
1114 }));
1115 data_cmd_tx.send(command).unwrap();
1116
1117 let event = TimeEvent::new(
1119 Ustr::from("test"),
1120 UUID4::new(),
1121 UnixNanos::from(1),
1122 UnixNanos::from(2),
1123 );
1124 let callback = TimeEventCallback::from(|_: TimeEvent| {});
1125 let handler = TimeEventHandler::new(event, callback);
1126 time_evt_tx.send(handler).unwrap();
1127
1128 let order_event = OrderSubmitted::new(
1130 TraderId::from("TRADER-001"),
1131 StrategyId::from("S-001"),
1132 InstrumentId::from("EUR/USD.SIM"),
1133 ClientOrderId::from("O-001"),
1134 AccountId::from("SIM-001"),
1135 UUID4::new(),
1136 UnixNanos::from(1),
1137 UnixNanos::from(2),
1138 );
1139 exec_evt_tx
1140 .send(ExecutionEvent::Order(OrderEventAny::Submitted(order_event)))
1141 .unwrap();
1142
1143 let order_status = OrderStatusReport::new(
1145 AccountId::from("SIM-001"),
1146 InstrumentId::from("EUR/USD.SIM"),
1147 Some(ClientOrderId::from("O-001")),
1148 VenueOrderId::from("V-001"),
1149 OrderSide::Buy,
1150 OrderType::Market,
1151 TimeInForce::Gtc,
1152 OrderStatus::Accepted,
1153 Quantity::from(100_000),
1154 Quantity::from(100_000),
1155 UnixNanos::from(1),
1156 UnixNanos::from(2),
1157 UnixNanos::from(3),
1158 None,
1159 );
1160 exec_evt_tx
1161 .send(ExecutionEvent::Report(ExecutionReport::Order(Box::new(
1162 order_status,
1163 ))))
1164 .unwrap();
1165
1166 let fill = FillReport::new(
1168 AccountId::from("SIM-001"),
1169 InstrumentId::from("EUR/USD.SIM"),
1170 VenueOrderId::from("V-001"),
1171 TradeId::from("T-001"),
1172 OrderSide::Buy,
1173 Quantity::from(100_000),
1174 Price::from("1.10000"),
1175 Money::from("10 USD"),
1176 LiquiditySide::Taker,
1177 Some(ClientOrderId::from("O-001")),
1178 None,
1179 UnixNanos::from(1),
1180 UnixNanos::from(2),
1181 None,
1182 );
1183 exec_evt_tx
1184 .send(ExecutionEvent::Report(ExecutionReport::Fill(Box::new(
1185 fill,
1186 ))))
1187 .unwrap();
1188
1189 let position = PositionStatusReport::new(
1191 AccountId::from("SIM-001"),
1192 InstrumentId::from("EUR/USD.SIM"),
1193 PositionSideSpecified::Long,
1194 Quantity::from(100_000),
1195 UnixNanos::from(1),
1196 UnixNanos::from(2),
1197 None,
1198 Some(PositionId::from("P-001")),
1199 None,
1200 );
1201 exec_evt_tx
1202 .send(ExecutionEvent::Report(ExecutionReport::Position(Box::new(
1203 position,
1204 ))))
1205 .unwrap();
1206
1207 let account_state = AccountState::new(
1209 AccountId::from("SIM-001"),
1210 AccountType::Cash,
1211 vec![],
1212 vec![],
1213 true,
1214 UUID4::new(),
1215 UnixNanos::from(1),
1216 UnixNanos::from(2),
1217 None,
1218 );
1219 exec_evt_tx
1220 .send(ExecutionEvent::Account(account_state))
1221 .unwrap();
1222
1223 tokio::task::yield_now().await;
1225 signal_tx.send(()).unwrap();
1226
1227 let result = tokio::time::timeout(Duration::from_millis(200), runner_handle).await;
1228 assert!(
1229 result.is_ok(),
1230 "Runner should process all event types and stop cleanly"
1231 );
1232 }
1233
1234 #[tokio::test]
1235 async fn test_runner_handle_stops_runner() {
1236 let (_data_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1237 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1238 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
1239 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1240 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1241 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1242
1243 let mut runner = create_test_runner(
1244 time_evt_rx,
1245 data_evt_rx,
1246 data_cmd_rx,
1247 exec_evt_rx,
1248 exec_cmd_rx,
1249 signal_rx,
1250 signal_tx.clone(),
1251 );
1252
1253 let handle = runner.handle();
1255
1256 let runner_task = tokio::spawn(async move {
1257 runner.run().await;
1258 });
1259
1260 handle.stop();
1262
1263 let result = tokio::time::timeout(Duration::from_millis(100), runner_task).await;
1264 assert!(result.is_ok(), "Runner should stop via handle");
1265 }
1266
1267 #[tokio::test]
1268 async fn test_runner_handle_is_cloneable() {
1269 let (signal_tx, _signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1270 let handle = AsyncRunnerHandle { signal_tx };
1271
1272 let handle2 = handle.clone();
1273
1274 assert!(handle.signal_tx.send(()).is_ok());
1276 assert!(handle2.signal_tx.send(()).is_ok());
1277 }
1278
1279 #[tokio::test]
1280 async fn test_runner_processes_events_before_stop() {
1281 let (data_evt_tx, data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1282 let (_cmd_tx, data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1283 let (_time_tx, time_evt_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
1284 let (_exec_evt_tx, exec_evt_rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1285 let (_exec_cmd_tx, exec_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
1286 let (signal_tx, signal_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
1287
1288 let mut runner = create_test_runner(
1289 time_evt_rx,
1290 data_evt_rx,
1291 data_cmd_rx,
1292 exec_evt_rx,
1293 exec_cmd_rx,
1294 signal_rx,
1295 signal_tx.clone(),
1296 );
1297
1298 let handle = runner.handle();
1299
1300 for _ in 0..10 {
1302 let quote = test_quote();
1303 data_evt_tx
1304 .send(DataEvent::Data(Data::Quote(quote)))
1305 .unwrap();
1306 }
1307
1308 let runner_task = tokio::spawn(async move {
1309 runner.run().await;
1310 });
1311
1312 tokio::task::yield_now().await;
1314 handle.stop();
1315
1316 let result = tokio::time::timeout(Duration::from_millis(200), runner_task).await;
1317 assert!(result.is_ok(), "Runner should process events and stop");
1318 }
1319
1320 #[rstest]
1321 fn test_new_does_not_bind_tls() {
1322 std::thread::spawn(|| {
1323 let _runner = AsyncRunner::new();
1324 assert!(try_get_time_event_sender().is_none());
1325 assert!(try_get_trading_cmd_sender().is_none());
1326 })
1327 .join()
1328 .unwrap();
1329 }
1330
1331 #[rstest]
1332 fn test_bind_senders_routes_to_runner_channels() {
1333 std::thread::spawn(|| {
1334 let mut runner = AsyncRunner::new();
1335 runner.bind_senders();
1336
1337 get_data_cmd_sender().execute(DataCommand::Subscribe(SubscribeCommand::Data(
1338 SubscribeCustomData {
1339 client_id: Some(ClientId::from("TEST")),
1340 venue: None,
1341 data_type: DataType::new("test", None, None),
1342 command_id: UUID4::new(),
1343 ts_init: UnixNanos::default(),
1344 correlation_id: None,
1345 params: None,
1346 },
1347 )));
1348 assert!(runner.channels.data_cmd_rx.try_recv().is_ok());
1349
1350 get_trading_cmd_sender().execute(TradingCommand::CancelAllOrders(
1351 CancelAllOrders::new(
1352 TraderId::from("TRADER-001"),
1353 None,
1354 StrategyId::from("S-001"),
1355 InstrumentId::from("EUR/USD.SIM"),
1356 OrderSide::Buy,
1357 UUID4::new(),
1358 UnixNanos::default(),
1359 None,
1360 ),
1361 ));
1362 assert!(runner.channels.exec_cmd_rx.try_recv().is_ok());
1363
1364 let event = TimeEvent::new(
1365 Ustr::from("test"),
1366 UUID4::new(),
1367 UnixNanos::from(1),
1368 UnixNanos::from(2),
1369 );
1370 let callback = TimeEventCallback::from(|_: TimeEvent| {});
1371 get_time_event_sender().send(TimeEventHandler::new(event, callback));
1372 assert!(runner.channels.time_evt_rx.try_recv().is_ok());
1373
1374 get_data_event_sender()
1375 .send(DataEvent::Data(Data::Quote(test_quote())))
1376 .unwrap();
1377 assert!(runner.channels.data_evt_rx.try_recv().is_ok());
1378
1379 let account = AccountState::new(
1380 AccountId::from("SIM-001"),
1381 AccountType::Cash,
1382 vec![],
1383 vec![],
1384 true,
1385 UUID4::new(),
1386 UnixNanos::from(1),
1387 UnixNanos::from(2),
1388 None,
1389 );
1390 get_exec_event_sender()
1391 .send(ExecutionEvent::Account(account))
1392 .unwrap();
1393 assert!(runner.channels.exec_evt_rx.try_recv().is_ok());
1394 })
1395 .join()
1396 .unwrap();
1397 }
1398
1399 #[rstest]
1400 fn test_bind_senders_reclaims_tls_from_previous_runner() {
1401 std::thread::spawn(|| {
1402 let mut runner1 = AsyncRunner::new();
1403 runner1.bind_senders();
1404
1405 let mut runner2 = AsyncRunner::new();
1406 runner2.bind_senders();
1407
1408 get_data_cmd_sender().execute(DataCommand::Subscribe(SubscribeCommand::Data(
1409 SubscribeCustomData {
1410 client_id: Some(ClientId::from("TEST")),
1411 venue: None,
1412 data_type: DataType::new("test", None, None),
1413 command_id: UUID4::new(),
1414 ts_init: UnixNanos::default(),
1415 correlation_id: None,
1416 params: None,
1417 },
1418 )));
1419
1420 assert!(runner2.channels.data_cmd_rx.try_recv().is_ok());
1421 assert!(runner1.channels.data_cmd_rx.try_recv().is_err());
1422 })
1423 .join()
1424 .unwrap();
1425 }
1426
1427 #[tokio::test]
1428 async fn test_execution_event_order_submitted_batch_channel() {
1429 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1430
1431 let events = vec![
1432 OrderSubmitted::new(
1433 TraderId::from("TRADER-001"),
1434 StrategyId::from("S-001"),
1435 InstrumentId::from("EUR/USD.SIM"),
1436 ClientOrderId::from("O-001"),
1437 AccountId::from("SIM-001"),
1438 UUID4::new(),
1439 UnixNanos::from(1),
1440 UnixNanos::from(2),
1441 ),
1442 OrderSubmitted::new(
1443 TraderId::from("TRADER-001"),
1444 StrategyId::from("S-001"),
1445 InstrumentId::from("EUR/USD.SIM"),
1446 ClientOrderId::from("O-002"),
1447 AccountId::from("SIM-001"),
1448 UUID4::new(),
1449 UnixNanos::from(3),
1450 UnixNanos::from(4),
1451 ),
1452 ];
1453
1454 let batch = OrderSubmittedBatch::new(events);
1455 tx.send(ExecutionEvent::OrderSubmittedBatch(batch)).unwrap();
1456
1457 let received = rx.recv().await.unwrap();
1458 match received {
1459 ExecutionEvent::OrderSubmittedBatch(b) => {
1460 assert_eq!(b.len(), 2);
1461 assert_eq!(b.events[0].client_order_id, ClientOrderId::from("O-001"));
1462 assert_eq!(b.events[1].client_order_id, ClientOrderId::from("O-002"));
1463 }
1464 _ => panic!("Expected OrderSubmittedBatch event"),
1465 }
1466 }
1467
1468 #[tokio::test]
1469 async fn test_execution_event_order_accepted_batch_channel() {
1470 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1471
1472 let events = vec![
1473 OrderAccepted::new(
1474 TraderId::from("TRADER-001"),
1475 StrategyId::from("S-001"),
1476 InstrumentId::from("EUR/USD.SIM"),
1477 ClientOrderId::from("O-001"),
1478 VenueOrderId::from("V-001"),
1479 AccountId::from("SIM-001"),
1480 UUID4::new(),
1481 UnixNanos::from(1),
1482 UnixNanos::from(2),
1483 false,
1484 ),
1485 OrderAccepted::new(
1486 TraderId::from("TRADER-001"),
1487 StrategyId::from("S-001"),
1488 InstrumentId::from("EUR/USD.SIM"),
1489 ClientOrderId::from("O-002"),
1490 VenueOrderId::from("V-002"),
1491 AccountId::from("SIM-001"),
1492 UUID4::new(),
1493 UnixNanos::from(3),
1494 UnixNanos::from(4),
1495 false,
1496 ),
1497 ];
1498
1499 let batch = OrderAcceptedBatch::new(events);
1500 tx.send(ExecutionEvent::OrderAcceptedBatch(batch)).unwrap();
1501
1502 let received = rx.recv().await.unwrap();
1503 match received {
1504 ExecutionEvent::OrderAcceptedBatch(b) => {
1505 assert_eq!(b.len(), 2);
1506 assert_eq!(b.events[0].client_order_id, ClientOrderId::from("O-001"));
1507 assert_eq!(b.events[1].client_order_id, ClientOrderId::from("O-002"));
1508 }
1509 _ => panic!("Expected OrderAcceptedBatch event"),
1510 }
1511 }
1512
1513 #[tokio::test]
1514 async fn test_execution_event_order_canceled_batch_channel() {
1515 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
1516
1517 let events = vec![
1518 OrderCanceled::new(
1519 TraderId::from("TRADER-001"),
1520 StrategyId::from("S-001"),
1521 InstrumentId::from("EUR/USD.SIM"),
1522 ClientOrderId::from("O-001"),
1523 UUID4::new(),
1524 UnixNanos::from(1),
1525 UnixNanos::from(2),
1526 false,
1527 None,
1528 Some(AccountId::from("SIM-001")),
1529 ),
1530 OrderCanceled::new(
1531 TraderId::from("TRADER-001"),
1532 StrategyId::from("S-001"),
1533 InstrumentId::from("EUR/USD.SIM"),
1534 ClientOrderId::from("O-002"),
1535 UUID4::new(),
1536 UnixNanos::from(3),
1537 UnixNanos::from(4),
1538 false,
1539 None,
1540 Some(AccountId::from("SIM-001")),
1541 ),
1542 ];
1543
1544 let batch = OrderCanceledBatch::new(events);
1545 tx.send(ExecutionEvent::OrderCanceledBatch(batch)).unwrap();
1546
1547 let received = rx.recv().await.unwrap();
1548 match received {
1549 ExecutionEvent::OrderCanceledBatch(b) => {
1550 assert_eq!(b.len(), 2);
1551 assert_eq!(b.events[0].client_order_id, ClientOrderId::from("O-001"));
1552 assert_eq!(b.events[1].client_order_id, ClientOrderId::from("O-002"));
1553 }
1554 _ => panic!("Expected OrderCanceledBatch event"),
1555 }
1556 }
1557}