Skip to main content

nautilus_common/msgbus/
switchboard.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{num::NonZeroUsize, sync::OnceLock};
17
18use ahash::AHashMap;
19use nautilus_model::{
20    data::{BarType, DataType},
21    identifiers::{ClientOrderId, InstrumentId, OptionSeriesId, PositionId, StrategyId, Venue},
22};
23
24use super::mstr::{Endpoint, MStr, Pattern, Topic};
25use crate::msgbus::get_message_bus;
26
27pub const CLOSE_TOPIC: &str = "CLOSE";
28
29static DATA_QUEUE_COMMAND_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
30static DATA_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
31static DATA_PROCESS_ANY_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
32static DATA_PROCESS_DATA_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
33#[cfg(feature = "defi")]
34static DATA_PROCESS_DEFI_DATA_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
35static DATA_RESPONSE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
36static EXEC_QUEUE_COMMAND_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
37static EXEC_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
38static EXEC_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
39static EXEC_RECONCILE_REPORT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
40static RISK_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
41static RISK_QUEUE_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
42static RISK_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
43static ORDER_EMULATOR_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
44static PORTFOLIO_ACCOUNT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
45static SHUTDOWN_SYSTEM_TOPIC: OnceLock<MStr<Topic>> = OnceLock::new();
46
47macro_rules! define_switchboard {
48    ($(
49        $field:ident: $key_ty:ty,
50        $method:ident($($arg_name:ident: $arg_ty:ty),*) -> $key_expr:expr,
51        $val_fmt:expr,
52        $($val_args:expr),*
53    );* $(;)?) => {
54        /// Represents a switchboard of built-in messaging endpoint names.
55        #[derive(Clone, Debug)]
56        pub struct MessagingSwitchboard {
57            $(
58                $field: AHashMap<$key_ty, MStr<Topic>>,
59            )*
60            instruments_patterns: AHashMap<Venue, MStr<Pattern>>,
61            signal_topics: AHashMap<String, MStr<Topic>>,
62            signal_patterns: AHashMap<String, MStr<Pattern>>,
63            #[cfg(feature = "defi")]
64            pub(crate) defi: crate::defi::switchboard::DefiSwitchboard,
65        }
66
67        impl Default for MessagingSwitchboard {
68            /// Creates a new default [`MessagingSwitchboard`] instance.
69            fn default() -> Self {
70                Self {
71                    $(
72                        $field: AHashMap::new(),
73                    )*
74                    instruments_patterns: AHashMap::new(),
75                    signal_topics: AHashMap::new(),
76                    signal_patterns: AHashMap::new(),
77                    #[cfg(feature = "defi")]
78                    defi: crate::defi::switchboard::DefiSwitchboard::default(),
79                }
80            }
81        }
82
83        impl MessagingSwitchboard {
84            // Static endpoints
85            #[inline]
86            #[must_use]
87            pub fn data_engine_queue_execute() -> MStr<Endpoint> {
88                *DATA_QUEUE_COMMAND_ENDPOINT.get_or_init(|| "DataEngine.queue_execute".into())
89            }
90
91            #[inline]
92            #[must_use]
93            pub fn data_engine_execute() -> MStr<Endpoint> {
94                *DATA_EXECUTE_ENDPOINT.get_or_init(|| "DataEngine.execute".into())
95            }
96
97            #[inline]
98            #[must_use]
99            pub fn data_engine_process() -> MStr<Endpoint> {
100                *DATA_PROCESS_ANY_ENDPOINT.get_or_init(|| "DataEngine.process".into())
101            }
102
103            #[inline]
104            #[must_use]
105            pub fn data_engine_process_data() -> MStr<Endpoint> {
106                *DATA_PROCESS_DATA_ENDPOINT.get_or_init(|| "DataEngine.process_data".into())
107            }
108
109            #[cfg(feature = "defi")]
110            #[inline]
111            #[must_use]
112            pub fn data_engine_process_defi_data() -> MStr<Endpoint> {
113                *DATA_PROCESS_DEFI_DATA_ENDPOINT
114                    .get_or_init(|| "DataEngine.process_defi_data".into())
115            }
116
117            #[inline]
118            #[must_use]
119            pub fn data_engine_response() -> MStr<Endpoint> {
120                *DATA_RESPONSE_ENDPOINT.get_or_init(|| "DataEngine.response".into())
121            }
122
123            #[inline]
124            #[must_use]
125            pub fn exec_engine_execute() -> MStr<Endpoint> {
126                *EXEC_EXECUTE_ENDPOINT.get_or_init(|| "ExecEngine.execute".into())
127            }
128
129            #[inline]
130            #[must_use]
131            pub fn exec_engine_queue_execute() -> MStr<Endpoint> {
132                *EXEC_QUEUE_COMMAND_ENDPOINT.get_or_init(|| "ExecEngine.queue_execute".into())
133            }
134
135            #[inline]
136            #[must_use]
137            pub fn exec_engine_process() -> MStr<Endpoint> {
138                *EXEC_PROCESS_ENDPOINT.get_or_init(|| "ExecEngine.process".into())
139            }
140
141            #[inline]
142            #[must_use]
143            pub fn exec_engine_reconcile_execution_report() -> MStr<Endpoint> {
144                *EXEC_RECONCILE_REPORT_ENDPOINT.get_or_init(|| "ExecEngine.reconcile_execution_report".into())
145            }
146
147            #[inline]
148            #[must_use]
149            pub fn risk_engine_execute() -> MStr<Endpoint> {
150                *RISK_EXECUTE_ENDPOINT.get_or_init(|| "RiskEngine.execute".into())
151            }
152
153            /// Queued endpoint for deferred command execution (re-entrancy safe).
154            /// Falls back to direct endpoint when no `TradingCommandSender` is
155            /// available (backtest / test environments).
156            #[inline]
157            #[must_use]
158            pub fn risk_engine_queue_execute() -> MStr<Endpoint> {
159                *RISK_QUEUE_EXECUTE_ENDPOINT.get_or_init(|| "RiskEngine.queue_execute".into())
160            }
161
162            #[inline]
163            #[must_use]
164            pub fn risk_engine_process() -> MStr<Endpoint> {
165                *RISK_PROCESS_ENDPOINT.get_or_init(|| "RiskEngine.process".into())
166            }
167
168            #[inline]
169            #[must_use]
170            pub fn order_emulator_execute() -> MStr<Endpoint> {
171                *ORDER_EMULATOR_ENDPOINT.get_or_init(|| "OrderEmulator.execute".into())
172            }
173
174            #[inline]
175            #[must_use]
176            pub fn portfolio_update_account() -> MStr<Endpoint> {
177                *PORTFOLIO_ACCOUNT_ENDPOINT.get_or_init(|| "Portfolio.update_account".into())
178            }
179
180            /// Pub/sub topic carrying `ShutdownSystem` commands published by
181            /// actors, engines, and strategies.
182            ///
183            /// Matches the Python topic. The kernel subscribes to validate the
184            /// command and signal graceful shutdown; additional components may
185            /// subscribe to react to the same signal.
186            #[inline]
187            #[must_use]
188            pub fn shutdown_system_topic() -> MStr<Topic> {
189                *SHUTDOWN_SYSTEM_TOPIC.get_or_init(|| "commands.system.shutdown".into())
190            }
191
192            /// Returns a wildcard pattern for matching all instrument topics for a venue.
193            #[must_use]
194            pub fn instruments_pattern(&mut self, venue: Venue) -> MStr<Pattern> {
195                *self.instruments_patterns
196                    .entry(venue)
197                    .or_insert_with(|| format!("data.instrument.{venue}.*").into())
198            }
199
200            /// Returns the exact signal publish topic for `name`
201            /// (`data.Signal<TitleName>`).
202            ///
203            /// The title-cased encoding mirrors the v1 Python convention so
204            /// subscribers keyed on either a specific name or the global
205            /// `data.Signal*` wildcard receive published signals.
206            #[must_use]
207            pub fn signal_topic(&mut self, name: &str) -> MStr<Topic> {
208                *self
209                    .signal_topics
210                    .entry(name.to_string())
211                    .or_insert_with(|| {
212                        format!(
213                            "data.Signal{}",
214                            nautilus_core::string::conversions::title_case(name)
215                        )
216                        .into()
217                    })
218            }
219
220            /// Returns the subscription pattern for `name`
221            /// (`data.Signal<TitleName>*`).
222            ///
223            /// An empty `name` yields the wildcard `data.Signal*` that matches
224            /// every signal topic.
225            #[must_use]
226            pub fn signal_pattern(&mut self, name: &str) -> MStr<Pattern> {
227                *self
228                    .signal_patterns
229                    .entry(name.to_string())
230                    .or_insert_with(|| {
231                        format!(
232                            "data.Signal{}*",
233                            nautilus_core::string::conversions::title_case(name)
234                        )
235                        .into()
236                    })
237            }
238
239            // Dynamic topics
240            $(
241                #[must_use]
242                pub fn $method(&mut self, $($arg_name: $arg_ty),*) -> MStr<Topic> {
243                    let key = $key_expr;
244                    *self.$field
245                        .entry(key)
246                        .or_insert_with(|| format!($val_fmt, $($val_args),*).into())
247                }
248            )*
249        }
250    };
251}
252
253define_switchboard! {
254    custom_topics: DataType,
255    get_custom_topic(data_type: &DataType) -> data_type.clone(),
256    "data.{}", data_type.topic();
257
258    instruments_topics: Venue,
259    get_instruments_topic(venue: Venue) -> venue,
260    "data.instrument.{}", venue;
261
262    instrument_topics: InstrumentId,
263    get_instrument_topic(instrument_id: InstrumentId) -> instrument_id,
264    "data.instrument.{}.{}", instrument_id.venue, instrument_id.symbol;
265
266    book_deltas_topics: InstrumentId,
267    get_book_deltas_topic(instrument_id: InstrumentId) -> instrument_id,
268    "data.book.deltas.{}.{}", instrument_id.venue, instrument_id.symbol;
269
270    book_depth10_topics: InstrumentId,
271    get_book_depth10_topic(instrument_id: InstrumentId) -> instrument_id,
272    "data.book.depth10.{}.{}", instrument_id.venue, instrument_id.symbol;
273
274    book_snapshots_topics: (InstrumentId, NonZeroUsize),
275    get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> (instrument_id, interval_ms),
276    "data.book.snapshots.{}.{}.{}", instrument_id.venue, instrument_id.symbol, interval_ms;
277
278    quote_topics: InstrumentId,
279    get_quotes_topic(instrument_id: InstrumentId) -> instrument_id,
280    "data.quotes.{}.{}", instrument_id.venue, instrument_id.symbol;
281
282    trade_topics: InstrumentId,
283    get_trades_topic(instrument_id: InstrumentId) -> instrument_id,
284    "data.trades.{}.{}", instrument_id.venue, instrument_id.symbol;
285
286    bar_topics: BarType,
287    get_bars_topic(bar_type: BarType) -> bar_type,
288    "data.bars.{}", bar_type;
289
290    mark_price_topics: InstrumentId,
291    get_mark_price_topic(instrument_id: InstrumentId) -> instrument_id,
292    "data.mark_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
293
294    index_price_topics: InstrumentId,
295    get_index_price_topic(instrument_id: InstrumentId) -> instrument_id,
296    "data.index_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
297
298    funding_rate_topics: InstrumentId,
299    get_funding_rate_topic(instrument_id: InstrumentId) -> instrument_id,
300    "data.funding_rates.{}.{}", instrument_id.venue, instrument_id.symbol;
301
302    instrument_status_topics: InstrumentId,
303    get_instrument_status_topic(instrument_id: InstrumentId) -> instrument_id,
304    "data.status.{}.{}", instrument_id.venue, instrument_id.symbol;
305
306    instrument_close_topics: InstrumentId,
307    get_instrument_close_topic(instrument_id: InstrumentId) -> instrument_id,
308    "data.close.{}.{}", instrument_id.venue, instrument_id.symbol;
309
310    option_greeks_topics: InstrumentId,
311    get_option_greeks_topic(instrument_id: InstrumentId) -> instrument_id,
312    "data.option_greeks.{}.{}", instrument_id.venue, instrument_id.symbol;
313
314    option_chain_topics: OptionSeriesId,
315    get_option_chain_topic(series_id: OptionSeriesId) -> series_id,
316    "data.option_chain.{}", series_id;
317
318    order_fills_topics: InstrumentId,
319    get_order_fills_topic(instrument_id: InstrumentId) -> instrument_id,
320    "events.fills.{}", instrument_id;
321
322    order_cancels_topics: InstrumentId,
323    get_order_cancels_topic(instrument_id: InstrumentId) -> instrument_id,
324    "events.cancels.{}", instrument_id;
325
326    order_snapshots_topics: ClientOrderId,
327    get_order_snapshots_topic(client_order_id: ClientOrderId) -> client_order_id,
328    "order.snapshots.{}", client_order_id;
329
330    positions_snapshots_topics: PositionId,
331    get_positions_snapshots_topic(position_id: PositionId) -> position_id,
332    "positions.snapshots.{}", position_id;
333
334    event_orders_topics: StrategyId,
335    get_event_orders_topic(strategy_id: StrategyId) -> strategy_id,
336    "events.order.{}", strategy_id;
337
338    event_positions_topics: StrategyId,
339    get_event_positions_topic(strategy_id: StrategyId) -> strategy_id,
340    "events.position.{}", strategy_id;
341}
342
343////////////////////////////////////////////////////////////////////////////////
344// Topic wrapper functions
345////////////////////////////////////////////////////////////////////////////////
346// These wrapper functions provide convenient access to switchboard topic methods
347// by accessing the thread-local message bus instance.
348
349macro_rules! define_wrappers {
350    ($($method:ident($($arg_name:ident: $arg_ty:ty),*) -> $ret:ty),* $(,)?) => {
351        $(
352            #[must_use]
353            pub fn $method($($arg_name: $arg_ty),*) -> $ret {
354                get_message_bus()
355                    .borrow_mut()
356                    .switchboard
357                    .$method($($arg_name),*)
358            }
359        )*
360    }
361}
362
363define_wrappers! {
364    get_custom_topic(data_type: &DataType) -> MStr<Topic>,
365    get_instruments_topic(venue: Venue) -> MStr<Topic>,
366    get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic>,
367    get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic>,
368    get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic>,
369    get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> MStr<Topic>,
370    get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic>,
371    get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic>,
372    get_bars_topic(bar_type: BarType) -> MStr<Topic>,
373    get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
374    get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
375    get_funding_rate_topic(instrument_id: InstrumentId) -> MStr<Topic>,
376    get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic>,
377    get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic>,
378    get_option_greeks_topic(instrument_id: InstrumentId) -> MStr<Topic>,
379    get_option_chain_topic(series_id: OptionSeriesId) -> MStr<Topic>,
380    get_order_fills_topic(instrument_id: InstrumentId) -> MStr<Topic>,
381    get_order_cancels_topic(instrument_id: InstrumentId) -> MStr<Topic>,
382    get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic>,
383    get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic>,
384    get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic>,
385    get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic>,
386}
387
388/// Returns a wildcard subscription pattern that matches all instrument topics
389/// for the given `venue`.
390///
391/// For example, venue `BINANCE` produces pattern `data.instrument.BINANCE.*`,
392/// which matches per-instrument topics like `data.instrument.BINANCE.BTCUSDT`.
393#[must_use]
394pub fn get_instruments_pattern(venue: Venue) -> MStr<Pattern> {
395    get_message_bus()
396        .borrow_mut()
397        .switchboard
398        .instruments_pattern(venue)
399}
400
401/// Returns the exact signal publish topic for `name` (`data.Signal<TitleName>`).
402#[must_use]
403pub fn get_signal_topic(name: &str) -> MStr<Topic> {
404    get_message_bus()
405        .borrow_mut()
406        .switchboard
407        .signal_topic(name)
408}
409
410/// Returns the signal subscription pattern for `name` (`data.Signal<TitleName>*`).
411///
412/// An empty `name` yields the wildcard `data.Signal*` matching every signal topic.
413#[must_use]
414pub fn get_signal_pattern(name: &str) -> MStr<Pattern> {
415    get_message_bus()
416        .borrow_mut()
417        .switchboard
418        .signal_pattern(name)
419}
420
421#[cfg(test)]
422mod tests {
423    use nautilus_model::{
424        data::{BarType, DataType},
425        identifiers::{InstrumentId, Venue},
426    };
427    use rstest::*;
428
429    use super::*;
430    use crate::msgbus::matching::is_matching_backtracking;
431
432    #[fixture]
433    fn switchboard() -> MessagingSwitchboard {
434        MessagingSwitchboard::default()
435    }
436
437    #[fixture]
438    fn instrument_id() -> InstrumentId {
439        InstrumentId::from("ESZ24.XCME")
440    }
441
442    #[rstest]
443    fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
444        let data_type = DataType::new("ExampleDataType", None, None);
445        let expected_topic = "data.ExampleDataType".into();
446        let result = switchboard.get_custom_topic(&data_type);
447        assert_eq!(result, expected_topic);
448        assert!(switchboard.custom_topics.contains_key(&data_type));
449    }
450
451    #[rstest]
452    fn test_get_instrument_topic(
453        mut switchboard: MessagingSwitchboard,
454        instrument_id: InstrumentId,
455    ) {
456        let expected_topic = "data.instrument.XCME.ESZ24".into();
457        let result = switchboard.get_instrument_topic(instrument_id);
458        assert_eq!(result, expected_topic);
459        assert!(switchboard.instrument_topics.contains_key(&instrument_id));
460    }
461
462    #[rstest]
463    fn test_get_book_deltas_topic(
464        mut switchboard: MessagingSwitchboard,
465        instrument_id: InstrumentId,
466    ) {
467        let expected_topic = "data.book.deltas.XCME.ESZ24".into();
468        let result = switchboard.get_book_deltas_topic(instrument_id);
469        assert_eq!(result, expected_topic);
470        assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
471    }
472
473    #[rstest]
474    fn test_get_book_depth10_topic(
475        mut switchboard: MessagingSwitchboard,
476        instrument_id: InstrumentId,
477    ) {
478        let expected_topic = "data.book.depth10.XCME.ESZ24".into();
479        let result = switchboard.get_book_depth10_topic(instrument_id);
480        assert_eq!(result, expected_topic);
481        assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
482    }
483
484    #[rstest]
485    fn test_get_book_snapshots_topic(
486        mut switchboard: MessagingSwitchboard,
487        instrument_id: InstrumentId,
488    ) {
489        let expected_topic = "data.book.snapshots.XCME.ESZ24.1000".into();
490        let interval_ms = NonZeroUsize::new(1000).unwrap();
491        let result = switchboard.get_book_snapshots_topic(instrument_id, interval_ms);
492        assert_eq!(result, expected_topic);
493
494        assert!(
495            switchboard
496                .book_snapshots_topics
497                .contains_key(&(instrument_id, interval_ms))
498        );
499    }
500
501    #[rstest]
502    fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
503        let expected_topic = "data.quotes.XCME.ESZ24".into();
504        let result = switchboard.get_quotes_topic(instrument_id);
505        assert_eq!(result, expected_topic);
506        assert!(switchboard.quote_topics.contains_key(&instrument_id));
507    }
508
509    #[rstest]
510    fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
511        let expected_topic = "data.trades.XCME.ESZ24".into();
512        let result = switchboard.get_trades_topic(instrument_id);
513        assert_eq!(result, expected_topic);
514        assert!(switchboard.trade_topics.contains_key(&instrument_id));
515    }
516
517    #[rstest]
518    fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
519        let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
520        let expected_topic = format!("data.bars.{bar_type}").into();
521        let result = switchboard.get_bars_topic(bar_type);
522        assert_eq!(result, expected_topic);
523        assert!(switchboard.bar_topics.contains_key(&bar_type));
524    }
525
526    #[rstest]
527    fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
528        let client_order_id = ClientOrderId::from("O-123456789");
529        let expected_topic = format!("order.snapshots.{client_order_id}").into();
530        let result = switchboard.get_order_snapshots_topic(client_order_id);
531        assert_eq!(result, expected_topic);
532        assert!(
533            switchboard
534                .order_snapshots_topics
535                .contains_key(&client_order_id)
536        );
537    }
538
539    #[rstest]
540    fn test_instruments_pattern_matches_instrument_topic(
541        mut switchboard: MessagingSwitchboard,
542        instrument_id: InstrumentId,
543    ) {
544        let venue = instrument_id.venue;
545        let pattern = switchboard.instruments_pattern(venue);
546        let topic = switchboard.get_instrument_topic(instrument_id);
547
548        assert_eq!(pattern.as_ref(), "data.instrument.XCME.*");
549        assert!(is_matching_backtracking(topic, pattern));
550    }
551
552    #[rstest]
553    fn test_instruments_pattern_does_not_match_other_venue(mut switchboard: MessagingSwitchboard) {
554        let pattern = switchboard.instruments_pattern(Venue::from("BINANCE"));
555        let topic = switchboard.get_instrument_topic(InstrumentId::from("ESZ24.XCME"));
556
557        assert!(!is_matching_backtracking(topic, pattern));
558    }
559}