Skip to main content

nautilus_databento/python/
live.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Databento live client.
17
18use std::{fmt::Debug, fs, path::PathBuf, str::FromStr, sync::Arc};
19
20use databento::{dbn, live::Subscription};
21use indexmap::IndexMap;
22use nautilus_core::{
23    AtomicMap,
24    python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err},
25};
26use nautilus_model::{
27    identifiers::{InstrumentId, Symbol, Venue},
28    python::{data::data_to_pycapsule, instruments::instrument_any_to_pyobject},
29};
30use pyo3::prelude::*;
31use time::OffsetDateTime;
32
33use super::types::DatabentoSubscriptionAck;
34use crate::{
35    common::Credential,
36    live::{DatabentoFeedHandler, DatabentoMessage, HandlerCommand},
37    symbology::{check_consistent_symbology, infer_symbology_type},
38    types::DatabentoPublisher,
39};
40
41#[cfg_attr(
42    feature = "python",
43    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
44)]
45#[cfg_attr(
46    feature = "python",
47    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.databento")
48)]
49pub struct DatabentoLiveClient {
50    credential: Credential,
51    #[pyo3(get)]
52    pub dataset: String,
53    is_running: bool,
54    is_closed: bool,
55    cmd_tx: tokio::sync::mpsc::UnboundedSender<HandlerCommand>,
56    cmd_rx: Option<tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>>,
57    buffer_size: usize,
58    publisher_venue_map: IndexMap<u16, Venue>,
59    symbol_venue_map: Arc<AtomicMap<Symbol, Venue>>,
60    use_exchange_as_venue: bool,
61    bars_timestamp_on_close: bool,
62    reconnect_timeout_mins: Option<u64>,
63}
64
65impl Debug for DatabentoLiveClient {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        f.debug_struct(stringify!(DatabentoLiveClient))
68            .field("credential", &self.credential)
69            .field("dataset", &self.dataset)
70            .field("is_running", &self.is_running)
71            .field("is_closed", &self.is_closed)
72            .finish()
73    }
74}
75
76impl DatabentoLiveClient {
77    #[must_use]
78    pub fn is_closed(&self) -> bool {
79        self.cmd_tx.is_closed()
80    }
81
82    async fn process_messages(
83        mut msg_rx: tokio::sync::mpsc::Receiver<DatabentoMessage>,
84        callback: Py<PyAny>,
85        callback_pyo3: Py<PyAny>,
86    ) -> PyResult<()> {
87        log::debug!("Processing messages...");
88        // Continue to process messages until channel is hung up
89        while let Some(msg) = msg_rx.recv().await {
90            log::trace!("Received message: {msg:?}");
91
92            match msg {
93                DatabentoMessage::Data(data) => Python::attach(|py| {
94                    let py_obj = data_to_pycapsule(py, data);
95                    call_python(py, &callback, py_obj);
96                }),
97                DatabentoMessage::Instrument(data) => {
98                    Python::attach(|py| match instrument_any_to_pyobject(py, *data) {
99                        Ok(py_obj) => call_python(py, &callback, py_obj),
100                        Err(e) => log::error!("Failed creating instrument: {e}"),
101                    });
102                }
103                DatabentoMessage::Status(data) => Python::attach(|py| {
104                    let py_obj = data.into_py_any_unwrap(py);
105                    call_python(py, &callback_pyo3, py_obj);
106                }),
107                DatabentoMessage::Imbalance(data) => Python::attach(|py| {
108                    let py_obj = data.into_py_any_unwrap(py);
109                    call_python(py, &callback_pyo3, py_obj);
110                }),
111                DatabentoMessage::Statistics(data) => Python::attach(|py| {
112                    let py_obj = data.into_py_any_unwrap(py);
113                    call_python(py, &callback_pyo3, py_obj);
114                }),
115                DatabentoMessage::SubscriptionAck(ack) => Python::attach(|py| {
116                    let py_obj: DatabentoSubscriptionAck = ack.into();
117                    let py_obj = py_obj.into_py_any_unwrap(py);
118                    call_python(py, &callback_pyo3, py_obj);
119                }),
120                DatabentoMessage::Close => {
121                    // Graceful close
122                    break;
123                }
124                DatabentoMessage::Error(e) => {
125                    // Return error to Python
126                    return Err(to_pyruntime_err(e));
127                }
128            }
129        }
130
131        msg_rx.close();
132        log::debug!("Closed message receiver");
133
134        Ok(())
135    }
136
137    fn send_command(&self, cmd: HandlerCommand) -> PyResult<()> {
138        self.cmd_tx.send(cmd).map_err(to_pyruntime_err)
139    }
140}
141
142fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
143    if let Err(e) = callback.call1(py, (py_obj,)) {
144        // TODO: Improve this by checking for the actual exception type
145        if !e.to_string().contains("CancelledError") {
146            log::error!("Error calling Python: {e}");
147        }
148    }
149}
150
151#[pymethods]
152#[pyo3_stub_gen::derive::gen_stub_pymethods]
153impl DatabentoLiveClient {
154    /// # Errors
155    ///
156    /// Returns a `PyErr` if reading or parsing the publishers file fails.
157    #[new]
158    #[pyo3(signature = (key, dataset, publishers_filepath, use_exchange_as_venue, bars_timestamp_on_close=None, reconnect_timeout_mins=None))]
159    pub fn py_new(
160        key: String,
161        dataset: String,
162        publishers_filepath: PathBuf,
163        use_exchange_as_venue: bool,
164        bars_timestamp_on_close: Option<bool>,
165        reconnect_timeout_mins: Option<i64>,
166    ) -> PyResult<Self> {
167        let publishers_json = fs::read_to_string(publishers_filepath).map_err(to_pyvalue_err)?;
168        let publishers_vec: Vec<DatabentoPublisher> =
169            serde_json::from_str(&publishers_json).map_err(to_pyvalue_err)?;
170        let publisher_venue_map = publishers_vec
171            .into_iter()
172            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
173            .collect::<IndexMap<u16, Venue>>();
174
175        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
176
177        // Hardcoded to a reasonable size for now
178        let buffer_size = 100_000;
179
180        // Convert i64 to u64: None/negative = infinite retries, 0 = no retries, positive = timeout in minutes
181        let reconnect_timeout_mins = reconnect_timeout_mins
182            .and_then(|mins| if mins >= 0 { Some(mins as u64) } else { None });
183
184        Ok(Self {
185            credential: Credential::new(key),
186            dataset,
187            cmd_tx,
188            cmd_rx: Some(cmd_rx),
189            buffer_size,
190            is_running: false,
191            is_closed: false,
192            publisher_venue_map,
193            symbol_venue_map: Arc::new(AtomicMap::new()),
194            use_exchange_as_venue,
195            bars_timestamp_on_close: bars_timestamp_on_close.unwrap_or(true),
196            reconnect_timeout_mins,
197        })
198    }
199
200    #[pyo3(name = "is_running")]
201    const fn py_is_running(&self) -> bool {
202        self.is_running
203    }
204
205    #[pyo3(name = "is_closed")]
206    const fn py_is_closed(&self) -> bool {
207        self.is_closed
208    }
209
210    #[pyo3(name = "subscribe")]
211    #[pyo3(signature = (schema, instrument_ids, start=None, snapshot=None))]
212    #[expect(clippy::needless_pass_by_value)]
213    fn py_subscribe(
214        &mut self,
215        schema: String,
216        instrument_ids: Vec<InstrumentId>,
217        start: Option<u64>,
218        snapshot: Option<bool>,
219    ) -> PyResult<()> {
220        self.symbol_venue_map.rcu(|m| {
221            for id in &instrument_ids {
222                m.entry(id.symbol).or_insert(id.venue);
223            }
224        });
225        let symbols: Vec<String> = instrument_ids
226            .iter()
227            .map(|id| id.symbol.to_string())
228            .collect();
229        let first_symbol = symbols
230            .first()
231            .ok_or_else(|| to_pyvalue_err("No symbols provided"))?;
232        let stype_in = infer_symbology_type(first_symbol);
233        let symbols: Vec<&str> = symbols.iter().map(String::as_str).collect();
234        check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
235        let mut sub = Subscription::builder()
236            .symbols(symbols)
237            .schema(dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?)
238            .stype_in(stype_in)
239            .build();
240
241        if let Some(start) = start {
242            let start = OffsetDateTime::from_unix_timestamp_nanos(i128::from(start))
243                .map_err(to_pyvalue_err)?;
244            sub.start = Some(start);
245        }
246        sub.use_snapshot = snapshot.unwrap_or(false);
247
248        self.send_command(HandlerCommand::Subscribe(sub))
249    }
250
251    #[pyo3(name = "start")]
252    fn py_start<'py>(
253        &mut self,
254        py: Python<'py>,
255        callback: Py<PyAny>,
256        callback_pyo3: Py<PyAny>,
257    ) -> PyResult<Bound<'py, PyAny>> {
258        if self.is_closed {
259            return Err(to_pyruntime_err("Client already closed"));
260        }
261
262        if self.is_running {
263            return Err(to_pyruntime_err("Client already running"));
264        }
265
266        log::debug!("Starting client");
267
268        self.is_running = true;
269
270        let (msg_tx, msg_rx) = tokio::sync::mpsc::channel::<DatabentoMessage>(self.buffer_size);
271
272        // Consume the receiver
273        // We guard the client from being started more than once with the
274        // `is_running` flag, so here it is safe to unwrap the command receiver.
275        let cmd_rx = self
276            .cmd_rx
277            .take()
278            .ok_or_else(|| to_pyruntime_err("Command receiver already taken"))?;
279
280        let mut feed_handler = DatabentoFeedHandler::new(
281            self.credential.clone(),
282            self.dataset.clone(),
283            cmd_rx,
284            msg_tx,
285            self.publisher_venue_map.clone(),
286            self.symbol_venue_map.clone(),
287            self.use_exchange_as_venue,
288            self.bars_timestamp_on_close,
289            self.reconnect_timeout_mins,
290        );
291
292        self.send_command(HandlerCommand::Start)?;
293
294        pyo3_async_runtimes::tokio::future_into_py(py, async move {
295            let (proc_handle, feed_handle) = tokio::join!(
296                Self::process_messages(msg_rx, callback, callback_pyo3),
297                feed_handler.run(),
298            );
299
300            match proc_handle {
301                Ok(()) => log::debug!("Message processor completed"),
302                Err(e) => log::error!("Message processor error: {e}"),
303            }
304
305            match feed_handle {
306                Ok(()) => log::debug!("Feed handler completed"),
307                Err(e) => log::error!("Feed handler error: {e}"),
308            }
309
310            Ok(())
311        })
312    }
313
314    #[pyo3(name = "close")]
315    fn py_close(&mut self) -> PyResult<()> {
316        if !self.is_running {
317            return Err(to_pyruntime_err("Client never started"));
318        }
319
320        if self.is_closed {
321            return Err(to_pyruntime_err("Client already closed"));
322        }
323
324        log::debug!("Closing client");
325
326        if !self.is_closed() {
327            self.send_command(HandlerCommand::Close)?;
328        }
329
330        self.is_running = false;
331        self.is_closed = true;
332
333        Ok(())
334    }
335}