Skip to main content

nautilus_databento/
historical.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Core Databento historical client for both Rust and Python usage.
17
18use std::{fmt::Debug, fs, num::NonZeroU64, path::PathBuf, str::FromStr, sync::Arc};
19
20use databento::{
21    dbn::{self, decode::DbnMetadata},
22    historical::timeseries::GetRangeParams,
23};
24use indexmap::IndexMap;
25use nautilus_core::{AtomicMap, UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
26use nautilus_model::{
27    data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
28    enums::BarAggregation,
29    identifiers::{InstrumentId, Symbol, Venue},
30    instruments::InstrumentAny,
31    types::Currency,
32};
33
34use crate::{
35    common::{Credential, get_date_time_range},
36    decode::{
37        decode_imbalance_msg, decode_instrument_def_msg, decode_mbo_msg, decode_mbp10_msg,
38        decode_record, decode_statistics_msg, decode_status_msg,
39    },
40    symbology::{
41        MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
42        infer_symbology_type,
43    },
44    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
45};
46
47/// Core Databento historical client for fetching historical market data.
48///
49/// This client provides both synchronous and asynchronous interfaces for fetching
50/// various types of historical market data from Databento.
51#[derive(Clone)]
52pub struct DatabentoHistoricalClient {
53    credential: Credential,
54    clock: &'static AtomicTime,
55    inner: Arc<tokio::sync::Mutex<databento::HistoricalClient>>,
56    publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
57    symbol_venue_map: Arc<AtomicMap<Symbol, Venue>>,
58    use_exchange_as_venue: bool,
59}
60
61/// Parameters for range queries to Databento historical API.
62#[derive(Debug)]
63pub struct RangeQueryParams {
64    pub dataset: String,
65    pub symbols: Vec<String>,
66    pub start: UnixNanos,
67    pub end: Option<UnixNanos>,
68    pub limit: Option<u64>,
69    pub price_precision: Option<u8>,
70}
71
72/// Result containing dataset date range information.
73#[derive(Debug, Clone)]
74pub struct DatasetRange {
75    pub start: String,
76    pub end: String,
77}
78
79impl Debug for DatabentoHistoricalClient {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        f.debug_struct(stringify!(DatabentoHistoricalClient))
82            .field("credential", &self.credential)
83            .finish()
84    }
85}
86
87impl DatabentoHistoricalClient {
88    /// Returns the API key from the stored credential.
89    #[must_use]
90    pub fn api_key(&self) -> &str {
91        self.credential.api_key()
92    }
93
94    /// Creates a new [`DatabentoHistoricalClient`] instance.
95    ///
96    /// # Errors
97    ///
98    /// Returns an error if client creation or publisher loading fails.
99    pub fn new(
100        credential: Credential,
101        publishers_filepath: PathBuf,
102        clock: &'static AtomicTime,
103        use_exchange_as_venue: bool,
104    ) -> anyhow::Result<Self> {
105        let client = databento::HistoricalClient::builder()
106            .user_agent_extension(NAUTILUS_USER_AGENT.into())
107            .key(credential.api_key())
108            .map_err(|e| anyhow::anyhow!("Failed to create client builder: {e}"))?
109            .build()
110            .map_err(|e| anyhow::anyhow!("Failed to build client: {e}"))?;
111
112        let file_content = fs::read_to_string(publishers_filepath)?;
113        let publishers_vec: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
114
115        let publisher_venue_map = publishers_vec
116            .into_iter()
117            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
118            .collect::<IndexMap<u16, Venue>>();
119
120        Ok(Self {
121            clock,
122            inner: Arc::new(tokio::sync::Mutex::new(client)),
123            publisher_venue_map: Arc::new(publisher_venue_map),
124            symbol_venue_map: Arc::new(AtomicMap::new()),
125            credential,
126            use_exchange_as_venue,
127        })
128    }
129
130    /// Gets the date range for a specific dataset.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error if the API request fails.
135    pub async fn get_dataset_range(&self, dataset: &str) -> anyhow::Result<DatasetRange> {
136        let mut client = self.inner.lock().await;
137        let response = client
138            .metadata()
139            .get_dataset_range(dataset)
140            .await
141            .map_err(|e| anyhow::anyhow!("Failed to get dataset range: {e}"))?;
142
143        Ok(DatasetRange {
144            start: response.start.to_string(),
145            end: response.end.to_string(),
146        })
147    }
148
149    /// Fetches instrument definitions for the given parameters.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if the API request or data processing fails.
154    pub async fn get_range_instruments(
155        &self,
156        params: RangeQueryParams,
157    ) -> anyhow::Result<Vec<InstrumentAny>> {
158        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
159        check_consistent_symbology(&symbols)?;
160
161        let first_symbol = params
162            .symbols
163            .first()
164            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
165        let stype_in = infer_symbology_type(first_symbol);
166        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
167        let time_range = get_date_time_range(params.start, end)?;
168
169        let range_params = GetRangeParams::builder()
170            .dataset(params.dataset)
171            .date_time_range(time_range)
172            .symbols(symbols)
173            .stype_in(stype_in)
174            .schema(dbn::Schema::Definition)
175            .maybe_limit(params.limit.and_then(NonZeroU64::new))
176            .build();
177
178        let mut client = self.inner.lock().await;
179        let mut decoder = client
180            .timeseries()
181            .get_range(&range_params)
182            .await
183            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
184
185        let metadata = decoder.metadata().clone();
186        let mut metadata_cache = MetadataCache::new(metadata);
187        let mut instruments = Vec::new();
188
189        while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
190            let record = dbn::RecordRef::from(msg);
191            let sym_map = self.symbol_venue_map.load();
192            let mut instrument_id = decode_nautilus_instrument_id(
193                &record,
194                &mut metadata_cache,
195                &self.publisher_venue_map,
196                &sym_map,
197            )?;
198
199            if self.use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
200                let exchange = msg
201                    .exchange()
202                    .map_err(|e| anyhow::anyhow!("Missing exchange in record: {e}"))?;
203                let venue = Venue::from_code(exchange)
204                    .map_err(|e| anyhow::anyhow!("Venue not found for exchange {exchange}: {e}"))?;
205                instrument_id.venue = venue;
206            }
207
208            match decode_instrument_def_msg(msg, instrument_id, None) {
209                Ok(instrument) => instruments.push(instrument),
210                Err(e) => log::error!("Failed to decode instrument: {e:?}"),
211            }
212        }
213
214        Ok(instruments)
215    }
216
217    /// Fetches quote ticks for the given parameters.
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if the API request or data processing fails.
222    pub async fn get_range_quotes(
223        &self,
224        params: RangeQueryParams,
225        schema: Option<String>,
226    ) -> anyhow::Result<Vec<QuoteTick>> {
227        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
228        check_consistent_symbology(&symbols)?;
229
230        let first_symbol = params
231            .symbols
232            .first()
233            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
234        let stype_in = infer_symbology_type(first_symbol);
235        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
236        let time_range = get_date_time_range(params.start, end)?;
237        let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
238        let dbn_schema = dbn::Schema::from_str(&schema)?;
239
240        match dbn_schema {
241            dbn::Schema::Mbp1
242            | dbn::Schema::Bbo1S
243            | dbn::Schema::Bbo1M
244            | dbn::Schema::Cmbp1
245            | dbn::Schema::Cbbo1S
246            | dbn::Schema::Cbbo1M => (),
247            _ => anyhow::bail!(
248                "Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m, cmbp-1, cbbo-1s, cbbo-1m"
249            ),
250        }
251
252        let range_params = GetRangeParams::builder()
253            .dataset(params.dataset)
254            .date_time_range(time_range)
255            .symbols(symbols)
256            .stype_in(stype_in)
257            .schema(dbn_schema)
258            .maybe_limit(params.limit.and_then(NonZeroU64::new))
259            .build();
260
261        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
262
263        let mut client = self.inner.lock().await;
264        let mut decoder = client
265            .timeseries()
266            .get_range(&range_params)
267            .await
268            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
269
270        let metadata = decoder.metadata().clone();
271        let mut metadata_cache = MetadataCache::new(metadata);
272        let mut result: Vec<QuoteTick> = Vec::new();
273
274        let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
275            let sym_map = self.symbol_venue_map.load();
276            let instrument_id = decode_nautilus_instrument_id(
277                &record,
278                &mut metadata_cache,
279                &self.publisher_venue_map,
280                &sym_map,
281            )?;
282
283            let (data, _) = decode_record(
284                &record,
285                instrument_id,
286                price_precision,
287                None,
288                false, // Don't include trades
289                true,
290            )?;
291
292            match data {
293                Some(Data::Quote(quote)) => result.push(quote),
294                None => {} // Skip records with undefined bid/ask prices
295                _ => anyhow::bail!("Invalid data element not `QuoteTick`, was {data:?}"),
296            }
297            Ok(())
298        };
299
300        match dbn_schema {
301            dbn::Schema::Mbp1 => {
302                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
303                    process_record(dbn::RecordRef::from(msg))?;
304                }
305            }
306            dbn::Schema::Cmbp1 => {
307                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Cmbp1Msg>().await {
308                    process_record(dbn::RecordRef::from(msg))?;
309                }
310            }
311            dbn::Schema::Bbo1M => {
312                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
313                    process_record(dbn::RecordRef::from(msg))?;
314                }
315            }
316            dbn::Schema::Bbo1S => {
317                while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
318                    process_record(dbn::RecordRef::from(msg))?;
319                }
320            }
321            dbn::Schema::Cbbo1S | dbn::Schema::Cbbo1M => {
322                while let Ok(Some(msg)) = decoder.decode_record::<dbn::CbboMsg>().await {
323                    process_record(dbn::RecordRef::from(msg))?;
324                }
325            }
326            _ => anyhow::bail!("Invalid schema {dbn_schema}"),
327        }
328
329        Ok(result)
330    }
331
332    /// Fetches order book depth10 snapshots for the given parameters.
333    ///
334    /// # Errors
335    ///
336    /// Returns an error if the API request or data processing fails.
337    pub async fn get_range_order_book_depth10(
338        &self,
339        params: RangeQueryParams,
340        depth: Option<usize>,
341    ) -> anyhow::Result<Vec<OrderBookDepth10>> {
342        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
343        check_consistent_symbology(&symbols)?;
344
345        let first_symbol = params
346            .symbols
347            .first()
348            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
349        let stype_in = infer_symbology_type(first_symbol);
350        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
351        let time_range = get_date_time_range(params.start, end)?;
352
353        // For now, only support MBP_10 schema for depth 10
354        let _depth = depth.unwrap_or(10);
355        if _depth != 10 {
356            anyhow::bail!("Only depth=10 is currently supported for order book depths");
357        }
358
359        let range_params = GetRangeParams::builder()
360            .dataset(params.dataset)
361            .date_time_range(time_range)
362            .symbols(symbols)
363            .stype_in(stype_in)
364            .schema(dbn::Schema::Mbp10)
365            .maybe_limit(params.limit.and_then(NonZeroU64::new))
366            .build();
367
368        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
369
370        let mut client = self.inner.lock().await;
371        let mut decoder = client
372            .timeseries()
373            .get_range(&range_params)
374            .await
375            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
376
377        let metadata = decoder.metadata().clone();
378        let mut metadata_cache = MetadataCache::new(metadata);
379        let mut result: Vec<OrderBookDepth10> = Vec::new();
380
381        let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
382            let sym_map = self.symbol_venue_map.load();
383            let instrument_id = decode_nautilus_instrument_id(
384                &record,
385                &mut metadata_cache,
386                &self.publisher_venue_map,
387                &sym_map,
388            )?;
389
390            if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
391                let depth = decode_mbp10_msg(msg, instrument_id, price_precision, None)?;
392                result.push(depth);
393            }
394
395            Ok(())
396        };
397
398        while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp10Msg>().await {
399            process_record(dbn::RecordRef::from(msg))?;
400        }
401
402        Ok(result)
403    }
404
405    /// Fetches order book deltas for the given parameters.
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if the API request or data processing fails.
410    pub async fn get_range_order_book_deltas(
411        &self,
412        params: RangeQueryParams,
413    ) -> anyhow::Result<Vec<OrderBookDelta>> {
414        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
415        check_consistent_symbology(&symbols)?;
416
417        let first_symbol = params
418            .symbols
419            .first()
420            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
421        let stype_in = infer_symbology_type(first_symbol);
422        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
423        let time_range = get_date_time_range(params.start, end)?;
424
425        let range_params = GetRangeParams::builder()
426            .dataset(params.dataset)
427            .date_time_range(time_range)
428            .symbols(symbols)
429            .stype_in(stype_in)
430            .schema(dbn::Schema::Mbo)
431            .maybe_limit(params.limit.and_then(NonZeroU64::new))
432            .build();
433
434        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
435
436        let mut client = self.inner.lock().await;
437        let mut decoder = client
438            .timeseries()
439            .get_range(&range_params)
440            .await
441            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
442
443        let metadata = decoder.metadata().clone();
444        let mut metadata_cache = MetadataCache::new(metadata);
445        let mut result: Vec<OrderBookDelta> = Vec::new();
446
447        let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
448            let sym_map = self.symbol_venue_map.load();
449            let instrument_id = decode_nautilus_instrument_id(
450                &record,
451                &mut metadata_cache,
452                &self.publisher_venue_map,
453                &sym_map,
454            )?;
455
456            if let Some(msg) = record.get::<dbn::MboMsg>() {
457                let (delta, _trade) =
458                    decode_mbo_msg(msg, instrument_id, price_precision, None, false)?;
459
460                if let Some(delta) = delta {
461                    result.push(delta);
462                }
463            }
464
465            Ok(())
466        };
467
468        while let Ok(Some(msg)) = decoder.decode_record::<dbn::MboMsg>().await {
469            process_record(dbn::RecordRef::from(msg))?;
470        }
471
472        Ok(result)
473    }
474
475    /// Fetches trade ticks for the given parameters.
476    ///
477    /// # Errors
478    ///
479    /// Returns an error if the API request or data processing fails.
480    pub async fn get_range_trades(
481        &self,
482        params: RangeQueryParams,
483    ) -> anyhow::Result<Vec<TradeTick>> {
484        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
485        check_consistent_symbology(&symbols)?;
486
487        let first_symbol = params
488            .symbols
489            .first()
490            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
491        let stype_in = infer_symbology_type(first_symbol);
492        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
493        let time_range = get_date_time_range(params.start, end)?;
494
495        let range_params = GetRangeParams::builder()
496            .dataset(params.dataset)
497            .date_time_range(time_range)
498            .symbols(symbols)
499            .stype_in(stype_in)
500            .schema(dbn::Schema::Trades)
501            .maybe_limit(params.limit.and_then(NonZeroU64::new))
502            .build();
503
504        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
505
506        let mut client = self.inner.lock().await;
507        let mut decoder = client
508            .timeseries()
509            .get_range(&range_params)
510            .await
511            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
512
513        let metadata = decoder.metadata().clone();
514        let mut metadata_cache = MetadataCache::new(metadata);
515        let mut result: Vec<TradeTick> = Vec::new();
516
517        while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
518            let record = dbn::RecordRef::from(msg);
519            let sym_map = self.symbol_venue_map.load();
520            let instrument_id = decode_nautilus_instrument_id(
521                &record,
522                &mut metadata_cache,
523                &self.publisher_venue_map,
524                &sym_map,
525            )?;
526
527            let (data, _) = decode_record(
528                &record,
529                instrument_id,
530                price_precision,
531                None,
532                false, // Not applicable (trade will be decoded regardless)
533                true,
534            )?;
535
536            match data {
537                Some(Data::Trade(trade)) => {
538                    result.push(trade);
539                }
540                _ => anyhow::bail!("Invalid data element not `TradeTick`, was {data:?}"),
541            }
542        }
543
544        Ok(result)
545    }
546
547    /// Fetches bars for the given parameters.
548    ///
549    /// # Errors
550    ///
551    /// Returns an error if the API request or data processing fails.
552    pub async fn get_range_bars(
553        &self,
554        params: RangeQueryParams,
555        aggregation: BarAggregation,
556        timestamp_on_close: bool,
557    ) -> anyhow::Result<Vec<Bar>> {
558        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
559        check_consistent_symbology(&symbols)?;
560
561        let first_symbol = params
562            .symbols
563            .first()
564            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
565        let stype_in = infer_symbology_type(first_symbol);
566        let schema = match aggregation {
567            BarAggregation::Second => dbn::Schema::Ohlcv1S,
568            BarAggregation::Minute => dbn::Schema::Ohlcv1M,
569            BarAggregation::Hour => dbn::Schema::Ohlcv1H,
570            BarAggregation::Day => dbn::Schema::Ohlcv1D,
571            _ => anyhow::bail!("Invalid `BarAggregation` for request, was {aggregation}"),
572        };
573
574        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
575        let time_range = get_date_time_range(params.start, end)?;
576
577        let range_params = GetRangeParams::builder()
578            .dataset(params.dataset)
579            .date_time_range(time_range)
580            .symbols(symbols)
581            .stype_in(stype_in)
582            .schema(schema)
583            .maybe_limit(params.limit.and_then(NonZeroU64::new))
584            .build();
585
586        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
587
588        let mut client = self.inner.lock().await;
589        let mut decoder = client
590            .timeseries()
591            .get_range(&range_params)
592            .await
593            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
594
595        let metadata = decoder.metadata().clone();
596        let mut metadata_cache = MetadataCache::new(metadata);
597        let mut result: Vec<Bar> = Vec::new();
598
599        while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
600            let record = dbn::RecordRef::from(msg);
601            let sym_map = self.symbol_venue_map.load();
602            let instrument_id = decode_nautilus_instrument_id(
603                &record,
604                &mut metadata_cache,
605                &self.publisher_venue_map,
606                &sym_map,
607            )?;
608
609            let (data, _) = decode_record(
610                &record,
611                instrument_id,
612                price_precision,
613                None,
614                false, // Not applicable
615                timestamp_on_close,
616            )?;
617
618            match data {
619                Some(Data::Bar(bar)) => {
620                    result.push(bar);
621                }
622                _ => anyhow::bail!("Invalid data element not `Bar`, was {data:?}"),
623            }
624        }
625
626        Ok(result)
627    }
628
629    /// Fetches imbalance data for the given parameters.
630    ///
631    /// # Errors
632    ///
633    /// Returns an error if the API request or data processing fails.
634    pub async fn get_range_imbalance(
635        &self,
636        params: RangeQueryParams,
637    ) -> anyhow::Result<Vec<DatabentoImbalance>> {
638        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
639        check_consistent_symbology(&symbols)?;
640
641        let first_symbol = params
642            .symbols
643            .first()
644            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
645        let stype_in = infer_symbology_type(first_symbol);
646        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
647        let time_range = get_date_time_range(params.start, end)?;
648
649        let range_params = GetRangeParams::builder()
650            .dataset(params.dataset)
651            .date_time_range(time_range)
652            .symbols(symbols)
653            .stype_in(stype_in)
654            .schema(dbn::Schema::Imbalance)
655            .maybe_limit(params.limit.and_then(NonZeroU64::new))
656            .build();
657
658        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
659
660        let mut client = self.inner.lock().await;
661        let mut decoder = client
662            .timeseries()
663            .get_range(&range_params)
664            .await
665            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
666
667        let metadata = decoder.metadata().clone();
668        let mut metadata_cache = MetadataCache::new(metadata);
669        let mut result: Vec<DatabentoImbalance> = Vec::new();
670
671        while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
672            let record = dbn::RecordRef::from(msg);
673            let sym_map = self.symbol_venue_map.load();
674            let instrument_id = decode_nautilus_instrument_id(
675                &record,
676                &mut metadata_cache,
677                &self.publisher_venue_map,
678                &sym_map,
679            )?;
680
681            let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, None)?;
682            result.push(imbalance);
683        }
684
685        Ok(result)
686    }
687
688    /// Fetches statistics data for the given parameters.
689    ///
690    /// # Errors
691    ///
692    /// Returns an error if the API request or data processing fails.
693    pub async fn get_range_statistics(
694        &self,
695        params: RangeQueryParams,
696    ) -> anyhow::Result<Vec<DatabentoStatistics>> {
697        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
698        check_consistent_symbology(&symbols)?;
699
700        let first_symbol = params
701            .symbols
702            .first()
703            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
704        let stype_in = infer_symbology_type(first_symbol);
705        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
706        let time_range = get_date_time_range(params.start, end)?;
707
708        let range_params = GetRangeParams::builder()
709            .dataset(params.dataset)
710            .date_time_range(time_range)
711            .symbols(symbols)
712            .stype_in(stype_in)
713            .schema(dbn::Schema::Statistics)
714            .maybe_limit(params.limit.and_then(NonZeroU64::new))
715            .build();
716
717        let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
718
719        let mut client = self.inner.lock().await;
720        let mut decoder = client
721            .timeseries()
722            .get_range(&range_params)
723            .await
724            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
725
726        let metadata = decoder.metadata().clone();
727        let mut metadata_cache = MetadataCache::new(metadata);
728        let mut result: Vec<DatabentoStatistics> = Vec::new();
729
730        while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
731            let record = dbn::RecordRef::from(msg);
732            let sym_map = self.symbol_venue_map.load();
733            let instrument_id = decode_nautilus_instrument_id(
734                &record,
735                &mut metadata_cache,
736                &self.publisher_venue_map,
737                &sym_map,
738            )?;
739
740            let statistics = decode_statistics_msg(msg, instrument_id, price_precision, None)?;
741            result.push(statistics);
742        }
743
744        Ok(result)
745    }
746
747    /// Fetches status data for the given parameters.
748    ///
749    /// # Errors
750    ///
751    /// Returns an error if the API request or data processing fails.
752    pub async fn get_range_status(
753        &self,
754        params: RangeQueryParams,
755    ) -> anyhow::Result<Vec<InstrumentStatus>> {
756        let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
757        check_consistent_symbology(&symbols)?;
758
759        let first_symbol = params
760            .symbols
761            .first()
762            .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
763        let stype_in = infer_symbology_type(first_symbol);
764        let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
765        let time_range = get_date_time_range(params.start, end)?;
766
767        let range_params = GetRangeParams::builder()
768            .dataset(params.dataset)
769            .date_time_range(time_range)
770            .symbols(symbols)
771            .stype_in(stype_in)
772            .schema(dbn::Schema::Status)
773            .maybe_limit(params.limit.and_then(NonZeroU64::new))
774            .build();
775
776        let mut client = self.inner.lock().await;
777        let mut decoder = client
778            .timeseries()
779            .get_range(&range_params)
780            .await
781            .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
782
783        let metadata = decoder.metadata().clone();
784        let mut metadata_cache = MetadataCache::new(metadata);
785        let mut result: Vec<InstrumentStatus> = Vec::new();
786
787        while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
788            let record = dbn::RecordRef::from(msg);
789            let sym_map = self.symbol_venue_map.load();
790            let instrument_id = decode_nautilus_instrument_id(
791                &record,
792                &mut metadata_cache,
793                &self.publisher_venue_map,
794                &sym_map,
795            )?;
796
797            let status = decode_status_msg(msg, instrument_id, None)?;
798            result.push(status);
799        }
800
801        Ok(result)
802    }
803
804    /// Helper method to prepare symbols from instrument IDs.
805    pub fn prepare_symbols_from_instrument_ids(
806        &self,
807        instrument_ids: &[InstrumentId],
808    ) -> Vec<String> {
809        self.symbol_venue_map.rcu(|m| {
810            for id in instrument_ids {
811                m.entry(id.symbol).or_insert(id.venue);
812            }
813        });
814
815        instrument_ids
816            .iter()
817            .map(|id| id.symbol.to_string())
818            .collect()
819    }
820}