nautilus_databento/
symbology.rs1use ahash::AHashMap;
17use databento::dbn::{self, PitSymbolMap, SType};
18use dbn::{Publisher, Record};
19use indexmap::IndexMap;
20use nautilus_model::identifiers::{InstrumentId, Symbol, Venue};
21
22use super::types::PublisherId;
23
24#[derive(Debug)]
25pub struct MetadataCache {
26 metadata: dbn::Metadata,
27 date_metadata_map: AHashMap<time::Date, PitSymbolMap>,
28}
29
30impl MetadataCache {
31 #[must_use]
32 pub fn new(metadata: dbn::Metadata) -> Self {
33 Self {
34 metadata,
35 date_metadata_map: AHashMap::new(),
36 }
37 }
38
39 pub fn symbol_map_for_date(&mut self, date: time::Date) -> dbn::Result<&PitSymbolMap> {
43 if !self.date_metadata_map.contains_key(&date) {
44 let map = self.metadata.symbol_map_for_date(date)?;
45 self.date_metadata_map.insert(date, map);
46 }
47
48 self.date_metadata_map
49 .get(&date)
50 .ok_or_else(|| dbn::Error::decode(format!("metadata cache missing for date {date}")))
51 }
52}
53
54pub fn instrument_id_to_symbol_string(
55 instrument_id: InstrumentId,
56 symbol_venue_map: &mut AHashMap<Symbol, Venue>,
57) -> String {
58 symbol_venue_map
59 .entry(instrument_id.symbol)
60 .or_insert(instrument_id.venue);
61 instrument_id.symbol.to_string()
62}
63
64pub fn decode_nautilus_instrument_id(
73 record: &dbn::RecordRef,
74 metadata: &mut MetadataCache,
75 publisher_venue_map: &IndexMap<PublisherId, Venue>,
76 symbol_venue_map: &AHashMap<Symbol, Venue>,
77) -> anyhow::Result<InstrumentId> {
78 let publisher = record
79 .publisher()
80 .map_err(|e| anyhow::anyhow!("Invalid `publisher` for record: {e}"))?;
81 let publisher_id = publisher as PublisherId;
82 let venue = publisher_venue_map
83 .get(&publisher_id)
84 .ok_or_else(|| anyhow::anyhow!("`Venue` not found for `publisher_id` {publisher_id}"))?;
85 let mut instrument_id = get_nautilus_instrument_id_for_record(record, metadata, *venue)?;
86
87 if publisher == Publisher::GlbxMdp3Glbx
88 && let Some(venue) = symbol_venue_map.get(&instrument_id.symbol)
89 {
90 instrument_id.venue = *venue;
91 }
92
93 Ok(instrument_id)
94}
95
96pub fn get_nautilus_instrument_id_for_record(
106 record: &dbn::RecordRef,
107 metadata: &mut MetadataCache,
108 venue: Venue,
109) -> anyhow::Result<InstrumentId> {
110 let (instrument_id, nanoseconds) = if let Some(msg) = record.get::<dbn::MboMsg>() {
111 (msg.hd.instrument_id, msg.ts_recv)
112 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
113 (msg.hd.instrument_id, msg.ts_recv)
114 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
115 (msg.hd.instrument_id, msg.ts_recv)
116 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
117 (msg.hd.instrument_id, msg.ts_recv)
118 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
119 (msg.hd.instrument_id, msg.ts_recv)
120 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
121 (msg.hd.instrument_id, msg.ts_recv)
122 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
123 (msg.hd.instrument_id, msg.hd.ts_event)
124 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
125 (msg.hd.instrument_id, msg.ts_recv)
126 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
127 (msg.hd.instrument_id, msg.ts_recv)
128 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
129 (msg.hd.instrument_id, msg.ts_recv)
130 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
131 (msg.hd.instrument_id, msg.ts_recv)
132 } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
133 (msg.hd.instrument_id, msg.ts_recv)
134 } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
135 (msg.hd.instrument_id, msg.ts_recv)
136 } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
137 (msg.hd.instrument_id, msg.ts_recv)
138 } else {
139 anyhow::bail!("DBN message type is not currently supported")
140 };
141
142 let duration = time::Duration::nanoseconds(nanoseconds as i64);
143 let datetime = time::OffsetDateTime::UNIX_EPOCH
144 .checked_add(duration)
145 .ok_or_else(|| anyhow::anyhow!("Timestamp overflow for record"))?;
146 let date = datetime.date();
147 let symbol_map = metadata.symbol_map_for_date(date)?;
148 let raw_symbol = symbol_map
149 .get(instrument_id)
150 .ok_or_else(|| anyhow::anyhow!("No raw symbol found for {instrument_id}"))?;
151
152 let symbol = Symbol::from_str_unchecked(raw_symbol);
153
154 Ok(InstrumentId::new(symbol, venue))
155}
156
157#[must_use]
158pub fn infer_symbology_type(symbol: &str) -> SType {
159 if symbol.ends_with(".FUT") || symbol.ends_with(".OPT") {
160 return SType::Parent;
161 }
162
163 let parts: Vec<&str> = symbol.split('.').collect();
164 if parts.len() == 3 && parts[2].chars().all(|c| c.is_ascii_digit()) {
165 return SType::Continuous;
166 }
167
168 if symbol.chars().all(|c| c.is_ascii_digit()) {
169 return SType::InstrumentId;
170 }
171
172 SType::RawSymbol
173}
174
175pub fn check_consistent_symbology(symbols: &[&str]) -> anyhow::Result<()> {
179 if symbols.is_empty() {
180 anyhow::bail!("No symbols provided");
181 }
182 let first_symbol = symbols[0];
183 let first_stype = infer_symbology_type(first_symbol);
184
185 for symbol in symbols {
186 let next_stype = infer_symbology_type(symbol);
187 if next_stype != first_stype {
188 anyhow::bail!(
189 "Inconsistent symbology types: '{first_stype}' for {first_symbol} vs '{next_stype}' for {symbol}"
190 );
191 }
192 }
193
194 Ok(())
195}
196
197#[cfg(test)]
198mod tests {
199 use rstest::*;
200
201 use super::*;
202
203 #[rstest]
204 #[case("1", "instrument_id")]
205 #[case("123456789", "instrument_id")]
206 #[case("AAPL", "raw_symbol")]
207 #[case("ESM4", "raw_symbol")]
208 #[case("BRN FMM0024!", "raw_symbol")]
209 #[case("BRN 99 5617289", "raw_symbol")]
210 #[case("SPY 240319P00511000", "raw_symbol")]
211 #[case("ES.FUT", "parent")]
212 #[case("ES.OPT", "parent")]
213 #[case("BRN.FUT", "parent")]
214 #[case("SPX.OPT", "parent")]
215 #[case("ES.c.0", "continuous")]
216 #[case("SPX.n.0", "continuous")]
217 fn test_infer_symbology_type(#[case] symbol: String, #[case] expected: SType) {
218 let result = infer_symbology_type(&symbol);
219 assert_eq!(result, expected);
220 }
221
222 #[rstest]
223 fn test_check_consistent_symbology_when_empty_symbols() {
224 let symbols: Vec<&str> = vec![];
225 assert!(check_consistent_symbology(&symbols).is_err());
226 }
227
228 #[rstest]
229 fn test_instrument_id_to_symbol_string_updates_map() {
230 let symbol = Symbol::from("TEST");
231 let venue = Venue::from("XNAS");
232 let instrument_id = InstrumentId::new(symbol, venue);
233 let mut map: AHashMap<Symbol, Venue> = AHashMap::new();
234
235 let sym_str = instrument_id_to_symbol_string(instrument_id, &mut map);
237 assert_eq!(sym_str, "TEST");
238 assert_eq!(map.get(&Symbol::from("TEST")), Some(&Venue::from("XNAS")));
239
240 let other = Venue::from("XLON");
242 let inst2 = InstrumentId::new(Symbol::from("TEST"), other);
243 let sym_str2 = instrument_id_to_symbol_string(inst2, &mut map);
244 assert_eq!(sym_str2, "TEST");
245
246 assert_eq!(map.get(&Symbol::from("TEST")), Some(&Venue::from("XNAS")));
248 }
249
250 #[rstest]
251 fn test_check_consistent_symbology_when_inconsistent() {
252 let symbols = vec!["ESM4", "ES.OPT"];
253 let result = check_consistent_symbology(&symbols);
254 assert!(result.is_err());
255 assert_eq!(
256 result.err().unwrap().to_string(),
257 "Inconsistent symbology types: 'raw_symbol' for ESM4 vs 'parent' for ES.OPT"
258 );
259 }
260
261 #[rstest]
262 #[case(vec!["AAPL,MSFT"])]
263 #[case(vec!["ES.OPT,ES.FUT"])]
264 #[case(vec!["ES.c.0,ES.c.1"])]
265 fn test_check_consistent_symbology_when_consistent(#[case] symbols: Vec<&str>) {
266 let result = check_consistent_symbology(&symbols);
267 assert!(result.is_ok());
268 }
269}