Skip to main content

nautilus_databento/python/
historical.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Databento historical client.
17
18use std::{fmt::Debug, path::PathBuf};
19
20use nautilus_core::{
21    python::{IntoPyObjectNautilusExt, to_pyexception, to_pyvalue_err},
22    time::get_atomic_clock_realtime,
23};
24use nautilus_model::{
25    enums::BarAggregation, identifiers::InstrumentId,
26    python::instruments::instrument_any_to_pyobject,
27};
28use pyo3::{
29    IntoPyObjectExt,
30    prelude::*,
31    types::{PyDict, PyList},
32};
33
34use crate::{
35    common::Credential,
36    historical::{DatabentoHistoricalClient as CoreDatabentoHistoricalClient, RangeQueryParams},
37};
38
39/// Python wrapper for the core Databento historical client.
40#[cfg_attr(
41    feature = "python",
42    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
43)]
44#[cfg_attr(
45    feature = "python",
46    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.databento")
47)]
48pub struct DatabentoHistoricalClient {
49    inner: CoreDatabentoHistoricalClient,
50}
51
52impl Debug for DatabentoHistoricalClient {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.debug_struct(stringify!(DatabentoHistoricalClient))
55            .field("inner", &self.inner)
56            .finish()
57    }
58}
59
60#[pymethods]
61#[pyo3_stub_gen::derive::gen_stub_pymethods]
62impl DatabentoHistoricalClient {
63    /// Core Databento historical client for fetching historical market data.
64    ///
65    /// This client provides both synchronous and asynchronous interfaces for fetching
66    /// various types of historical market data from Databento.
67    #[new]
68    fn py_new(
69        key: String,
70        publishers_filepath: PathBuf,
71        use_exchange_as_venue: bool,
72    ) -> PyResult<Self> {
73        let clock = get_atomic_clock_realtime();
74        let inner = CoreDatabentoHistoricalClient::new(
75            Credential::new(key),
76            publishers_filepath,
77            clock,
78            use_exchange_as_venue,
79        )
80        .map_err(to_pyvalue_err)?;
81
82        Ok(Self { inner })
83    }
84
85    /// Returns the API key from the stored credential.
86    #[getter]
87    #[pyo3(name = "api_key")]
88    fn py_api_key(&self) -> &str {
89        self.inner.api_key()
90    }
91
92    /// Gets the date range for a specific dataset.
93    #[pyo3(name = "get_dataset_range")]
94    fn py_get_dataset_range<'py>(
95        &self,
96        py: Python<'py>,
97        dataset: String,
98    ) -> PyResult<Bound<'py, PyAny>> {
99        let inner = self.inner.clone();
100
101        pyo3_async_runtimes::tokio::future_into_py(py, async move {
102            let response = inner.get_dataset_range(&dataset).await;
103            match response {
104                Ok(res) => Python::attach(|py| {
105                    let dict = PyDict::new(py);
106                    dict.set_item("start", res.start)?;
107                    dict.set_item("end", res.end)?;
108                    dict.into_py_any(py)
109                }),
110                Err(e) => Err(to_pyexception(format!("Error handling response: {e}"))),
111            }
112        })
113    }
114
115    /// Fetches instrument definitions for the given parameters.
116    #[pyo3(name = "get_range_instruments")]
117    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
118    #[expect(clippy::needless_pass_by_value)]
119    fn py_get_range_instruments<'py>(
120        &self,
121        py: Python<'py>,
122        dataset: String,
123        instrument_ids: Vec<InstrumentId>,
124        start: u64,
125        end: Option<u64>,
126        limit: Option<u64>,
127    ) -> PyResult<Bound<'py, PyAny>> {
128        let inner = self.inner.clone();
129        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
130
131        let params = RangeQueryParams {
132            dataset,
133            symbols,
134            start: start.into(),
135            end: end.map(Into::into),
136            limit,
137            price_precision: None,
138        };
139
140        pyo3_async_runtimes::tokio::future_into_py(py, async move {
141            let instruments = inner
142                .get_range_instruments(params)
143                .await
144                .map_err(to_pyvalue_err)?;
145
146            Python::attach(|py| -> PyResult<Py<PyAny>> {
147                let objs: Vec<Py<PyAny>> = instruments
148                    .into_iter()
149                    .map(|inst| instrument_any_to_pyobject(py, inst))
150                    .collect::<PyResult<Vec<Py<PyAny>>>>()?;
151
152                let list = PyList::new(py, &objs).expect("Invalid `ExactSizeIterator`");
153                Ok(list.into_py_any_unwrap(py))
154            })
155        })
156    }
157
158    /// Fetches quote ticks for the given parameters.
159    #[pyo3(name = "get_range_quotes")]
160    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None, schema=None))]
161    #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
162    fn py_get_range_quotes<'py>(
163        &self,
164        py: Python<'py>,
165        dataset: String,
166        instrument_ids: Vec<InstrumentId>,
167        start: u64,
168        end: Option<u64>,
169        limit: Option<u64>,
170        price_precision: Option<u8>,
171        schema: Option<String>,
172    ) -> PyResult<Bound<'py, PyAny>> {
173        let inner = self.inner.clone();
174        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
175
176        let params = RangeQueryParams {
177            dataset,
178            symbols,
179            start: start.into(),
180            end: end.map(Into::into),
181            limit,
182            price_precision,
183        };
184
185        pyo3_async_runtimes::tokio::future_into_py(py, async move {
186            let quotes = inner
187                .get_range_quotes(params, schema)
188                .await
189                .map_err(to_pyvalue_err)?;
190            Python::attach(|py| quotes.into_py_any(py))
191        })
192    }
193
194    /// Fetches trade ticks for the given parameters.
195    #[pyo3(name = "get_range_trades")]
196    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
197    #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
198    fn py_get_range_trades<'py>(
199        &self,
200        py: Python<'py>,
201        dataset: String,
202        instrument_ids: Vec<InstrumentId>,
203        start: u64,
204        end: Option<u64>,
205        limit: Option<u64>,
206        price_precision: Option<u8>,
207    ) -> PyResult<Bound<'py, PyAny>> {
208        let inner = self.inner.clone();
209        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
210
211        let params = RangeQueryParams {
212            dataset,
213            symbols,
214            start: start.into(),
215            end: end.map(Into::into),
216            limit,
217            price_precision,
218        };
219
220        pyo3_async_runtimes::tokio::future_into_py(py, async move {
221            let trades = inner
222                .get_range_trades(params)
223                .await
224                .map_err(to_pyvalue_err)?;
225            Python::attach(|py| trades.into_py_any(py))
226        })
227    }
228
229    /// Fetches bars for the given parameters.
230    #[pyo3(name = "get_range_bars")]
231    #[pyo3(signature = (dataset, instrument_ids, aggregation, start, end=None, limit=None, price_precision=None, timestamp_on_close=true))]
232    #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
233    fn py_get_range_bars<'py>(
234        &self,
235        py: Python<'py>,
236        dataset: String,
237        instrument_ids: Vec<InstrumentId>,
238        aggregation: BarAggregation,
239        start: u64,
240        end: Option<u64>,
241        limit: Option<u64>,
242        price_precision: Option<u8>,
243        timestamp_on_close: bool,
244    ) -> PyResult<Bound<'py, PyAny>> {
245        let inner = self.inner.clone();
246        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
247
248        let params = RangeQueryParams {
249            dataset,
250            symbols,
251            start: start.into(),
252            end: end.map(Into::into),
253            limit,
254            price_precision,
255        };
256
257        pyo3_async_runtimes::tokio::future_into_py(py, async move {
258            let bars = inner
259                .get_range_bars(params, aggregation, timestamp_on_close)
260                .await
261                .map_err(to_pyvalue_err)?;
262            Python::attach(|py| bars.into_py_any(py))
263        })
264    }
265
266    #[pyo3(name = "get_order_book_depth10")]
267    #[pyo3(signature = (dataset, instrument_ids, start, end=None, depth=None))]
268    #[expect(clippy::needless_pass_by_value)]
269    fn py_get_order_book_depth10<'py>(
270        &self,
271        py: Python<'py>,
272        dataset: String,
273        instrument_ids: Vec<InstrumentId>,
274        start: u64,
275        end: Option<u64>,
276        depth: Option<usize>,
277    ) -> PyResult<Bound<'py, PyAny>> {
278        let inner = self.inner.clone();
279        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
280
281        let params = RangeQueryParams {
282            dataset,
283            symbols,
284            start: start.into(),
285            end: end.map(Into::into),
286            limit: None,
287            price_precision: None,
288        };
289
290        pyo3_async_runtimes::tokio::future_into_py(py, async move {
291            let depths = inner
292                .get_range_order_book_depth10(params, depth)
293                .await
294                .map_err(to_pyvalue_err)?;
295            Python::attach(|py| depths.into_py_any(py))
296        })
297    }
298
299    /// Fetches order book deltas for the given parameters.
300    #[pyo3(name = "get_range_order_book_deltas")]
301    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
302    #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
303    fn py_get_range_order_book_deltas<'py>(
304        &self,
305        py: Python<'py>,
306        dataset: String,
307        instrument_ids: Vec<InstrumentId>,
308        start: u64,
309        end: Option<u64>,
310        limit: Option<u64>,
311        price_precision: Option<u8>,
312    ) -> PyResult<Bound<'py, PyAny>> {
313        let inner = self.inner.clone();
314        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
315
316        let params = RangeQueryParams {
317            dataset,
318            symbols,
319            start: start.into(),
320            end: end.map(Into::into),
321            limit,
322            price_precision,
323        };
324
325        pyo3_async_runtimes::tokio::future_into_py(py, async move {
326            let deltas = inner
327                .get_range_order_book_deltas(params)
328                .await
329                .map_err(to_pyvalue_err)?;
330            Python::attach(|py| deltas.into_py_any(py))
331        })
332    }
333
334    /// Fetches imbalance data for the given parameters.
335    #[pyo3(name = "get_range_imbalance")]
336    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
337    #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
338    fn py_get_range_imbalance<'py>(
339        &self,
340        py: Python<'py>,
341        dataset: String,
342        instrument_ids: Vec<InstrumentId>,
343        start: u64,
344        end: Option<u64>,
345        limit: Option<u64>,
346        price_precision: Option<u8>,
347    ) -> PyResult<Bound<'py, PyAny>> {
348        let inner = self.inner.clone();
349        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
350
351        let params = RangeQueryParams {
352            dataset,
353            symbols,
354            start: start.into(),
355            end: end.map(Into::into),
356            limit,
357            price_precision,
358        };
359
360        pyo3_async_runtimes::tokio::future_into_py(py, async move {
361            let imbalances = inner
362                .get_range_imbalance(params)
363                .await
364                .map_err(to_pyvalue_err)?;
365            Python::attach(|py| imbalances.into_py_any(py))
366        })
367    }
368
369    /// Fetches statistics data for the given parameters.
370    #[pyo3(name = "get_range_statistics")]
371    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
372    #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
373    fn py_get_range_statistics<'py>(
374        &self,
375        py: Python<'py>,
376        dataset: String,
377        instrument_ids: Vec<InstrumentId>,
378        start: u64,
379        end: Option<u64>,
380        limit: Option<u64>,
381        price_precision: Option<u8>,
382    ) -> PyResult<Bound<'py, PyAny>> {
383        let inner = self.inner.clone();
384        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
385
386        let params = RangeQueryParams {
387            dataset,
388            symbols,
389            start: start.into(),
390            end: end.map(Into::into),
391            limit,
392            price_precision,
393        };
394
395        pyo3_async_runtimes::tokio::future_into_py(py, async move {
396            let statistics = inner
397                .get_range_statistics(params)
398                .await
399                .map_err(to_pyvalue_err)?;
400            Python::attach(|py| statistics.into_py_any(py))
401        })
402    }
403
404    /// Fetches status data for the given parameters.
405    #[pyo3(name = "get_range_status")]
406    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
407    #[expect(clippy::needless_pass_by_value)]
408    fn py_get_range_status<'py>(
409        &self,
410        py: Python<'py>,
411        dataset: String,
412        instrument_ids: Vec<InstrumentId>,
413        start: u64,
414        end: Option<u64>,
415        limit: Option<u64>,
416    ) -> PyResult<Bound<'py, PyAny>> {
417        let inner = self.inner.clone();
418        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
419
420        let params = RangeQueryParams {
421            dataset,
422            symbols,
423            start: start.into(),
424            end: end.map(Into::into),
425            limit,
426            price_precision: None,
427        };
428
429        pyo3_async_runtimes::tokio::future_into_py(py, async move {
430            let statuses = inner
431                .get_range_status(params)
432                .await
433                .map_err(to_pyvalue_err)?;
434            Python::attach(|py| statuses.into_py_any(py))
435        })
436    }
437}