1use std::{fmt::Debug, path::PathBuf};
19
20use nautilus_core::{
21 python::{IntoPyObjectNautilusExt, to_pyexception, to_pyvalue_err},
22 time::get_atomic_clock_realtime,
23};
24use nautilus_model::{
25 enums::BarAggregation, identifiers::InstrumentId,
26 python::instruments::instrument_any_to_pyobject,
27};
28use pyo3::{
29 IntoPyObjectExt,
30 prelude::*,
31 types::{PyDict, PyList},
32};
33
34use crate::{
35 common::Credential,
36 historical::{DatabentoHistoricalClient as CoreDatabentoHistoricalClient, RangeQueryParams},
37};
38
39#[cfg_attr(
41 feature = "python",
42 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
43)]
44#[cfg_attr(
45 feature = "python",
46 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.databento")
47)]
48pub struct DatabentoHistoricalClient {
49 inner: CoreDatabentoHistoricalClient,
50}
51
52impl Debug for DatabentoHistoricalClient {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct(stringify!(DatabentoHistoricalClient))
55 .field("inner", &self.inner)
56 .finish()
57 }
58}
59
60#[pymethods]
61#[pyo3_stub_gen::derive::gen_stub_pymethods]
62impl DatabentoHistoricalClient {
63 #[new]
68 fn py_new(
69 key: String,
70 publishers_filepath: PathBuf,
71 use_exchange_as_venue: bool,
72 ) -> PyResult<Self> {
73 let clock = get_atomic_clock_realtime();
74 let inner = CoreDatabentoHistoricalClient::new(
75 Credential::new(key),
76 publishers_filepath,
77 clock,
78 use_exchange_as_venue,
79 )
80 .map_err(to_pyvalue_err)?;
81
82 Ok(Self { inner })
83 }
84
85 #[getter]
87 #[pyo3(name = "api_key")]
88 fn py_api_key(&self) -> &str {
89 self.inner.api_key()
90 }
91
92 #[pyo3(name = "get_dataset_range")]
94 fn py_get_dataset_range<'py>(
95 &self,
96 py: Python<'py>,
97 dataset: String,
98 ) -> PyResult<Bound<'py, PyAny>> {
99 let inner = self.inner.clone();
100
101 pyo3_async_runtimes::tokio::future_into_py(py, async move {
102 let response = inner.get_dataset_range(&dataset).await;
103 match response {
104 Ok(res) => Python::attach(|py| {
105 let dict = PyDict::new(py);
106 dict.set_item("start", res.start)?;
107 dict.set_item("end", res.end)?;
108 dict.into_py_any(py)
109 }),
110 Err(e) => Err(to_pyexception(format!("Error handling response: {e}"))),
111 }
112 })
113 }
114
115 #[pyo3(name = "get_range_instruments")]
117 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
118 #[expect(clippy::needless_pass_by_value)]
119 fn py_get_range_instruments<'py>(
120 &self,
121 py: Python<'py>,
122 dataset: String,
123 instrument_ids: Vec<InstrumentId>,
124 start: u64,
125 end: Option<u64>,
126 limit: Option<u64>,
127 ) -> PyResult<Bound<'py, PyAny>> {
128 let inner = self.inner.clone();
129 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
130
131 let params = RangeQueryParams {
132 dataset,
133 symbols,
134 start: start.into(),
135 end: end.map(Into::into),
136 limit,
137 price_precision: None,
138 };
139
140 pyo3_async_runtimes::tokio::future_into_py(py, async move {
141 let instruments = inner
142 .get_range_instruments(params)
143 .await
144 .map_err(to_pyvalue_err)?;
145
146 Python::attach(|py| -> PyResult<Py<PyAny>> {
147 let objs: Vec<Py<PyAny>> = instruments
148 .into_iter()
149 .map(|inst| instrument_any_to_pyobject(py, inst))
150 .collect::<PyResult<Vec<Py<PyAny>>>>()?;
151
152 let list = PyList::new(py, &objs).expect("Invalid `ExactSizeIterator`");
153 Ok(list.into_py_any_unwrap(py))
154 })
155 })
156 }
157
158 #[pyo3(name = "get_range_quotes")]
160 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None, schema=None))]
161 #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
162 fn py_get_range_quotes<'py>(
163 &self,
164 py: Python<'py>,
165 dataset: String,
166 instrument_ids: Vec<InstrumentId>,
167 start: u64,
168 end: Option<u64>,
169 limit: Option<u64>,
170 price_precision: Option<u8>,
171 schema: Option<String>,
172 ) -> PyResult<Bound<'py, PyAny>> {
173 let inner = self.inner.clone();
174 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
175
176 let params = RangeQueryParams {
177 dataset,
178 symbols,
179 start: start.into(),
180 end: end.map(Into::into),
181 limit,
182 price_precision,
183 };
184
185 pyo3_async_runtimes::tokio::future_into_py(py, async move {
186 let quotes = inner
187 .get_range_quotes(params, schema)
188 .await
189 .map_err(to_pyvalue_err)?;
190 Python::attach(|py| quotes.into_py_any(py))
191 })
192 }
193
194 #[pyo3(name = "get_range_trades")]
196 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
197 #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
198 fn py_get_range_trades<'py>(
199 &self,
200 py: Python<'py>,
201 dataset: String,
202 instrument_ids: Vec<InstrumentId>,
203 start: u64,
204 end: Option<u64>,
205 limit: Option<u64>,
206 price_precision: Option<u8>,
207 ) -> PyResult<Bound<'py, PyAny>> {
208 let inner = self.inner.clone();
209 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
210
211 let params = RangeQueryParams {
212 dataset,
213 symbols,
214 start: start.into(),
215 end: end.map(Into::into),
216 limit,
217 price_precision,
218 };
219
220 pyo3_async_runtimes::tokio::future_into_py(py, async move {
221 let trades = inner
222 .get_range_trades(params)
223 .await
224 .map_err(to_pyvalue_err)?;
225 Python::attach(|py| trades.into_py_any(py))
226 })
227 }
228
229 #[pyo3(name = "get_range_bars")]
231 #[pyo3(signature = (dataset, instrument_ids, aggregation, start, end=None, limit=None, price_precision=None, timestamp_on_close=true))]
232 #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
233 fn py_get_range_bars<'py>(
234 &self,
235 py: Python<'py>,
236 dataset: String,
237 instrument_ids: Vec<InstrumentId>,
238 aggregation: BarAggregation,
239 start: u64,
240 end: Option<u64>,
241 limit: Option<u64>,
242 price_precision: Option<u8>,
243 timestamp_on_close: bool,
244 ) -> PyResult<Bound<'py, PyAny>> {
245 let inner = self.inner.clone();
246 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
247
248 let params = RangeQueryParams {
249 dataset,
250 symbols,
251 start: start.into(),
252 end: end.map(Into::into),
253 limit,
254 price_precision,
255 };
256
257 pyo3_async_runtimes::tokio::future_into_py(py, async move {
258 let bars = inner
259 .get_range_bars(params, aggregation, timestamp_on_close)
260 .await
261 .map_err(to_pyvalue_err)?;
262 Python::attach(|py| bars.into_py_any(py))
263 })
264 }
265
266 #[pyo3(name = "get_order_book_depth10")]
267 #[pyo3(signature = (dataset, instrument_ids, start, end=None, depth=None))]
268 #[expect(clippy::needless_pass_by_value)]
269 fn py_get_order_book_depth10<'py>(
270 &self,
271 py: Python<'py>,
272 dataset: String,
273 instrument_ids: Vec<InstrumentId>,
274 start: u64,
275 end: Option<u64>,
276 depth: Option<usize>,
277 ) -> PyResult<Bound<'py, PyAny>> {
278 let inner = self.inner.clone();
279 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
280
281 let params = RangeQueryParams {
282 dataset,
283 symbols,
284 start: start.into(),
285 end: end.map(Into::into),
286 limit: None,
287 price_precision: None,
288 };
289
290 pyo3_async_runtimes::tokio::future_into_py(py, async move {
291 let depths = inner
292 .get_range_order_book_depth10(params, depth)
293 .await
294 .map_err(to_pyvalue_err)?;
295 Python::attach(|py| depths.into_py_any(py))
296 })
297 }
298
299 #[pyo3(name = "get_range_order_book_deltas")]
301 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
302 #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
303 fn py_get_range_order_book_deltas<'py>(
304 &self,
305 py: Python<'py>,
306 dataset: String,
307 instrument_ids: Vec<InstrumentId>,
308 start: u64,
309 end: Option<u64>,
310 limit: Option<u64>,
311 price_precision: Option<u8>,
312 ) -> PyResult<Bound<'py, PyAny>> {
313 let inner = self.inner.clone();
314 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
315
316 let params = RangeQueryParams {
317 dataset,
318 symbols,
319 start: start.into(),
320 end: end.map(Into::into),
321 limit,
322 price_precision,
323 };
324
325 pyo3_async_runtimes::tokio::future_into_py(py, async move {
326 let deltas = inner
327 .get_range_order_book_deltas(params)
328 .await
329 .map_err(to_pyvalue_err)?;
330 Python::attach(|py| deltas.into_py_any(py))
331 })
332 }
333
334 #[pyo3(name = "get_range_imbalance")]
336 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
337 #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
338 fn py_get_range_imbalance<'py>(
339 &self,
340 py: Python<'py>,
341 dataset: String,
342 instrument_ids: Vec<InstrumentId>,
343 start: u64,
344 end: Option<u64>,
345 limit: Option<u64>,
346 price_precision: Option<u8>,
347 ) -> PyResult<Bound<'py, PyAny>> {
348 let inner = self.inner.clone();
349 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
350
351 let params = RangeQueryParams {
352 dataset,
353 symbols,
354 start: start.into(),
355 end: end.map(Into::into),
356 limit,
357 price_precision,
358 };
359
360 pyo3_async_runtimes::tokio::future_into_py(py, async move {
361 let imbalances = inner
362 .get_range_imbalance(params)
363 .await
364 .map_err(to_pyvalue_err)?;
365 Python::attach(|py| imbalances.into_py_any(py))
366 })
367 }
368
369 #[pyo3(name = "get_range_statistics")]
371 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
372 #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
373 fn py_get_range_statistics<'py>(
374 &self,
375 py: Python<'py>,
376 dataset: String,
377 instrument_ids: Vec<InstrumentId>,
378 start: u64,
379 end: Option<u64>,
380 limit: Option<u64>,
381 price_precision: Option<u8>,
382 ) -> PyResult<Bound<'py, PyAny>> {
383 let inner = self.inner.clone();
384 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
385
386 let params = RangeQueryParams {
387 dataset,
388 symbols,
389 start: start.into(),
390 end: end.map(Into::into),
391 limit,
392 price_precision,
393 };
394
395 pyo3_async_runtimes::tokio::future_into_py(py, async move {
396 let statistics = inner
397 .get_range_statistics(params)
398 .await
399 .map_err(to_pyvalue_err)?;
400 Python::attach(|py| statistics.into_py_any(py))
401 })
402 }
403
404 #[pyo3(name = "get_range_status")]
406 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
407 #[expect(clippy::needless_pass_by_value)]
408 fn py_get_range_status<'py>(
409 &self,
410 py: Python<'py>,
411 dataset: String,
412 instrument_ids: Vec<InstrumentId>,
413 start: u64,
414 end: Option<u64>,
415 limit: Option<u64>,
416 ) -> PyResult<Bound<'py, PyAny>> {
417 let inner = self.inner.clone();
418 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
419
420 let params = RangeQueryParams {
421 dataset,
422 symbols,
423 start: start.into(),
424 end: end.map(Into::into),
425 limit,
426 price_precision: None,
427 };
428
429 pyo3_async_runtimes::tokio::future_into_py(py, async move {
430 let statuses = inner
431 .get_range_status(params)
432 .await
433 .map_err(to_pyvalue_err)?;
434 Python::attach(|py| statuses.into_py_any(py))
435 })
436 }
437}