Skip to main content

nautilus_common/msgbus/
api.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Public API functions for interacting with the message bus.
17//!
18//! This module provides free-standing functions that wrap the thread-local
19//! message bus, providing a convenient API for:
20//!
21//! - Registering endpoint handlers (point-to-point messaging).
22//! - Subscribing to topics (pub/sub messaging).
23//! - Publishing messages to subscribers.
24//! - Sending messages to endpoints.
25
26use std::{any::Any, cell::RefCell, thread::LocalKey};
27
28use nautilus_core::UUID4;
29#[cfg(feature = "defi")]
30use nautilus_model::defi::{
31    Block, DefiData, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
32};
33use nautilus_model::{
34    data::{
35        Bar, Data, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate,
36        OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
37        option_chain::{OptionChainSlice, OptionGreeks},
38    },
39    events::{AccountState, OrderEventAny, PositionEvent},
40    orderbook::OrderBook,
41    orders::OrderAny,
42    position::Position,
43};
44use smallvec::SmallVec;
45use ustr::Ustr;
46
47use super::{
48    ACCOUNT_STATE_HANDLERS, ANY_HANDLERS, BAR_HANDLERS, BOOK_HANDLERS, DELTAS_HANDLERS,
49    DEPTH10_HANDLERS, FUNDING_RATE_HANDLERS, GREEKS_HANDLERS, HANDLER_BUFFER_CAP,
50    INDEX_PRICE_HANDLERS, MARK_PRICE_HANDLERS, OPTION_CHAIN_HANDLERS, OPTION_GREEKS_HANDLERS,
51    ORDER_EVENT_HANDLERS, POSITION_EVENT_HANDLERS, QUOTE_HANDLERS, TRADE_HANDLERS,
52    core::{MessageBus, Subscription},
53    get_message_bus,
54    matching::is_matching_backtracking,
55    mstr::{Endpoint, MStr, Pattern, Topic},
56    typed_handler::{ShareableMessageHandler, TypedHandler, TypedIntoHandler},
57};
58#[cfg(feature = "defi")]
59use super::{
60    DEFI_BLOCK_HANDLERS, DEFI_COLLECT_HANDLERS, DEFI_FLASH_HANDLERS, DEFI_LIQUIDITY_HANDLERS,
61    DEFI_POOL_HANDLERS, DEFI_SWAP_HANDLERS,
62};
63use crate::messages::{
64    data::{DataCommand, DataResponse},
65    execution::{ExecutionReport, TradingCommand},
66};
67
68/// Registers a handler for an endpoint using runtime type dispatch (Any).
69pub fn register_any(endpoint: MStr<Endpoint>, handler: ShareableMessageHandler) {
70    log::debug!(
71        "Registering endpoint '{endpoint}' with handler ID {}",
72        handler.0.id(),
73    );
74    get_message_bus()
75        .borrow_mut()
76        .endpoints
77        .insert(endpoint, handler);
78}
79
80/// Registers a response handler for a correlation ID.
81pub fn register_response_handler(correlation_id: &UUID4, handler: ShareableMessageHandler) {
82    if let Err(e) = get_message_bus()
83        .borrow_mut()
84        .register_response_handler(correlation_id, handler)
85    {
86        log::error!("Failed to register request handler: {e}");
87    }
88}
89
90/// Registers a quote tick handler at an endpoint.
91pub fn register_quote_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<QuoteTick>) {
92    get_message_bus()
93        .borrow_mut()
94        .endpoints_quotes
95        .register(endpoint, handler);
96}
97
98/// Registers a trade tick handler at an endpoint.
99pub fn register_trade_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<TradeTick>) {
100    get_message_bus()
101        .borrow_mut()
102        .endpoints_trades
103        .register(endpoint, handler);
104}
105
106/// Registers a bar handler at an endpoint.
107pub fn register_bar_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<Bar>) {
108    get_message_bus()
109        .borrow_mut()
110        .endpoints_bars
111        .register(endpoint, handler);
112}
113
114/// Registers an order event handler at an endpoint (ownership-based).
115pub fn register_order_event_endpoint(
116    endpoint: MStr<Endpoint>,
117    handler: TypedIntoHandler<OrderEventAny>,
118) {
119    get_message_bus()
120        .borrow_mut()
121        .endpoints_order_events
122        .register(endpoint, handler);
123}
124
125/// Registers an account state handler at an endpoint.
126pub fn register_account_state_endpoint(
127    endpoint: MStr<Endpoint>,
128    handler: TypedHandler<AccountState>,
129) {
130    get_message_bus()
131        .borrow_mut()
132        .endpoints_account_state
133        .register(endpoint, handler);
134}
135
136/// Registers a trading command handler at an endpoint (ownership-based).
137pub fn register_trading_command_endpoint(
138    endpoint: MStr<Endpoint>,
139    handler: TypedIntoHandler<TradingCommand>,
140) {
141    get_message_bus()
142        .borrow_mut()
143        .endpoints_trading_commands
144        .register(endpoint, handler);
145}
146
147/// Registers a data command handler at an endpoint (ownership-based).
148pub fn register_data_command_endpoint(
149    endpoint: MStr<Endpoint>,
150    handler: TypedIntoHandler<DataCommand>,
151) {
152    get_message_bus()
153        .borrow_mut()
154        .endpoints_data_commands
155        .register(endpoint, handler);
156}
157
158/// Registers a data response handler at an endpoint (ownership-based).
159pub fn register_data_response_endpoint(
160    endpoint: MStr<Endpoint>,
161    handler: TypedIntoHandler<DataResponse>,
162) {
163    get_message_bus()
164        .borrow_mut()
165        .endpoints_data_responses
166        .register(endpoint, handler);
167}
168
169/// Registers an execution report handler at an endpoint (ownership-based).
170pub fn register_execution_report_endpoint(
171    endpoint: MStr<Endpoint>,
172    handler: TypedIntoHandler<ExecutionReport>,
173) {
174    get_message_bus()
175        .borrow_mut()
176        .endpoints_exec_reports
177        .register(endpoint, handler);
178}
179
180/// Registers a data handler at an endpoint (ownership-based).
181pub fn register_data_endpoint(endpoint: MStr<Endpoint>, handler: TypedIntoHandler<Data>) {
182    get_message_bus()
183        .borrow_mut()
184        .endpoints_data
185        .register(endpoint, handler);
186}
187
188/// Registers a DeFi data handler at an endpoint (ownership-based).
189#[cfg(feature = "defi")]
190pub fn register_defi_data_endpoint(endpoint: MStr<Endpoint>, handler: TypedIntoHandler<DefiData>) {
191    get_message_bus()
192        .borrow_mut()
193        .endpoints_defi_data
194        .register(endpoint, handler);
195}
196
197/// Deregisters the handler for an endpoint (Any-based).
198pub fn deregister_any(endpoint: MStr<Endpoint>) {
199    log::debug!("Deregistering endpoint '{endpoint}'");
200    get_message_bus()
201        .borrow_mut()
202        .endpoints
203        .shift_remove(&endpoint);
204}
205
206/// Returns whether an endpoint handler is registered for the given endpoint name.
207#[must_use]
208pub fn has_endpoint(endpoint: &str) -> bool {
209    let key: MStr<Endpoint> = Ustr::from(endpoint).into();
210    get_message_bus().borrow().get_endpoint(key).is_some()
211}
212
213/// Subscribes a handler to a pattern using runtime type dispatch (Any).
214///
215/// # Warnings
216///
217/// Assigning priority handling is an advanced feature which *shouldn't
218/// normally be needed by most users*. **Only assign a higher priority to the
219/// subscription if you are certain of what you're doing**. If an inappropriate
220/// priority is assigned then the handler may receive messages before core
221/// system components have been able to process necessary calculations and
222/// produce potential side effects for logically sound behavior.
223pub fn subscribe_any(
224    pattern: MStr<Pattern>,
225    handler: ShareableMessageHandler,
226    priority: Option<u8>,
227) {
228    let msgbus = get_message_bus();
229    let mut msgbus_ref_mut = msgbus.borrow_mut();
230    let sub = Subscription::new(pattern, handler, priority);
231
232    log::debug!(
233        "Subscribing {:?} for pattern '{}'",
234        sub.handler,
235        sub.pattern
236    );
237
238    if msgbus_ref_mut.subscriptions.contains(&sub) {
239        log::warn!("{sub:?} already exists");
240        return;
241    }
242
243    for (topic, subs) in &mut msgbus_ref_mut.topics {
244        if is_matching_backtracking(*topic, sub.pattern) {
245            subs.push(sub.clone());
246            subs.sort();
247            log::debug!("Added subscription for '{topic}'");
248        }
249    }
250
251    msgbus_ref_mut.subscriptions.insert(sub);
252}
253
254/// Subscribes a handler to instrument messages matching a pattern.
255pub fn subscribe_instruments(
256    pattern: MStr<Pattern>,
257    handler: ShareableMessageHandler,
258    priority: Option<u8>,
259) {
260    subscribe_any(pattern, handler, priority);
261}
262
263/// Subscribes a handler to instrument close messages matching a pattern.
264pub fn subscribe_instrument_close(
265    pattern: MStr<Pattern>,
266    handler: ShareableMessageHandler,
267    priority: Option<u8>,
268) {
269    subscribe_any(pattern, handler, priority);
270}
271
272/// Subscribes a handler to order book deltas matching a pattern.
273pub fn subscribe_book_deltas(
274    pattern: MStr<Pattern>,
275    handler: TypedHandler<OrderBookDeltas>,
276    priority: Option<u8>,
277) {
278    get_message_bus()
279        .borrow_mut()
280        .router_deltas
281        .subscribe(pattern, handler, priority.unwrap_or(0));
282}
283
284/// Subscribes a handler to order book depth10 snapshots matching a pattern.
285pub fn subscribe_book_depth10(
286    pattern: MStr<Pattern>,
287    handler: TypedHandler<OrderBookDepth10>,
288    priority: Option<u8>,
289) {
290    get_message_bus().borrow_mut().router_depth10.subscribe(
291        pattern,
292        handler,
293        priority.unwrap_or(0),
294    );
295}
296
297/// Subscribes a handler to order book snapshots matching a pattern.
298pub fn subscribe_book_snapshots(
299    pattern: MStr<Pattern>,
300    handler: TypedHandler<OrderBook>,
301    priority: Option<u8>,
302) {
303    get_message_bus()
304        .borrow_mut()
305        .router_book_snapshots
306        .subscribe(pattern, handler, priority.unwrap_or(0));
307}
308
309/// Subscribes a handler to quote ticks matching a pattern.
310pub fn subscribe_quotes(
311    pattern: MStr<Pattern>,
312    handler: TypedHandler<QuoteTick>,
313    priority: Option<u8>,
314) {
315    get_message_bus()
316        .borrow_mut()
317        .router_quotes
318        .subscribe(pattern, handler, priority.unwrap_or(0));
319}
320
321/// Subscribes a handler to trade ticks matching a pattern.
322pub fn subscribe_trades(
323    pattern: MStr<Pattern>,
324    handler: TypedHandler<TradeTick>,
325    priority: Option<u8>,
326) {
327    get_message_bus()
328        .borrow_mut()
329        .router_trades
330        .subscribe(pattern, handler, priority.unwrap_or(0));
331}
332
333/// Subscribes a handler to bars matching a pattern.
334pub fn subscribe_bars(pattern: MStr<Pattern>, handler: TypedHandler<Bar>, priority: Option<u8>) {
335    get_message_bus()
336        .borrow_mut()
337        .router_bars
338        .subscribe(pattern, handler, priority.unwrap_or(0));
339}
340
341/// Subscribes a handler to mark price updates matching a pattern.
342pub fn subscribe_mark_prices(
343    pattern: MStr<Pattern>,
344    handler: TypedHandler<MarkPriceUpdate>,
345    priority: Option<u8>,
346) {
347    get_message_bus().borrow_mut().router_mark_prices.subscribe(
348        pattern,
349        handler,
350        priority.unwrap_or(0),
351    );
352}
353
354/// Subscribes a handler to index price updates matching a pattern.
355pub fn subscribe_index_prices(
356    pattern: MStr<Pattern>,
357    handler: TypedHandler<IndexPriceUpdate>,
358    priority: Option<u8>,
359) {
360    get_message_bus()
361        .borrow_mut()
362        .router_index_prices
363        .subscribe(pattern, handler, priority.unwrap_or(0));
364}
365
366/// Subscribes a handler to funding rate updates matching a pattern.
367pub fn subscribe_funding_rates(
368    pattern: MStr<Pattern>,
369    handler: TypedHandler<FundingRateUpdate>,
370    priority: Option<u8>,
371) {
372    get_message_bus()
373        .borrow_mut()
374        .router_funding_rates
375        .subscribe(pattern, handler, priority.unwrap_or(0));
376}
377
378/// Subscribes a handler to greeks data matching a pattern.
379pub fn subscribe_greeks(
380    pattern: MStr<Pattern>,
381    handler: TypedHandler<GreeksData>,
382    priority: Option<u8>,
383) {
384    get_message_bus()
385        .borrow_mut()
386        .router_greeks
387        .subscribe(pattern, handler, priority.unwrap_or(0));
388}
389
390/// Subscribes a handler to option greeks updates matching a pattern.
391pub fn subscribe_option_greeks(
392    pattern: MStr<Pattern>,
393    handler: TypedHandler<OptionGreeks>,
394    priority: Option<u8>,
395) {
396    get_message_bus()
397        .borrow_mut()
398        .router_option_greeks
399        .subscribe(pattern, handler, priority.unwrap_or(0));
400}
401
402/// Subscribes a handler to option chain slice updates matching a pattern.
403pub fn subscribe_option_chain(
404    pattern: MStr<Pattern>,
405    handler: TypedHandler<OptionChainSlice>,
406    priority: Option<u8>,
407) {
408    get_message_bus()
409        .borrow_mut()
410        .router_option_chain
411        .subscribe(pattern, handler, priority.unwrap_or(0));
412}
413
414/// Subscribes a handler to order events matching a pattern.
415pub fn subscribe_order_events(
416    pattern: MStr<Pattern>,
417    handler: TypedHandler<OrderEventAny>,
418    priority: Option<u8>,
419) {
420    get_message_bus()
421        .borrow_mut()
422        .router_order_events
423        .subscribe(pattern, handler, priority.unwrap_or(0));
424}
425
426/// Subscribes a handler to position events matching a pattern.
427pub fn subscribe_position_events(
428    pattern: MStr<Pattern>,
429    handler: TypedHandler<PositionEvent>,
430    priority: Option<u8>,
431) {
432    get_message_bus()
433        .borrow_mut()
434        .router_position_events
435        .subscribe(pattern, handler, priority.unwrap_or(0));
436}
437
438/// Subscribes a handler to account state updates matching a pattern.
439pub fn subscribe_account_state(
440    pattern: MStr<Pattern>,
441    handler: TypedHandler<AccountState>,
442    priority: Option<u8>,
443) {
444    get_message_bus()
445        .borrow_mut()
446        .router_account_state
447        .subscribe(pattern, handler, priority.unwrap_or(0));
448}
449
450/// Subscribes a handler to positions matching a pattern.
451pub fn subscribe_positions(
452    pattern: MStr<Pattern>,
453    handler: TypedHandler<Position>,
454    priority: Option<u8>,
455) {
456    get_message_bus().borrow_mut().router_positions.subscribe(
457        pattern,
458        handler,
459        priority.unwrap_or(0),
460    );
461}
462
463/// Subscribes a handler to DeFi blocks matching a pattern.
464#[cfg(feature = "defi")]
465pub fn subscribe_defi_blocks(
466    pattern: MStr<Pattern>,
467    handler: TypedHandler<Block>,
468    priority: Option<u8>,
469) {
470    get_message_bus().borrow_mut().router_defi_blocks.subscribe(
471        pattern,
472        handler,
473        priority.unwrap_or(0),
474    );
475}
476
477/// Subscribes a handler to DeFi pools matching a pattern.
478#[cfg(feature = "defi")]
479pub fn subscribe_defi_pools(
480    pattern: MStr<Pattern>,
481    handler: TypedHandler<Pool>,
482    priority: Option<u8>,
483) {
484    get_message_bus().borrow_mut().router_defi_pools.subscribe(
485        pattern,
486        handler,
487        priority.unwrap_or(0),
488    );
489}
490
491/// Subscribes a handler to DeFi pool swaps matching a pattern.
492#[cfg(feature = "defi")]
493pub fn subscribe_defi_swaps(
494    pattern: MStr<Pattern>,
495    handler: TypedHandler<PoolSwap>,
496    priority: Option<u8>,
497) {
498    get_message_bus().borrow_mut().router_defi_swaps.subscribe(
499        pattern,
500        handler,
501        priority.unwrap_or(0),
502    );
503}
504
505/// Subscribes a handler to DeFi liquidity updates matching a pattern.
506#[cfg(feature = "defi")]
507pub fn subscribe_defi_liquidity(
508    pattern: MStr<Pattern>,
509    handler: TypedHandler<PoolLiquidityUpdate>,
510    priority: Option<u8>,
511) {
512    get_message_bus()
513        .borrow_mut()
514        .router_defi_liquidity
515        .subscribe(pattern, handler, priority.unwrap_or(0));
516}
517
518/// Subscribes a handler to DeFi fee collects matching a pattern.
519#[cfg(feature = "defi")]
520pub fn subscribe_defi_collects(
521    pattern: MStr<Pattern>,
522    handler: TypedHandler<PoolFeeCollect>,
523    priority: Option<u8>,
524) {
525    get_message_bus()
526        .borrow_mut()
527        .router_defi_collects
528        .subscribe(pattern, handler, priority.unwrap_or(0));
529}
530
531/// Subscribes a handler to DeFi flash loans matching a pattern.
532#[cfg(feature = "defi")]
533pub fn subscribe_defi_flash(
534    pattern: MStr<Pattern>,
535    handler: TypedHandler<PoolFlash>,
536    priority: Option<u8>,
537) {
538    get_message_bus().borrow_mut().router_defi_flash.subscribe(
539        pattern,
540        handler,
541        priority.unwrap_or(0),
542    );
543}
544
545/// Unsubscribes a handler from instrument messages.
546pub fn unsubscribe_instruments(pattern: MStr<Pattern>, handler: &ShareableMessageHandler) {
547    unsubscribe_any(pattern, handler);
548}
549
550/// Unsubscribes a handler from instrument close messages.
551pub fn unsubscribe_instrument_close(pattern: MStr<Pattern>, handler: &ShareableMessageHandler) {
552    unsubscribe_any(pattern, handler);
553}
554
555/// Unsubscribes a handler from order book deltas.
556pub fn unsubscribe_book_deltas(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBookDeltas>) {
557    get_message_bus()
558        .borrow_mut()
559        .router_deltas
560        .unsubscribe(pattern, handler);
561}
562
563/// Unsubscribes a handler from order book depth10 snapshots.
564pub fn unsubscribe_book_depth10(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBookDepth10>) {
565    get_message_bus()
566        .borrow_mut()
567        .router_depth10
568        .unsubscribe(pattern, handler);
569}
570
571/// Unsubscribes a handler from order book snapshots.
572pub fn unsubscribe_book_snapshots(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBook>) {
573    get_message_bus()
574        .borrow_mut()
575        .router_book_snapshots
576        .unsubscribe(pattern, handler);
577}
578
579/// Unsubscribes a handler from quote ticks.
580pub fn unsubscribe_quotes(pattern: MStr<Pattern>, handler: &TypedHandler<QuoteTick>) {
581    get_message_bus()
582        .borrow_mut()
583        .router_quotes
584        .unsubscribe(pattern, handler);
585}
586
587/// Unsubscribes a handler from trade ticks.
588pub fn unsubscribe_trades(pattern: MStr<Pattern>, handler: &TypedHandler<TradeTick>) {
589    get_message_bus()
590        .borrow_mut()
591        .router_trades
592        .unsubscribe(pattern, handler);
593}
594
595/// Unsubscribes a handler from bars.
596pub fn unsubscribe_bars(pattern: MStr<Pattern>, handler: &TypedHandler<Bar>) {
597    get_message_bus()
598        .borrow_mut()
599        .router_bars
600        .unsubscribe(pattern, handler);
601}
602
603/// Unsubscribes a handler from mark price updates.
604pub fn unsubscribe_mark_prices(pattern: MStr<Pattern>, handler: &TypedHandler<MarkPriceUpdate>) {
605    get_message_bus()
606        .borrow_mut()
607        .router_mark_prices
608        .unsubscribe(pattern, handler);
609}
610
611/// Unsubscribes a handler from index price updates.
612pub fn unsubscribe_index_prices(pattern: MStr<Pattern>, handler: &TypedHandler<IndexPriceUpdate>) {
613    get_message_bus()
614        .borrow_mut()
615        .router_index_prices
616        .unsubscribe(pattern, handler);
617}
618
619/// Unsubscribes a handler from funding rate updates.
620pub fn unsubscribe_funding_rates(
621    pattern: MStr<Pattern>,
622    handler: &TypedHandler<FundingRateUpdate>,
623) {
624    get_message_bus()
625        .borrow_mut()
626        .router_funding_rates
627        .unsubscribe(pattern, handler);
628}
629
630/// Unsubscribes a handler from account state updates.
631pub fn unsubscribe_account_state(pattern: MStr<Pattern>, handler: &TypedHandler<AccountState>) {
632    get_message_bus()
633        .borrow_mut()
634        .router_account_state
635        .unsubscribe(pattern, handler);
636}
637
638/// Unsubscribes a handler from order events.
639pub fn unsubscribe_order_events(pattern: MStr<Pattern>, handler: &TypedHandler<OrderEventAny>) {
640    get_message_bus()
641        .borrow_mut()
642        .router_order_events
643        .unsubscribe(pattern, handler);
644}
645
646/// Unsubscribes a handler from position events.
647pub fn unsubscribe_position_events(pattern: MStr<Pattern>, handler: &TypedHandler<PositionEvent>) {
648    get_message_bus()
649        .borrow_mut()
650        .router_position_events
651        .unsubscribe(pattern, handler);
652}
653
654/// Removes a specific order event handler by pattern and handler ID.
655pub fn remove_order_event_handler(pattern: MStr<Pattern>, handler_id: Ustr) {
656    get_message_bus()
657        .borrow_mut()
658        .router_order_events
659        .remove_handler(pattern, handler_id);
660}
661
662/// Removes a specific position event handler by pattern and handler ID.
663pub fn remove_position_event_handler(pattern: MStr<Pattern>, handler_id: Ustr) {
664    get_message_bus()
665        .borrow_mut()
666        .router_position_events
667        .remove_handler(pattern, handler_id);
668}
669
670/// Unsubscribes a handler from orders.
671pub fn unsubscribe_orders(pattern: MStr<Pattern>, handler: &TypedHandler<OrderAny>) {
672    get_message_bus()
673        .borrow_mut()
674        .router_orders
675        .unsubscribe(pattern, handler);
676}
677
678/// Unsubscribes a handler from positions.
679pub fn unsubscribe_positions(pattern: MStr<Pattern>, handler: &TypedHandler<Position>) {
680    get_message_bus()
681        .borrow_mut()
682        .router_positions
683        .unsubscribe(pattern, handler);
684}
685
686/// Unsubscribes a handler from greeks data.
687pub fn unsubscribe_greeks(pattern: MStr<Pattern>, handler: &TypedHandler<GreeksData>) {
688    get_message_bus()
689        .borrow_mut()
690        .router_greeks
691        .unsubscribe(pattern, handler);
692}
693
694/// Unsubscribes a handler from option greeks updates.
695pub fn unsubscribe_option_greeks(pattern: MStr<Pattern>, handler: &TypedHandler<OptionGreeks>) {
696    get_message_bus()
697        .borrow_mut()
698        .router_option_greeks
699        .unsubscribe(pattern, handler);
700}
701
702/// Unsubscribes a handler from option chain slice updates.
703pub fn unsubscribe_option_chain(pattern: MStr<Pattern>, handler: &TypedHandler<OptionChainSlice>) {
704    get_message_bus()
705        .borrow_mut()
706        .router_option_chain
707        .unsubscribe(pattern, handler);
708}
709
710/// Unsubscribes a handler from DeFi blocks.
711#[cfg(feature = "defi")]
712pub fn unsubscribe_defi_blocks(pattern: MStr<Pattern>, handler: &TypedHandler<Block>) {
713    get_message_bus()
714        .borrow_mut()
715        .router_defi_blocks
716        .unsubscribe(pattern, handler);
717}
718
719/// Unsubscribes a handler from DeFi pools.
720#[cfg(feature = "defi")]
721pub fn unsubscribe_defi_pools(pattern: MStr<Pattern>, handler: &TypedHandler<Pool>) {
722    get_message_bus()
723        .borrow_mut()
724        .router_defi_pools
725        .unsubscribe(pattern, handler);
726}
727
728/// Unsubscribes a handler from DeFi pool swaps.
729#[cfg(feature = "defi")]
730pub fn unsubscribe_defi_swaps(pattern: MStr<Pattern>, handler: &TypedHandler<PoolSwap>) {
731    get_message_bus()
732        .borrow_mut()
733        .router_defi_swaps
734        .unsubscribe(pattern, handler);
735}
736
737/// Unsubscribes a handler from DeFi liquidity updates.
738#[cfg(feature = "defi")]
739pub fn unsubscribe_defi_liquidity(
740    pattern: MStr<Pattern>,
741    handler: &TypedHandler<PoolLiquidityUpdate>,
742) {
743    get_message_bus()
744        .borrow_mut()
745        .router_defi_liquidity
746        .unsubscribe(pattern, handler);
747}
748
749/// Unsubscribes a handler from DeFi fee collects.
750#[cfg(feature = "defi")]
751pub fn unsubscribe_defi_collects(pattern: MStr<Pattern>, handler: &TypedHandler<PoolFeeCollect>) {
752    get_message_bus()
753        .borrow_mut()
754        .router_defi_collects
755        .unsubscribe(pattern, handler);
756}
757
758/// Unsubscribes a handler from DeFi flash loans.
759#[cfg(feature = "defi")]
760pub fn unsubscribe_defi_flash(pattern: MStr<Pattern>, handler: &TypedHandler<PoolFlash>) {
761    get_message_bus()
762        .borrow_mut()
763        .router_defi_flash
764        .unsubscribe(pattern, handler);
765}
766
767/// Unsubscribes a handler from a pattern (Any-based).
768pub fn unsubscribe_any(pattern: MStr<Pattern>, handler: &ShareableMessageHandler) {
769    log::debug!("Unsubscribing {handler:?} from pattern '{pattern}'");
770
771    let handler_id = handler.0.id();
772    let bus_rc = get_message_bus();
773    let mut bus = bus_rc.borrow_mut();
774
775    let count_before = bus.subscriptions.len();
776
777    bus.topics.values_mut().for_each(|subs| {
778        subs.retain(|s| !(s.pattern == pattern && s.handler_id == handler_id));
779    });
780
781    bus.subscriptions
782        .retain(|s| !(s.pattern == pattern && s.handler_id == handler_id));
783
784    let removed = bus.subscriptions.len() < count_before;
785
786    if removed {
787        log::debug!("Handler for pattern '{pattern}' was removed");
788    } else {
789        log::debug!("No matching handler for pattern '{pattern}' was found");
790    }
791}
792
793/// Checks if a handler is subscribed to a pattern (Any-based).
794pub fn is_subscribed_any<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) -> bool {
795    let pattern = MStr::from(pattern.as_ref());
796    let sub = Subscription::new(pattern, handler, None);
797    get_message_bus().borrow().subscriptions.contains(&sub)
798}
799
800/// Returns the count of Any-based subscriptions for a topic.
801pub fn subscriptions_count_any<S: AsRef<str>>(topic: S) -> usize {
802    get_message_bus().borrow().subscriptions_count(topic)
803}
804
805/// Returns the subscriber count for order book deltas on a topic.
806pub fn subscriber_count_deltas(topic: MStr<Topic>) -> usize {
807    get_message_bus()
808        .borrow()
809        .router_deltas
810        .subscriber_count(topic)
811}
812
813/// Returns the subscriber count for order book depth10 on a topic.
814pub fn subscriber_count_depth10(topic: MStr<Topic>) -> usize {
815    get_message_bus()
816        .borrow()
817        .router_depth10
818        .subscriber_count(topic)
819}
820
821/// Returns the subscriber count for order book snapshots on a topic.
822pub fn subscriber_count_book_snapshots(topic: MStr<Topic>) -> usize {
823    get_message_bus()
824        .borrow()
825        .router_book_snapshots
826        .subscriber_count(topic)
827}
828
829/// Returns the exact subscriber count for bars on a topic,
830/// excluding wildcard pattern subscriptions.
831pub fn exact_subscriber_count_bars(topic: MStr<Topic>) -> usize {
832    get_message_bus()
833        .borrow()
834        .router_bars
835        .exact_subscriber_count(topic)
836}
837
838/// Publishes a message to the topic using runtime type dispatch (Any).
839pub fn publish_any(topic: MStr<Topic>, message: &dyn Any) {
840    // Take buffer (re-entrancy safe)
841    let mut handlers = ANY_HANDLERS.with_borrow_mut(std::mem::take);
842
843    get_message_bus()
844        .borrow_mut()
845        .fill_matching_any_handlers(topic, &mut handlers);
846
847    for handler in &handlers {
848        handler.0.handle(message);
849    }
850
851    handlers.clear(); // Release refs before restore
852    ANY_HANDLERS.with_borrow_mut(|buf| *buf = handlers);
853}
854
855/// Publishes order book deltas to subscribers on a topic.
856pub fn publish_deltas(topic: MStr<Topic>, deltas: &OrderBookDeltas) {
857    publish_typed(
858        &DELTAS_HANDLERS,
859        |bus, h| bus.router_deltas.fill_matching_handlers(topic, h),
860        deltas,
861    );
862}
863
864/// Publishes order book depth10 to subscribers on a topic.
865pub fn publish_depth10(topic: MStr<Topic>, depth: &OrderBookDepth10) {
866    publish_typed(
867        &DEPTH10_HANDLERS,
868        |bus, h| bus.router_depth10.fill_matching_handlers(topic, h),
869        depth,
870    );
871}
872
873/// Publishes an order book snapshot to subscribers on a topic.
874pub fn publish_book(topic: MStr<Topic>, book: &OrderBook) {
875    publish_typed(
876        &BOOK_HANDLERS,
877        |bus, h| bus.router_book_snapshots.fill_matching_handlers(topic, h),
878        book,
879    );
880}
881
882/// Publishes a quote tick to subscribers on a topic.
883pub fn publish_quote(topic: MStr<Topic>, quote: &QuoteTick) {
884    publish_typed(
885        &QUOTE_HANDLERS,
886        |bus, h| bus.router_quotes.fill_matching_handlers(topic, h),
887        quote,
888    );
889}
890
891/// Publishes a trade tick to subscribers on a topic.
892pub fn publish_trade(topic: MStr<Topic>, trade: &TradeTick) {
893    publish_typed(
894        &TRADE_HANDLERS,
895        |bus, h| bus.router_trades.fill_matching_handlers(topic, h),
896        trade,
897    );
898}
899
900/// Publishes a bar to subscribers on a topic.
901pub fn publish_bar(topic: MStr<Topic>, bar: &Bar) {
902    publish_typed(
903        &BAR_HANDLERS,
904        |bus, h| bus.router_bars.fill_matching_handlers(topic, h),
905        bar,
906    );
907}
908
909/// Publishes a mark price update to subscribers on a topic.
910pub fn publish_mark_price(topic: MStr<Topic>, mark_price: &MarkPriceUpdate) {
911    publish_typed(
912        &MARK_PRICE_HANDLERS,
913        |bus, h| bus.router_mark_prices.fill_matching_handlers(topic, h),
914        mark_price,
915    );
916}
917
918/// Publishes an index price update to subscribers on a topic.
919pub fn publish_index_price(topic: MStr<Topic>, index_price: &IndexPriceUpdate) {
920    publish_typed(
921        &INDEX_PRICE_HANDLERS,
922        |bus, h| bus.router_index_prices.fill_matching_handlers(topic, h),
923        index_price,
924    );
925}
926
927/// Publishes a funding rate update to subscribers on a topic.
928pub fn publish_funding_rate(topic: MStr<Topic>, funding_rate: &FundingRateUpdate) {
929    publish_typed(
930        &FUNDING_RATE_HANDLERS,
931        |bus, h| bus.router_funding_rates.fill_matching_handlers(topic, h),
932        funding_rate,
933    );
934}
935
936/// Publishes greeks data to subscribers on a topic.
937pub fn publish_greeks(topic: MStr<Topic>, greeks: &GreeksData) {
938    publish_typed(
939        &GREEKS_HANDLERS,
940        |bus, h| bus.router_greeks.fill_matching_handlers(topic, h),
941        greeks,
942    );
943}
944
945/// Publishes option greeks to subscribers on a topic.
946pub fn publish_option_greeks(topic: MStr<Topic>, option_greeks: &OptionGreeks) {
947    publish_typed(
948        &OPTION_GREEKS_HANDLERS,
949        |bus, h| bus.router_option_greeks.fill_matching_handlers(topic, h),
950        option_greeks,
951    );
952}
953
954/// Publishes an option chain slice to subscribers on a topic.
955pub fn publish_option_chain(topic: MStr<Topic>, slice: &OptionChainSlice) {
956    publish_typed(
957        &OPTION_CHAIN_HANDLERS,
958        |bus, h| bus.router_option_chain.fill_matching_handlers(topic, h),
959        slice,
960    );
961}
962
963/// Publishes an account state to subscribers on a topic.
964pub fn publish_account_state(topic: MStr<Topic>, state: &AccountState) {
965    publish_typed(
966        &ACCOUNT_STATE_HANDLERS,
967        |bus, h| bus.router_account_state.fill_matching_handlers(topic, h),
968        state,
969    );
970}
971
972/// Publishes an order event to subscribers on a topic.
973pub fn publish_order_event(topic: MStr<Topic>, event: &OrderEventAny) {
974    publish_typed(
975        &ORDER_EVENT_HANDLERS,
976        |bus, h| bus.router_order_events.fill_matching_handlers(topic, h),
977        event,
978    );
979}
980
981/// Publishes a position event to subscribers on a topic.
982pub fn publish_position_event(topic: MStr<Topic>, event: &PositionEvent) {
983    publish_typed(
984        &POSITION_EVENT_HANDLERS,
985        |bus, h| bus.router_position_events.fill_matching_handlers(topic, h),
986        event,
987    );
988}
989
990/// Publishes a DeFi block to subscribers on a topic.
991#[cfg(feature = "defi")]
992pub fn publish_defi_block(topic: MStr<Topic>, block: &Block) {
993    publish_typed(
994        &DEFI_BLOCK_HANDLERS,
995        |bus, h| bus.router_defi_blocks.fill_matching_handlers(topic, h),
996        block,
997    );
998}
999
1000/// Publishes a DeFi pool to subscribers on a topic.
1001#[cfg(feature = "defi")]
1002pub fn publish_defi_pool(topic: MStr<Topic>, pool: &Pool) {
1003    publish_typed(
1004        &DEFI_POOL_HANDLERS,
1005        |bus, h| bus.router_defi_pools.fill_matching_handlers(topic, h),
1006        pool,
1007    );
1008}
1009
1010/// Publishes a DeFi pool swap to subscribers on a topic.
1011#[cfg(feature = "defi")]
1012pub fn publish_defi_swap(topic: MStr<Topic>, swap: &PoolSwap) {
1013    publish_typed(
1014        &DEFI_SWAP_HANDLERS,
1015        |bus, h| bus.router_defi_swaps.fill_matching_handlers(topic, h),
1016        swap,
1017    );
1018}
1019
1020/// Publishes a DeFi liquidity update to subscribers on a topic.
1021#[cfg(feature = "defi")]
1022pub fn publish_defi_liquidity(topic: MStr<Topic>, update: &PoolLiquidityUpdate) {
1023    publish_typed(
1024        &DEFI_LIQUIDITY_HANDLERS,
1025        |bus, h| bus.router_defi_liquidity.fill_matching_handlers(topic, h),
1026        update,
1027    );
1028}
1029
1030/// Publishes a DeFi fee collect to subscribers on a topic.
1031#[cfg(feature = "defi")]
1032pub fn publish_defi_collect(topic: MStr<Topic>, collect: &PoolFeeCollect) {
1033    publish_typed(
1034        &DEFI_COLLECT_HANDLERS,
1035        |bus, h| bus.router_defi_collects.fill_matching_handlers(topic, h),
1036        collect,
1037    );
1038}
1039
1040/// Publishes a DeFi flash loan to subscribers on a topic.
1041#[cfg(feature = "defi")]
1042pub fn publish_defi_flash(topic: MStr<Topic>, flash: &PoolFlash) {
1043    publish_typed(
1044        &DEFI_FLASH_HANDLERS,
1045        |bus, h| bus.router_defi_flash.fill_matching_handlers(topic, h),
1046        flash,
1047    );
1048}
1049
1050/// Publishes a message to typed handlers using thread-local buffer reuse.
1051///
1052/// The `fill_fn` receives a mutable reference to the MessageBus, avoiding
1053/// redundant TLS access and Rc clone/drop overhead per publish.
1054///
1055/// # Invariants
1056///
1057/// - `fill_fn` must not call any publish path (would panic from RefCell double-borrow).
1058/// - Handler panics drop the buffer, losing reuse optimization (acceptable as panics are fatal).
1059#[inline]
1060fn publish_typed<T: 'static>(
1061    tls: &'static LocalKey<RefCell<SmallVec<[TypedHandler<T>; HANDLER_BUFFER_CAP]>>>,
1062    fill_fn: impl FnOnce(&mut MessageBus, &mut SmallVec<[TypedHandler<T>; HANDLER_BUFFER_CAP]>),
1063    message: &T,
1064) {
1065    // Take buffer (re-entrancy safe)
1066    let mut handlers = tls.with_borrow_mut(std::mem::take);
1067
1068    // Borrow scope ends before dispatch to support re-entrant publishes
1069    let bus_rc = get_message_bus();
1070    fill_fn(&mut bus_rc.borrow_mut(), &mut handlers);
1071
1072    for handler in &handlers {
1073        handler.handle(message);
1074    }
1075
1076    handlers.clear(); // Release refs before restore
1077    tls.with_borrow_mut(|buf| *buf = handlers);
1078}
1079
1080/// Sends a message to an endpoint handler using runtime type dispatch (Any).
1081pub fn send_any(endpoint: MStr<Endpoint>, message: &dyn Any) {
1082    let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
1083
1084    if let Some(handler) = handler {
1085        handler.0.handle(message);
1086    } else {
1087        log::error!("send_any: no registered endpoint '{endpoint}'");
1088    }
1089}
1090
1091/// Sends a message to an endpoint, converting to Any (convenience wrapper).
1092pub fn send_any_value<T: 'static>(endpoint: MStr<Endpoint>, message: &T) {
1093    let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
1094
1095    if let Some(handler) = handler {
1096        handler.0.handle(message);
1097    } else {
1098        log::error!("send_any_value: no registered endpoint '{endpoint}'");
1099    }
1100}
1101
1102/// Sends the [`DataResponse`] to the registered correlation ID handler.
1103pub fn send_response(correlation_id: &UUID4, message: &DataResponse) {
1104    let handler = get_message_bus()
1105        .borrow()
1106        .get_response_handler(correlation_id)
1107        .cloned();
1108
1109    if let Some(handler) = handler {
1110        match message {
1111            DataResponse::Data(resp) => handler.0.handle(resp),
1112            DataResponse::Instrument(resp) => handler.0.handle(resp.as_ref()),
1113            DataResponse::Instruments(resp) => handler.0.handle(resp),
1114            DataResponse::Book(resp) => handler.0.handle(resp),
1115            DataResponse::Quotes(resp) => handler.0.handle(resp),
1116            DataResponse::Trades(resp) => handler.0.handle(resp),
1117            DataResponse::FundingRates(resp) => handler.0.handle(resp),
1118            DataResponse::ForwardPrices(resp) => handler.0.handle(resp),
1119            DataResponse::Bars(resp) => handler.0.handle(resp),
1120        }
1121    } else {
1122        log::error!("send_response: handler not found for correlation_id '{correlation_id}'");
1123    }
1124}
1125
1126/// Sends a quote tick to an endpoint handler.
1127pub fn send_quote(endpoint: MStr<Endpoint>, quote: &QuoteTick) {
1128    send_endpoint_ref(
1129        endpoint,
1130        quote,
1131        |bus| bus.endpoints_quotes.get(endpoint),
1132        "send_quote",
1133    );
1134}
1135
1136/// Sends a trade tick to an endpoint handler.
1137pub fn send_trade(endpoint: MStr<Endpoint>, trade: &TradeTick) {
1138    send_endpoint_ref(
1139        endpoint,
1140        trade,
1141        |bus| bus.endpoints_trades.get(endpoint),
1142        "send_trade",
1143    );
1144}
1145
1146/// Sends a bar to an endpoint handler.
1147pub fn send_bar(endpoint: MStr<Endpoint>, bar: &Bar) {
1148    send_endpoint_ref(
1149        endpoint,
1150        bar,
1151        |bus| bus.endpoints_bars.get(endpoint),
1152        "send_bar",
1153    );
1154}
1155
1156/// Sends an order event to an endpoint handler, transferring ownership.
1157pub fn send_order_event(endpoint: MStr<Endpoint>, event: OrderEventAny) {
1158    send_endpoint_owned(
1159        endpoint,
1160        event,
1161        |bus| bus.endpoints_order_events.get(endpoint),
1162        "send_order_event",
1163    );
1164}
1165
1166/// Sends an account state to an endpoint handler.
1167pub fn send_account_state(endpoint: MStr<Endpoint>, state: &AccountState) {
1168    send_endpoint_ref(
1169        endpoint,
1170        state,
1171        |bus| bus.endpoints_account_state.get(endpoint),
1172        "send_account_state",
1173    );
1174}
1175
1176/// Sends a trading command to an endpoint handler, transferring ownership.
1177pub fn send_trading_command(endpoint: MStr<Endpoint>, command: TradingCommand) {
1178    send_endpoint_owned(
1179        endpoint,
1180        command,
1181        |bus| bus.endpoints_trading_commands.get(endpoint),
1182        "send_trading_command",
1183    );
1184}
1185
1186/// Sends a data command to an endpoint handler, transferring ownership.
1187pub fn send_data_command(endpoint: MStr<Endpoint>, command: DataCommand) {
1188    send_endpoint_owned(
1189        endpoint,
1190        command,
1191        |bus| bus.endpoints_data_commands.get(endpoint),
1192        "send_data_command",
1193    );
1194}
1195
1196/// Sends a data response to an endpoint handler, transferring ownership.
1197pub fn send_data_response(endpoint: MStr<Endpoint>, response: DataResponse) {
1198    send_endpoint_owned(
1199        endpoint,
1200        response,
1201        |bus| bus.endpoints_data_responses.get(endpoint),
1202        "send_data_response",
1203    );
1204}
1205
1206/// Sends an execution report to an endpoint handler, transferring ownership.
1207pub fn send_execution_report(endpoint: MStr<Endpoint>, report: ExecutionReport) {
1208    send_endpoint_owned(
1209        endpoint,
1210        report,
1211        |bus| bus.endpoints_exec_reports.get(endpoint),
1212        "send_execution_report",
1213    );
1214}
1215
1216/// Sends data to an endpoint handler, transferring ownership.
1217pub fn send_data(endpoint: MStr<Endpoint>, data: Data) {
1218    send_endpoint_owned(
1219        endpoint,
1220        data,
1221        |bus| bus.endpoints_data.get(endpoint),
1222        "send_data",
1223    );
1224}
1225
1226/// Sends DeFi data to an endpoint handler, transferring ownership.
1227#[cfg(feature = "defi")]
1228pub fn send_defi_data(endpoint: MStr<Endpoint>, data: DefiData) {
1229    send_endpoint_owned(
1230        endpoint,
1231        data,
1232        |bus| bus.endpoints_defi_data.get(endpoint),
1233        "send_defi_data",
1234    );
1235}
1236
1237#[inline]
1238fn send_endpoint_ref<T: 'static, F>(
1239    endpoint: MStr<Endpoint>,
1240    message: &T,
1241    get_handler: F,
1242    fn_name: &str,
1243) where
1244    F: FnOnce(&MessageBus) -> Option<&TypedHandler<T>>,
1245{
1246    let handler = {
1247        let bus = get_message_bus();
1248        get_handler(&bus.borrow()).cloned()
1249    };
1250
1251    if let Some(handler) = handler {
1252        handler.handle(message);
1253    } else {
1254        log::error!("{fn_name}: no registered endpoint '{endpoint}'");
1255    }
1256}
1257
1258#[inline]
1259fn send_endpoint_owned<T: 'static, F>(
1260    endpoint: MStr<Endpoint>,
1261    message: T,
1262    get_handler: F,
1263    fn_name: &str,
1264) where
1265    F: FnOnce(&MessageBus) -> Option<&TypedIntoHandler<T>>,
1266{
1267    let handler = {
1268        let bus = get_message_bus();
1269        get_handler(&bus.borrow()).cloned()
1270    };
1271
1272    if let Some(handler) = handler {
1273        handler.handle(message);
1274    } else {
1275        log::error!("{fn_name}: no registered endpoint '{endpoint}'");
1276    }
1277}
1278
1279#[cfg(test)]
1280mod tests {
1281    //! Tests for the message bus API functions.
1282    //!
1283    //! Includes re-entrancy tests that verify handlers can call back into the
1284    //! message bus without causing RefCell borrow conflicts. This is the scenario
1285    //! where `send_*` holds a borrow, calls the handler, and the handler needs to
1286    //! call `borrow_mut()` for topic getters or other operations.
1287
1288    use std::{cell::RefCell, rc::Rc};
1289
1290    use nautilus_core::UUID4;
1291    use nautilus_model::{
1292        data::{Bar, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
1293        enums::OrderSide,
1294        events::OrderDenied,
1295        identifiers::{ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId},
1296    };
1297    use rstest::rstest;
1298
1299    use super::*;
1300    use crate::messages::{
1301        data::{DataCommand, SubscribeCommand, SubscribeQuotes},
1302        execution::{CancelAllOrders, TradingCommand},
1303    };
1304
1305    #[rstest]
1306    fn test_typed_quote_publish_subscribe_integration() {
1307        let _msgbus = get_message_bus();
1308        let received = Rc::new(RefCell::new(Vec::new()));
1309        let received_clone = received.clone();
1310
1311        let handler = TypedHandler::from(move |quote: &QuoteTick| {
1312            received_clone.borrow_mut().push(*quote);
1313        });
1314
1315        subscribe_quotes("data.quotes.*".into(), handler, None);
1316
1317        let quote = QuoteTick::default();
1318        publish_quote("data.quotes.TEST".into(), &quote);
1319        publish_quote("data.quotes.TEST".into(), &quote);
1320
1321        assert_eq!(received.borrow().len(), 2);
1322    }
1323
1324    #[rstest]
1325    fn test_typed_trade_publish_subscribe_integration() {
1326        let _msgbus = get_message_bus();
1327        let received = Rc::new(RefCell::new(Vec::new()));
1328        let received_clone = received.clone();
1329
1330        let handler = TypedHandler::from(move |trade: &TradeTick| {
1331            received_clone.borrow_mut().push(*trade);
1332        });
1333
1334        subscribe_trades("data.trades.*".into(), handler, None);
1335
1336        let trade = TradeTick::default();
1337        publish_trade("data.trades.TEST".into(), &trade);
1338
1339        assert_eq!(received.borrow().len(), 1);
1340    }
1341
1342    #[rstest]
1343    fn test_typed_bar_publish_subscribe_integration() {
1344        let _msgbus = get_message_bus();
1345        let received = Rc::new(RefCell::new(Vec::new()));
1346        let received_clone = received.clone();
1347
1348        let handler = TypedHandler::from(move |bar: &Bar| {
1349            received_clone.borrow_mut().push(*bar);
1350        });
1351
1352        subscribe_bars("data.bars.*".into(), handler, None);
1353
1354        let bar = Bar::default();
1355        publish_bar("data.bars.TEST".into(), &bar);
1356
1357        assert_eq!(received.borrow().len(), 1);
1358    }
1359
1360    #[rstest]
1361    fn test_typed_deltas_publish_subscribe_integration() {
1362        let _msgbus = get_message_bus();
1363        let received = Rc::new(RefCell::new(Vec::new()));
1364        let received_clone = received.clone();
1365
1366        let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1367            received_clone.borrow_mut().push(deltas.clone());
1368        });
1369
1370        subscribe_book_deltas("data.book.deltas.*".into(), handler, None);
1371
1372        let instrument_id = InstrumentId::from("TEST.VENUE");
1373        let delta = OrderBookDelta::clear(instrument_id, 0, 1.into(), 2.into());
1374        let deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
1375        publish_deltas("data.book.deltas.TEST".into(), &deltas);
1376
1377        assert_eq!(received.borrow().len(), 1);
1378    }
1379
1380    #[rstest]
1381    fn test_typed_unsubscribe_stops_delivery() {
1382        let _msgbus = get_message_bus();
1383        let received = Rc::new(RefCell::new(Vec::new()));
1384        let received_clone = received.clone();
1385
1386        let handler = TypedHandler::from_with_id("unsub-test", move |quote: &QuoteTick| {
1387            received_clone.borrow_mut().push(*quote);
1388        });
1389
1390        subscribe_quotes("data.quotes.UNSUB".into(), handler.clone(), None);
1391
1392        let quote = QuoteTick::default();
1393        publish_quote("data.quotes.UNSUB".into(), &quote);
1394        assert_eq!(received.borrow().len(), 1);
1395
1396        unsubscribe_quotes("data.quotes.UNSUB".into(), &handler);
1397
1398        publish_quote("data.quotes.UNSUB".into(), &quote);
1399        assert_eq!(received.borrow().len(), 1);
1400    }
1401
1402    #[rstest]
1403    fn test_typed_wildcard_pattern_matching() {
1404        let _msgbus = get_message_bus();
1405        let received = Rc::new(RefCell::new(Vec::new()));
1406        let received_clone = received.clone();
1407
1408        let handler = TypedHandler::from(move |quote: &QuoteTick| {
1409            received_clone.borrow_mut().push(*quote);
1410        });
1411
1412        subscribe_quotes("data.quotes.WILD.*".into(), handler, None);
1413
1414        let quote = QuoteTick::default();
1415        publish_quote("data.quotes.WILD.AAPL".into(), &quote);
1416        publish_quote("data.quotes.WILD.MSFT".into(), &quote);
1417        publish_quote("data.quotes.OTHER.AAPL".into(), &quote);
1418
1419        assert_eq!(received.borrow().len(), 2);
1420    }
1421
1422    #[rstest]
1423    fn test_typed_priority_ordering() {
1424        let _msgbus = get_message_bus();
1425        let order = Rc::new(RefCell::new(Vec::new()));
1426
1427        let order1 = order.clone();
1428        let handler_low = TypedHandler::from_with_id("low-priority", move |_: &QuoteTick| {
1429            order1.borrow_mut().push("low");
1430        });
1431
1432        let order2 = order.clone();
1433        let handler_high = TypedHandler::from_with_id("high-priority", move |_: &QuoteTick| {
1434            order2.borrow_mut().push("high");
1435        });
1436
1437        subscribe_quotes("data.quotes.PRIO.*".into(), handler_low, Some(1));
1438        subscribe_quotes("data.quotes.PRIO.*".into(), handler_high, Some(10));
1439
1440        let quote = QuoteTick::default();
1441        publish_quote("data.quotes.PRIO.TEST".into(), &quote);
1442
1443        assert_eq!(*order.borrow(), vec!["high", "low"]);
1444    }
1445
1446    #[rstest]
1447    fn test_typed_routing_isolation() {
1448        let _msgbus = get_message_bus();
1449        let quote_received = Rc::new(RefCell::new(false));
1450        let trade_received = Rc::new(RefCell::new(false));
1451
1452        let qr = quote_received.clone();
1453        let quote_handler = TypedHandler::from(move |_: &QuoteTick| {
1454            *qr.borrow_mut() = true;
1455        });
1456
1457        let tr = trade_received.clone();
1458        let trade_handler = TypedHandler::from(move |_: &TradeTick| {
1459            *tr.borrow_mut() = true;
1460        });
1461
1462        subscribe_quotes("data.iso.*".into(), quote_handler, None);
1463        subscribe_trades("data.iso.*".into(), trade_handler, None);
1464
1465        let quote = QuoteTick::default();
1466        publish_quote("data.iso.TEST".into(), &quote);
1467
1468        assert!(*quote_received.borrow());
1469        assert!(!*trade_received.borrow());
1470    }
1471
1472    #[rstest]
1473    fn test_send_data_allows_reentrant_topic_access() {
1474        use crate::msgbus::switchboard::get_quotes_topic;
1475
1476        let _msgbus = get_message_bus();
1477        let topic_retrieved = Rc::new(RefCell::new(false));
1478        let topic_clone = topic_retrieved.clone();
1479
1480        let handler = TypedIntoHandler::from(move |data: Data| {
1481            let instrument_id = data.instrument_id();
1482            let _topic = get_quotes_topic(instrument_id);
1483            *topic_clone.borrow_mut() = true;
1484        });
1485
1486        let endpoint: MStr<Endpoint> = "ReentrantTest.data".into();
1487        register_data_endpoint(endpoint, handler);
1488
1489        let quote = QuoteTick::default();
1490        send_data(endpoint, Data::Quote(quote));
1491
1492        assert!(*topic_retrieved.borrow());
1493    }
1494
1495    #[rstest]
1496    fn test_send_trading_command_allows_reentrant_topic_access() {
1497        use nautilus_model::{
1498            enums::OrderSide,
1499            identifiers::{StrategyId, TraderId},
1500        };
1501
1502        use crate::{
1503            messages::execution::{TradingCommand, cancel::CancelAllOrders},
1504            msgbus::switchboard::get_trades_topic,
1505        };
1506
1507        let _msgbus = get_message_bus();
1508        let topic_retrieved = Rc::new(RefCell::new(false));
1509        let topic_clone = topic_retrieved.clone();
1510
1511        let handler = TypedIntoHandler::from(move |cmd: TradingCommand| {
1512            let instrument_id = cmd.instrument_id();
1513            let _topic = get_trades_topic(instrument_id);
1514            *topic_clone.borrow_mut() = true;
1515        });
1516
1517        let endpoint: MStr<Endpoint> = "ReentrantTest.tradingCmd".into();
1518        register_trading_command_endpoint(endpoint, handler);
1519
1520        let cmd = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1521            TraderId::new("TESTER-001"),
1522            None,
1523            StrategyId::new("S-001"),
1524            InstrumentId::from("TEST.VENUE"),
1525            OrderSide::NoOrderSide,
1526            UUID4::new(),
1527            0.into(),
1528            None,
1529        ));
1530        send_trading_command(endpoint, cmd);
1531
1532        assert!(*topic_retrieved.borrow());
1533    }
1534
1535    #[rstest]
1536    fn test_send_account_state_allows_reentrant_topic_access() {
1537        use nautilus_model::{enums::AccountType, identifiers::AccountId, types::Currency};
1538
1539        use crate::msgbus::switchboard::get_quotes_topic;
1540
1541        let _msgbus = get_message_bus();
1542        let topic_retrieved = Rc::new(RefCell::new(false));
1543        let topic_clone = topic_retrieved.clone();
1544
1545        let handler = TypedHandler::from(move |_state: &AccountState| {
1546            let instrument_id = InstrumentId::from("TEST.VENUE");
1547            let _topic = get_quotes_topic(instrument_id);
1548            *topic_clone.borrow_mut() = true;
1549        });
1550
1551        let endpoint: MStr<Endpoint> = "ReentrantTest.accountState".into();
1552        register_account_state_endpoint(endpoint, handler);
1553
1554        let state = AccountState::new(
1555            AccountId::new("SIM-001"),
1556            AccountType::Cash,
1557            vec![],
1558            vec![],
1559            true,
1560            UUID4::new(),
1561            0.into(),
1562            0.into(),
1563            Some(Currency::USD()),
1564        );
1565        send_account_state(endpoint, &state);
1566
1567        assert!(*topic_retrieved.borrow());
1568    }
1569
1570    #[rstest]
1571    fn test_send_order_event_allows_reentrant_topic_access() {
1572        use nautilus_model::{
1573            events::OrderDenied,
1574            identifiers::{ClientOrderId, StrategyId, TraderId},
1575        };
1576
1577        use crate::msgbus::switchboard::get_quotes_topic;
1578
1579        let _msgbus = get_message_bus();
1580        let topic_retrieved = Rc::new(RefCell::new(false));
1581        let topic_clone = topic_retrieved.clone();
1582
1583        let handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1584            let instrument_id = InstrumentId::from("TEST.VENUE");
1585            let _topic = get_quotes_topic(instrument_id);
1586            *topic_clone.borrow_mut() = true;
1587        });
1588
1589        let endpoint: MStr<Endpoint> = "ReentrantTest.orderEvent".into();
1590        register_order_event_endpoint(endpoint, handler);
1591
1592        let event = OrderEventAny::Denied(OrderDenied::new(
1593            TraderId::new("TESTER-001"),
1594            StrategyId::new("S-001"),
1595            InstrumentId::from("TEST.VENUE"),
1596            ClientOrderId::new("O-001"),
1597            "test denied".into(),
1598            UUID4::new(),
1599            0.into(),
1600            0.into(),
1601        ));
1602        send_order_event(endpoint, event);
1603
1604        assert!(*topic_retrieved.borrow());
1605    }
1606
1607    #[rstest]
1608    fn test_send_data_command_allows_reentrant_topic_access() {
1609        use nautilus_model::identifiers::ClientId;
1610
1611        use crate::{
1612            messages::data::{DataCommand, SubscribeCommand, SubscribeQuotes},
1613            msgbus::switchboard::get_trades_topic,
1614        };
1615
1616        let _msgbus = get_message_bus();
1617        let topic_retrieved = Rc::new(RefCell::new(false));
1618        let topic_clone = topic_retrieved.clone();
1619
1620        let handler = TypedIntoHandler::from(move |_cmd: DataCommand| {
1621            let _topic = get_trades_topic(InstrumentId::from("TEST.VENUE"));
1622            *topic_clone.borrow_mut() = true;
1623        });
1624
1625        let endpoint: MStr<Endpoint> = "ReentrantTest.dataCmd".into();
1626        register_data_command_endpoint(endpoint, handler);
1627
1628        let cmd = DataCommand::Subscribe(SubscribeCommand::Quotes(SubscribeQuotes::new(
1629            InstrumentId::from("TEST.VENUE"),
1630            Some(ClientId::new("SIM")),
1631            None,
1632            UUID4::new(),
1633            0.into(),
1634            None,
1635            None,
1636        )));
1637        send_data_command(endpoint, cmd);
1638
1639        assert!(*topic_retrieved.borrow());
1640    }
1641
1642    #[rstest]
1643    fn test_send_data_response_allows_reentrant_topic_access() {
1644        use nautilus_model::identifiers::ClientId;
1645
1646        use crate::{
1647            messages::data::{DataResponse, QuotesResponse},
1648            msgbus::switchboard::get_quotes_topic,
1649        };
1650
1651        let _msgbus = get_message_bus();
1652        let topic_retrieved = Rc::new(RefCell::new(false));
1653        let topic_clone = topic_retrieved.clone();
1654
1655        let handler = TypedIntoHandler::from(move |_resp: DataResponse| {
1656            let _topic = get_quotes_topic(InstrumentId::from("TEST.VENUE"));
1657            *topic_clone.borrow_mut() = true;
1658        });
1659
1660        let endpoint: MStr<Endpoint> = "ReentrantTest.dataResp".into();
1661        register_data_response_endpoint(endpoint, handler);
1662
1663        let resp = DataResponse::Quotes(QuotesResponse {
1664            correlation_id: UUID4::new(),
1665            client_id: ClientId::new("SIM"),
1666            instrument_id: InstrumentId::from("TEST.VENUE"),
1667            data: vec![],
1668            start: None,
1669            end: None,
1670            ts_init: 0.into(),
1671            params: None,
1672        });
1673        send_data_response(endpoint, resp);
1674
1675        assert!(*topic_retrieved.borrow());
1676    }
1677
1678    #[rstest]
1679    fn test_send_execution_report_allows_reentrant_topic_access() {
1680        use nautilus_model::{
1681            identifiers::{AccountId, ClientId, Venue},
1682            reports::ExecutionMassStatus,
1683        };
1684
1685        use crate::{messages::execution::ExecutionReport, msgbus::switchboard::get_trades_topic};
1686
1687        let _msgbus = get_message_bus();
1688        let topic_retrieved = Rc::new(RefCell::new(false));
1689        let topic_clone = topic_retrieved.clone();
1690
1691        let handler = TypedIntoHandler::from(move |_report: ExecutionReport| {
1692            let _topic = get_trades_topic(InstrumentId::from("TEST.VENUE"));
1693            *topic_clone.borrow_mut() = true;
1694        });
1695
1696        let endpoint: MStr<Endpoint> = "ReentrantTest.execReport".into();
1697        register_execution_report_endpoint(endpoint, handler);
1698
1699        let report = ExecutionReport::MassStatus(Box::new(ExecutionMassStatus::new(
1700            ClientId::new("SIM"),
1701            AccountId::new("SIM-001"),
1702            Venue::new("TEST"),
1703            0.into(),
1704            None,
1705        )));
1706        send_execution_report(endpoint, report);
1707
1708        assert!(*topic_retrieved.borrow());
1709    }
1710
1711    #[rstest]
1712    fn test_order_event_handler_can_send_trading_command() {
1713        // Tests that a handler processing an order event can send a trading command
1714        // without causing a borrow conflict. This simulates the scenario where a
1715        // strategy's on_order_accepted() handler calls cancel_order().
1716        let _msgbus = get_message_bus();
1717        let command_sent = Rc::new(RefCell::new(false));
1718        let command_sent_clone = command_sent.clone();
1719
1720        let cmd_received = Rc::new(RefCell::new(false));
1721        let cmd_received_clone = cmd_received.clone();
1722        let cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
1723            *cmd_received_clone.borrow_mut() = true;
1724        });
1725        let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.execCmd".into();
1726        register_trading_command_endpoint(cmd_endpoint, cmd_handler);
1727
1728        let event_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1729            // Simulate strategy calling cancel_order from on_order_accepted
1730            let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1731                TraderId::new("TESTER-001"),
1732                None,
1733                StrategyId::new("S-001"),
1734                InstrumentId::from("TEST.VENUE"),
1735                OrderSide::Buy,
1736                UUID4::new(),
1737                0.into(),
1738                None,
1739            ));
1740            send_trading_command(cmd_endpoint, command);
1741            *command_sent_clone.borrow_mut() = true;
1742        });
1743
1744        let event_endpoint: MStr<Endpoint> = "ReentrantTest.orderEvt".into();
1745        register_order_event_endpoint(event_endpoint, event_handler);
1746
1747        let event = OrderEventAny::Denied(OrderDenied::new(
1748            TraderId::new("TESTER-001"),
1749            StrategyId::new("S-001"),
1750            InstrumentId::from("TEST.VENUE"),
1751            ClientOrderId::new("O-001"),
1752            "Test denial".into(),
1753            UUID4::new(),
1754            0.into(),
1755            0.into(),
1756        ));
1757        send_order_event(event_endpoint, event);
1758
1759        assert!(
1760            *command_sent.borrow(),
1761            "Order event handler should have run"
1762        );
1763        assert!(
1764            *cmd_received.borrow(),
1765            "Trading command should have been received"
1766        );
1767    }
1768
1769    #[rstest]
1770    fn test_data_handler_can_send_data_command() {
1771        // Tests that a handler processing data can send a data command
1772        // without causing a borrow conflict.
1773        let _msgbus = get_message_bus();
1774        let command_sent = Rc::new(RefCell::new(false));
1775        let command_sent_clone = command_sent.clone();
1776
1777        let cmd_received = Rc::new(RefCell::new(false));
1778        let cmd_received_clone = cmd_received.clone();
1779        let cmd_handler = TypedIntoHandler::from(move |_cmd: DataCommand| {
1780            *cmd_received_clone.borrow_mut() = true;
1781        });
1782        let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.dataCmd2".into();
1783        register_data_command_endpoint(cmd_endpoint, cmd_handler);
1784
1785        let data_handler = TypedIntoHandler::from(move |_data: Data| {
1786            let command = DataCommand::Subscribe(SubscribeCommand::Quotes(SubscribeQuotes::new(
1787                InstrumentId::from("TEST.VENUE"),
1788                Some(ClientId::new("SIM")),
1789                None,
1790                UUID4::new(),
1791                0.into(),
1792                None,
1793                None,
1794            )));
1795            send_data_command(cmd_endpoint, command);
1796            *command_sent_clone.borrow_mut() = true;
1797        });
1798
1799        let data_endpoint: MStr<Endpoint> = "ReentrantTest.data2".into();
1800        register_data_endpoint(data_endpoint, data_handler);
1801
1802        let quote = QuoteTick::default();
1803        send_data(data_endpoint, Data::Quote(quote));
1804
1805        assert!(*command_sent.borrow(), "Data handler should have run");
1806        assert!(
1807            *cmd_received.borrow(),
1808            "Data command should have been received"
1809        );
1810    }
1811
1812    #[rstest]
1813    fn test_trading_command_handler_can_send_order_event() {
1814        // Tests that a handler processing a trading command can send an order event
1815        // without causing a borrow conflict. This is the reverse direction of the
1816        // common re-entrancy scenario.
1817        let _msgbus = get_message_bus();
1818        let event_sent = Rc::new(RefCell::new(false));
1819        let event_sent_clone = event_sent.clone();
1820
1821        let evt_received = Rc::new(RefCell::new(false));
1822        let evt_received_clone = evt_received.clone();
1823        let evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1824            *evt_received_clone.borrow_mut() = true;
1825        });
1826        let evt_endpoint: MStr<Endpoint> = "ReentrantTest.orderEvt2".into();
1827        register_order_event_endpoint(evt_endpoint, evt_handler);
1828
1829        let cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
1830            let event = OrderEventAny::Denied(OrderDenied::new(
1831                TraderId::new("TESTER-001"),
1832                StrategyId::new("S-001"),
1833                InstrumentId::from("TEST.VENUE"),
1834                ClientOrderId::new("O-001"),
1835                "Test denial".into(),
1836                UUID4::new(),
1837                0.into(),
1838                0.into(),
1839            ));
1840            send_order_event(evt_endpoint, event);
1841            *event_sent_clone.borrow_mut() = true;
1842        });
1843
1844        let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.execCmd2".into();
1845        register_trading_command_endpoint(cmd_endpoint, cmd_handler);
1846
1847        let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1848            TraderId::new("TESTER-001"),
1849            None,
1850            StrategyId::new("S-001"),
1851            InstrumentId::from("TEST.VENUE"),
1852            OrderSide::Buy,
1853            UUID4::new(),
1854            0.into(),
1855            None,
1856        ));
1857        send_trading_command(cmd_endpoint, command);
1858
1859        assert!(
1860            *event_sent.borrow(),
1861            "Trading command handler should have run"
1862        );
1863        assert!(
1864            *evt_received.borrow(),
1865            "Order event should have been received"
1866        );
1867    }
1868
1869    #[rstest]
1870    fn test_nested_reentrant_calls() {
1871        // Tests deeply nested re-entrant calls: order event -> trading command -> order event.
1872        // This simulates a complex scenario where handlers chain multiple calls.
1873        let _msgbus = get_message_bus();
1874        let call_depth = Rc::new(RefCell::new(0u32));
1875
1876        let final_received = Rc::new(RefCell::new(false));
1877        let final_received_clone = final_received.clone();
1878        let final_evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1879            *final_received_clone.borrow_mut() = true;
1880        });
1881        let final_evt_endpoint: MStr<Endpoint> = "ReentrantTest.finalEvt".into();
1882        register_order_event_endpoint(final_evt_endpoint, final_evt_handler);
1883
1884        let call_depth_clone2 = call_depth.clone();
1885        let mid_cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
1886            *call_depth_clone2.borrow_mut() += 1;
1887            let event = OrderEventAny::Denied(OrderDenied::new(
1888                TraderId::new("TESTER-001"),
1889                StrategyId::new("S-001"),
1890                InstrumentId::from("TEST.VENUE"),
1891                ClientOrderId::new("O-002"),
1892                "Nested denial".into(),
1893                UUID4::new(),
1894                0.into(),
1895                0.into(),
1896            ));
1897            send_order_event(final_evt_endpoint, event);
1898        });
1899        let mid_cmd_endpoint: MStr<Endpoint> = "ReentrantTest.midCmd".into();
1900        register_trading_command_endpoint(mid_cmd_endpoint, mid_cmd_handler);
1901
1902        let call_depth_clone1 = call_depth.clone();
1903        let init_evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
1904            *call_depth_clone1.borrow_mut() += 1;
1905            let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
1906                TraderId::new("TESTER-001"),
1907                None,
1908                StrategyId::new("S-001"),
1909                InstrumentId::from("TEST.VENUE"),
1910                OrderSide::Buy,
1911                UUID4::new(),
1912                0.into(),
1913                None,
1914            ));
1915            send_trading_command(mid_cmd_endpoint, command);
1916        });
1917        let init_evt_endpoint: MStr<Endpoint> = "ReentrantTest.initEvt".into();
1918        register_order_event_endpoint(init_evt_endpoint, init_evt_handler);
1919
1920        let event = OrderEventAny::Denied(OrderDenied::new(
1921            TraderId::new("TESTER-001"),
1922            StrategyId::new("S-001"),
1923            InstrumentId::from("TEST.VENUE"),
1924            ClientOrderId::new("O-001"),
1925            "Initial denial".into(),
1926            UUID4::new(),
1927            0.into(),
1928            0.into(),
1929        ));
1930        send_order_event(init_evt_endpoint, event);
1931
1932        assert_eq!(
1933            *call_depth.borrow(),
1934            2,
1935            "Both intermediate handlers should have run"
1936        );
1937        assert!(
1938            *final_received.borrow(),
1939            "Final event handler should have received the event"
1940        );
1941    }
1942}