nautilus_databento/python/
live.rs1use std::{fmt::Debug, fs, path::PathBuf, str::FromStr, sync::Arc};
19
20use databento::{dbn, live::Subscription};
21use indexmap::IndexMap;
22use nautilus_core::{
23 AtomicMap,
24 python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err},
25};
26use nautilus_model::{
27 identifiers::{InstrumentId, Symbol, Venue},
28 python::{data::data_to_pycapsule, instruments::instrument_any_to_pyobject},
29};
30use pyo3::prelude::*;
31use time::OffsetDateTime;
32
33use super::types::DatabentoSubscriptionAck;
34use crate::{
35 common::Credential,
36 live::{DatabentoFeedHandler, DatabentoMessage, HandlerCommand},
37 symbology::{check_consistent_symbology, infer_symbology_type},
38 types::DatabentoPublisher,
39};
40
41#[cfg_attr(
42 feature = "python",
43 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
44)]
45#[cfg_attr(
46 feature = "python",
47 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.databento")
48)]
49pub struct DatabentoLiveClient {
50 credential: Credential,
51 #[pyo3(get)]
52 pub dataset: String,
53 is_running: bool,
54 is_closed: bool,
55 cmd_tx: tokio::sync::mpsc::UnboundedSender<HandlerCommand>,
56 cmd_rx: Option<tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>>,
57 buffer_size: usize,
58 publisher_venue_map: IndexMap<u16, Venue>,
59 symbol_venue_map: Arc<AtomicMap<Symbol, Venue>>,
60 use_exchange_as_venue: bool,
61 bars_timestamp_on_close: bool,
62 reconnect_timeout_mins: Option<u64>,
63}
64
65impl Debug for DatabentoLiveClient {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 f.debug_struct(stringify!(DatabentoLiveClient))
68 .field("credential", &self.credential)
69 .field("dataset", &self.dataset)
70 .field("is_running", &self.is_running)
71 .field("is_closed", &self.is_closed)
72 .finish()
73 }
74}
75
76impl DatabentoLiveClient {
77 #[must_use]
78 pub fn is_closed(&self) -> bool {
79 self.cmd_tx.is_closed()
80 }
81
82 async fn process_messages(
83 mut msg_rx: tokio::sync::mpsc::Receiver<DatabentoMessage>,
84 callback: Py<PyAny>,
85 callback_pyo3: Py<PyAny>,
86 ) -> PyResult<()> {
87 log::debug!("Processing messages...");
88 while let Some(msg) = msg_rx.recv().await {
90 log::trace!("Received message: {msg:?}");
91
92 match msg {
93 DatabentoMessage::Data(data) => Python::attach(|py| {
94 let py_obj = data_to_pycapsule(py, data);
95 call_python(py, &callback, py_obj);
96 }),
97 DatabentoMessage::Instrument(data) => {
98 Python::attach(|py| match instrument_any_to_pyobject(py, *data) {
99 Ok(py_obj) => call_python(py, &callback, py_obj),
100 Err(e) => log::error!("Failed creating instrument: {e}"),
101 });
102 }
103 DatabentoMessage::Status(data) => Python::attach(|py| {
104 let py_obj = data.into_py_any_unwrap(py);
105 call_python(py, &callback_pyo3, py_obj);
106 }),
107 DatabentoMessage::Imbalance(data) => Python::attach(|py| {
108 let py_obj = data.into_py_any_unwrap(py);
109 call_python(py, &callback_pyo3, py_obj);
110 }),
111 DatabentoMessage::Statistics(data) => Python::attach(|py| {
112 let py_obj = data.into_py_any_unwrap(py);
113 call_python(py, &callback_pyo3, py_obj);
114 }),
115 DatabentoMessage::SubscriptionAck(ack) => Python::attach(|py| {
116 let py_obj: DatabentoSubscriptionAck = ack.into();
117 let py_obj = py_obj.into_py_any_unwrap(py);
118 call_python(py, &callback_pyo3, py_obj);
119 }),
120 DatabentoMessage::Close => {
121 break;
123 }
124 DatabentoMessage::Error(e) => {
125 return Err(to_pyruntime_err(e));
127 }
128 }
129 }
130
131 msg_rx.close();
132 log::debug!("Closed message receiver");
133
134 Ok(())
135 }
136
137 fn send_command(&self, cmd: HandlerCommand) -> PyResult<()> {
138 self.cmd_tx.send(cmd).map_err(to_pyruntime_err)
139 }
140}
141
142fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
143 if let Err(e) = callback.call1(py, (py_obj,)) {
144 if !e.to_string().contains("CancelledError") {
146 log::error!("Error calling Python: {e}");
147 }
148 }
149}
150
151#[pymethods]
152#[pyo3_stub_gen::derive::gen_stub_pymethods]
153impl DatabentoLiveClient {
154 #[new]
158 #[pyo3(signature = (key, dataset, publishers_filepath, use_exchange_as_venue, bars_timestamp_on_close=None, reconnect_timeout_mins=None))]
159 pub fn py_new(
160 key: String,
161 dataset: String,
162 publishers_filepath: PathBuf,
163 use_exchange_as_venue: bool,
164 bars_timestamp_on_close: Option<bool>,
165 reconnect_timeout_mins: Option<i64>,
166 ) -> PyResult<Self> {
167 let publishers_json = fs::read_to_string(publishers_filepath).map_err(to_pyvalue_err)?;
168 let publishers_vec: Vec<DatabentoPublisher> =
169 serde_json::from_str(&publishers_json).map_err(to_pyvalue_err)?;
170 let publisher_venue_map = publishers_vec
171 .into_iter()
172 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
173 .collect::<IndexMap<u16, Venue>>();
174
175 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
176
177 let buffer_size = 100_000;
179
180 let reconnect_timeout_mins = reconnect_timeout_mins
182 .and_then(|mins| if mins >= 0 { Some(mins as u64) } else { None });
183
184 Ok(Self {
185 credential: Credential::new(key),
186 dataset,
187 cmd_tx,
188 cmd_rx: Some(cmd_rx),
189 buffer_size,
190 is_running: false,
191 is_closed: false,
192 publisher_venue_map,
193 symbol_venue_map: Arc::new(AtomicMap::new()),
194 use_exchange_as_venue,
195 bars_timestamp_on_close: bars_timestamp_on_close.unwrap_or(true),
196 reconnect_timeout_mins,
197 })
198 }
199
200 #[pyo3(name = "is_running")]
201 const fn py_is_running(&self) -> bool {
202 self.is_running
203 }
204
205 #[pyo3(name = "is_closed")]
206 const fn py_is_closed(&self) -> bool {
207 self.is_closed
208 }
209
210 #[pyo3(name = "subscribe")]
211 #[pyo3(signature = (schema, instrument_ids, start=None, snapshot=None))]
212 #[expect(clippy::needless_pass_by_value)]
213 fn py_subscribe(
214 &mut self,
215 schema: String,
216 instrument_ids: Vec<InstrumentId>,
217 start: Option<u64>,
218 snapshot: Option<bool>,
219 ) -> PyResult<()> {
220 self.symbol_venue_map.rcu(|m| {
221 for id in &instrument_ids {
222 m.entry(id.symbol).or_insert(id.venue);
223 }
224 });
225 let symbols: Vec<String> = instrument_ids
226 .iter()
227 .map(|id| id.symbol.to_string())
228 .collect();
229 let first_symbol = symbols
230 .first()
231 .ok_or_else(|| to_pyvalue_err("No symbols provided"))?;
232 let stype_in = infer_symbology_type(first_symbol);
233 let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
234 check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
235 let mut sub = Subscription::builder()
236 .symbols(symbols)
237 .schema(dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?)
238 .stype_in(stype_in)
239 .build();
240
241 if let Some(start) = start {
242 let start = OffsetDateTime::from_unix_timestamp_nanos(i128::from(start))
243 .map_err(to_pyvalue_err)?;
244 sub.start = Some(start);
245 }
246 sub.use_snapshot = snapshot.unwrap_or(false);
247
248 self.send_command(HandlerCommand::Subscribe(sub))
249 }
250
251 #[pyo3(name = "start")]
252 fn py_start<'py>(
253 &mut self,
254 py: Python<'py>,
255 callback: Py<PyAny>,
256 callback_pyo3: Py<PyAny>,
257 ) -> PyResult<Bound<'py, PyAny>> {
258 if self.is_closed {
259 return Err(to_pyruntime_err("Client already closed"));
260 }
261
262 if self.is_running {
263 return Err(to_pyruntime_err("Client already running"));
264 }
265
266 log::debug!("Starting client");
267
268 self.is_running = true;
269
270 let (msg_tx, msg_rx) = tokio::sync::mpsc::channel::<DatabentoMessage>(self.buffer_size);
271
272 let cmd_rx = self
276 .cmd_rx
277 .take()
278 .ok_or_else(|| to_pyruntime_err("Command receiver already taken"))?;
279
280 let mut feed_handler = DatabentoFeedHandler::new(
281 self.credential.clone(),
282 self.dataset.clone(),
283 cmd_rx,
284 msg_tx,
285 self.publisher_venue_map.clone(),
286 self.symbol_venue_map.clone(),
287 self.use_exchange_as_venue,
288 self.bars_timestamp_on_close,
289 self.reconnect_timeout_mins,
290 );
291
292 self.send_command(HandlerCommand::Start)?;
293
294 pyo3_async_runtimes::tokio::future_into_py(py, async move {
295 let (proc_handle, feed_handle) = tokio::join!(
296 Self::process_messages(msg_rx, callback, callback_pyo3),
297 feed_handler.run(),
298 );
299
300 match proc_handle {
301 Ok(()) => log::debug!("Message processor completed"),
302 Err(e) => log::error!("Message processor error: {e}"),
303 }
304
305 match feed_handle {
306 Ok(()) => log::debug!("Feed handler completed"),
307 Err(e) => log::error!("Feed handler error: {e}"),
308 }
309
310 Ok(())
311 })
312 }
313
314 #[pyo3(name = "close")]
315 fn py_close(&mut self) -> PyResult<()> {
316 if !self.is_running {
317 return Err(to_pyruntime_err("Client never started"));
318 }
319
320 if self.is_closed {
321 return Err(to_pyruntime_err("Client already closed"));
322 }
323
324 log::debug!("Closing client");
325
326 if !self.is_closed() {
327 self.send_command(HandlerCommand::Close)?;
328 }
329
330 self.is_running = false;
331 self.is_closed = true;
332
333 Ok(())
334 }
335}