Skip to main content

nautilus_databento/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a unified data client that combines Databento's live streaming and historical data capabilities.
17//!
18//! This module implements a data client that manages connections to multiple Databento datasets,
19//! handles live market data subscriptions, and provides access to historical data on demand.
20
21use std::{
22    fmt::Debug,
23    path::PathBuf,
24    sync::{
25        Arc, Mutex,
26        atomic::{AtomicBool, Ordering},
27    },
28};
29
30use ahash::AHashMap;
31use databento::live::Subscription;
32use indexmap::IndexMap;
33use nautilus_common::{
34    clients::DataClient,
35    live::{runner::get_data_event_sender, runtime::get_runtime},
36    messages::{
37        DataEvent,
38        data::{
39            RequestBars, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBookDeltas,
40            SubscribeInstrument, SubscribeInstrumentStatus, SubscribeQuotes, SubscribeTrades,
41            UnsubscribeBookDeltas, UnsubscribeInstrumentStatus, UnsubscribeQuotes,
42            UnsubscribeTrades,
43        },
44    },
45};
46use nautilus_core::{AtomicMap, MUTEX_POISONED, string::secret::REDACTED, time::AtomicTime};
47use nautilus_model::{
48    enums::BarAggregation,
49    identifiers::{ClientId, Symbol, Venue},
50    instruments::Instrument,
51};
52use tokio::task::JoinHandle;
53use tokio_util::sync::CancellationToken;
54
55use crate::{
56    common::Credential,
57    historical::{DatabentoHistoricalClient, RangeQueryParams},
58    live::{DatabentoFeedHandler, DatabentoMessage, HandlerCommand},
59    loader::DatabentoDataLoader,
60    symbology::instrument_id_to_symbol_string,
61    types::PublisherId,
62};
63
64/// Configuration for the Databento data client.
65#[derive(Clone)]
66pub struct DatabentoDataClientConfig {
67    /// Databento API credential.
68    pub(crate) credential: Credential,
69    /// Path to publishers.json file.
70    pub publishers_filepath: PathBuf,
71    /// Whether to use exchange as venue for GLBX instruments.
72    pub use_exchange_as_venue: bool,
73    /// Whether to timestamp bars on close.
74    pub bars_timestamp_on_close: bool,
75    /// Reconnection timeout in minutes (None for infinite retries).
76    pub reconnect_timeout_mins: Option<u64>,
77}
78
79impl Debug for DatabentoDataClientConfig {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        f.debug_struct(stringify!(DatabentoDataClientConfig))
82            .field("credential", &REDACTED)
83            .field("publishers_filepath", &self.publishers_filepath)
84            .field("use_exchange_as_venue", &self.use_exchange_as_venue)
85            .field("bars_timestamp_on_close", &self.bars_timestamp_on_close)
86            .field("reconnect_timeout_mins", &self.reconnect_timeout_mins)
87            .finish()
88    }
89}
90
91impl DatabentoDataClientConfig {
92    /// Creates a new [`DatabentoDataClientConfig`] instance.
93    #[must_use]
94    pub fn new(
95        api_key: impl Into<String>,
96        publishers_filepath: PathBuf,
97        use_exchange_as_venue: bool,
98        bars_timestamp_on_close: bool,
99    ) -> Self {
100        Self {
101            credential: Credential::new(api_key),
102            publishers_filepath,
103            use_exchange_as_venue,
104            bars_timestamp_on_close,
105            reconnect_timeout_mins: Some(10), // Default: 10 minutes
106        }
107    }
108
109    /// Returns the API key associated with this config.
110    #[must_use]
111    pub fn api_key(&self) -> &str {
112        self.credential.api_key()
113    }
114
115    /// Returns a masked version of the API key for logging purposes.
116    #[must_use]
117    pub fn api_key_masked(&self) -> String {
118        self.credential.api_key_masked()
119    }
120}
121
122/// A Databento data client that combines live streaming and historical data functionality.
123///
124/// This client uses the existing `DatabentoFeedHandler` for live data subscriptions
125/// and `DatabentoHistoricalClient` for historical data requests. It supports multiple
126/// datasets simultaneously, with separate feed handlers per dataset.
127#[cfg_attr(feature = "python", pyo3::pyclass)]
128#[cfg_attr(
129    feature = "python",
130    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.databento")
131)]
132#[derive(Debug)]
133pub struct DatabentoDataClient {
134    /// Client identifier.
135    client_id: ClientId,
136    /// Client configuration.
137    config: DatabentoDataClientConfig,
138    /// Connection state.
139    is_connected: AtomicBool,
140    /// Historical client for on-demand data requests.
141    historical: DatabentoHistoricalClient,
142    /// Data loader for venue-to-dataset mapping.
143    loader: DatabentoDataLoader,
144    /// Feed handler command senders per dataset.
145    cmd_channels: Arc<Mutex<AHashMap<String, tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>>,
146    /// Task handles for lifecycle management.
147    task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
148    /// Cancellation token for graceful shutdown.
149    cancellation_token: CancellationToken,
150    /// Publisher to venue mapping.
151    publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
152    /// Symbol to venue mapping (for caching).
153    symbol_venue_map: Arc<AtomicMap<Symbol, Venue>>,
154    /// Data event sender for forwarding data to the async runner.
155    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
156}
157
158impl DatabentoDataClient {
159    /// Creates a new [`DatabentoDataClient`] instance.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if client creation or publisher configuration loading fails.
164    pub fn new(
165        client_id: ClientId,
166        config: DatabentoDataClientConfig,
167        clock: &'static AtomicTime,
168    ) -> anyhow::Result<Self> {
169        let historical = DatabentoHistoricalClient::new(
170            config.credential.clone(),
171            config.publishers_filepath.clone(),
172            clock,
173            config.use_exchange_as_venue,
174        )?;
175
176        // Create data loader for venue-to-dataset mapping
177        let loader = DatabentoDataLoader::new(Some(config.publishers_filepath.clone()))?;
178
179        // Load publisher configuration
180        let file_content = std::fs::read_to_string(&config.publishers_filepath)?;
181        let publishers_vec: Vec<crate::types::DatabentoPublisher> =
182            serde_json::from_str(&file_content)?;
183
184        let publisher_venue_map = publishers_vec
185            .into_iter()
186            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
187            .collect::<IndexMap<u16, Venue>>();
188
189        let data_sender = get_data_event_sender();
190
191        Ok(Self {
192            client_id,
193            config,
194            is_connected: AtomicBool::new(false),
195            historical,
196            loader,
197            cmd_channels: Arc::new(Mutex::new(AHashMap::new())),
198            task_handles: Arc::new(Mutex::new(Vec::new())),
199            cancellation_token: CancellationToken::new(),
200            publisher_venue_map: Arc::new(publisher_venue_map),
201            symbol_venue_map: Arc::new(AtomicMap::new()),
202            data_sender,
203        })
204    }
205
206    /// Gets the dataset for a given venue using the data loader.
207    ///
208    /// # Errors
209    ///
210    /// Returns an error if the venue-to-dataset mapping cannot be found.
211    fn get_dataset_for_venue(&self, venue: Venue) -> anyhow::Result<String> {
212        self.loader
213            .get_dataset_for_venue(&venue)
214            .map(ToString::to_string)
215            .ok_or_else(|| anyhow::anyhow!("No dataset found for venue: {venue}"))
216    }
217
218    /// Gets or creates a feed handler for the specified dataset.
219    fn get_or_create_feed_handler(&self, dataset: &str) {
220        let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
221
222        if !channels.contains_key(dataset) {
223            log::info!("Creating new feed handler for dataset: {dataset}");
224            let cmd_tx = self.initialize_live_feed(dataset.to_string());
225            channels.insert(dataset.to_string(), cmd_tx);
226
227            log::debug!("Feed handler created for dataset: {dataset}, channel stored");
228        }
229    }
230
231    /// Sends a command to a specific dataset's feed handler.
232    ///
233    /// # Errors
234    ///
235    /// Returns an error if the command cannot be sent.
236    fn send_command_to_dataset(&self, dataset: &str, cmd: HandlerCommand) -> anyhow::Result<()> {
237        let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
238        if let Some(tx) = channels.get(dataset) {
239            tx.send(cmd)
240                .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
241        } else {
242            anyhow::bail!("No feed handler found for dataset: {dataset}");
243        }
244        Ok(())
245    }
246
247    /// Initializes the live feed handler for streaming data.
248    fn initialize_live_feed(
249        &self,
250        dataset: String,
251    ) -> tokio::sync::mpsc::UnboundedSender<HandlerCommand> {
252        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
253        let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(1000);
254
255        let mut feed_handler = DatabentoFeedHandler::new(
256            self.config.credential.clone(),
257            dataset,
258            cmd_rx,
259            msg_tx,
260            (*self.publisher_venue_map).clone(),
261            self.symbol_venue_map.clone(),
262            self.config.use_exchange_as_venue,
263            self.config.bars_timestamp_on_close,
264            self.config.reconnect_timeout_mins,
265        );
266
267        let cancellation_token = self.cancellation_token.clone();
268
269        // Spawn the feed handler task with cancellation support
270        let feed_handle = get_runtime().spawn(async move {
271            tokio::select! {
272                result = feed_handler.run() => {
273                    if let Err(e) = result {
274                        log::error!("Feed handler error: {e}");
275                    }
276                }
277                () = cancellation_token.cancelled() => {
278                    log::debug!("Feed handler cancelled");
279                }
280            }
281        });
282
283        let cancellation_token = self.cancellation_token.clone();
284        let data_sender = self.data_sender.clone();
285
286        // Spawn message processing task with cancellation support
287        let msg_handle = get_runtime().spawn(async move {
288            let mut msg_rx = msg_rx;
289
290            loop {
291                tokio::select! {
292                    msg = msg_rx.recv() => {
293                        match msg {
294                            Some(DatabentoMessage::Data(data)) => {
295                                log::debug!("Received data: {data:?}");
296                                if let Err(e) = data_sender.send(DataEvent::Data(data)) {
297                                    log::error!("Failed to send data event: {e}");
298                                }
299                            }
300                            Some(DatabentoMessage::Instrument(instrument)) => {
301                                log::info!("Received instrument definition: {}", instrument.id());
302                                if let Err(e) = data_sender.send(DataEvent::Instrument(*instrument)) {
303                                    log::error!("Failed to send instrument: {e}");
304                                }
305                            }
306                            Some(DatabentoMessage::Status(status)) => {
307                                log::debug!("Received status: {status:?}");
308                                // TODO: Forward to appropriate handler
309                            }
310                            Some(DatabentoMessage::Imbalance(imbalance)) => {
311                                log::debug!("Received imbalance: {imbalance:?}");
312                                // TODO: Forward to appropriate handler
313                            }
314                            Some(DatabentoMessage::Statistics(statistics)) => {
315                                log::debug!("Received statistics: {statistics:?}");
316                                // TODO: Forward to appropriate handler
317                            }
318                            Some(DatabentoMessage::SubscriptionAck(ack)) => {
319                                log::debug!("Received subscription ack: {}", ack.message);
320                            }
321                            Some(DatabentoMessage::Error(error)) => {
322                                log::error!("Feed handler error: {error}");
323                                // TODO: Handle error appropriately
324                            }
325                            Some(DatabentoMessage::Close) => {
326                                log::info!("Feed handler closed");
327                                break;
328                            }
329                            None => {
330                                log::debug!("Message channel closed");
331                                break;
332                            }
333                        }
334                    }
335                    () = cancellation_token.cancelled() => {
336                        log::debug!("Message processing cancelled");
337                        break;
338                    }
339                }
340            }
341        });
342
343        {
344            let mut handles = self.task_handles.lock().expect(MUTEX_POISONED);
345            handles.push(feed_handle);
346            handles.push(msg_handle);
347        }
348
349        cmd_tx
350    }
351}
352
353#[async_trait::async_trait(?Send)]
354impl DataClient for DatabentoDataClient {
355    /// Returns the client identifier.
356    fn client_id(&self) -> ClientId {
357        self.client_id
358    }
359
360    /// Returns the venue associated with this client (None for multi-venue clients).
361    fn venue(&self) -> Option<Venue> {
362        None
363    }
364
365    /// Starts the data client.
366    ///
367    /// # Errors
368    ///
369    /// Returns an error if the client fails to start.
370    fn start(&mut self) -> anyhow::Result<()> {
371        log::debug!("Starting");
372        Ok(())
373    }
374
375    /// Stops the data client and cancels all active subscriptions.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if the client fails to stop cleanly.
380    fn stop(&mut self) -> anyhow::Result<()> {
381        log::debug!("Stopping");
382
383        // Signal cancellation to all running tasks
384        self.cancellation_token.cancel();
385
386        // Send close command to all active feed handlers
387        let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
388        for (dataset, tx) in channels.iter() {
389            if let Err(e) = tx.send(HandlerCommand::Close) {
390                log::error!("Failed to send close command to dataset {dataset}: {e}");
391            }
392        }
393
394        self.is_connected.store(false, Ordering::Relaxed);
395        Ok(())
396    }
397
398    fn reset(&mut self) -> anyhow::Result<()> {
399        log::debug!("Resetting");
400        self.is_connected.store(false, Ordering::Relaxed);
401        Ok(())
402    }
403
404    fn dispose(&mut self) -> anyhow::Result<()> {
405        log::debug!("Disposing");
406        self.stop()
407    }
408
409    async fn connect(&mut self) -> anyhow::Result<()> {
410        log::debug!("Connecting...");
411
412        // Connection will happen lazily when subscriptions are made
413        // No need to create feed handlers upfront since we don't know which datasets will be needed
414        self.is_connected.store(true, Ordering::Relaxed);
415
416        log::info!("Connected");
417        Ok(())
418    }
419
420    async fn disconnect(&mut self) -> anyhow::Result<()> {
421        log::debug!("Disconnecting...");
422
423        // Signal cancellation to all running tasks
424        self.cancellation_token.cancel();
425
426        // Send close command to all active feed handlers
427        {
428            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
429            for (dataset, tx) in channels.iter() {
430                if let Err(e) = tx.send(HandlerCommand::Close) {
431                    log::error!("Failed to send close command to dataset {dataset}: {e}");
432                }
433            }
434        }
435
436        // Wait for all spawned tasks to complete
437        let handles = {
438            let mut task_handles = self.task_handles.lock().expect(MUTEX_POISONED);
439            std::mem::take(&mut *task_handles)
440        };
441
442        for handle in handles {
443            if let Err(e) = handle.await
444                && !e.is_cancelled()
445            {
446                log::error!("Task join error: {e}");
447            }
448        }
449
450        self.is_connected.store(false, Ordering::Relaxed);
451
452        {
453            let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
454            channels.clear();
455        }
456
457        log::info!("Disconnected");
458        Ok(())
459    }
460
461    /// Returns whether the client is currently connected.
462    fn is_connected(&self) -> bool {
463        self.is_connected.load(Ordering::Relaxed)
464    }
465
466    fn is_disconnected(&self) -> bool {
467        !self.is_connected()
468    }
469
470    /// Subscribes to instrument definition data for the specified instrument.
471    ///
472    /// # Errors
473    ///
474    /// Returns an error if the subscription request fails.
475    fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
476        log::debug!("Subscribe instrument: {cmd:?}");
477
478        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
479        let was_new_handler = {
480            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
481            !channels.contains_key(&dataset)
482        };
483
484        self.get_or_create_feed_handler(&dataset);
485
486        // Start the feed handler if it was newly created
487        if was_new_handler {
488            self.send_command_to_dataset(&dataset, HandlerCommand::Start)?;
489        }
490
491        self.symbol_venue_map
492            .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
493        let symbol = cmd.instrument_id.symbol.to_string();
494
495        let subscription = Subscription::builder()
496            .schema(databento::dbn::Schema::Definition)
497            .symbols(symbol)
498            .build();
499
500        self.send_command_to_dataset(&dataset, HandlerCommand::Subscribe(subscription))?;
501
502        Ok(())
503    }
504
505    /// Subscribes to quote tick data for the specified instruments.
506    ///
507    /// # Errors
508    ///
509    /// Returns an error if the subscription request fails.
510    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
511        log::debug!("Subscribe quotes: {cmd:?}");
512
513        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
514        let was_new_handler = {
515            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
516            !channels.contains_key(&dataset)
517        };
518
519        self.get_or_create_feed_handler(&dataset);
520
521        // Start the feed handler if it was newly created
522        if was_new_handler {
523            self.send_command_to_dataset(&dataset, HandlerCommand::Start)?;
524        }
525
526        self.symbol_venue_map
527            .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
528        let symbol = cmd.instrument_id.symbol.to_string();
529
530        let subscription = Subscription::builder()
531            .schema(databento::dbn::Schema::Mbp1) // Market by price level 1 for quotes
532            .symbols(symbol)
533            .build();
534
535        self.send_command_to_dataset(&dataset, HandlerCommand::Subscribe(subscription))?;
536
537        Ok(())
538    }
539
540    /// Subscribes to trade tick data for the specified instruments.
541    ///
542    /// # Errors
543    ///
544    /// Returns an error if the subscription request fails.
545    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
546        log::debug!("Subscribe trades: {cmd:?}");
547
548        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
549        let was_new_handler = {
550            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
551            !channels.contains_key(&dataset)
552        };
553
554        self.get_or_create_feed_handler(&dataset);
555
556        // Start the feed handler if it was newly created
557        if was_new_handler {
558            self.send_command_to_dataset(&dataset, HandlerCommand::Start)?;
559        }
560
561        self.symbol_venue_map
562            .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
563        let symbol = cmd.instrument_id.symbol.to_string();
564
565        let subscription = Subscription::builder()
566            .schema(databento::dbn::Schema::Trades)
567            .symbols(symbol)
568            .build();
569
570        self.send_command_to_dataset(&dataset, HandlerCommand::Subscribe(subscription))?;
571
572        Ok(())
573    }
574
575    /// Subscribes to order book delta updates for the specified instruments.
576    ///
577    /// # Errors
578    ///
579    /// Returns an error if the subscription request fails.
580    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
581        log::debug!("Subscribe book deltas: {cmd:?}");
582
583        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
584        let was_new_handler = {
585            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
586            !channels.contains_key(&dataset)
587        };
588
589        self.get_or_create_feed_handler(&dataset);
590
591        // Start the feed handler if it was newly created
592        if was_new_handler {
593            self.send_command_to_dataset(&dataset, HandlerCommand::Start)?;
594        }
595
596        self.symbol_venue_map
597            .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
598        let symbol = cmd.instrument_id.symbol.to_string();
599
600        let subscription = Subscription::builder()
601            .schema(databento::dbn::Schema::Mbo) // Market by order for book deltas
602            .symbols(symbol)
603            .build();
604
605        self.send_command_to_dataset(&dataset, HandlerCommand::Subscribe(subscription))?;
606
607        Ok(())
608    }
609
610    /// Subscribes to instrument status updates for the specified instruments.
611    ///
612    /// # Errors
613    ///
614    /// Returns an error if the subscription request fails.
615    fn subscribe_instrument_status(
616        &mut self,
617        cmd: SubscribeInstrumentStatus,
618    ) -> anyhow::Result<()> {
619        log::debug!("Subscribe instrument status: {cmd:?}");
620
621        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
622        let was_new_handler = {
623            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
624            !channels.contains_key(&dataset)
625        };
626
627        self.get_or_create_feed_handler(&dataset);
628
629        // Start the feed handler if it was newly created
630        if was_new_handler {
631            self.send_command_to_dataset(&dataset, HandlerCommand::Start)?;
632        }
633
634        self.symbol_venue_map
635            .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
636        let symbol = cmd.instrument_id.symbol.to_string();
637
638        let subscription = Subscription::builder()
639            .schema(databento::dbn::Schema::Status)
640            .symbols(symbol)
641            .build();
642
643        self.send_command_to_dataset(&dataset, HandlerCommand::Subscribe(subscription))?;
644
645        Ok(())
646    }
647
648    // Unsubscribe methods
649    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
650        log::debug!("Unsubscribe quotes: {cmd:?}");
651
652        // Note: Databento live API doesn't support granular unsubscribing.
653        // The feed handler manages subscriptions and can handle reconnections
654        // with the appropriate subscription state.
655        log::warn!(
656            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
657            cmd.instrument_id
658        );
659
660        Ok(())
661    }
662
663    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
664        log::debug!("Unsubscribe trades: {cmd:?}");
665
666        // Note: Databento live API doesn't support granular unsubscribing.
667        // The feed handler manages subscriptions and can handle reconnections
668        // with the appropriate subscription state.
669        log::warn!(
670            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
671            cmd.instrument_id
672        );
673
674        Ok(())
675    }
676
677    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
678        log::debug!("Unsubscribe book deltas: {cmd:?}");
679
680        // Note: Databento live API doesn't support granular unsubscribing.
681        // The feed handler manages subscriptions and can handle reconnections
682        // with the appropriate subscription state.
683        log::warn!(
684            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
685            cmd.instrument_id
686        );
687
688        Ok(())
689    }
690
691    fn unsubscribe_instrument_status(
692        &mut self,
693        cmd: &UnsubscribeInstrumentStatus,
694    ) -> anyhow::Result<()> {
695        log::debug!("Unsubscribe instrument status: {cmd:?}");
696
697        // Note: Databento live API doesn't support granular unsubscribing.
698        // The feed handler manages subscriptions and can handle reconnections
699        // with the appropriate subscription state.
700        log::warn!(
701            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
702            cmd.instrument_id
703        );
704
705        Ok(())
706    }
707
708    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
709        log::debug!("Request instruments: {request:?}");
710
711        let historical_client = self.historical.clone();
712        let data_sender = self.data_sender.clone();
713
714        get_runtime().spawn(async move {
715            let symbols = vec!["ALL_SYMBOLS".to_string()]; // TODO: Improve symbol handling
716
717            let params = RangeQueryParams {
718                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
719                symbols,
720                start: request
721                    .start
722                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
723                    .into(),
724                end: request
725                    .end
726                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
727                    .map(Into::into),
728                limit: None,
729                price_precision: None,
730            };
731
732            match historical_client.get_range_instruments(params).await {
733                Ok(instruments) => {
734                    log::info!("Retrieved {} instruments", instruments.len());
735                    for instrument in instruments {
736                        if let Err(e) = data_sender.send(DataEvent::Instrument(instrument)) {
737                            log::error!("Failed to send instrument: {e}");
738                        }
739                    }
740                }
741                Err(e) => {
742                    log::error!("Failed to request instruments: {e}");
743                }
744            }
745        });
746
747        Ok(())
748    }
749
750    fn request_quotes(&self, request: RequestQuotes) -> anyhow::Result<()> {
751        log::debug!("Request quotes: {request:?}");
752
753        let historical_client = self.historical.clone();
754
755        get_runtime().spawn(async move {
756            let symbols = vec![instrument_id_to_symbol_string(
757                request.instrument_id,
758                &mut AHashMap::new(), // TODO: Use proper symbol map
759            )];
760
761            let params = RangeQueryParams {
762                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
763                symbols,
764                start: request
765                    .start
766                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
767                    .into(),
768                end: request
769                    .end
770                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
771                    .map(Into::into),
772                limit: request.limit.map(|l| l.get() as u64),
773                price_precision: None,
774            };
775
776            match historical_client.get_range_quotes(params, None).await {
777                Ok(quotes) => {
778                    log::info!("Retrieved {} quotes", quotes.len());
779                    // TODO: Send quotes to message bus
780                }
781                Err(e) => {
782                    log::error!("Failed to request quotes: {e}");
783                }
784            }
785        });
786
787        Ok(())
788    }
789
790    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
791        log::debug!("Request trades: {request:?}");
792
793        let historical_client = self.historical.clone();
794
795        get_runtime().spawn(async move {
796            let symbols = vec![instrument_id_to_symbol_string(
797                request.instrument_id,
798                &mut AHashMap::new(), // TODO: Use proper symbol map
799            )];
800
801            let params = RangeQueryParams {
802                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
803                symbols,
804                start: request
805                    .start
806                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
807                    .into(),
808                end: request
809                    .end
810                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
811                    .map(Into::into),
812                limit: request.limit.map(|l| l.get() as u64),
813                price_precision: None,
814            };
815
816            match historical_client.get_range_trades(params).await {
817                Ok(trades) => {
818                    log::info!("Retrieved {} trades", trades.len());
819                    // TODO: Send trades to message bus
820                }
821                Err(e) => {
822                    log::error!("Failed to request trades: {e}");
823                }
824            }
825        });
826
827        Ok(())
828    }
829
830    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
831        log::debug!("Request bars: {request:?}");
832
833        let historical_client = self.historical.clone();
834
835        get_runtime().spawn(async move {
836            let symbols = vec![instrument_id_to_symbol_string(
837                request.bar_type.instrument_id(),
838                &mut AHashMap::new(), // TODO: Use proper symbol map
839            )];
840
841            let params = RangeQueryParams {
842                dataset: "GLBX.MDP3".to_string(), // TODO: Make configurable
843                symbols,
844                start: request
845                    .start
846                    .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
847                    .into(),
848                end: request
849                    .end
850                    .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
851                    .map(Into::into),
852                limit: request.limit.map(|l| l.get() as u64),
853                price_precision: None,
854            };
855
856            // Map bar aggregation from the request
857            let aggregation = match request.bar_type.spec().aggregation {
858                BarAggregation::Second => BarAggregation::Second,
859                BarAggregation::Minute => BarAggregation::Minute,
860                BarAggregation::Hour => BarAggregation::Hour,
861                BarAggregation::Day => BarAggregation::Day,
862                _ => {
863                    log::error!(
864                        "Unsupported bar aggregation: {:?}",
865                        request.bar_type.spec().aggregation
866                    );
867                    return;
868                }
869            };
870
871            match historical_client
872                .get_range_bars(params, aggregation, true)
873                .await
874            {
875                Ok(bars) => {
876                    log::info!("Retrieved {} bars", bars.len());
877                    // TODO: Send bars to message bus
878                }
879                Err(e) => {
880                    log::error!("Failed to request bars: {e}");
881                }
882            }
883        });
884
885        Ok(())
886    }
887}