1use std::{num::NonZeroUsize, sync::OnceLock};
17
18use ahash::AHashMap;
19use nautilus_model::{
20 data::{BarType, DataType},
21 identifiers::{ClientOrderId, InstrumentId, OptionSeriesId, PositionId, StrategyId, Venue},
22};
23
24use super::mstr::{Endpoint, MStr, Pattern, Topic};
25use crate::msgbus::get_message_bus;
26
27pub const CLOSE_TOPIC: &str = "CLOSE";
28
29static DATA_QUEUE_COMMAND_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
30static DATA_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
31static DATA_PROCESS_ANY_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
32static DATA_PROCESS_DATA_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
33#[cfg(feature = "defi")]
34static DATA_PROCESS_DEFI_DATA_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
35static DATA_RESPONSE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
36static EXEC_QUEUE_COMMAND_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
37static EXEC_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
38static EXEC_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
39static EXEC_RECONCILE_REPORT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
40static RISK_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
41static RISK_QUEUE_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
42static RISK_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
43static ORDER_EMULATOR_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
44static PORTFOLIO_ACCOUNT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
45static SHUTDOWN_SYSTEM_TOPIC: OnceLock<MStr<Topic>> = OnceLock::new();
46
47macro_rules! define_switchboard {
48 ($(
49 $field:ident: $key_ty:ty,
50 $method:ident($($arg_name:ident: $arg_ty:ty),*) -> $key_expr:expr,
51 $val_fmt:expr,
52 $($val_args:expr),*
53 );* $(;)?) => {
54 #[derive(Clone, Debug)]
56 pub struct MessagingSwitchboard {
57 $(
58 $field: AHashMap<$key_ty, MStr<Topic>>,
59 )*
60 instruments_patterns: AHashMap<Venue, MStr<Pattern>>,
61 signal_topics: AHashMap<String, MStr<Topic>>,
62 signal_patterns: AHashMap<String, MStr<Pattern>>,
63 #[cfg(feature = "defi")]
64 pub(crate) defi: crate::defi::switchboard::DefiSwitchboard,
65 }
66
67 impl Default for MessagingSwitchboard {
68 fn default() -> Self {
70 Self {
71 $(
72 $field: AHashMap::new(),
73 )*
74 instruments_patterns: AHashMap::new(),
75 signal_topics: AHashMap::new(),
76 signal_patterns: AHashMap::new(),
77 #[cfg(feature = "defi")]
78 defi: crate::defi::switchboard::DefiSwitchboard::default(),
79 }
80 }
81 }
82
83 impl MessagingSwitchboard {
84 #[inline]
86 #[must_use]
87 pub fn data_engine_queue_execute() -> MStr<Endpoint> {
88 *DATA_QUEUE_COMMAND_ENDPOINT.get_or_init(|| "DataEngine.queue_execute".into())
89 }
90
91 #[inline]
92 #[must_use]
93 pub fn data_engine_execute() -> MStr<Endpoint> {
94 *DATA_EXECUTE_ENDPOINT.get_or_init(|| "DataEngine.execute".into())
95 }
96
97 #[inline]
98 #[must_use]
99 pub fn data_engine_process() -> MStr<Endpoint> {
100 *DATA_PROCESS_ANY_ENDPOINT.get_or_init(|| "DataEngine.process".into())
101 }
102
103 #[inline]
104 #[must_use]
105 pub fn data_engine_process_data() -> MStr<Endpoint> {
106 *DATA_PROCESS_DATA_ENDPOINT.get_or_init(|| "DataEngine.process_data".into())
107 }
108
109 #[cfg(feature = "defi")]
110 #[inline]
111 #[must_use]
112 pub fn data_engine_process_defi_data() -> MStr<Endpoint> {
113 *DATA_PROCESS_DEFI_DATA_ENDPOINT
114 .get_or_init(|| "DataEngine.process_defi_data".into())
115 }
116
117 #[inline]
118 #[must_use]
119 pub fn data_engine_response() -> MStr<Endpoint> {
120 *DATA_RESPONSE_ENDPOINT.get_or_init(|| "DataEngine.response".into())
121 }
122
123 #[inline]
124 #[must_use]
125 pub fn exec_engine_execute() -> MStr<Endpoint> {
126 *EXEC_EXECUTE_ENDPOINT.get_or_init(|| "ExecEngine.execute".into())
127 }
128
129 #[inline]
130 #[must_use]
131 pub fn exec_engine_queue_execute() -> MStr<Endpoint> {
132 *EXEC_QUEUE_COMMAND_ENDPOINT.get_or_init(|| "ExecEngine.queue_execute".into())
133 }
134
135 #[inline]
136 #[must_use]
137 pub fn exec_engine_process() -> MStr<Endpoint> {
138 *EXEC_PROCESS_ENDPOINT.get_or_init(|| "ExecEngine.process".into())
139 }
140
141 #[inline]
142 #[must_use]
143 pub fn exec_engine_reconcile_execution_report() -> MStr<Endpoint> {
144 *EXEC_RECONCILE_REPORT_ENDPOINT.get_or_init(|| "ExecEngine.reconcile_execution_report".into())
145 }
146
147 #[inline]
148 #[must_use]
149 pub fn risk_engine_execute() -> MStr<Endpoint> {
150 *RISK_EXECUTE_ENDPOINT.get_or_init(|| "RiskEngine.execute".into())
151 }
152
153 #[inline]
157 #[must_use]
158 pub fn risk_engine_queue_execute() -> MStr<Endpoint> {
159 *RISK_QUEUE_EXECUTE_ENDPOINT.get_or_init(|| "RiskEngine.queue_execute".into())
160 }
161
162 #[inline]
163 #[must_use]
164 pub fn risk_engine_process() -> MStr<Endpoint> {
165 *RISK_PROCESS_ENDPOINT.get_or_init(|| "RiskEngine.process".into())
166 }
167
168 #[inline]
169 #[must_use]
170 pub fn order_emulator_execute() -> MStr<Endpoint> {
171 *ORDER_EMULATOR_ENDPOINT.get_or_init(|| "OrderEmulator.execute".into())
172 }
173
174 #[inline]
175 #[must_use]
176 pub fn portfolio_update_account() -> MStr<Endpoint> {
177 *PORTFOLIO_ACCOUNT_ENDPOINT.get_or_init(|| "Portfolio.update_account".into())
178 }
179
180 #[inline]
187 #[must_use]
188 pub fn shutdown_system_topic() -> MStr<Topic> {
189 *SHUTDOWN_SYSTEM_TOPIC.get_or_init(|| "commands.system.shutdown".into())
190 }
191
192 #[must_use]
194 pub fn instruments_pattern(&mut self, venue: Venue) -> MStr<Pattern> {
195 *self.instruments_patterns
196 .entry(venue)
197 .or_insert_with(|| format!("data.instrument.{venue}.*").into())
198 }
199
200 #[must_use]
207 pub fn signal_topic(&mut self, name: &str) -> MStr<Topic> {
208 *self
209 .signal_topics
210 .entry(name.to_string())
211 .or_insert_with(|| {
212 format!(
213 "data.Signal{}",
214 nautilus_core::string::conversions::title_case(name)
215 )
216 .into()
217 })
218 }
219
220 #[must_use]
226 pub fn signal_pattern(&mut self, name: &str) -> MStr<Pattern> {
227 *self
228 .signal_patterns
229 .entry(name.to_string())
230 .or_insert_with(|| {
231 format!(
232 "data.Signal{}*",
233 nautilus_core::string::conversions::title_case(name)
234 )
235 .into()
236 })
237 }
238
239 $(
241 #[must_use]
242 pub fn $method(&mut self, $($arg_name: $arg_ty),*) -> MStr<Topic> {
243 let key = $key_expr;
244 *self.$field
245 .entry(key)
246 .or_insert_with(|| format!($val_fmt, $($val_args),*).into())
247 }
248 )*
249 }
250 };
251}
252
253define_switchboard! {
254 custom_topics: DataType,
255 get_custom_topic(data_type: &DataType) -> data_type.clone(),
256 "data.{}", data_type.topic();
257
258 instruments_topics: Venue,
259 get_instruments_topic(venue: Venue) -> venue,
260 "data.instrument.{}", venue;
261
262 instrument_topics: InstrumentId,
263 get_instrument_topic(instrument_id: InstrumentId) -> instrument_id,
264 "data.instrument.{}.{}", instrument_id.venue, instrument_id.symbol;
265
266 book_deltas_topics: InstrumentId,
267 get_book_deltas_topic(instrument_id: InstrumentId) -> instrument_id,
268 "data.book.deltas.{}.{}", instrument_id.venue, instrument_id.symbol;
269
270 book_depth10_topics: InstrumentId,
271 get_book_depth10_topic(instrument_id: InstrumentId) -> instrument_id,
272 "data.book.depth10.{}.{}", instrument_id.venue, instrument_id.symbol;
273
274 book_snapshots_topics: (InstrumentId, NonZeroUsize),
275 get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> (instrument_id, interval_ms),
276 "data.book.snapshots.{}.{}.{}", instrument_id.venue, instrument_id.symbol, interval_ms;
277
278 quote_topics: InstrumentId,
279 get_quotes_topic(instrument_id: InstrumentId) -> instrument_id,
280 "data.quotes.{}.{}", instrument_id.venue, instrument_id.symbol;
281
282 trade_topics: InstrumentId,
283 get_trades_topic(instrument_id: InstrumentId) -> instrument_id,
284 "data.trades.{}.{}", instrument_id.venue, instrument_id.symbol;
285
286 bar_topics: BarType,
287 get_bars_topic(bar_type: BarType) -> bar_type,
288 "data.bars.{}", bar_type;
289
290 mark_price_topics: InstrumentId,
291 get_mark_price_topic(instrument_id: InstrumentId) -> instrument_id,
292 "data.mark_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
293
294 index_price_topics: InstrumentId,
295 get_index_price_topic(instrument_id: InstrumentId) -> instrument_id,
296 "data.index_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
297
298 funding_rate_topics: InstrumentId,
299 get_funding_rate_topic(instrument_id: InstrumentId) -> instrument_id,
300 "data.funding_rates.{}.{}", instrument_id.venue, instrument_id.symbol;
301
302 instrument_status_topics: InstrumentId,
303 get_instrument_status_topic(instrument_id: InstrumentId) -> instrument_id,
304 "data.status.{}.{}", instrument_id.venue, instrument_id.symbol;
305
306 instrument_close_topics: InstrumentId,
307 get_instrument_close_topic(instrument_id: InstrumentId) -> instrument_id,
308 "data.close.{}.{}", instrument_id.venue, instrument_id.symbol;
309
310 option_greeks_topics: InstrumentId,
311 get_option_greeks_topic(instrument_id: InstrumentId) -> instrument_id,
312 "data.option_greeks.{}.{}", instrument_id.venue, instrument_id.symbol;
313
314 option_chain_topics: OptionSeriesId,
315 get_option_chain_topic(series_id: OptionSeriesId) -> series_id,
316 "data.option_chain.{}", series_id;
317
318 order_fills_topics: InstrumentId,
319 get_order_fills_topic(instrument_id: InstrumentId) -> instrument_id,
320 "events.fills.{}", instrument_id;
321
322 order_cancels_topics: InstrumentId,
323 get_order_cancels_topic(instrument_id: InstrumentId) -> instrument_id,
324 "events.cancels.{}", instrument_id;
325
326 order_snapshots_topics: ClientOrderId,
327 get_order_snapshots_topic(client_order_id: ClientOrderId) -> client_order_id,
328 "order.snapshots.{}", client_order_id;
329
330 positions_snapshots_topics: PositionId,
331 get_positions_snapshots_topic(position_id: PositionId) -> position_id,
332 "positions.snapshots.{}", position_id;
333
334 event_orders_topics: StrategyId,
335 get_event_orders_topic(strategy_id: StrategyId) -> strategy_id,
336 "events.order.{}", strategy_id;
337
338 event_positions_topics: StrategyId,
339 get_event_positions_topic(strategy_id: StrategyId) -> strategy_id,
340 "events.position.{}", strategy_id;
341}
342
343macro_rules! define_wrappers {
350 ($($method:ident($($arg_name:ident: $arg_ty:ty),*) -> $ret:ty),* $(,)?) => {
351 $(
352 #[must_use]
353 pub fn $method($($arg_name: $arg_ty),*) -> $ret {
354 get_message_bus()
355 .borrow_mut()
356 .switchboard
357 .$method($($arg_name),*)
358 }
359 )*
360 }
361}
362
363define_wrappers! {
364 get_custom_topic(data_type: &DataType) -> MStr<Topic>,
365 get_instruments_topic(venue: Venue) -> MStr<Topic>,
366 get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic>,
367 get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic>,
368 get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic>,
369 get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> MStr<Topic>,
370 get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic>,
371 get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic>,
372 get_bars_topic(bar_type: BarType) -> MStr<Topic>,
373 get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
374 get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
375 get_funding_rate_topic(instrument_id: InstrumentId) -> MStr<Topic>,
376 get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic>,
377 get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic>,
378 get_option_greeks_topic(instrument_id: InstrumentId) -> MStr<Topic>,
379 get_option_chain_topic(series_id: OptionSeriesId) -> MStr<Topic>,
380 get_order_fills_topic(instrument_id: InstrumentId) -> MStr<Topic>,
381 get_order_cancels_topic(instrument_id: InstrumentId) -> MStr<Topic>,
382 get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic>,
383 get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic>,
384 get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic>,
385 get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic>,
386}
387
388#[must_use]
394pub fn get_instruments_pattern(venue: Venue) -> MStr<Pattern> {
395 get_message_bus()
396 .borrow_mut()
397 .switchboard
398 .instruments_pattern(venue)
399}
400
401#[must_use]
403pub fn get_signal_topic(name: &str) -> MStr<Topic> {
404 get_message_bus()
405 .borrow_mut()
406 .switchboard
407 .signal_topic(name)
408}
409
410#[must_use]
414pub fn get_signal_pattern(name: &str) -> MStr<Pattern> {
415 get_message_bus()
416 .borrow_mut()
417 .switchboard
418 .signal_pattern(name)
419}
420
421#[cfg(test)]
422mod tests {
423 use nautilus_model::{
424 data::{BarType, DataType},
425 identifiers::{InstrumentId, Venue},
426 };
427 use rstest::*;
428
429 use super::*;
430 use crate::msgbus::matching::is_matching_backtracking;
431
432 #[fixture]
433 fn switchboard() -> MessagingSwitchboard {
434 MessagingSwitchboard::default()
435 }
436
437 #[fixture]
438 fn instrument_id() -> InstrumentId {
439 InstrumentId::from("ESZ24.XCME")
440 }
441
442 #[rstest]
443 fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
444 let data_type = DataType::new("ExampleDataType", None, None);
445 let expected_topic = "data.ExampleDataType".into();
446 let result = switchboard.get_custom_topic(&data_type);
447 assert_eq!(result, expected_topic);
448 assert!(switchboard.custom_topics.contains_key(&data_type));
449 }
450
451 #[rstest]
452 fn test_get_instrument_topic(
453 mut switchboard: MessagingSwitchboard,
454 instrument_id: InstrumentId,
455 ) {
456 let expected_topic = "data.instrument.XCME.ESZ24".into();
457 let result = switchboard.get_instrument_topic(instrument_id);
458 assert_eq!(result, expected_topic);
459 assert!(switchboard.instrument_topics.contains_key(&instrument_id));
460 }
461
462 #[rstest]
463 fn test_get_book_deltas_topic(
464 mut switchboard: MessagingSwitchboard,
465 instrument_id: InstrumentId,
466 ) {
467 let expected_topic = "data.book.deltas.XCME.ESZ24".into();
468 let result = switchboard.get_book_deltas_topic(instrument_id);
469 assert_eq!(result, expected_topic);
470 assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
471 }
472
473 #[rstest]
474 fn test_get_book_depth10_topic(
475 mut switchboard: MessagingSwitchboard,
476 instrument_id: InstrumentId,
477 ) {
478 let expected_topic = "data.book.depth10.XCME.ESZ24".into();
479 let result = switchboard.get_book_depth10_topic(instrument_id);
480 assert_eq!(result, expected_topic);
481 assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
482 }
483
484 #[rstest]
485 fn test_get_book_snapshots_topic(
486 mut switchboard: MessagingSwitchboard,
487 instrument_id: InstrumentId,
488 ) {
489 let expected_topic = "data.book.snapshots.XCME.ESZ24.1000".into();
490 let interval_ms = NonZeroUsize::new(1000).unwrap();
491 let result = switchboard.get_book_snapshots_topic(instrument_id, interval_ms);
492 assert_eq!(result, expected_topic);
493
494 assert!(
495 switchboard
496 .book_snapshots_topics
497 .contains_key(&(instrument_id, interval_ms))
498 );
499 }
500
501 #[rstest]
502 fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
503 let expected_topic = "data.quotes.XCME.ESZ24".into();
504 let result = switchboard.get_quotes_topic(instrument_id);
505 assert_eq!(result, expected_topic);
506 assert!(switchboard.quote_topics.contains_key(&instrument_id));
507 }
508
509 #[rstest]
510 fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
511 let expected_topic = "data.trades.XCME.ESZ24".into();
512 let result = switchboard.get_trades_topic(instrument_id);
513 assert_eq!(result, expected_topic);
514 assert!(switchboard.trade_topics.contains_key(&instrument_id));
515 }
516
517 #[rstest]
518 fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
519 let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
520 let expected_topic = format!("data.bars.{bar_type}").into();
521 let result = switchboard.get_bars_topic(bar_type);
522 assert_eq!(result, expected_topic);
523 assert!(switchboard.bar_topics.contains_key(&bar_type));
524 }
525
526 #[rstest]
527 fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
528 let client_order_id = ClientOrderId::from("O-123456789");
529 let expected_topic = format!("order.snapshots.{client_order_id}").into();
530 let result = switchboard.get_order_snapshots_topic(client_order_id);
531 assert_eq!(result, expected_topic);
532 assert!(
533 switchboard
534 .order_snapshots_topics
535 .contains_key(&client_order_id)
536 );
537 }
538
539 #[rstest]
540 fn test_instruments_pattern_matches_instrument_topic(
541 mut switchboard: MessagingSwitchboard,
542 instrument_id: InstrumentId,
543 ) {
544 let venue = instrument_id.venue;
545 let pattern = switchboard.instruments_pattern(venue);
546 let topic = switchboard.get_instrument_topic(instrument_id);
547
548 assert_eq!(pattern.as_ref(), "data.instrument.XCME.*");
549 assert!(is_matching_backtracking(topic, pattern));
550 }
551
552 #[rstest]
553 fn test_instruments_pattern_does_not_match_other_venue(mut switchboard: MessagingSwitchboard) {
554 let pattern = switchboard.instruments_pattern(Venue::from("BINANCE"));
555 let topic = switchboard.get_instrument_topic(InstrumentId::from("ESZ24.XCME"));
556
557 assert!(!is_matching_backtracking(topic, pattern));
558 }
559}