1use std::{fmt::Debug, fs, num::NonZeroU64, path::PathBuf, str::FromStr, sync::Arc};
19
20use databento::{
21 dbn::{self, decode::DbnMetadata},
22 historical::timeseries::GetRangeParams,
23};
24use indexmap::IndexMap;
25use nautilus_core::{AtomicMap, UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
26use nautilus_model::{
27 data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
28 enums::BarAggregation,
29 identifiers::{InstrumentId, Symbol, Venue},
30 instruments::InstrumentAny,
31 types::Currency,
32};
33
34use crate::{
35 common::{Credential, get_date_time_range},
36 decode::{
37 decode_imbalance_msg, decode_instrument_def_msg, decode_mbo_msg, decode_mbp10_msg,
38 decode_record, decode_statistics_msg, decode_status_msg,
39 },
40 symbology::{
41 MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
42 infer_symbology_type,
43 },
44 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
45};
46
47#[derive(Clone)]
52pub struct DatabentoHistoricalClient {
53 credential: Credential,
54 clock: &'static AtomicTime,
55 inner: Arc<tokio::sync::Mutex<databento::HistoricalClient>>,
56 publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
57 symbol_venue_map: Arc<AtomicMap<Symbol, Venue>>,
58 use_exchange_as_venue: bool,
59}
60
61#[derive(Debug)]
63pub struct RangeQueryParams {
64 pub dataset: String,
65 pub symbols: Vec<String>,
66 pub start: UnixNanos,
67 pub end: Option<UnixNanos>,
68 pub limit: Option<u64>,
69 pub price_precision: Option<u8>,
70}
71
72#[derive(Debug, Clone)]
74pub struct DatasetRange {
75 pub start: String,
76 pub end: String,
77}
78
79impl Debug for DatabentoHistoricalClient {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 f.debug_struct(stringify!(DatabentoHistoricalClient))
82 .field("credential", &self.credential)
83 .finish()
84 }
85}
86
87impl DatabentoHistoricalClient {
88 #[must_use]
90 pub fn api_key(&self) -> &str {
91 self.credential.api_key()
92 }
93
94 pub fn new(
100 credential: Credential,
101 publishers_filepath: PathBuf,
102 clock: &'static AtomicTime,
103 use_exchange_as_venue: bool,
104 ) -> anyhow::Result<Self> {
105 let client = databento::HistoricalClient::builder()
106 .user_agent_extension(NAUTILUS_USER_AGENT.into())
107 .key(credential.api_key())
108 .map_err(|e| anyhow::anyhow!("Failed to create client builder: {e}"))?
109 .build()
110 .map_err(|e| anyhow::anyhow!("Failed to build client: {e}"))?;
111
112 let file_content = fs::read_to_string(publishers_filepath)?;
113 let publishers_vec: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
114
115 let publisher_venue_map = publishers_vec
116 .into_iter()
117 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
118 .collect::<IndexMap<u16, Venue>>();
119
120 Ok(Self {
121 clock,
122 inner: Arc::new(tokio::sync::Mutex::new(client)),
123 publisher_venue_map: Arc::new(publisher_venue_map),
124 symbol_venue_map: Arc::new(AtomicMap::new()),
125 credential,
126 use_exchange_as_venue,
127 })
128 }
129
130 pub async fn get_dataset_range(&self, dataset: &str) -> anyhow::Result<DatasetRange> {
136 let mut client = self.inner.lock().await;
137 let response = client
138 .metadata()
139 .get_dataset_range(dataset)
140 .await
141 .map_err(|e| anyhow::anyhow!("Failed to get dataset range: {e}"))?;
142
143 Ok(DatasetRange {
144 start: response.start.to_string(),
145 end: response.end.to_string(),
146 })
147 }
148
149 pub async fn get_range_instruments(
155 &self,
156 params: RangeQueryParams,
157 ) -> anyhow::Result<Vec<InstrumentAny>> {
158 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
159 check_consistent_symbology(&symbols)?;
160
161 let first_symbol = params
162 .symbols
163 .first()
164 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
165 let stype_in = infer_symbology_type(first_symbol);
166 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
167 let time_range = get_date_time_range(params.start, end)?;
168
169 let range_params = GetRangeParams::builder()
170 .dataset(params.dataset)
171 .date_time_range(time_range)
172 .symbols(symbols)
173 .stype_in(stype_in)
174 .schema(dbn::Schema::Definition)
175 .maybe_limit(params.limit.and_then(NonZeroU64::new))
176 .build();
177
178 let mut client = self.inner.lock().await;
179 let mut decoder = client
180 .timeseries()
181 .get_range(&range_params)
182 .await
183 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
184
185 let metadata = decoder.metadata().clone();
186 let mut metadata_cache = MetadataCache::new(metadata);
187 let mut instruments = Vec::new();
188
189 while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
190 let record = dbn::RecordRef::from(msg);
191 let sym_map = self.symbol_venue_map.load();
192 let mut instrument_id = decode_nautilus_instrument_id(
193 &record,
194 &mut metadata_cache,
195 &self.publisher_venue_map,
196 &sym_map,
197 )?;
198
199 if self.use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
200 let exchange = msg
201 .exchange()
202 .map_err(|e| anyhow::anyhow!("Missing exchange in record: {e}"))?;
203 let venue = Venue::from_code(exchange)
204 .map_err(|e| anyhow::anyhow!("Venue not found for exchange {exchange}: {e}"))?;
205 instrument_id.venue = venue;
206 }
207
208 match decode_instrument_def_msg(msg, instrument_id, None) {
209 Ok(instrument) => instruments.push(instrument),
210 Err(e) => log::error!("Failed to decode instrument: {e:?}"),
211 }
212 }
213
214 Ok(instruments)
215 }
216
217 pub async fn get_range_quotes(
223 &self,
224 params: RangeQueryParams,
225 schema: Option<String>,
226 ) -> anyhow::Result<Vec<QuoteTick>> {
227 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
228 check_consistent_symbology(&symbols)?;
229
230 let first_symbol = params
231 .symbols
232 .first()
233 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
234 let stype_in = infer_symbology_type(first_symbol);
235 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
236 let time_range = get_date_time_range(params.start, end)?;
237 let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
238 let dbn_schema = dbn::Schema::from_str(&schema)?;
239
240 match dbn_schema {
241 dbn::Schema::Mbp1
242 | dbn::Schema::Bbo1S
243 | dbn::Schema::Bbo1M
244 | dbn::Schema::Cmbp1
245 | dbn::Schema::Cbbo1S
246 | dbn::Schema::Cbbo1M => (),
247 _ => anyhow::bail!(
248 "Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m, cmbp-1, cbbo-1s, cbbo-1m"
249 ),
250 }
251
252 let range_params = GetRangeParams::builder()
253 .dataset(params.dataset)
254 .date_time_range(time_range)
255 .symbols(symbols)
256 .stype_in(stype_in)
257 .schema(dbn_schema)
258 .maybe_limit(params.limit.and_then(NonZeroU64::new))
259 .build();
260
261 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
262
263 let mut client = self.inner.lock().await;
264 let mut decoder = client
265 .timeseries()
266 .get_range(&range_params)
267 .await
268 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
269
270 let metadata = decoder.metadata().clone();
271 let mut metadata_cache = MetadataCache::new(metadata);
272 let mut result: Vec<QuoteTick> = Vec::new();
273
274 let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
275 let sym_map = self.symbol_venue_map.load();
276 let instrument_id = decode_nautilus_instrument_id(
277 &record,
278 &mut metadata_cache,
279 &self.publisher_venue_map,
280 &sym_map,
281 )?;
282
283 let (data, _) = decode_record(
284 &record,
285 instrument_id,
286 price_precision,
287 None,
288 false, true,
290 )?;
291
292 match data {
293 Some(Data::Quote(quote)) => result.push(quote),
294 None => {} _ => anyhow::bail!("Invalid data element not `QuoteTick`, was {data:?}"),
296 }
297 Ok(())
298 };
299
300 match dbn_schema {
301 dbn::Schema::Mbp1 => {
302 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
303 process_record(dbn::RecordRef::from(msg))?;
304 }
305 }
306 dbn::Schema::Cmbp1 => {
307 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Cmbp1Msg>().await {
308 process_record(dbn::RecordRef::from(msg))?;
309 }
310 }
311 dbn::Schema::Bbo1M => {
312 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
313 process_record(dbn::RecordRef::from(msg))?;
314 }
315 }
316 dbn::Schema::Bbo1S => {
317 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
318 process_record(dbn::RecordRef::from(msg))?;
319 }
320 }
321 dbn::Schema::Cbbo1S | dbn::Schema::Cbbo1M => {
322 while let Ok(Some(msg)) = decoder.decode_record::<dbn::CbboMsg>().await {
323 process_record(dbn::RecordRef::from(msg))?;
324 }
325 }
326 _ => anyhow::bail!("Invalid schema {dbn_schema}"),
327 }
328
329 Ok(result)
330 }
331
332 pub async fn get_range_order_book_depth10(
338 &self,
339 params: RangeQueryParams,
340 depth: Option<usize>,
341 ) -> anyhow::Result<Vec<OrderBookDepth10>> {
342 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
343 check_consistent_symbology(&symbols)?;
344
345 let first_symbol = params
346 .symbols
347 .first()
348 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
349 let stype_in = infer_symbology_type(first_symbol);
350 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
351 let time_range = get_date_time_range(params.start, end)?;
352
353 let _depth = depth.unwrap_or(10);
355 if _depth != 10 {
356 anyhow::bail!("Only depth=10 is currently supported for order book depths");
357 }
358
359 let range_params = GetRangeParams::builder()
360 .dataset(params.dataset)
361 .date_time_range(time_range)
362 .symbols(symbols)
363 .stype_in(stype_in)
364 .schema(dbn::Schema::Mbp10)
365 .maybe_limit(params.limit.and_then(NonZeroU64::new))
366 .build();
367
368 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
369
370 let mut client = self.inner.lock().await;
371 let mut decoder = client
372 .timeseries()
373 .get_range(&range_params)
374 .await
375 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
376
377 let metadata = decoder.metadata().clone();
378 let mut metadata_cache = MetadataCache::new(metadata);
379 let mut result: Vec<OrderBookDepth10> = Vec::new();
380
381 let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
382 let sym_map = self.symbol_venue_map.load();
383 let instrument_id = decode_nautilus_instrument_id(
384 &record,
385 &mut metadata_cache,
386 &self.publisher_venue_map,
387 &sym_map,
388 )?;
389
390 if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
391 let depth = decode_mbp10_msg(msg, instrument_id, price_precision, None)?;
392 result.push(depth);
393 }
394
395 Ok(())
396 };
397
398 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp10Msg>().await {
399 process_record(dbn::RecordRef::from(msg))?;
400 }
401
402 Ok(result)
403 }
404
405 pub async fn get_range_order_book_deltas(
411 &self,
412 params: RangeQueryParams,
413 ) -> anyhow::Result<Vec<OrderBookDelta>> {
414 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
415 check_consistent_symbology(&symbols)?;
416
417 let first_symbol = params
418 .symbols
419 .first()
420 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
421 let stype_in = infer_symbology_type(first_symbol);
422 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
423 let time_range = get_date_time_range(params.start, end)?;
424
425 let range_params = GetRangeParams::builder()
426 .dataset(params.dataset)
427 .date_time_range(time_range)
428 .symbols(symbols)
429 .stype_in(stype_in)
430 .schema(dbn::Schema::Mbo)
431 .maybe_limit(params.limit.and_then(NonZeroU64::new))
432 .build();
433
434 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
435
436 let mut client = self.inner.lock().await;
437 let mut decoder = client
438 .timeseries()
439 .get_range(&range_params)
440 .await
441 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
442
443 let metadata = decoder.metadata().clone();
444 let mut metadata_cache = MetadataCache::new(metadata);
445 let mut result: Vec<OrderBookDelta> = Vec::new();
446
447 let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
448 let sym_map = self.symbol_venue_map.load();
449 let instrument_id = decode_nautilus_instrument_id(
450 &record,
451 &mut metadata_cache,
452 &self.publisher_venue_map,
453 &sym_map,
454 )?;
455
456 if let Some(msg) = record.get::<dbn::MboMsg>() {
457 let (delta, _trade) =
458 decode_mbo_msg(msg, instrument_id, price_precision, None, false)?;
459
460 if let Some(delta) = delta {
461 result.push(delta);
462 }
463 }
464
465 Ok(())
466 };
467
468 while let Ok(Some(msg)) = decoder.decode_record::<dbn::MboMsg>().await {
469 process_record(dbn::RecordRef::from(msg))?;
470 }
471
472 Ok(result)
473 }
474
475 pub async fn get_range_trades(
481 &self,
482 params: RangeQueryParams,
483 ) -> anyhow::Result<Vec<TradeTick>> {
484 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
485 check_consistent_symbology(&symbols)?;
486
487 let first_symbol = params
488 .symbols
489 .first()
490 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
491 let stype_in = infer_symbology_type(first_symbol);
492 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
493 let time_range = get_date_time_range(params.start, end)?;
494
495 let range_params = GetRangeParams::builder()
496 .dataset(params.dataset)
497 .date_time_range(time_range)
498 .symbols(symbols)
499 .stype_in(stype_in)
500 .schema(dbn::Schema::Trades)
501 .maybe_limit(params.limit.and_then(NonZeroU64::new))
502 .build();
503
504 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
505
506 let mut client = self.inner.lock().await;
507 let mut decoder = client
508 .timeseries()
509 .get_range(&range_params)
510 .await
511 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
512
513 let metadata = decoder.metadata().clone();
514 let mut metadata_cache = MetadataCache::new(metadata);
515 let mut result: Vec<TradeTick> = Vec::new();
516
517 while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
518 let record = dbn::RecordRef::from(msg);
519 let sym_map = self.symbol_venue_map.load();
520 let instrument_id = decode_nautilus_instrument_id(
521 &record,
522 &mut metadata_cache,
523 &self.publisher_venue_map,
524 &sym_map,
525 )?;
526
527 let (data, _) = decode_record(
528 &record,
529 instrument_id,
530 price_precision,
531 None,
532 false, true,
534 )?;
535
536 match data {
537 Some(Data::Trade(trade)) => {
538 result.push(trade);
539 }
540 _ => anyhow::bail!("Invalid data element not `TradeTick`, was {data:?}"),
541 }
542 }
543
544 Ok(result)
545 }
546
547 pub async fn get_range_bars(
553 &self,
554 params: RangeQueryParams,
555 aggregation: BarAggregation,
556 timestamp_on_close: bool,
557 ) -> anyhow::Result<Vec<Bar>> {
558 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
559 check_consistent_symbology(&symbols)?;
560
561 let first_symbol = params
562 .symbols
563 .first()
564 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
565 let stype_in = infer_symbology_type(first_symbol);
566 let schema = match aggregation {
567 BarAggregation::Second => dbn::Schema::Ohlcv1S,
568 BarAggregation::Minute => dbn::Schema::Ohlcv1M,
569 BarAggregation::Hour => dbn::Schema::Ohlcv1H,
570 BarAggregation::Day => dbn::Schema::Ohlcv1D,
571 _ => anyhow::bail!("Invalid `BarAggregation` for request, was {aggregation}"),
572 };
573
574 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
575 let time_range = get_date_time_range(params.start, end)?;
576
577 let range_params = GetRangeParams::builder()
578 .dataset(params.dataset)
579 .date_time_range(time_range)
580 .symbols(symbols)
581 .stype_in(stype_in)
582 .schema(schema)
583 .maybe_limit(params.limit.and_then(NonZeroU64::new))
584 .build();
585
586 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
587
588 let mut client = self.inner.lock().await;
589 let mut decoder = client
590 .timeseries()
591 .get_range(&range_params)
592 .await
593 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
594
595 let metadata = decoder.metadata().clone();
596 let mut metadata_cache = MetadataCache::new(metadata);
597 let mut result: Vec<Bar> = Vec::new();
598
599 while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
600 let record = dbn::RecordRef::from(msg);
601 let sym_map = self.symbol_venue_map.load();
602 let instrument_id = decode_nautilus_instrument_id(
603 &record,
604 &mut metadata_cache,
605 &self.publisher_venue_map,
606 &sym_map,
607 )?;
608
609 let (data, _) = decode_record(
610 &record,
611 instrument_id,
612 price_precision,
613 None,
614 false, timestamp_on_close,
616 )?;
617
618 match data {
619 Some(Data::Bar(bar)) => {
620 result.push(bar);
621 }
622 _ => anyhow::bail!("Invalid data element not `Bar`, was {data:?}"),
623 }
624 }
625
626 Ok(result)
627 }
628
629 pub async fn get_range_imbalance(
635 &self,
636 params: RangeQueryParams,
637 ) -> anyhow::Result<Vec<DatabentoImbalance>> {
638 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
639 check_consistent_symbology(&symbols)?;
640
641 let first_symbol = params
642 .symbols
643 .first()
644 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
645 let stype_in = infer_symbology_type(first_symbol);
646 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
647 let time_range = get_date_time_range(params.start, end)?;
648
649 let range_params = GetRangeParams::builder()
650 .dataset(params.dataset)
651 .date_time_range(time_range)
652 .symbols(symbols)
653 .stype_in(stype_in)
654 .schema(dbn::Schema::Imbalance)
655 .maybe_limit(params.limit.and_then(NonZeroU64::new))
656 .build();
657
658 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
659
660 let mut client = self.inner.lock().await;
661 let mut decoder = client
662 .timeseries()
663 .get_range(&range_params)
664 .await
665 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
666
667 let metadata = decoder.metadata().clone();
668 let mut metadata_cache = MetadataCache::new(metadata);
669 let mut result: Vec<DatabentoImbalance> = Vec::new();
670
671 while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
672 let record = dbn::RecordRef::from(msg);
673 let sym_map = self.symbol_venue_map.load();
674 let instrument_id = decode_nautilus_instrument_id(
675 &record,
676 &mut metadata_cache,
677 &self.publisher_venue_map,
678 &sym_map,
679 )?;
680
681 let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, None)?;
682 result.push(imbalance);
683 }
684
685 Ok(result)
686 }
687
688 pub async fn get_range_statistics(
694 &self,
695 params: RangeQueryParams,
696 ) -> anyhow::Result<Vec<DatabentoStatistics>> {
697 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
698 check_consistent_symbology(&symbols)?;
699
700 let first_symbol = params
701 .symbols
702 .first()
703 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
704 let stype_in = infer_symbology_type(first_symbol);
705 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
706 let time_range = get_date_time_range(params.start, end)?;
707
708 let range_params = GetRangeParams::builder()
709 .dataset(params.dataset)
710 .date_time_range(time_range)
711 .symbols(symbols)
712 .stype_in(stype_in)
713 .schema(dbn::Schema::Statistics)
714 .maybe_limit(params.limit.and_then(NonZeroU64::new))
715 .build();
716
717 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
718
719 let mut client = self.inner.lock().await;
720 let mut decoder = client
721 .timeseries()
722 .get_range(&range_params)
723 .await
724 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
725
726 let metadata = decoder.metadata().clone();
727 let mut metadata_cache = MetadataCache::new(metadata);
728 let mut result: Vec<DatabentoStatistics> = Vec::new();
729
730 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
731 let record = dbn::RecordRef::from(msg);
732 let sym_map = self.symbol_venue_map.load();
733 let instrument_id = decode_nautilus_instrument_id(
734 &record,
735 &mut metadata_cache,
736 &self.publisher_venue_map,
737 &sym_map,
738 )?;
739
740 let statistics = decode_statistics_msg(msg, instrument_id, price_precision, None)?;
741 result.push(statistics);
742 }
743
744 Ok(result)
745 }
746
747 pub async fn get_range_status(
753 &self,
754 params: RangeQueryParams,
755 ) -> anyhow::Result<Vec<InstrumentStatus>> {
756 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
757 check_consistent_symbology(&symbols)?;
758
759 let first_symbol = params
760 .symbols
761 .first()
762 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
763 let stype_in = infer_symbology_type(first_symbol);
764 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
765 let time_range = get_date_time_range(params.start, end)?;
766
767 let range_params = GetRangeParams::builder()
768 .dataset(params.dataset)
769 .date_time_range(time_range)
770 .symbols(symbols)
771 .stype_in(stype_in)
772 .schema(dbn::Schema::Status)
773 .maybe_limit(params.limit.and_then(NonZeroU64::new))
774 .build();
775
776 let mut client = self.inner.lock().await;
777 let mut decoder = client
778 .timeseries()
779 .get_range(&range_params)
780 .await
781 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
782
783 let metadata = decoder.metadata().clone();
784 let mut metadata_cache = MetadataCache::new(metadata);
785 let mut result: Vec<InstrumentStatus> = Vec::new();
786
787 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
788 let record = dbn::RecordRef::from(msg);
789 let sym_map = self.symbol_venue_map.load();
790 let instrument_id = decode_nautilus_instrument_id(
791 &record,
792 &mut metadata_cache,
793 &self.publisher_venue_map,
794 &sym_map,
795 )?;
796
797 let status = decode_status_msg(msg, instrument_id, None)?;
798 result.push(status);
799 }
800
801 Ok(result)
802 }
803
804 pub fn prepare_symbols_from_instrument_ids(
806 &self,
807 instrument_ids: &[InstrumentId],
808 ) -> Vec<String> {
809 self.symbol_venue_map.rcu(|m| {
810 for id in instrument_ids {
811 m.entry(id.symbol).or_insert(id.venue);
812 }
813 });
814
815 instrument_ids
816 .iter()
817 .map(|id| id.symbol.to_string())
818 .collect()
819 }
820}