1use std::{
22 fmt::Debug,
23 path::PathBuf,
24 sync::{
25 Arc, Mutex,
26 atomic::{AtomicBool, Ordering},
27 },
28};
29
30use ahash::AHashMap;
31use databento::live::Subscription;
32use indexmap::IndexMap;
33use nautilus_common::{
34 clients::DataClient,
35 live::{runner::get_data_event_sender, runtime::get_runtime},
36 messages::{
37 DataEvent,
38 data::{
39 RequestBars, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBookDeltas,
40 SubscribeInstrument, SubscribeInstrumentStatus, SubscribeQuotes, SubscribeTrades,
41 UnsubscribeBookDeltas, UnsubscribeInstrumentStatus, UnsubscribeQuotes,
42 UnsubscribeTrades,
43 },
44 },
45};
46use nautilus_core::{AtomicMap, MUTEX_POISONED, string::secret::REDACTED, time::AtomicTime};
47use nautilus_model::{
48 enums::BarAggregation,
49 identifiers::{ClientId, Symbol, Venue},
50 instruments::Instrument,
51};
52use tokio::task::JoinHandle;
53use tokio_util::sync::CancellationToken;
54
55use crate::{
56 common::Credential,
57 historical::{DatabentoHistoricalClient, RangeQueryParams},
58 live::{DatabentoFeedHandler, DatabentoMessage, HandlerCommand},
59 loader::DatabentoDataLoader,
60 symbology::instrument_id_to_symbol_string,
61 types::PublisherId,
62};
63
64#[derive(Clone)]
66pub struct DatabentoDataClientConfig {
67 pub(crate) credential: Credential,
69 pub publishers_filepath: PathBuf,
71 pub use_exchange_as_venue: bool,
73 pub bars_timestamp_on_close: bool,
75 pub reconnect_timeout_mins: Option<u64>,
77}
78
79impl Debug for DatabentoDataClientConfig {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 f.debug_struct(stringify!(DatabentoDataClientConfig))
82 .field("credential", &REDACTED)
83 .field("publishers_filepath", &self.publishers_filepath)
84 .field("use_exchange_as_venue", &self.use_exchange_as_venue)
85 .field("bars_timestamp_on_close", &self.bars_timestamp_on_close)
86 .field("reconnect_timeout_mins", &self.reconnect_timeout_mins)
87 .finish()
88 }
89}
90
91impl DatabentoDataClientConfig {
92 #[must_use]
94 pub fn new(
95 api_key: impl Into<String>,
96 publishers_filepath: PathBuf,
97 use_exchange_as_venue: bool,
98 bars_timestamp_on_close: bool,
99 ) -> Self {
100 Self {
101 credential: Credential::new(api_key),
102 publishers_filepath,
103 use_exchange_as_venue,
104 bars_timestamp_on_close,
105 reconnect_timeout_mins: Some(10), }
107 }
108
109 #[must_use]
111 pub fn api_key(&self) -> &str {
112 self.credential.api_key()
113 }
114
115 #[must_use]
117 pub fn api_key_masked(&self) -> String {
118 self.credential.api_key_masked()
119 }
120}
121
122#[cfg_attr(feature = "python", pyo3::pyclass)]
128#[cfg_attr(
129 feature = "python",
130 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.databento")
131)]
132#[derive(Debug)]
133pub struct DatabentoDataClient {
134 client_id: ClientId,
136 config: DatabentoDataClientConfig,
138 is_connected: AtomicBool,
140 historical: DatabentoHistoricalClient,
142 loader: DatabentoDataLoader,
144 cmd_channels: Arc<Mutex<AHashMap<String, tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>>,
146 task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
148 cancellation_token: CancellationToken,
150 publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
152 symbol_venue_map: Arc<AtomicMap<Symbol, Venue>>,
154 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
156}
157
158impl DatabentoDataClient {
159 pub fn new(
165 client_id: ClientId,
166 config: DatabentoDataClientConfig,
167 clock: &'static AtomicTime,
168 ) -> anyhow::Result<Self> {
169 let historical = DatabentoHistoricalClient::new(
170 config.credential.clone(),
171 config.publishers_filepath.clone(),
172 clock,
173 config.use_exchange_as_venue,
174 )?;
175
176 let loader = DatabentoDataLoader::new(Some(config.publishers_filepath.clone()))?;
178
179 let file_content = std::fs::read_to_string(&config.publishers_filepath)?;
181 let publishers_vec: Vec<crate::types::DatabentoPublisher> =
182 serde_json::from_str(&file_content)?;
183
184 let publisher_venue_map = publishers_vec
185 .into_iter()
186 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
187 .collect::<IndexMap<u16, Venue>>();
188
189 let data_sender = get_data_event_sender();
190
191 Ok(Self {
192 client_id,
193 config,
194 is_connected: AtomicBool::new(false),
195 historical,
196 loader,
197 cmd_channels: Arc::new(Mutex::new(AHashMap::new())),
198 task_handles: Arc::new(Mutex::new(Vec::new())),
199 cancellation_token: CancellationToken::new(),
200 publisher_venue_map: Arc::new(publisher_venue_map),
201 symbol_venue_map: Arc::new(AtomicMap::new()),
202 data_sender,
203 })
204 }
205
206 fn get_dataset_for_venue(&self, venue: Venue) -> anyhow::Result<String> {
212 self.loader
213 .get_dataset_for_venue(&venue)
214 .map(ToString::to_string)
215 .ok_or_else(|| anyhow::anyhow!("No dataset found for venue: {venue}"))
216 }
217
218 fn get_or_create_feed_handler(&self, dataset: &str) {
220 let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
221
222 if !channels.contains_key(dataset) {
223 log::info!("Creating new feed handler for dataset: {dataset}");
224 let cmd_tx = self.initialize_live_feed(dataset.to_string());
225 channels.insert(dataset.to_string(), cmd_tx);
226
227 log::debug!("Feed handler created for dataset: {dataset}, channel stored");
228 }
229 }
230
231 fn send_command_to_dataset(&self, dataset: &str, cmd: HandlerCommand) -> anyhow::Result<()> {
237 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
238 if let Some(tx) = channels.get(dataset) {
239 tx.send(cmd)
240 .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
241 } else {
242 anyhow::bail!("No feed handler found for dataset: {dataset}");
243 }
244 Ok(())
245 }
246
247 fn initialize_live_feed(
249 &self,
250 dataset: String,
251 ) -> tokio::sync::mpsc::UnboundedSender<HandlerCommand> {
252 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
253 let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(1000);
254
255 let mut feed_handler = DatabentoFeedHandler::new(
256 self.config.credential.clone(),
257 dataset,
258 cmd_rx,
259 msg_tx,
260 (*self.publisher_venue_map).clone(),
261 self.symbol_venue_map.clone(),
262 self.config.use_exchange_as_venue,
263 self.config.bars_timestamp_on_close,
264 self.config.reconnect_timeout_mins,
265 );
266
267 let cancellation_token = self.cancellation_token.clone();
268
269 let feed_handle = get_runtime().spawn(async move {
271 tokio::select! {
272 result = feed_handler.run() => {
273 if let Err(e) = result {
274 log::error!("Feed handler error: {e}");
275 }
276 }
277 () = cancellation_token.cancelled() => {
278 log::debug!("Feed handler cancelled");
279 }
280 }
281 });
282
283 let cancellation_token = self.cancellation_token.clone();
284 let data_sender = self.data_sender.clone();
285
286 let msg_handle = get_runtime().spawn(async move {
288 let mut msg_rx = msg_rx;
289
290 loop {
291 tokio::select! {
292 msg = msg_rx.recv() => {
293 match msg {
294 Some(DatabentoMessage::Data(data)) => {
295 log::debug!("Received data: {data:?}");
296 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
297 log::error!("Failed to send data event: {e}");
298 }
299 }
300 Some(DatabentoMessage::Instrument(instrument)) => {
301 log::info!("Received instrument definition: {}", instrument.id());
302 if let Err(e) = data_sender.send(DataEvent::Instrument(*instrument)) {
303 log::error!("Failed to send instrument: {e}");
304 }
305 }
306 Some(DatabentoMessage::Status(status)) => {
307 log::debug!("Received status: {status:?}");
308 }
310 Some(DatabentoMessage::Imbalance(imbalance)) => {
311 log::debug!("Received imbalance: {imbalance:?}");
312 }
314 Some(DatabentoMessage::Statistics(statistics)) => {
315 log::debug!("Received statistics: {statistics:?}");
316 }
318 Some(DatabentoMessage::SubscriptionAck(ack)) => {
319 log::debug!("Received subscription ack: {}", ack.message);
320 }
321 Some(DatabentoMessage::Error(error)) => {
322 log::error!("Feed handler error: {error}");
323 }
325 Some(DatabentoMessage::Close) => {
326 log::info!("Feed handler closed");
327 break;
328 }
329 None => {
330 log::debug!("Message channel closed");
331 break;
332 }
333 }
334 }
335 () = cancellation_token.cancelled() => {
336 log::debug!("Message processing cancelled");
337 break;
338 }
339 }
340 }
341 });
342
343 {
344 let mut handles = self.task_handles.lock().expect(MUTEX_POISONED);
345 handles.push(feed_handle);
346 handles.push(msg_handle);
347 }
348
349 cmd_tx
350 }
351}
352
353#[async_trait::async_trait(?Send)]
354impl DataClient for DatabentoDataClient {
355 fn client_id(&self) -> ClientId {
357 self.client_id
358 }
359
360 fn venue(&self) -> Option<Venue> {
362 None
363 }
364
365 fn start(&mut self) -> anyhow::Result<()> {
371 log::debug!("Starting");
372 Ok(())
373 }
374
375 fn stop(&mut self) -> anyhow::Result<()> {
381 log::debug!("Stopping");
382
383 self.cancellation_token.cancel();
385
386 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
388 for (dataset, tx) in channels.iter() {
389 if let Err(e) = tx.send(HandlerCommand::Close) {
390 log::error!("Failed to send close command to dataset {dataset}: {e}");
391 }
392 }
393
394 self.is_connected.store(false, Ordering::Relaxed);
395 Ok(())
396 }
397
398 fn reset(&mut self) -> anyhow::Result<()> {
399 log::debug!("Resetting");
400 self.is_connected.store(false, Ordering::Relaxed);
401 Ok(())
402 }
403
404 fn dispose(&mut self) -> anyhow::Result<()> {
405 log::debug!("Disposing");
406 self.stop()
407 }
408
409 async fn connect(&mut self) -> anyhow::Result<()> {
410 log::debug!("Connecting...");
411
412 self.is_connected.store(true, Ordering::Relaxed);
415
416 log::info!("Connected");
417 Ok(())
418 }
419
420 async fn disconnect(&mut self) -> anyhow::Result<()> {
421 log::debug!("Disconnecting...");
422
423 self.cancellation_token.cancel();
425
426 {
428 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
429 for (dataset, tx) in channels.iter() {
430 if let Err(e) = tx.send(HandlerCommand::Close) {
431 log::error!("Failed to send close command to dataset {dataset}: {e}");
432 }
433 }
434 }
435
436 let handles = {
438 let mut task_handles = self.task_handles.lock().expect(MUTEX_POISONED);
439 std::mem::take(&mut *task_handles)
440 };
441
442 for handle in handles {
443 if let Err(e) = handle.await
444 && !e.is_cancelled()
445 {
446 log::error!("Task join error: {e}");
447 }
448 }
449
450 self.is_connected.store(false, Ordering::Relaxed);
451
452 {
453 let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
454 channels.clear();
455 }
456
457 log::info!("Disconnected");
458 Ok(())
459 }
460
461 fn is_connected(&self) -> bool {
463 self.is_connected.load(Ordering::Relaxed)
464 }
465
466 fn is_disconnected(&self) -> bool {
467 !self.is_connected()
468 }
469
470 fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
476 log::debug!("Subscribe instrument: {cmd:?}");
477
478 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
479 let was_new_handler = {
480 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
481 !channels.contains_key(&dataset)
482 };
483
484 self.get_or_create_feed_handler(&dataset);
485
486 if was_new_handler {
488 self.send_command_to_dataset(&dataset, HandlerCommand::Start)?;
489 }
490
491 self.symbol_venue_map
492 .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
493 let symbol = cmd.instrument_id.symbol.to_string();
494
495 let subscription = Subscription::builder()
496 .schema(databento::dbn::Schema::Definition)
497 .symbols(symbol)
498 .build();
499
500 self.send_command_to_dataset(&dataset, HandlerCommand::Subscribe(subscription))?;
501
502 Ok(())
503 }
504
505 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
511 log::debug!("Subscribe quotes: {cmd:?}");
512
513 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
514 let was_new_handler = {
515 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
516 !channels.contains_key(&dataset)
517 };
518
519 self.get_or_create_feed_handler(&dataset);
520
521 if was_new_handler {
523 self.send_command_to_dataset(&dataset, HandlerCommand::Start)?;
524 }
525
526 self.symbol_venue_map
527 .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
528 let symbol = cmd.instrument_id.symbol.to_string();
529
530 let subscription = Subscription::builder()
531 .schema(databento::dbn::Schema::Mbp1) .symbols(symbol)
533 .build();
534
535 self.send_command_to_dataset(&dataset, HandlerCommand::Subscribe(subscription))?;
536
537 Ok(())
538 }
539
540 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
546 log::debug!("Subscribe trades: {cmd:?}");
547
548 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
549 let was_new_handler = {
550 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
551 !channels.contains_key(&dataset)
552 };
553
554 self.get_or_create_feed_handler(&dataset);
555
556 if was_new_handler {
558 self.send_command_to_dataset(&dataset, HandlerCommand::Start)?;
559 }
560
561 self.symbol_venue_map
562 .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
563 let symbol = cmd.instrument_id.symbol.to_string();
564
565 let subscription = Subscription::builder()
566 .schema(databento::dbn::Schema::Trades)
567 .symbols(symbol)
568 .build();
569
570 self.send_command_to_dataset(&dataset, HandlerCommand::Subscribe(subscription))?;
571
572 Ok(())
573 }
574
575 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
581 log::debug!("Subscribe book deltas: {cmd:?}");
582
583 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
584 let was_new_handler = {
585 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
586 !channels.contains_key(&dataset)
587 };
588
589 self.get_or_create_feed_handler(&dataset);
590
591 if was_new_handler {
593 self.send_command_to_dataset(&dataset, HandlerCommand::Start)?;
594 }
595
596 self.symbol_venue_map
597 .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
598 let symbol = cmd.instrument_id.symbol.to_string();
599
600 let subscription = Subscription::builder()
601 .schema(databento::dbn::Schema::Mbo) .symbols(symbol)
603 .build();
604
605 self.send_command_to_dataset(&dataset, HandlerCommand::Subscribe(subscription))?;
606
607 Ok(())
608 }
609
610 fn subscribe_instrument_status(
616 &mut self,
617 cmd: SubscribeInstrumentStatus,
618 ) -> anyhow::Result<()> {
619 log::debug!("Subscribe instrument status: {cmd:?}");
620
621 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
622 let was_new_handler = {
623 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
624 !channels.contains_key(&dataset)
625 };
626
627 self.get_or_create_feed_handler(&dataset);
628
629 if was_new_handler {
631 self.send_command_to_dataset(&dataset, HandlerCommand::Start)?;
632 }
633
634 self.symbol_venue_map
635 .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
636 let symbol = cmd.instrument_id.symbol.to_string();
637
638 let subscription = Subscription::builder()
639 .schema(databento::dbn::Schema::Status)
640 .symbols(symbol)
641 .build();
642
643 self.send_command_to_dataset(&dataset, HandlerCommand::Subscribe(subscription))?;
644
645 Ok(())
646 }
647
648 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
650 log::debug!("Unsubscribe quotes: {cmd:?}");
651
652 log::warn!(
656 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
657 cmd.instrument_id
658 );
659
660 Ok(())
661 }
662
663 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
664 log::debug!("Unsubscribe trades: {cmd:?}");
665
666 log::warn!(
670 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
671 cmd.instrument_id
672 );
673
674 Ok(())
675 }
676
677 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
678 log::debug!("Unsubscribe book deltas: {cmd:?}");
679
680 log::warn!(
684 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
685 cmd.instrument_id
686 );
687
688 Ok(())
689 }
690
691 fn unsubscribe_instrument_status(
692 &mut self,
693 cmd: &UnsubscribeInstrumentStatus,
694 ) -> anyhow::Result<()> {
695 log::debug!("Unsubscribe instrument status: {cmd:?}");
696
697 log::warn!(
701 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
702 cmd.instrument_id
703 );
704
705 Ok(())
706 }
707
708 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
709 log::debug!("Request instruments: {request:?}");
710
711 let historical_client = self.historical.clone();
712 let data_sender = self.data_sender.clone();
713
714 get_runtime().spawn(async move {
715 let symbols = vec!["ALL_SYMBOLS".to_string()]; let params = RangeQueryParams {
718 dataset: "GLBX.MDP3".to_string(), symbols,
720 start: request
721 .start
722 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
723 .into(),
724 end: request
725 .end
726 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
727 .map(Into::into),
728 limit: None,
729 price_precision: None,
730 };
731
732 match historical_client.get_range_instruments(params).await {
733 Ok(instruments) => {
734 log::info!("Retrieved {} instruments", instruments.len());
735 for instrument in instruments {
736 if let Err(e) = data_sender.send(DataEvent::Instrument(instrument)) {
737 log::error!("Failed to send instrument: {e}");
738 }
739 }
740 }
741 Err(e) => {
742 log::error!("Failed to request instruments: {e}");
743 }
744 }
745 });
746
747 Ok(())
748 }
749
750 fn request_quotes(&self, request: RequestQuotes) -> anyhow::Result<()> {
751 log::debug!("Request quotes: {request:?}");
752
753 let historical_client = self.historical.clone();
754
755 get_runtime().spawn(async move {
756 let symbols = vec![instrument_id_to_symbol_string(
757 request.instrument_id,
758 &mut AHashMap::new(), )];
760
761 let params = RangeQueryParams {
762 dataset: "GLBX.MDP3".to_string(), symbols,
764 start: request
765 .start
766 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
767 .into(),
768 end: request
769 .end
770 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
771 .map(Into::into),
772 limit: request.limit.map(|l| l.get() as u64),
773 price_precision: None,
774 };
775
776 match historical_client.get_range_quotes(params, None).await {
777 Ok(quotes) => {
778 log::info!("Retrieved {} quotes", quotes.len());
779 }
781 Err(e) => {
782 log::error!("Failed to request quotes: {e}");
783 }
784 }
785 });
786
787 Ok(())
788 }
789
790 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
791 log::debug!("Request trades: {request:?}");
792
793 let historical_client = self.historical.clone();
794
795 get_runtime().spawn(async move {
796 let symbols = vec![instrument_id_to_symbol_string(
797 request.instrument_id,
798 &mut AHashMap::new(), )];
800
801 let params = RangeQueryParams {
802 dataset: "GLBX.MDP3".to_string(), symbols,
804 start: request
805 .start
806 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
807 .into(),
808 end: request
809 .end
810 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
811 .map(Into::into),
812 limit: request.limit.map(|l| l.get() as u64),
813 price_precision: None,
814 };
815
816 match historical_client.get_range_trades(params).await {
817 Ok(trades) => {
818 log::info!("Retrieved {} trades", trades.len());
819 }
821 Err(e) => {
822 log::error!("Failed to request trades: {e}");
823 }
824 }
825 });
826
827 Ok(())
828 }
829
830 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
831 log::debug!("Request bars: {request:?}");
832
833 let historical_client = self.historical.clone();
834
835 get_runtime().spawn(async move {
836 let symbols = vec![instrument_id_to_symbol_string(
837 request.bar_type.instrument_id(),
838 &mut AHashMap::new(), )];
840
841 let params = RangeQueryParams {
842 dataset: "GLBX.MDP3".to_string(), symbols,
844 start: request
845 .start
846 .map_or(0, |dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
847 .into(),
848 end: request
849 .end
850 .map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64)
851 .map(Into::into),
852 limit: request.limit.map(|l| l.get() as u64),
853 price_precision: None,
854 };
855
856 let aggregation = match request.bar_type.spec().aggregation {
858 BarAggregation::Second => BarAggregation::Second,
859 BarAggregation::Minute => BarAggregation::Minute,
860 BarAggregation::Hour => BarAggregation::Hour,
861 BarAggregation::Day => BarAggregation::Day,
862 _ => {
863 log::error!(
864 "Unsupported bar aggregation: {:?}",
865 request.bar_type.spec().aggregation
866 );
867 return;
868 }
869 };
870
871 match historical_client
872 .get_range_bars(params, aggregation, true)
873 .await
874 {
875 Ok(bars) => {
876 log::info!("Retrieved {} bars", bars.len());
877 }
879 Err(e) => {
880 log::error!("Failed to request bars: {e}");
881 }
882 }
883 });
884
885 Ok(())
886 }
887}