nautilus_common/msgbus/
mod.rs1mod api;
40pub mod core;
41pub mod database;
42pub mod matching;
43pub mod message;
44pub mod mstr;
45pub mod stubs;
46pub mod switchboard;
47pub mod typed_endpoints;
48pub mod typed_handler;
49pub mod typed_router;
50
51use std::{cell::RefCell, rc::Rc};
52
53#[cfg(feature = "defi")]
54use nautilus_model::defi::{Block, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap};
55use nautilus_model::{
56 data::{
57 Bar, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, OrderBookDeltas,
58 OrderBookDepth10, QuoteTick, TradeTick,
59 option_chain::{OptionChainSlice, OptionGreeks},
60 },
61 events::{AccountState, OrderEventAny, PositionEvent},
62 orderbook::OrderBook,
63};
64use smallvec::SmallVec;
65
66pub use self::{
67 api::*,
68 core::{MessageBus, Subscription},
69 message::BusMessage,
70 mstr::{Endpoint, MStr, Pattern, Topic},
71 switchboard::MessagingSwitchboard,
72 typed_endpoints::{EndpointMap, IntoEndpointMap},
73 typed_handler::{
74 CallbackHandler, Handler, IntoHandler, ShareableMessageHandler, TypedHandler,
75 TypedIntoHandler,
76 },
77 typed_router::{TopicRouter, TypedSubscription},
78};
79
80pub(super) const HANDLER_BUFFER_CAP: usize = 64;
82
83thread_local! {
92 pub(super) static MESSAGE_BUS: RefCell<Option<Rc<RefCell<MessageBus>>>> = const { RefCell::new(None) };
93
94 pub(super) static ANY_HANDLERS: RefCell<SmallVec<[ShareableMessageHandler; HANDLER_BUFFER_CAP]>> =
95 RefCell::new(SmallVec::new());
96
97 pub(super) static DELTAS_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderBookDeltas>; HANDLER_BUFFER_CAP]>> =
98 RefCell::new(SmallVec::new());
99 pub(super) static DEPTH10_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderBookDepth10>; HANDLER_BUFFER_CAP]>> =
100 RefCell::new(SmallVec::new());
101 pub(super) static BOOK_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderBook>; HANDLER_BUFFER_CAP]>> =
102 RefCell::new(SmallVec::new());
103 pub(super) static QUOTE_HANDLERS: RefCell<SmallVec<[TypedHandler<QuoteTick>; HANDLER_BUFFER_CAP]>> =
104 RefCell::new(SmallVec::new());
105 pub(super) static TRADE_HANDLERS: RefCell<SmallVec<[TypedHandler<TradeTick>; HANDLER_BUFFER_CAP]>> =
106 RefCell::new(SmallVec::new());
107 pub(super) static BAR_HANDLERS: RefCell<SmallVec<[TypedHandler<Bar>; HANDLER_BUFFER_CAP]>> =
108 RefCell::new(SmallVec::new());
109 pub(super) static MARK_PRICE_HANDLERS: RefCell<SmallVec<[TypedHandler<MarkPriceUpdate>; HANDLER_BUFFER_CAP]>> =
110 RefCell::new(SmallVec::new());
111 pub(super) static INDEX_PRICE_HANDLERS: RefCell<SmallVec<[TypedHandler<IndexPriceUpdate>; HANDLER_BUFFER_CAP]>> =
112 RefCell::new(SmallVec::new());
113 pub(super) static FUNDING_RATE_HANDLERS: RefCell<SmallVec<[TypedHandler<FundingRateUpdate>; HANDLER_BUFFER_CAP]>> =
114 RefCell::new(SmallVec::new());
115 pub(super) static GREEKS_HANDLERS: RefCell<SmallVec<[TypedHandler<GreeksData>; HANDLER_BUFFER_CAP]>> =
116 RefCell::new(SmallVec::new());
117 pub(super) static OPTION_GREEKS_HANDLERS: RefCell<SmallVec<[TypedHandler<OptionGreeks>; HANDLER_BUFFER_CAP]>> =
118 RefCell::new(SmallVec::new());
119 pub(super) static OPTION_CHAIN_HANDLERS: RefCell<SmallVec<[TypedHandler<OptionChainSlice>; HANDLER_BUFFER_CAP]>> =
120 RefCell::new(SmallVec::new());
121 pub(super) static ACCOUNT_STATE_HANDLERS: RefCell<SmallVec<[TypedHandler<AccountState>; HANDLER_BUFFER_CAP]>> =
122 RefCell::new(SmallVec::new());
123 pub(super) static ORDER_EVENT_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderEventAny>; HANDLER_BUFFER_CAP]>> =
124 RefCell::new(SmallVec::new());
125 pub(super) static POSITION_EVENT_HANDLERS: RefCell<SmallVec<[TypedHandler<PositionEvent>; HANDLER_BUFFER_CAP]>> =
126 RefCell::new(SmallVec::new());
127
128 #[cfg(feature = "defi")]
129 pub(super) static DEFI_BLOCK_HANDLERS: RefCell<SmallVec<[TypedHandler<Block>; HANDLER_BUFFER_CAP]>> =
130 RefCell::new(SmallVec::new());
131 #[cfg(feature = "defi")]
132 pub(super) static DEFI_POOL_HANDLERS: RefCell<SmallVec<[TypedHandler<Pool>; HANDLER_BUFFER_CAP]>> =
133 RefCell::new(SmallVec::new());
134 #[cfg(feature = "defi")]
135 pub(super) static DEFI_SWAP_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolSwap>; HANDLER_BUFFER_CAP]>> =
136 RefCell::new(SmallVec::new());
137 #[cfg(feature = "defi")]
138 pub(super) static DEFI_LIQUIDITY_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolLiquidityUpdate>; HANDLER_BUFFER_CAP]>> =
139 RefCell::new(SmallVec::new());
140 #[cfg(feature = "defi")]
141 pub(super) static DEFI_COLLECT_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolFeeCollect>; HANDLER_BUFFER_CAP]>> =
142 RefCell::new(SmallVec::new());
143 #[cfg(feature = "defi")]
144 pub(super) static DEFI_FLASH_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolFlash>; HANDLER_BUFFER_CAP]>> =
145 RefCell::new(SmallVec::new());
146}
147
148pub fn set_message_bus(msgbus: Rc<RefCell<MessageBus>>) {
150 MESSAGE_BUS.with(|bus| {
151 *bus.borrow_mut() = Some(msgbus);
152 });
153}
154
155pub fn get_message_bus() -> Rc<RefCell<MessageBus>> {
159 MESSAGE_BUS.with(|bus| {
160 let mut slot = bus.borrow_mut();
161 let rc = slot.get_or_insert_with(|| Rc::new(RefCell::new(MessageBus::default())));
162 rc.clone()
163 })
164}