1use std::{any::Any, cell::RefCell, thread::LocalKey};
27
28use nautilus_core::UUID4;
29#[cfg(feature = "defi")]
30use nautilus_model::defi::{
31 Block, DefiData, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
32};
33use nautilus_model::{
34 data::{
35 Bar, Data, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate,
36 OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
37 option_chain::{OptionChainSlice, OptionGreeks},
38 },
39 events::{AccountState, OrderEventAny, PositionEvent},
40 orderbook::OrderBook,
41 orders::OrderAny,
42 position::Position,
43};
44use smallvec::SmallVec;
45use ustr::Ustr;
46
47use super::{
48 ACCOUNT_STATE_HANDLERS, ANY_HANDLERS, BAR_HANDLERS, BOOK_HANDLERS, DELTAS_HANDLERS,
49 DEPTH10_HANDLERS, FUNDING_RATE_HANDLERS, GREEKS_HANDLERS, HANDLER_BUFFER_CAP,
50 INDEX_PRICE_HANDLERS, MARK_PRICE_HANDLERS, OPTION_CHAIN_HANDLERS, OPTION_GREEKS_HANDLERS,
51 ORDER_EVENT_HANDLERS, POSITION_EVENT_HANDLERS, QUOTE_HANDLERS, TRADE_HANDLERS,
52 core::{MessageBus, Subscription},
53 get_message_bus,
54 matching::is_matching_backtracking,
55 mstr::{Endpoint, MStr, Pattern, Topic},
56 typed_handler::{ShareableMessageHandler, TypedHandler, TypedIntoHandler},
57};
58#[cfg(feature = "defi")]
59use super::{
60 DEFI_BLOCK_HANDLERS, DEFI_COLLECT_HANDLERS, DEFI_FLASH_HANDLERS, DEFI_LIQUIDITY_HANDLERS,
61 DEFI_POOL_HANDLERS, DEFI_SWAP_HANDLERS,
62};
63use crate::messages::{
64 data::{DataCommand, DataResponse},
65 execution::{ExecutionReport, TradingCommand},
66};
67
68pub fn register_any(endpoint: MStr<Endpoint>, handler: ShareableMessageHandler) {
70 log::debug!(
71 "Registering endpoint '{endpoint}' with handler ID {}",
72 handler.0.id(),
73 );
74 get_message_bus()
75 .borrow_mut()
76 .endpoints
77 .insert(endpoint, handler);
78}
79
80pub fn register_response_handler(correlation_id: &UUID4, handler: ShareableMessageHandler) {
82 if let Err(e) = get_message_bus()
83 .borrow_mut()
84 .register_response_handler(correlation_id, handler)
85 {
86 log::error!("Failed to register request handler: {e}");
87 }
88}
89
90pub fn register_quote_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<QuoteTick>) {
92 get_message_bus()
93 .borrow_mut()
94 .endpoints_quotes
95 .register(endpoint, handler);
96}
97
98pub fn register_trade_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<TradeTick>) {
100 get_message_bus()
101 .borrow_mut()
102 .endpoints_trades
103 .register(endpoint, handler);
104}
105
106pub fn register_bar_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<Bar>) {
108 get_message_bus()
109 .borrow_mut()
110 .endpoints_bars
111 .register(endpoint, handler);
112}
113
114pub fn register_order_event_endpoint(
116 endpoint: MStr<Endpoint>,
117 handler: TypedIntoHandler<OrderEventAny>,
118) {
119 get_message_bus()
120 .borrow_mut()
121 .endpoints_order_events
122 .register(endpoint, handler);
123}
124
125pub fn register_account_state_endpoint(
127 endpoint: MStr<Endpoint>,
128 handler: TypedHandler<AccountState>,
129) {
130 get_message_bus()
131 .borrow_mut()
132 .endpoints_account_state
133 .register(endpoint, handler);
134}
135
136pub fn register_trading_command_endpoint(
138 endpoint: MStr<Endpoint>,
139 handler: TypedIntoHandler<TradingCommand>,
140) {
141 get_message_bus()
142 .borrow_mut()
143 .endpoints_trading_commands
144 .register(endpoint, handler);
145}
146
147pub fn register_data_command_endpoint(
149 endpoint: MStr<Endpoint>,
150 handler: TypedIntoHandler<DataCommand>,
151) {
152 get_message_bus()
153 .borrow_mut()
154 .endpoints_data_commands
155 .register(endpoint, handler);
156}
157
158pub fn register_data_response_endpoint(
160 endpoint: MStr<Endpoint>,
161 handler: TypedIntoHandler<DataResponse>,
162) {
163 get_message_bus()
164 .borrow_mut()
165 .endpoints_data_responses
166 .register(endpoint, handler);
167}
168
169pub fn register_execution_report_endpoint(
171 endpoint: MStr<Endpoint>,
172 handler: TypedIntoHandler<ExecutionReport>,
173) {
174 get_message_bus()
175 .borrow_mut()
176 .endpoints_exec_reports
177 .register(endpoint, handler);
178}
179
180pub fn register_data_endpoint(endpoint: MStr<Endpoint>, handler: TypedIntoHandler<Data>) {
182 get_message_bus()
183 .borrow_mut()
184 .endpoints_data
185 .register(endpoint, handler);
186}
187
188#[cfg(feature = "defi")]
190pub fn register_defi_data_endpoint(endpoint: MStr<Endpoint>, handler: TypedIntoHandler<DefiData>) {
191 get_message_bus()
192 .borrow_mut()
193 .endpoints_defi_data
194 .register(endpoint, handler);
195}
196
197pub fn deregister_any(endpoint: MStr<Endpoint>) {
199 log::debug!("Deregistering endpoint '{endpoint}'");
200 get_message_bus()
201 .borrow_mut()
202 .endpoints
203 .shift_remove(&endpoint);
204}
205
206#[must_use]
208pub fn has_endpoint(endpoint: &str) -> bool {
209 let key: MStr<Endpoint> = Ustr::from(endpoint).into();
210 get_message_bus().borrow().get_endpoint(key).is_some()
211}
212
213pub fn subscribe_any(
224 pattern: MStr<Pattern>,
225 handler: ShareableMessageHandler,
226 priority: Option<u8>,
227) {
228 let msgbus = get_message_bus();
229 let mut msgbus_ref_mut = msgbus.borrow_mut();
230 let sub = Subscription::new(pattern, handler, priority);
231
232 log::debug!(
233 "Subscribing {:?} for pattern '{}'",
234 sub.handler,
235 sub.pattern
236 );
237
238 if msgbus_ref_mut.subscriptions.contains(&sub) {
239 log::warn!("{sub:?} already exists");
240 return;
241 }
242
243 for (topic, subs) in &mut msgbus_ref_mut.topics {
244 if is_matching_backtracking(*topic, sub.pattern) {
245 subs.push(sub.clone());
246 subs.sort();
247 log::debug!("Added subscription for '{topic}'");
248 }
249 }
250
251 msgbus_ref_mut.subscriptions.insert(sub);
252}
253
254pub fn subscribe_instruments(
256 pattern: MStr<Pattern>,
257 handler: ShareableMessageHandler,
258 priority: Option<u8>,
259) {
260 subscribe_any(pattern, handler, priority);
261}
262
263pub fn subscribe_instrument_close(
265 pattern: MStr<Pattern>,
266 handler: ShareableMessageHandler,
267 priority: Option<u8>,
268) {
269 subscribe_any(pattern, handler, priority);
270}
271
272pub fn subscribe_book_deltas(
274 pattern: MStr<Pattern>,
275 handler: TypedHandler<OrderBookDeltas>,
276 priority: Option<u8>,
277) {
278 get_message_bus()
279 .borrow_mut()
280 .router_deltas
281 .subscribe(pattern, handler, priority.unwrap_or(0));
282}
283
284pub fn subscribe_book_depth10(
286 pattern: MStr<Pattern>,
287 handler: TypedHandler<OrderBookDepth10>,
288 priority: Option<u8>,
289) {
290 get_message_bus().borrow_mut().router_depth10.subscribe(
291 pattern,
292 handler,
293 priority.unwrap_or(0),
294 );
295}
296
297pub fn subscribe_book_snapshots(
299 pattern: MStr<Pattern>,
300 handler: TypedHandler<OrderBook>,
301 priority: Option<u8>,
302) {
303 get_message_bus()
304 .borrow_mut()
305 .router_book_snapshots
306 .subscribe(pattern, handler, priority.unwrap_or(0));
307}
308
309pub fn subscribe_quotes(
311 pattern: MStr<Pattern>,
312 handler: TypedHandler<QuoteTick>,
313 priority: Option<u8>,
314) {
315 get_message_bus()
316 .borrow_mut()
317 .router_quotes
318 .subscribe(pattern, handler, priority.unwrap_or(0));
319}
320
321pub fn subscribe_trades(
323 pattern: MStr<Pattern>,
324 handler: TypedHandler<TradeTick>,
325 priority: Option<u8>,
326) {
327 get_message_bus()
328 .borrow_mut()
329 .router_trades
330 .subscribe(pattern, handler, priority.unwrap_or(0));
331}
332
333pub fn subscribe_bars(pattern: MStr<Pattern>, handler: TypedHandler<Bar>, priority: Option<u8>) {
335 get_message_bus()
336 .borrow_mut()
337 .router_bars
338 .subscribe(pattern, handler, priority.unwrap_or(0));
339}
340
341pub fn subscribe_mark_prices(
343 pattern: MStr<Pattern>,
344 handler: TypedHandler<MarkPriceUpdate>,
345 priority: Option<u8>,
346) {
347 get_message_bus().borrow_mut().router_mark_prices.subscribe(
348 pattern,
349 handler,
350 priority.unwrap_or(0),
351 );
352}
353
354pub fn subscribe_index_prices(
356 pattern: MStr<Pattern>,
357 handler: TypedHandler<IndexPriceUpdate>,
358 priority: Option<u8>,
359) {
360 get_message_bus()
361 .borrow_mut()
362 .router_index_prices
363 .subscribe(pattern, handler, priority.unwrap_or(0));
364}
365
366pub fn subscribe_funding_rates(
368 pattern: MStr<Pattern>,
369 handler: TypedHandler<FundingRateUpdate>,
370 priority: Option<u8>,
371) {
372 get_message_bus()
373 .borrow_mut()
374 .router_funding_rates
375 .subscribe(pattern, handler, priority.unwrap_or(0));
376}
377
378pub fn subscribe_greeks(
380 pattern: MStr<Pattern>,
381 handler: TypedHandler<GreeksData>,
382 priority: Option<u8>,
383) {
384 get_message_bus()
385 .borrow_mut()
386 .router_greeks
387 .subscribe(pattern, handler, priority.unwrap_or(0));
388}
389
390pub fn subscribe_option_greeks(
392 pattern: MStr<Pattern>,
393 handler: TypedHandler<OptionGreeks>,
394 priority: Option<u8>,
395) {
396 get_message_bus()
397 .borrow_mut()
398 .router_option_greeks
399 .subscribe(pattern, handler, priority.unwrap_or(0));
400}
401
402pub fn subscribe_option_chain(
404 pattern: MStr<Pattern>,
405 handler: TypedHandler<OptionChainSlice>,
406 priority: Option<u8>,
407) {
408 get_message_bus()
409 .borrow_mut()
410 .router_option_chain
411 .subscribe(pattern, handler, priority.unwrap_or(0));
412}
413
414pub fn subscribe_order_events(
416 pattern: MStr<Pattern>,
417 handler: TypedHandler<OrderEventAny>,
418 priority: Option<u8>,
419) {
420 get_message_bus()
421 .borrow_mut()
422 .router_order_events
423 .subscribe(pattern, handler, priority.unwrap_or(0));
424}
425
426pub fn subscribe_position_events(
428 pattern: MStr<Pattern>,
429 handler: TypedHandler<PositionEvent>,
430 priority: Option<u8>,
431) {
432 get_message_bus()
433 .borrow_mut()
434 .router_position_events
435 .subscribe(pattern, handler, priority.unwrap_or(0));
436}
437
438pub fn subscribe_account_state(
440 pattern: MStr<Pattern>,
441 handler: TypedHandler<AccountState>,
442 priority: Option<u8>,
443) {
444 get_message_bus()
445 .borrow_mut()
446 .router_account_state
447 .subscribe(pattern, handler, priority.unwrap_or(0));
448}
449
450pub fn subscribe_positions(
452 pattern: MStr<Pattern>,
453 handler: TypedHandler<Position>,
454 priority: Option<u8>,
455) {
456 get_message_bus().borrow_mut().router_positions.subscribe(
457 pattern,
458 handler,
459 priority.unwrap_or(0),
460 );
461}
462
463#[cfg(feature = "defi")]
465pub fn subscribe_defi_blocks(
466 pattern: MStr<Pattern>,
467 handler: TypedHandler<Block>,
468 priority: Option<u8>,
469) {
470 get_message_bus().borrow_mut().router_defi_blocks.subscribe(
471 pattern,
472 handler,
473 priority.unwrap_or(0),
474 );
475}
476
477#[cfg(feature = "defi")]
479pub fn subscribe_defi_pools(
480 pattern: MStr<Pattern>,
481 handler: TypedHandler<Pool>,
482 priority: Option<u8>,
483) {
484 get_message_bus().borrow_mut().router_defi_pools.subscribe(
485 pattern,
486 handler,
487 priority.unwrap_or(0),
488 );
489}
490
491#[cfg(feature = "defi")]
493pub fn subscribe_defi_swaps(
494 pattern: MStr<Pattern>,
495 handler: TypedHandler<PoolSwap>,
496 priority: Option<u8>,
497) {
498 get_message_bus().borrow_mut().router_defi_swaps.subscribe(
499 pattern,
500 handler,
501 priority.unwrap_or(0),
502 );
503}
504
505#[cfg(feature = "defi")]
507pub fn subscribe_defi_liquidity(
508 pattern: MStr<Pattern>,
509 handler: TypedHandler<PoolLiquidityUpdate>,
510 priority: Option<u8>,
511) {
512 get_message_bus()
513 .borrow_mut()
514 .router_defi_liquidity
515 .subscribe(pattern, handler, priority.unwrap_or(0));
516}
517
518#[cfg(feature = "defi")]
520pub fn subscribe_defi_collects(
521 pattern: MStr<Pattern>,
522 handler: TypedHandler<PoolFeeCollect>,
523 priority: Option<u8>,
524) {
525 get_message_bus()
526 .borrow_mut()
527 .router_defi_collects
528 .subscribe(pattern, handler, priority.unwrap_or(0));
529}
530
531#[cfg(feature = "defi")]
533pub fn subscribe_defi_flash(
534 pattern: MStr<Pattern>,
535 handler: TypedHandler<PoolFlash>,
536 priority: Option<u8>,
537) {
538 get_message_bus().borrow_mut().router_defi_flash.subscribe(
539 pattern,
540 handler,
541 priority.unwrap_or(0),
542 );
543}
544
545pub fn unsubscribe_instruments(pattern: MStr<Pattern>, handler: &ShareableMessageHandler) {
547 unsubscribe_any(pattern, handler);
548}
549
550pub fn unsubscribe_instrument_close(pattern: MStr<Pattern>, handler: &ShareableMessageHandler) {
552 unsubscribe_any(pattern, handler);
553}
554
555pub fn unsubscribe_book_deltas(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBookDeltas>) {
557 get_message_bus()
558 .borrow_mut()
559 .router_deltas
560 .unsubscribe(pattern, handler);
561}
562
563pub fn unsubscribe_book_depth10(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBookDepth10>) {
565 get_message_bus()
566 .borrow_mut()
567 .router_depth10
568 .unsubscribe(pattern, handler);
569}
570
571pub fn unsubscribe_book_snapshots(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBook>) {
573 get_message_bus()
574 .borrow_mut()
575 .router_book_snapshots
576 .unsubscribe(pattern, handler);
577}
578
579pub fn unsubscribe_quotes(pattern: MStr<Pattern>, handler: &TypedHandler<QuoteTick>) {
581 get_message_bus()
582 .borrow_mut()
583 .router_quotes
584 .unsubscribe(pattern, handler);
585}
586
587pub fn unsubscribe_trades(pattern: MStr<Pattern>, handler: &TypedHandler<TradeTick>) {
589 get_message_bus()
590 .borrow_mut()
591 .router_trades
592 .unsubscribe(pattern, handler);
593}
594
595pub fn unsubscribe_bars(pattern: MStr<Pattern>, handler: &TypedHandler<Bar>) {
597 get_message_bus()
598 .borrow_mut()
599 .router_bars
600 .unsubscribe(pattern, handler);
601}
602
603pub fn unsubscribe_mark_prices(pattern: MStr<Pattern>, handler: &TypedHandler<MarkPriceUpdate>) {
605 get_message_bus()
606 .borrow_mut()
607 .router_mark_prices
608 .unsubscribe(pattern, handler);
609}
610
611pub fn unsubscribe_index_prices(pattern: MStr<Pattern>, handler: &TypedHandler<IndexPriceUpdate>) {
613 get_message_bus()
614 .borrow_mut()
615 .router_index_prices
616 .unsubscribe(pattern, handler);
617}
618
619pub fn unsubscribe_funding_rates(
621 pattern: MStr<Pattern>,
622 handler: &TypedHandler<FundingRateUpdate>,
623) {
624 get_message_bus()
625 .borrow_mut()
626 .router_funding_rates
627 .unsubscribe(pattern, handler);
628}
629
630pub fn unsubscribe_account_state(pattern: MStr<Pattern>, handler: &TypedHandler<AccountState>) {
632 get_message_bus()
633 .borrow_mut()
634 .router_account_state
635 .unsubscribe(pattern, handler);
636}
637
638pub fn unsubscribe_order_events(pattern: MStr<Pattern>, handler: &TypedHandler<OrderEventAny>) {
640 get_message_bus()
641 .borrow_mut()
642 .router_order_events
643 .unsubscribe(pattern, handler);
644}
645
646pub fn unsubscribe_position_events(pattern: MStr<Pattern>, handler: &TypedHandler<PositionEvent>) {
648 get_message_bus()
649 .borrow_mut()
650 .router_position_events
651 .unsubscribe(pattern, handler);
652}
653
654pub fn remove_order_event_handler(pattern: MStr<Pattern>, handler_id: Ustr) {
656 get_message_bus()
657 .borrow_mut()
658 .router_order_events
659 .remove_handler(pattern, handler_id);
660}
661
662pub fn remove_position_event_handler(pattern: MStr<Pattern>, handler_id: Ustr) {
664 get_message_bus()
665 .borrow_mut()
666 .router_position_events
667 .remove_handler(pattern, handler_id);
668}
669
670pub fn unsubscribe_orders(pattern: MStr<Pattern>, handler: &TypedHandler<OrderAny>) {
672 get_message_bus()
673 .borrow_mut()
674 .router_orders
675 .unsubscribe(pattern, handler);
676}
677
678pub fn unsubscribe_positions(pattern: MStr<Pattern>, handler: &TypedHandler<Position>) {
680 get_message_bus()
681 .borrow_mut()
682 .router_positions
683 .unsubscribe(pattern, handler);
684}
685
686pub fn unsubscribe_greeks(pattern: MStr<Pattern>, handler: &TypedHandler<GreeksData>) {
688 get_message_bus()
689 .borrow_mut()
690 .router_greeks
691 .unsubscribe(pattern, handler);
692}
693
694pub fn unsubscribe_option_greeks(pattern: MStr<Pattern>, handler: &TypedHandler<OptionGreeks>) {
696 get_message_bus()
697 .borrow_mut()
698 .router_option_greeks
699 .unsubscribe(pattern, handler);
700}
701
702pub fn unsubscribe_option_chain(pattern: MStr<Pattern>, handler: &TypedHandler<OptionChainSlice>) {
704 get_message_bus()
705 .borrow_mut()
706 .router_option_chain
707 .unsubscribe(pattern, handler);
708}
709
710#[cfg(feature = "defi")]
712pub fn unsubscribe_defi_blocks(pattern: MStr<Pattern>, handler: &TypedHandler<Block>) {
713 get_message_bus()
714 .borrow_mut()
715 .router_defi_blocks
716 .unsubscribe(pattern, handler);
717}
718
719#[cfg(feature = "defi")]
721pub fn unsubscribe_defi_pools(pattern: MStr<Pattern>, handler: &TypedHandler<Pool>) {
722 get_message_bus()
723 .borrow_mut()
724 .router_defi_pools
725 .unsubscribe(pattern, handler);
726}
727
728#[cfg(feature = "defi")]
730pub fn unsubscribe_defi_swaps(pattern: MStr<Pattern>, handler: &TypedHandler<PoolSwap>) {
731 get_message_bus()
732 .borrow_mut()
733 .router_defi_swaps
734 .unsubscribe(pattern, handler);
735}
736
737#[cfg(feature = "defi")]
739pub fn unsubscribe_defi_liquidity(
740 pattern: MStr<Pattern>,
741 handler: &TypedHandler<PoolLiquidityUpdate>,
742) {
743 get_message_bus()
744 .borrow_mut()
745 .router_defi_liquidity
746 .unsubscribe(pattern, handler);
747}
748
749#[cfg(feature = "defi")]
751pub fn unsubscribe_defi_collects(pattern: MStr<Pattern>, handler: &TypedHandler<PoolFeeCollect>) {
752 get_message_bus()
753 .borrow_mut()
754 .router_defi_collects
755 .unsubscribe(pattern, handler);
756}
757
758#[cfg(feature = "defi")]
760pub fn unsubscribe_defi_flash(pattern: MStr<Pattern>, handler: &TypedHandler<PoolFlash>) {
761 get_message_bus()
762 .borrow_mut()
763 .router_defi_flash
764 .unsubscribe(pattern, handler);
765}
766
767pub fn unsubscribe_any(pattern: MStr<Pattern>, handler: &ShareableMessageHandler) {
769 log::debug!("Unsubscribing {handler:?} from pattern '{pattern}'");
770
771 let handler_id = handler.0.id();
772 let bus_rc = get_message_bus();
773 let mut bus = bus_rc.borrow_mut();
774
775 let count_before = bus.subscriptions.len();
776
777 bus.topics.values_mut().for_each(|subs| {
778 subs.retain(|s| !(s.pattern == pattern && s.handler_id == handler_id));
779 });
780
781 bus.subscriptions
782 .retain(|s| !(s.pattern == pattern && s.handler_id == handler_id));
783
784 let removed = bus.subscriptions.len() < count_before;
785
786 if removed {
787 log::debug!("Handler for pattern '{pattern}' was removed");
788 } else {
789 log::debug!("No matching handler for pattern '{pattern}' was found");
790 }
791}
792
793pub fn is_subscribed_any<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) -> bool {
795 let pattern = MStr::from(pattern.as_ref());
796 let sub = Subscription::new(pattern, handler, None);
797 get_message_bus().borrow().subscriptions.contains(&sub)
798}
799
800pub fn subscriptions_count_any<S: AsRef<str>>(topic: S) -> usize {
802 get_message_bus().borrow().subscriptions_count(topic)
803}
804
805pub fn subscriber_count_deltas(topic: MStr<Topic>) -> usize {
807 get_message_bus()
808 .borrow()
809 .router_deltas
810 .subscriber_count(topic)
811}
812
813pub fn subscriber_count_depth10(topic: MStr<Topic>) -> usize {
815 get_message_bus()
816 .borrow()
817 .router_depth10
818 .subscriber_count(topic)
819}
820
821pub fn subscriber_count_book_snapshots(topic: MStr<Topic>) -> usize {
823 get_message_bus()
824 .borrow()
825 .router_book_snapshots
826 .subscriber_count(topic)
827}
828
829pub fn exact_subscriber_count_bars(topic: MStr<Topic>) -> usize {
832 get_message_bus()
833 .borrow()
834 .router_bars
835 .exact_subscriber_count(topic)
836}
837
838pub fn publish_any(topic: MStr<Topic>, message: &dyn Any) {
840 let mut handlers = ANY_HANDLERS.with_borrow_mut(std::mem::take);
842
843 get_message_bus()
844 .borrow_mut()
845 .fill_matching_any_handlers(topic, &mut handlers);
846
847 for handler in &handlers {
848 handler.0.handle(message);
849 }
850
851 handlers.clear(); ANY_HANDLERS.with_borrow_mut(|buf| *buf = handlers);
853}
854
855pub fn publish_deltas(topic: MStr<Topic>, deltas: &OrderBookDeltas) {
857 publish_typed(
858 &DELTAS_HANDLERS,
859 |bus, h| bus.router_deltas.fill_matching_handlers(topic, h),
860 deltas,
861 );
862}
863
864pub fn publish_depth10(topic: MStr<Topic>, depth: &OrderBookDepth10) {
866 publish_typed(
867 &DEPTH10_HANDLERS,
868 |bus, h| bus.router_depth10.fill_matching_handlers(topic, h),
869 depth,
870 );
871}
872
873pub fn publish_book(topic: MStr<Topic>, book: &OrderBook) {
875 publish_typed(
876 &BOOK_HANDLERS,
877 |bus, h| bus.router_book_snapshots.fill_matching_handlers(topic, h),
878 book,
879 );
880}
881
882pub fn publish_quote(topic: MStr<Topic>, quote: &QuoteTick) {
884 publish_typed(
885 "E_HANDLERS,
886 |bus, h| bus.router_quotes.fill_matching_handlers(topic, h),
887 quote,
888 );
889}
890
891pub fn publish_trade(topic: MStr<Topic>, trade: &TradeTick) {
893 publish_typed(
894 &TRADE_HANDLERS,
895 |bus, h| bus.router_trades.fill_matching_handlers(topic, h),
896 trade,
897 );
898}
899
900pub fn publish_bar(topic: MStr<Topic>, bar: &Bar) {
902 publish_typed(
903 &BAR_HANDLERS,
904 |bus, h| bus.router_bars.fill_matching_handlers(topic, h),
905 bar,
906 );
907}
908
909pub fn publish_mark_price(topic: MStr<Topic>, mark_price: &MarkPriceUpdate) {
911 publish_typed(
912 &MARK_PRICE_HANDLERS,
913 |bus, h| bus.router_mark_prices.fill_matching_handlers(topic, h),
914 mark_price,
915 );
916}
917
918pub fn publish_index_price(topic: MStr<Topic>, index_price: &IndexPriceUpdate) {
920 publish_typed(
921 &INDEX_PRICE_HANDLERS,
922 |bus, h| bus.router_index_prices.fill_matching_handlers(topic, h),
923 index_price,
924 );
925}
926
927pub fn publish_funding_rate(topic: MStr<Topic>, funding_rate: &FundingRateUpdate) {
929 publish_typed(
930 &FUNDING_RATE_HANDLERS,
931 |bus, h| bus.router_funding_rates.fill_matching_handlers(topic, h),
932 funding_rate,
933 );
934}
935
936pub fn publish_greeks(topic: MStr<Topic>, greeks: &GreeksData) {
938 publish_typed(
939 &GREEKS_HANDLERS,
940 |bus, h| bus.router_greeks.fill_matching_handlers(topic, h),
941 greeks,
942 );
943}
944
945pub fn publish_option_greeks(topic: MStr<Topic>, option_greeks: &OptionGreeks) {
947 publish_typed(
948 &OPTION_GREEKS_HANDLERS,
949 |bus, h| bus.router_option_greeks.fill_matching_handlers(topic, h),
950 option_greeks,
951 );
952}
953
954pub fn publish_option_chain(topic: MStr<Topic>, slice: &OptionChainSlice) {
956 publish_typed(
957 &OPTION_CHAIN_HANDLERS,
958 |bus, h| bus.router_option_chain.fill_matching_handlers(topic, h),
959 slice,
960 );
961}
962
963pub fn publish_account_state(topic: MStr<Topic>, state: &AccountState) {
965 publish_typed(
966 &ACCOUNT_STATE_HANDLERS,
967 |bus, h| bus.router_account_state.fill_matching_handlers(topic, h),
968 state,
969 );
970}
971
972pub fn publish_order_event(topic: MStr<Topic>, event: &OrderEventAny) {
974 publish_typed(
975 &ORDER_EVENT_HANDLERS,
976 |bus, h| bus.router_order_events.fill_matching_handlers(topic, h),
977 event,
978 );
979}
980
981pub fn publish_position_event(topic: MStr<Topic>, event: &PositionEvent) {
983 publish_typed(
984 &POSITION_EVENT_HANDLERS,
985 |bus, h| bus.router_position_events.fill_matching_handlers(topic, h),
986 event,
987 );
988}
989
990#[cfg(feature = "defi")]
992pub fn publish_defi_block(topic: MStr<Topic>, block: &Block) {
993 publish_typed(
994 &DEFI_BLOCK_HANDLERS,
995 |bus, h| bus.router_defi_blocks.fill_matching_handlers(topic, h),
996 block,
997 );
998}
999
1000#[cfg(feature = "defi")]
1002pub fn publish_defi_pool(topic: MStr<Topic>, pool: &Pool) {
1003 publish_typed(
1004 &DEFI_POOL_HANDLERS,
1005 |bus, h| bus.router_defi_pools.fill_matching_handlers(topic, h),
1006 pool,
1007 );
1008}
1009
1010#[cfg(feature = "defi")]
1012pub fn publish_defi_swap(topic: MStr<Topic>, swap: &PoolSwap) {
1013 publish_typed(
1014 &DEFI_SWAP_HANDLERS,
1015 |bus, h| bus.router_defi_swaps.fill_matching_handlers(topic, h),
1016 swap,
1017 );
1018}
1019
1020#[cfg(feature = "defi")]
1022pub fn publish_defi_liquidity(topic: MStr<Topic>, update: &PoolLiquidityUpdate) {
1023 publish_typed(
1024 &DEFI_LIQUIDITY_HANDLERS,
1025 |bus, h| bus.router_defi_liquidity.fill_matching_handlers(topic, h),
1026 update,
1027 );
1028}
1029
1030#[cfg(feature = "defi")]
1032pub fn publish_defi_collect(topic: MStr<Topic>, collect: &PoolFeeCollect) {
1033 publish_typed(
1034 &DEFI_COLLECT_HANDLERS,
1035 |bus, h| bus.router_defi_collects.fill_matching_handlers(topic, h),
1036 collect,
1037 );
1038}
1039
1040#[cfg(feature = "defi")]
1042pub fn publish_defi_flash(topic: MStr<Topic>, flash: &PoolFlash) {
1043 publish_typed(
1044 &DEFI_FLASH_HANDLERS,
1045 |bus, h| bus.router_defi_flash.fill_matching_handlers(topic, h),
1046 flash,
1047 );
1048}
1049
1050#[inline]
1060fn publish_typed<T: 'static>(
1061 tls: &'static LocalKey<RefCell<SmallVec<[TypedHandler<T>; HANDLER_BUFFER_CAP]>>>,
1062 fill_fn: impl FnOnce(&mut MessageBus, &mut SmallVec<[TypedHandler<T>; HANDLER_BUFFER_CAP]>),
1063 message: &T,
1064) {
1065 let mut handlers = tls.with_borrow_mut(std::mem::take);
1067
1068 let bus_rc = get_message_bus();
1070 fill_fn(&mut bus_rc.borrow_mut(), &mut handlers);
1071
1072 for handler in &handlers {
1073 handler.handle(message);
1074 }
1075
1076 handlers.clear(); tls.with_borrow_mut(|buf| *buf = handlers);
1078}
1079
1080pub fn send_any(endpoint: MStr<Endpoint>, message: &dyn Any) {
1082 let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
1083
1084 if let Some(handler) = handler {
1085 handler.0.handle(message);
1086 } else {
1087 log::error!("send_any: no registered endpoint '{endpoint}'");
1088 }
1089}
1090
1091pub fn send_any_value<T: 'static>(endpoint: MStr<Endpoint>, message: &T) {
1093 let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
1094
1095 if let Some(handler) = handler {
1096 handler.0.handle(message);
1097 } else {
1098 log::error!("send_any_value: no registered endpoint '{endpoint}'");
1099 }
1100}
1101
1102pub fn send_response(correlation_id: &UUID4, message: &DataResponse) {
1104 let handler = get_message_bus()
1105 .borrow()
1106 .get_response_handler(correlation_id)
1107 .cloned();
1108
1109 if let Some(handler) = handler {
1110 match message {
1111 DataResponse::Data(resp) => handler.0.handle(resp),
1112 DataResponse::Instrument(resp) => handler.0.handle(resp.as_ref()),
1113 DataResponse::Instruments(resp) => handler.0.handle(resp),
1114 DataResponse::Book(resp) => handler.0.handle(resp),
1115 DataResponse::Quotes(resp) => handler.0.handle(resp),
1116 DataResponse::Trades(resp) => handler.0.handle(resp),
1117 DataResponse::FundingRates(resp) => handler.0.handle(resp),
1118 DataResponse::ForwardPrices(resp) => handler.0.handle(resp),
1119 DataResponse::Bars(resp) => handler.0.handle(resp),
1120 }
1121 } else {
1122 log::error!("send_response: handler not found for correlation_id '{correlation_id}'");
1123 }
1124}
1125
1126pub fn send_quote(endpoint: MStr<Endpoint>, quote: &QuoteTick) {
1128 send_endpoint_ref(
1129 endpoint,
1130 quote,
1131 |bus| bus.endpoints_quotes.get(endpoint),
1132 "send_quote",
1133 );
1134}
1135
1136pub fn send_trade(endpoint: MStr<Endpoint>, trade: &TradeTick) {
1138 send_endpoint_ref(
1139 endpoint,
1140 trade,
1141 |bus| bus.endpoints_trades.get(endpoint),
1142 "send_trade",
1143 );
1144}
1145
1146pub fn send_bar(endpoint: MStr<Endpoint>, bar: &Bar) {
1148 send_endpoint_ref(
1149 endpoint,
1150 bar,
1151 |bus| bus.endpoints_bars.get(endpoint),
1152 "send_bar",
1153 );
1154}
1155
1156pub fn send_order_event(endpoint: MStr<Endpoint>, event: OrderEventAny) {
1158 send_endpoint_owned(
1159 endpoint,
1160 event,
1161 |bus| bus.endpoints_order_events.get(endpoint),
1162 "send_order_event",
1163 );
1164}
1165
1166pub fn send_account_state(endpoint: MStr<Endpoint>, state: &AccountState) {
1168 send_endpoint_ref(
1169 endpoint,
1170 state,
1171 |bus| bus.endpoints_account_state.get(endpoint),
1172 "send_account_state",
1173 );
1174}
1175
1176pub fn send_trading_command(endpoint: MStr<Endpoint>, command: TradingCommand) {
1178 send_endpoint_owned(
1179 endpoint,
1180 command,
1181 |bus| bus.endpoints_trading_commands.get(endpoint),
1182 "send_trading_command",
1183 );
1184}
1185
1186pub fn send_data_command(endpoint: MStr<Endpoint>, command: DataCommand) {
1188 send_endpoint_owned(
1189 endpoint,
1190 command,
1191 |bus| bus.endpoints_data_commands.get(endpoint),
1192 "send_data_command",
1193 );
1194}
1195
1196pub fn send_data_response(endpoint: MStr<Endpoint>, response: DataResponse) {
1198 send_endpoint_owned(
1199 endpoint,
1200 response,
1201 |bus| bus.endpoints_data_responses.get(endpoint),
1202 "send_data_response",
1203 );
1204}
1205
1206pub fn send_execution_report(endpoint: MStr<Endpoint>, report: ExecutionReport) {
1208 send_endpoint_owned(
1209 endpoint,
1210 report,
1211 |bus| bus.endpoints_exec_reports.get(endpoint),
1212 "send_execution_report",
1213 );
1214}
1215
1216pub fn send_data(endpoint: MStr<Endpoint>, data: Data) {
1218 send_endpoint_owned(
1219 endpoint,
1220 data,
1221 |bus| bus.endpoints_data.get(endpoint),
1222 "send_data",
1223 );
1224}
1225
1226#[cfg(feature = "defi")]
1228pub fn send_defi_data(endpoint: MStr<Endpoint>, data: DefiData) {
1229 send_endpoint_owned(
1230 endpoint,
1231 data,
1232 |bus| bus.endpoints_defi_data.get(endpoint),
1233 "send_defi_data",
1234 );
1235}
1236
1237#[inline]
1238fn send_endpoint_ref<T: 'static, F>(
1239 endpoint: MStr<Endpoint>,
1240 message: &T,
1241 get_handler: F,
1242 fn_name: &str,
1243) where
1244 F: FnOnce(&MessageBus) -> Option<&TypedHandler<T>>,
1245{
1246 let handler = {
1247 let bus = get_message_bus();
1248 get_handler(&bus.borrow()).cloned()
1249 };
1250
1251 if let Some(handler) = handler {
1252 handler.handle(message);
1253 } else {
1254 log::error!("{fn_name}: no registered endpoint '{endpoint}'");
1255 }
1256}
1257
1258#[inline]
1259fn send_endpoint_owned<T: 'static, F>(
1260 endpoint: MStr<Endpoint>,
1261 message: T,
1262 get_handler: F,
1263 fn_name: &str,
1264) where
1265 F: FnOnce(&MessageBus) -> Option<&TypedIntoHandler<T>>,
1266{
1267 let handler = {
1268 let bus = get_message_bus();
1269 get_handler(&bus.borrow()).cloned()
1270 };
1271
1272 if let Some(handler) = handler {
1273 handler.handle(message);
1274 } else {
1275 log::error!("{fn_name}: no registered endpoint '{endpoint}'");
1276 }
1277}
1278
1279#[cfg(test)]
1280mod tests {
1281 use std::{cell::RefCell, rc::Rc};
1289
1290 use nautilus_core::UUID4;
1291 use nautilus_model::{
1292 data::{Bar, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
1293 enums::OrderSide,
1294 events::OrderDenied,
1295 identifiers::{ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId},
1296 };
1297 use rstest::rstest;
1298
1299 use super::*;
1300 use crate::messages::{
1301 data::{DataCommand, SubscribeCommand, SubscribeQuotes},
1302 execution::{CancelAllOrders, TradingCommand},
1303 };
1304
1305 #[rstest]
1306 fn test_typed_quote_publish_subscribe_integration() {
1307 let _msgbus = get_message_bus();
1308 let received = Rc::new(RefCell::new(Vec::new()));
1309 let received_clone = received.clone();
1310
1311 let handler = TypedHandler::from(move |quote: &QuoteTick| {
1312 received_clone.borrow_mut().push(*quote);
1313 });
1314
1315 subscribe_quotes("data.quotes.*".into(), handler, None);
1316
1317 let quote = QuoteTick::default();
1318 publish_quote("data.quotes.TEST".into(), "e);
1319 publish_quote("data.quotes.TEST".into(), "e);
1320
1321 assert_eq!(received.borrow().len(), 2);
1322 }
1323
1324 #[rstest]
1325 fn test_typed_trade_publish_subscribe_integration() {
1326 let _msgbus = get_message_bus();
1327 let received = Rc::new(RefCell::new(Vec::new()));
1328 let received_clone = received.clone();
1329
1330 let handler = TypedHandler::from(move |trade: &TradeTick| {
1331 received_clone.borrow_mut().push(*trade);
1332 });
1333
1334 subscribe_trades("data.trades.*".into(), handler, None);
1335
1336 let trade = TradeTick::default();
1337 publish_trade("data.trades.TEST".into(), &trade);
1338
1339 assert_eq!(received.borrow().len(), 1);
1340 }
1341
1342 #[rstest]
1343 fn test_typed_bar_publish_subscribe_integration() {
1344 let _msgbus = get_message_bus();
1345 let received = Rc::new(RefCell::new(Vec::new()));
1346 let received_clone = received.clone();
1347
1348 let handler = TypedHandler::from(move |bar: &Bar| {
1349 received_clone.borrow_mut().push(*bar);
1350 });
1351
1352 subscribe_bars("data.bars.*".into(), handler, None);
1353
1354 let bar = Bar::default();
1355 publish_bar("data.bars.TEST".into(), &bar);
1356
1357 assert_eq!(received.borrow().len(), 1);
1358 }
1359
1360 #[rstest]
1361 fn test_typed_deltas_publish_subscribe_integration() {
1362 let _msgbus = get_message_bus();
1363 let received = Rc::new(RefCell::new(Vec::new()));
1364 let received_clone = received.clone();
1365
1366 let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1367 received_clone.borrow_mut().push(deltas.clone());
1368 });
1369
1370 subscribe_book_deltas("data.book.deltas.*".into(), handler, None);
1371
1372 let instrument_id = InstrumentId::from("TEST.VENUE");
1373 let delta = OrderBookDelta::clear(instrument_id, 0, 1.into(), 2.into());
1374 let deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
1375 publish_deltas("data.book.deltas.TEST".into(), &deltas);
1376
1377 assert_eq!(received.borrow().len(), 1);
1378 }
1379
1380 #[rstest]
1381 fn test_typed_unsubscribe_stops_delivery() {
1382 let _msgbus = get_message_bus();
1383 let received = Rc::new(RefCell::new(Vec::new()));
1384 let received_clone = received.clone();
1385
1386 let handler = TypedHandler::from_with_id("unsub-test", move |quote: &QuoteTick| {
1387 received_clone.borrow_mut().push(*quote);
1388 });
1389
1390 subscribe_quotes("data.quotes.UNSUB".into(), handler.clone(), None);
1391
1392 let quote = QuoteTick::default();
1393 publish_quote("data.quotes.UNSUB".into(), "e);
1394 assert_eq!(received.borrow().len(), 1);
1395
1396 unsubscribe_quotes("data.quotes.UNSUB".into(), &handler);
1397
1398 publish_quote("data.quotes.UNSUB".into(), "e);
1399 assert_eq!(received.borrow().len(), 1);
1400 }
1401
1402 #[rstest]
1403 fn test_typed_wildcard_pattern_matching() {
1404 let _msgbus = get_message_bus();
1405 let received = Rc::new(RefCell::new(Vec::new()));
1406 let received_clone = received.clone();
1407
1408 let handler = TypedHandler::from(move |quote: &QuoteTick| {
1409 received_clone.borrow_mut().push(*quote);
1410 });
1411
1412 subscribe_quotes("data.quotes.WILD.*".into(), handler, None);
1413
1414 let quote = QuoteTick::default();
1415 publish_quote("data.quotes.WILD.AAPL".into(), "e);
1416 publish_quote("data.quotes.WILD.MSFT".into(), "e);
1417 publish_quote("data.quotes.OTHER.AAPL".into(), "e);
1418
1419 assert_eq!(received.borrow().len(), 2);
1420 }
1421
1422 #[rstest]
1423 fn test_typed_priority_ordering() {
1424 let _msgbus = get_message_bus();
1425 let order = Rc::new(RefCell::new(Vec::new()));
1426
1427 let order1 = order.clone();
1428 let handler_low = TypedHandler::from_with_id("low-priority", move |_: &QuoteTick| {
1429 order1.borrow_mut().push("low");
1430 });
1431
1432 let order2 = order.clone();
1433 let handler_high = TypedHandler::from_with_id("high-priority", move |_: &QuoteTick| {
1434 order2.borrow_mut().push("high");
1435 });
1436
1437 subscribe_quotes("data.quotes.PRIO.*".into(), handler_low, Some(1));
1438 subscribe_quotes("data.quotes.PRIO.*".into(), handler_high, Some(10));
1439
1440 let quote = QuoteTick::default();
1441 publish_quote("data.quotes.PRIO.TEST".into(), "e);
1442
1443 assert_eq!(*order.borrow(), vec!["high", "low"]);
1444 }
1445
1446 #[rstest]
1447 fn test_typed_routing_isolation() {
1448 let _msgbus = get_message_bus();
1449 let quote_received = Rc::new(RefCell::new(false));
1450 let trade_received = Rc::new(RefCell::new(false));
1451
1452 let qr = quote_received.clone();
1453 let quote_handler = TypedHandler::from(move |_: &QuoteTick| {
1454 *qr.borrow_mut() = true;
1455 });
1456
1457 let tr = trade_received.clone();
1458 let trade_handler = TypedHandler::from(move |_: &TradeTick| {
1459 *tr.borrow_mut() = true;
1460 });
1461
1462 subscribe_quotes("data.iso.*".into(), quote_handler, None);
1463 subscribe_trades("data.iso.*".into(), trade_handler, None);
1464
1465 let quote = QuoteTick::default();
1466 publish_quote("data.iso.TEST".into(), "e);
1467
1468 assert!(*quote_received.borrow());
1469 assert!(!*trade_received.borrow());
1470 }
1471
1472 #[rstest]
1473 fn test_send_data_allows_reentrant_topic_access() {
1474 use crate::msgbus::switchboard::get_quotes_topic;
1475
1476 let _msgbus = get_message_bus();
1477 let topic_retrieved = Rc::new(RefCell::new(false));
1478 let topic_clone = topic_retrieved.clone();
1479
1480 let handler = TypedIntoHandler::from(move |data: Data| {
1481 let instrument_id = data.instrument_id();
1482 let _topic = get_quotes_topic(instrument_id);
1483 *topic_clone.borrow_mut() = true;
1484 });
1485
1486 let endpoint: MStr<Endpoint> = "ReentrantTest.data".into();
1487 register_data_endpoint(endpoint, handler);
1488
1489 let quote = QuoteTick::default();
1490 send_data(endpoint, Data::Quote(quote));
1491
1492 assert!(*topic_retrieved.borrow());
1493 }
1494
1495 #[rstest]
1496 fn test_send_trading_command_allows_reentrant_topic_access() {
1497 use nautilus_model::{
1498 enums::OrderSide,
1499 identifiers::{StrategyId, TraderId},
1500 };
1501
1502 use crate::{
1503 messages::execution::{TradingCommand, cancel::CancelAllOrders},
1504 msgbus::switchboard::get_trades_topic,
1505 };
1506
1507 let _msgbus = get_message_bus();
1508 let topic_retrieved = Rc::new(RefCell::new(false));
1509 let topic_clone = topic_retrieved.clone();
1510
1511 let handler = TypedIntoHandler::from(move |cmd: TradingCommand| {
1512 let instrument_id = cmd.instrument_id();
1513 let _topic = get_trades_topic(instrument_id);
1514 *topic_clone.borrow_mut() = true;
1515 });
1516
1517 let endpoint: MStr<Endpoint> = "ReentrantTest.tradingCmd".into();
1518 register_trading_command_endpoint(endpoint, handler);
1519
1520 let cmd = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1521 TraderId::new("TESTER-001"),
1522 None,
1523 StrategyId::new("S-001"),
1524 InstrumentId::from("TEST.VENUE"),
1525 OrderSide::NoOrderSide,
1526 UUID4::new(),
1527 0.into(),
1528 None,
1529 ));
1530 send_trading_command(endpoint, cmd);
1531
1532 assert!(*topic_retrieved.borrow());
1533 }
1534
1535 #[rstest]
1536 fn test_send_account_state_allows_reentrant_topic_access() {
1537 use nautilus_model::{enums::AccountType, identifiers::AccountId, types::Currency};
1538
1539 use crate::msgbus::switchboard::get_quotes_topic;
1540
1541 let _msgbus = get_message_bus();
1542 let topic_retrieved = Rc::new(RefCell::new(false));
1543 let topic_clone = topic_retrieved.clone();
1544
1545 let handler = TypedHandler::from(move |_state: &AccountState| {
1546 let instrument_id = InstrumentId::from("TEST.VENUE");
1547 let _topic = get_quotes_topic(instrument_id);
1548 *topic_clone.borrow_mut() = true;
1549 });
1550
1551 let endpoint: MStr<Endpoint> = "ReentrantTest.accountState".into();
1552 register_account_state_endpoint(endpoint, handler);
1553
1554 let state = AccountState::new(
1555 AccountId::new("SIM-001"),
1556 AccountType::Cash,
1557 vec![],
1558 vec![],
1559 true,
1560 UUID4::new(),
1561 0.into(),
1562 0.into(),
1563 Some(Currency::USD()),
1564 );
1565 send_account_state(endpoint, &state);
1566
1567 assert!(*topic_retrieved.borrow());
1568 }
1569
1570 #[rstest]
1571 fn test_send_order_event_allows_reentrant_topic_access() {
1572 use nautilus_model::{
1573 events::OrderDenied,
1574 identifiers::{ClientOrderId, StrategyId, TraderId},
1575 };
1576
1577 use crate::msgbus::switchboard::get_quotes_topic;
1578
1579 let _msgbus = get_message_bus();
1580 let topic_retrieved = Rc::new(RefCell::new(false));
1581 let topic_clone = topic_retrieved.clone();
1582
1583 let handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1584 let instrument_id = InstrumentId::from("TEST.VENUE");
1585 let _topic = get_quotes_topic(instrument_id);
1586 *topic_clone.borrow_mut() = true;
1587 });
1588
1589 let endpoint: MStr<Endpoint> = "ReentrantTest.orderEvent".into();
1590 register_order_event_endpoint(endpoint, handler);
1591
1592 let event = OrderEventAny::Denied(OrderDenied::new(
1593 TraderId::new("TESTER-001"),
1594 StrategyId::new("S-001"),
1595 InstrumentId::from("TEST.VENUE"),
1596 ClientOrderId::new("O-001"),
1597 "test denied".into(),
1598 UUID4::new(),
1599 0.into(),
1600 0.into(),
1601 ));
1602 send_order_event(endpoint, event);
1603
1604 assert!(*topic_retrieved.borrow());
1605 }
1606
1607 #[rstest]
1608 fn test_send_data_command_allows_reentrant_topic_access() {
1609 use nautilus_model::identifiers::ClientId;
1610
1611 use crate::{
1612 messages::data::{DataCommand, SubscribeCommand, SubscribeQuotes},
1613 msgbus::switchboard::get_trades_topic,
1614 };
1615
1616 let _msgbus = get_message_bus();
1617 let topic_retrieved = Rc::new(RefCell::new(false));
1618 let topic_clone = topic_retrieved.clone();
1619
1620 let handler = TypedIntoHandler::from(move |_cmd: DataCommand| {
1621 let _topic = get_trades_topic(InstrumentId::from("TEST.VENUE"));
1622 *topic_clone.borrow_mut() = true;
1623 });
1624
1625 let endpoint: MStr<Endpoint> = "ReentrantTest.dataCmd".into();
1626 register_data_command_endpoint(endpoint, handler);
1627
1628 let cmd = DataCommand::Subscribe(SubscribeCommand::Quotes(SubscribeQuotes::new(
1629 InstrumentId::from("TEST.VENUE"),
1630 Some(ClientId::new("SIM")),
1631 None,
1632 UUID4::new(),
1633 0.into(),
1634 None,
1635 None,
1636 )));
1637 send_data_command(endpoint, cmd);
1638
1639 assert!(*topic_retrieved.borrow());
1640 }
1641
1642 #[rstest]
1643 fn test_send_data_response_allows_reentrant_topic_access() {
1644 use nautilus_model::identifiers::ClientId;
1645
1646 use crate::{
1647 messages::data::{DataResponse, QuotesResponse},
1648 msgbus::switchboard::get_quotes_topic,
1649 };
1650
1651 let _msgbus = get_message_bus();
1652 let topic_retrieved = Rc::new(RefCell::new(false));
1653 let topic_clone = topic_retrieved.clone();
1654
1655 let handler = TypedIntoHandler::from(move |_resp: DataResponse| {
1656 let _topic = get_quotes_topic(InstrumentId::from("TEST.VENUE"));
1657 *topic_clone.borrow_mut() = true;
1658 });
1659
1660 let endpoint: MStr<Endpoint> = "ReentrantTest.dataResp".into();
1661 register_data_response_endpoint(endpoint, handler);
1662
1663 let resp = DataResponse::Quotes(QuotesResponse {
1664 correlation_id: UUID4::new(),
1665 client_id: ClientId::new("SIM"),
1666 instrument_id: InstrumentId::from("TEST.VENUE"),
1667 data: vec![],
1668 start: None,
1669 end: None,
1670 ts_init: 0.into(),
1671 params: None,
1672 });
1673 send_data_response(endpoint, resp);
1674
1675 assert!(*topic_retrieved.borrow());
1676 }
1677
1678 #[rstest]
1679 fn test_send_execution_report_allows_reentrant_topic_access() {
1680 use nautilus_model::{
1681 identifiers::{AccountId, ClientId, Venue},
1682 reports::ExecutionMassStatus,
1683 };
1684
1685 use crate::{messages::execution::ExecutionReport, msgbus::switchboard::get_trades_topic};
1686
1687 let _msgbus = get_message_bus();
1688 let topic_retrieved = Rc::new(RefCell::new(false));
1689 let topic_clone = topic_retrieved.clone();
1690
1691 let handler = TypedIntoHandler::from(move |_report: ExecutionReport| {
1692 let _topic = get_trades_topic(InstrumentId::from("TEST.VENUE"));
1693 *topic_clone.borrow_mut() = true;
1694 });
1695
1696 let endpoint: MStr<Endpoint> = "ReentrantTest.execReport".into();
1697 register_execution_report_endpoint(endpoint, handler);
1698
1699 let report = ExecutionReport::MassStatus(Box::new(ExecutionMassStatus::new(
1700 ClientId::new("SIM"),
1701 AccountId::new("SIM-001"),
1702 Venue::new("TEST"),
1703 0.into(),
1704 None,
1705 )));
1706 send_execution_report(endpoint, report);
1707
1708 assert!(*topic_retrieved.borrow());
1709 }
1710
1711 #[rstest]
1712 fn test_order_event_handler_can_send_trading_command() {
1713 let _msgbus = get_message_bus();
1717 let command_sent = Rc::new(RefCell::new(false));
1718 let command_sent_clone = command_sent.clone();
1719
1720 let cmd_received = Rc::new(RefCell::new(false));
1721 let cmd_received_clone = cmd_received.clone();
1722 let cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
1723 *cmd_received_clone.borrow_mut() = true;
1724 });
1725 let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.execCmd".into();
1726 register_trading_command_endpoint(cmd_endpoint, cmd_handler);
1727
1728 let event_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1729 let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1731 TraderId::new("TESTER-001"),
1732 None,
1733 StrategyId::new("S-001"),
1734 InstrumentId::from("TEST.VENUE"),
1735 OrderSide::Buy,
1736 UUID4::new(),
1737 0.into(),
1738 None,
1739 ));
1740 send_trading_command(cmd_endpoint, command);
1741 *command_sent_clone.borrow_mut() = true;
1742 });
1743
1744 let event_endpoint: MStr<Endpoint> = "ReentrantTest.orderEvt".into();
1745 register_order_event_endpoint(event_endpoint, event_handler);
1746
1747 let event = OrderEventAny::Denied(OrderDenied::new(
1748 TraderId::new("TESTER-001"),
1749 StrategyId::new("S-001"),
1750 InstrumentId::from("TEST.VENUE"),
1751 ClientOrderId::new("O-001"),
1752 "Test denial".into(),
1753 UUID4::new(),
1754 0.into(),
1755 0.into(),
1756 ));
1757 send_order_event(event_endpoint, event);
1758
1759 assert!(
1760 *command_sent.borrow(),
1761 "Order event handler should have run"
1762 );
1763 assert!(
1764 *cmd_received.borrow(),
1765 "Trading command should have been received"
1766 );
1767 }
1768
1769 #[rstest]
1770 fn test_data_handler_can_send_data_command() {
1771 let _msgbus = get_message_bus();
1774 let command_sent = Rc::new(RefCell::new(false));
1775 let command_sent_clone = command_sent.clone();
1776
1777 let cmd_received = Rc::new(RefCell::new(false));
1778 let cmd_received_clone = cmd_received.clone();
1779 let cmd_handler = TypedIntoHandler::from(move |_cmd: DataCommand| {
1780 *cmd_received_clone.borrow_mut() = true;
1781 });
1782 let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.dataCmd2".into();
1783 register_data_command_endpoint(cmd_endpoint, cmd_handler);
1784
1785 let data_handler = TypedIntoHandler::from(move |_data: Data| {
1786 let command = DataCommand::Subscribe(SubscribeCommand::Quotes(SubscribeQuotes::new(
1787 InstrumentId::from("TEST.VENUE"),
1788 Some(ClientId::new("SIM")),
1789 None,
1790 UUID4::new(),
1791 0.into(),
1792 None,
1793 None,
1794 )));
1795 send_data_command(cmd_endpoint, command);
1796 *command_sent_clone.borrow_mut() = true;
1797 });
1798
1799 let data_endpoint: MStr<Endpoint> = "ReentrantTest.data2".into();
1800 register_data_endpoint(data_endpoint, data_handler);
1801
1802 let quote = QuoteTick::default();
1803 send_data(data_endpoint, Data::Quote(quote));
1804
1805 assert!(*command_sent.borrow(), "Data handler should have run");
1806 assert!(
1807 *cmd_received.borrow(),
1808 "Data command should have been received"
1809 );
1810 }
1811
1812 #[rstest]
1813 fn test_trading_command_handler_can_send_order_event() {
1814 let _msgbus = get_message_bus();
1818 let event_sent = Rc::new(RefCell::new(false));
1819 let event_sent_clone = event_sent.clone();
1820
1821 let evt_received = Rc::new(RefCell::new(false));
1822 let evt_received_clone = evt_received.clone();
1823 let evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1824 *evt_received_clone.borrow_mut() = true;
1825 });
1826 let evt_endpoint: MStr<Endpoint> = "ReentrantTest.orderEvt2".into();
1827 register_order_event_endpoint(evt_endpoint, evt_handler);
1828
1829 let cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
1830 let event = OrderEventAny::Denied(OrderDenied::new(
1831 TraderId::new("TESTER-001"),
1832 StrategyId::new("S-001"),
1833 InstrumentId::from("TEST.VENUE"),
1834 ClientOrderId::new("O-001"),
1835 "Test denial".into(),
1836 UUID4::new(),
1837 0.into(),
1838 0.into(),
1839 ));
1840 send_order_event(evt_endpoint, event);
1841 *event_sent_clone.borrow_mut() = true;
1842 });
1843
1844 let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.execCmd2".into();
1845 register_trading_command_endpoint(cmd_endpoint, cmd_handler);
1846
1847 let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1848 TraderId::new("TESTER-001"),
1849 None,
1850 StrategyId::new("S-001"),
1851 InstrumentId::from("TEST.VENUE"),
1852 OrderSide::Buy,
1853 UUID4::new(),
1854 0.into(),
1855 None,
1856 ));
1857 send_trading_command(cmd_endpoint, command);
1858
1859 assert!(
1860 *event_sent.borrow(),
1861 "Trading command handler should have run"
1862 );
1863 assert!(
1864 *evt_received.borrow(),
1865 "Order event should have been received"
1866 );
1867 }
1868
1869 #[rstest]
1870 fn test_nested_reentrant_calls() {
1871 let _msgbus = get_message_bus();
1874 let call_depth = Rc::new(RefCell::new(0u32));
1875
1876 let final_received = Rc::new(RefCell::new(false));
1877 let final_received_clone = final_received.clone();
1878 let final_evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1879 *final_received_clone.borrow_mut() = true;
1880 });
1881 let final_evt_endpoint: MStr<Endpoint> = "ReentrantTest.finalEvt".into();
1882 register_order_event_endpoint(final_evt_endpoint, final_evt_handler);
1883
1884 let call_depth_clone2 = call_depth.clone();
1885 let mid_cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
1886 *call_depth_clone2.borrow_mut() += 1;
1887 let event = OrderEventAny::Denied(OrderDenied::new(
1888 TraderId::new("TESTER-001"),
1889 StrategyId::new("S-001"),
1890 InstrumentId::from("TEST.VENUE"),
1891 ClientOrderId::new("O-002"),
1892 "Nested denial".into(),
1893 UUID4::new(),
1894 0.into(),
1895 0.into(),
1896 ));
1897 send_order_event(final_evt_endpoint, event);
1898 });
1899 let mid_cmd_endpoint: MStr<Endpoint> = "ReentrantTest.midCmd".into();
1900 register_trading_command_endpoint(mid_cmd_endpoint, mid_cmd_handler);
1901
1902 let call_depth_clone1 = call_depth.clone();
1903 let init_evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1904 *call_depth_clone1.borrow_mut() += 1;
1905 let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1906 TraderId::new("TESTER-001"),
1907 None,
1908 StrategyId::new("S-001"),
1909 InstrumentId::from("TEST.VENUE"),
1910 OrderSide::Buy,
1911 UUID4::new(),
1912 0.into(),
1913 None,
1914 ));
1915 send_trading_command(mid_cmd_endpoint, command);
1916 });
1917 let init_evt_endpoint: MStr<Endpoint> = "ReentrantTest.initEvt".into();
1918 register_order_event_endpoint(init_evt_endpoint, init_evt_handler);
1919
1920 let event = OrderEventAny::Denied(OrderDenied::new(
1921 TraderId::new("TESTER-001"),
1922 StrategyId::new("S-001"),
1923 InstrumentId::from("TEST.VENUE"),
1924 ClientOrderId::new("O-001"),
1925 "Initial denial".into(),
1926 UUID4::new(),
1927 0.into(),
1928 0.into(),
1929 ));
1930 send_order_event(init_evt_endpoint, event);
1931
1932 assert_eq!(
1933 *call_depth.borrow(),
1934 2,
1935 "Both intermediate handlers should have run"
1936 );
1937 assert!(
1938 *final_received.borrow(),
1939 "Final event handler should have received the event"
1940 );
1941 }
1942}