1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27 clients::DataClient,
28 live::{runner::get_data_event_sender, runtime::get_runtime},
29 messages::{
30 DataEvent,
31 data::{
32 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
34 SubscribeBookDeltas, SubscribeInstrument, SubscribeInstruments, SubscribeQuotes,
35 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
36 UnsubscribeQuotes, UnsubscribeTrades, subscribe::SubscribeInstrumentStatus,
37 unsubscribe::UnsubscribeInstrumentStatus,
38 },
39 },
40};
41use nautilus_core::{
42 AtomicMap,
43 datetime::datetime_to_unix_nanos,
44 time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_model::{
47 data::{Data, OrderBookDeltas_API},
48 enums::{BookType, MarketStatusAction},
49 identifiers::{ClientId, InstrumentId, Symbol, Venue},
50 instruments::{Instrument, InstrumentAny},
51};
52use tokio::task::JoinHandle;
53use tokio_util::sync::CancellationToken;
54use ustr::Ustr;
55
56use crate::{
57 common::{
58 consts::BINANCE_VENUE, credential::resolve_credentials, enums::BinanceProductType,
59 parse::bar_spec_to_binance_interval, status::diff_and_emit_statuses,
60 },
61 config::BinanceDataClientConfig,
62 spot::{
63 http::client::BinanceSpotHttpClient,
64 sbe::generated::symbol_status::SymbolStatus,
65 websocket::streams::{
66 client::BinanceSpotWebSocketClient,
67 messages::BinanceSpotWsMessage,
68 parse::{parse_bbo_event, parse_depth_diff, parse_depth_snapshot, parse_trades_event},
69 },
70 },
71};
72
73#[derive(Debug)]
75pub struct BinanceSpotDataClient {
76 clock: &'static AtomicTime,
77 client_id: ClientId,
78 config: BinanceDataClientConfig,
79 http_client: BinanceSpotHttpClient,
80 ws_client: BinanceSpotWebSocketClient,
81 is_connected: AtomicBool,
82 cancellation_token: CancellationToken,
83 tasks: Vec<JoinHandle<()>>,
84 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
85 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
86 status_cache: Arc<AtomicMap<InstrumentId, MarketStatusAction>>,
87}
88
89impl BinanceSpotDataClient {
90 pub fn new(client_id: ClientId, config: BinanceDataClientConfig) -> anyhow::Result<Self> {
96 let clock = get_atomic_clock_realtime();
97
98 let http_client = BinanceSpotHttpClient::new(
99 config.environment,
100 clock,
101 config.api_key.clone(),
102 config.api_secret.clone(),
103 config.base_url_http.clone(),
104 None, None, None, )?;
108
109 let product_type = config
110 .product_types
111 .first()
112 .copied()
113 .unwrap_or(BinanceProductType::Spot);
114
115 let creds = resolve_credentials(
116 config.api_key.clone(),
117 config.api_secret.clone(),
118 config.environment,
119 product_type,
120 )
121 .ok();
122
123 let ws_client = BinanceSpotWebSocketClient::new(
125 config.base_url_ws.clone(),
126 creds.as_ref().map(|(k, _)| k.clone()),
127 creds.as_ref().map(|(_, s)| s.clone()),
128 Some(20), config.transport_backend,
130 )?;
131 let data_sender = get_data_event_sender();
132
133 Ok(Self {
134 clock,
135 client_id,
136 config,
137 http_client,
138 ws_client,
139 is_connected: AtomicBool::new(false),
140 cancellation_token: CancellationToken::new(),
141 tasks: Vec::new(),
142 data_sender,
143 instruments: Arc::new(AtomicMap::new()),
144 status_cache: Arc::new(AtomicMap::new()),
145 })
146 }
147
148 fn venue(&self) -> Venue {
149 *BINANCE_VENUE
150 }
151
152 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
153 if let Err(e) = sender.send(DataEvent::Data(data)) {
154 log::error!("Failed to emit data event: {e}");
155 }
156 }
157
158 fn spawn_ws<F>(&self, fut: F, context: &'static str)
159 where
160 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
161 {
162 get_runtime().spawn(async move {
163 if let Err(e) = fut.await {
164 log::error!("{context}: {e:?}");
165 }
166 });
167 }
168
169 fn handle_ws_message(
170 msg: BinanceSpotWsMessage,
171 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
172 ws_instruments: &Arc<AtomicMap<Ustr, InstrumentAny>>,
173 ) {
174 match msg {
175 BinanceSpotWsMessage::Trades(ref event) => {
176 let symbol = Ustr::from(&event.symbol);
177 let cache = ws_instruments.load();
178 if let Some(instrument) = cache.get(&symbol) {
179 let trades = parse_trades_event(event, instrument);
180 for data in trades {
181 Self::send_data(data_sender, data);
182 }
183 }
184 }
185 BinanceSpotWsMessage::BestBidAsk(ref event) => {
186 let symbol = Ustr::from(&event.symbol);
187 let cache = ws_instruments.load();
188 if let Some(instrument) = cache.get(&symbol) {
189 let quote = parse_bbo_event(event, instrument);
190 Self::send_data(data_sender, Data::from(quote));
191 }
192 }
193 BinanceSpotWsMessage::DepthSnapshot(ref event) => {
194 let symbol = Ustr::from(&event.symbol);
195 let cache = ws_instruments.load();
196 if let Some(instrument) = cache.get(&symbol)
197 && let Some(deltas) = parse_depth_snapshot(event, instrument)
198 {
199 Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
200 }
201 }
202 BinanceSpotWsMessage::DepthDiff(ref event) => {
203 let symbol = Ustr::from(&event.symbol);
204 let cache = ws_instruments.load();
205 if let Some(instrument) = cache.get(&symbol)
206 && let Some(deltas) = parse_depth_diff(event, instrument)
207 {
208 Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
209 }
210 }
211 BinanceSpotWsMessage::RawBinary(data) => {
212 log::debug!("Unhandled binary message: {} bytes", data.len());
213 }
214 BinanceSpotWsMessage::RawJson(value) => {
215 log::debug!("Unhandled JSON message: {value:?}");
216 }
217 BinanceSpotWsMessage::Error(e) => {
218 log::error!("Binance WebSocket error: code={}, msg={}", e.code, e.msg);
219 }
220 BinanceSpotWsMessage::Reconnected => {
221 log::info!("WebSocket reconnected");
222 }
223 }
224 }
225}
226
227fn upsert_instrument(
228 cache: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
229 instrument: InstrumentAny,
230) {
231 cache.insert(instrument.id(), instrument);
232}
233
234#[async_trait::async_trait(?Send)]
235impl DataClient for BinanceSpotDataClient {
236 fn client_id(&self) -> ClientId {
237 self.client_id
238 }
239
240 fn venue(&self) -> Option<Venue> {
241 Some(self.venue())
242 }
243
244 fn start(&mut self) -> anyhow::Result<()> {
245 log::info!(
246 "Started: client_id={}, product_types={:?}, environment={:?}",
247 self.client_id,
248 self.config.product_types,
249 self.config.environment,
250 );
251 Ok(())
252 }
253
254 fn stop(&mut self) -> anyhow::Result<()> {
255 log::info!("Stopping {id}", id = self.client_id);
256 self.cancellation_token.cancel();
257 self.is_connected.store(false, Ordering::Relaxed);
258 Ok(())
259 }
260
261 fn reset(&mut self) -> anyhow::Result<()> {
262 log::debug!("Resetting {id}", id = self.client_id);
263
264 self.cancellation_token.cancel();
265
266 for task in self.tasks.drain(..) {
267 task.abort();
268 }
269
270 let mut ws = self.ws_client.clone();
271 get_runtime().spawn(async move {
272 let _ = ws.close().await;
273 });
274
275 self.is_connected.store(false, Ordering::Relaxed);
276 self.cancellation_token = CancellationToken::new();
277 Ok(())
278 }
279
280 fn dispose(&mut self) -> anyhow::Result<()> {
281 log::debug!("Disposing {id}", id = self.client_id);
282 self.stop()
283 }
284
285 async fn connect(&mut self) -> anyhow::Result<()> {
286 if self.is_connected() {
287 return Ok(());
288 }
289
290 self.cancellation_token = CancellationToken::new();
292
293 let exchange_info = self
295 .http_client
296 .exchange_info()
297 .await
298 .map_err(|e| anyhow::anyhow!("failed to request Binance exchange info: {e}"))?;
299
300 let instruments = self
301 .http_client
302 .request_instruments()
303 .await
304 .context("failed to request Binance instruments")?;
305
306 self.http_client.cache_instruments(instruments.clone());
307
308 {
309 let mut inst_map = AHashMap::new();
310 let mut status_map = AHashMap::new();
311
312 for instrument in &instruments {
313 inst_map.insert(instrument.id(), instrument.clone());
314 }
315
316 for symbol_info in &exchange_info.symbols {
318 let instrument_id =
319 InstrumentId::new(Symbol::from(symbol_info.symbol.as_str()), *BINANCE_VENUE);
320
321 if inst_map.contains_key(&instrument_id) {
322 let action = MarketStatusAction::from(SymbolStatus::from(symbol_info.status));
323 status_map.insert(instrument_id, action);
324 }
325 }
326
327 self.instruments.store(inst_map);
328 self.status_cache.store(status_map);
329 }
330
331 for instrument in instruments.clone() {
332 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
333 log::warn!("Failed to send instrument: {e}");
334 }
335 }
336
337 self.ws_client.cache_instruments(&instruments);
338
339 log::info!("Connecting to Binance SBE WebSocket...");
340 self.ws_client.connect().await.map_err(|e| {
341 log::error!("Binance WebSocket connection failed: {e:?}");
342 anyhow::anyhow!("failed to connect Binance WebSocket: {e}")
343 })?;
344 log::info!("Binance SBE WebSocket connected");
345
346 let stream = self.ws_client.stream();
347 let sender = self.data_sender.clone();
348 let ws_insts = self.ws_client.instruments_cache();
349 let cancel = self.cancellation_token.clone();
350
351 let handle = get_runtime().spawn(async move {
352 pin_mut!(stream);
353
354 loop {
355 tokio::select! {
356 Some(message) = stream.next() => {
357 Self::handle_ws_message(message, &sender, &ws_insts);
358 }
359 () = cancel.cancelled() => {
360 log::debug!("WebSocket stream task cancelled");
361 break;
362 }
363 }
364 }
365 });
366 self.tasks.push(handle);
367
368 let poll_secs = self.config.instrument_status_poll_secs;
370 if poll_secs > 0 {
371 let http = self.http_client.clone();
372 let poll_sender = self.data_sender.clone();
373 let poll_instruments = self.instruments.clone();
374 let poll_status_cache = self.status_cache.clone();
375 let poll_cancel = self.cancellation_token.clone();
376 let clock = self.clock;
377
378 let poll_handle = get_runtime().spawn(async move {
379 let mut interval =
380 tokio::time::interval(tokio::time::Duration::from_secs(poll_secs));
381 interval.tick().await; loop {
384 tokio::select! {
385 _ = interval.tick() => {
386 match http.exchange_info().await {
387 Ok(info) => {
388 let ts = clock.get_time_ns();
389 let inst_guard = poll_instruments.load();
390
391 let mut new_statuses = AHashMap::new();
392 for symbol_info in &info.symbols {
393 let instrument_id = InstrumentId::new(
394 Symbol::from(
395 symbol_info.symbol.as_str(),
396 ),
397 *BINANCE_VENUE,
398 );
399
400 if inst_guard.contains_key(&instrument_id) {
401 let action = MarketStatusAction::from(
402 SymbolStatus::from(symbol_info.status),
403 );
404 new_statuses.insert(instrument_id, action);
405 }
406 }
407 drop(inst_guard);
408
409 let mut cache =
410 (**poll_status_cache.load()).clone();
411 diff_and_emit_statuses(
412 &new_statuses, &mut cache, &poll_sender, ts, ts,
413 );
414 poll_status_cache.store(cache);
415 }
416 Err(e) => {
417 log::warn!("Instrument status poll failed: {e}");
418 }
419 }
420 }
421 () = poll_cancel.cancelled() => {
422 log::debug!("Instrument status polling task cancelled");
423 break;
424 }
425 }
426 }
427 });
428 self.tasks.push(poll_handle);
429 log::info!("Instrument status polling started: interval={poll_secs}s");
430 }
431
432 self.is_connected.store(true, Ordering::Release);
433 log::info!("Connected: client_id={}", self.client_id);
434 Ok(())
435 }
436
437 async fn disconnect(&mut self) -> anyhow::Result<()> {
438 if self.is_disconnected() {
439 return Ok(());
440 }
441
442 self.cancellation_token.cancel();
443
444 let _ = self.ws_client.close().await;
445
446 let handles: Vec<_> = self.tasks.drain(..).collect();
447 for handle in handles {
448 if let Err(e) = handle.await {
449 log::error!("Error joining WebSocket task: {e}");
450 }
451 }
452
453 self.is_connected.store(false, Ordering::Release);
454 log::info!("Disconnected: client_id={}", self.client_id);
455 Ok(())
456 }
457
458 fn is_connected(&self) -> bool {
459 self.is_connected.load(Ordering::Relaxed)
460 }
461
462 fn is_disconnected(&self) -> bool {
463 !self.is_connected()
464 }
465
466 fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
467 log::debug!("subscribe_instruments: Binance instruments are fetched via HTTP on connect");
468 Ok(())
469 }
470
471 fn subscribe_instrument(&mut self, _cmd: SubscribeInstrument) -> anyhow::Result<()> {
472 log::debug!("subscribe_instrument: Binance instruments are fetched via HTTP on connect");
473 Ok(())
474 }
475
476 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
477 if cmd.book_type != BookType::L2_MBP {
478 anyhow::bail!("Binance SBE only supports L2_MBP order book deltas");
479 }
480
481 let instrument_id = cmd.instrument_id;
482 let ws = self.ws_client.clone();
483 let depth = cmd.depth.map_or(20, |d| d.get());
484
485 let depth_level = match depth {
487 1..=5 => 5,
488 6..=10 => 10,
489 _ => 20,
490 };
491
492 let stream = format!(
493 "{}@depth{}",
494 instrument_id.symbol.as_str().to_lowercase(),
495 depth_level
496 );
497
498 self.spawn_ws(
499 async move {
500 ws.subscribe(vec![stream])
501 .await
502 .context("book deltas subscription")
503 },
504 "order book subscription",
505 );
506 Ok(())
507 }
508
509 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
510 let instrument_id = cmd.instrument_id;
511 let ws = self.ws_client.clone();
512
513 let stream = format!(
514 "{}@bestBidAsk",
515 instrument_id.symbol.as_str().to_lowercase()
516 );
517
518 self.spawn_ws(
519 async move {
520 ws.subscribe(vec![stream])
521 .await
522 .context("quotes subscription")
523 },
524 "quote subscription",
525 );
526 Ok(())
527 }
528
529 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
530 let instrument_id = cmd.instrument_id;
531 let ws = self.ws_client.clone();
532
533 let stream = format!("{}@trade", instrument_id.symbol.as_str().to_lowercase());
534
535 self.spawn_ws(
536 async move {
537 ws.subscribe(vec![stream])
538 .await
539 .context("trades subscription")
540 },
541 "trade subscription",
542 );
543 Ok(())
544 }
545
546 fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
547 let bar_type = cmd.bar_type;
548 let ws = self.ws_client.clone();
549 let interval = bar_spec_to_binance_interval(bar_type.spec())?;
550
551 let stream = format!(
552 "{}@kline_{}",
553 bar_type.instrument_id().symbol.as_str().to_lowercase(),
554 interval.as_str()
555 );
556
557 self.spawn_ws(
558 async move {
559 ws.subscribe(vec![stream])
560 .await
561 .context("bars subscription")
562 },
563 "bar subscription",
564 );
565 Ok(())
566 }
567
568 fn subscribe_instrument_status(
569 &mut self,
570 cmd: SubscribeInstrumentStatus,
571 ) -> anyhow::Result<()> {
572 log::debug!(
573 "subscribe_instrument_status: {id} (status changes detected via periodic exchange info polling)",
574 id = cmd.instrument_id,
575 );
576 Ok(())
577 }
578
579 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
580 let instrument_id = cmd.instrument_id;
581 let ws = self.ws_client.clone();
582
583 let symbol_lower = instrument_id.symbol.as_str().to_lowercase();
585 let streams = vec![
586 format!("{symbol_lower}@depth5"),
587 format!("{symbol_lower}@depth10"),
588 format!("{symbol_lower}@depth20"),
589 ];
590
591 self.spawn_ws(
592 async move {
593 ws.unsubscribe(streams)
594 .await
595 .context("book deltas unsubscribe")
596 },
597 "order book unsubscribe",
598 );
599 Ok(())
600 }
601
602 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
603 let instrument_id = cmd.instrument_id;
604 let ws = self.ws_client.clone();
605
606 let stream = format!(
607 "{}@bestBidAsk",
608 instrument_id.symbol.as_str().to_lowercase()
609 );
610
611 self.spawn_ws(
612 async move {
613 ws.unsubscribe(vec![stream])
614 .await
615 .context("quotes unsubscribe")
616 },
617 "quote unsubscribe",
618 );
619 Ok(())
620 }
621
622 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
623 let instrument_id = cmd.instrument_id;
624 let ws = self.ws_client.clone();
625
626 let stream = format!("{}@trade", instrument_id.symbol.as_str().to_lowercase());
627
628 self.spawn_ws(
629 async move {
630 ws.unsubscribe(vec![stream])
631 .await
632 .context("trades unsubscribe")
633 },
634 "trade unsubscribe",
635 );
636 Ok(())
637 }
638
639 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
640 let bar_type = cmd.bar_type;
641 let ws = self.ws_client.clone();
642 let interval = bar_spec_to_binance_interval(bar_type.spec())?;
643
644 let stream = format!(
645 "{}@kline_{}",
646 bar_type.instrument_id().symbol.as_str().to_lowercase(),
647 interval.as_str()
648 );
649
650 self.spawn_ws(
651 async move {
652 ws.unsubscribe(vec![stream])
653 .await
654 .context("bars unsubscribe")
655 },
656 "bar unsubscribe",
657 );
658 Ok(())
659 }
660
661 fn unsubscribe_instrument_status(
662 &mut self,
663 cmd: &UnsubscribeInstrumentStatus,
664 ) -> anyhow::Result<()> {
665 log::debug!(
666 "unsubscribe_instrument_status: {id}",
667 id = cmd.instrument_id,
668 );
669 Ok(())
670 }
671
672 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
673 let http = self.http_client.clone();
674 let sender = self.data_sender.clone();
675 let instruments_cache = self.instruments.clone();
676 let request_id = request.request_id;
677 let client_id = request.client_id.unwrap_or(self.client_id);
678 let venue = self.venue();
679 let start = request.start;
680 let end = request.end;
681 let params = request.params;
682 let clock = self.clock;
683 let start_nanos = datetime_to_unix_nanos(start);
684 let end_nanos = datetime_to_unix_nanos(end);
685
686 get_runtime().spawn(async move {
687 match http.request_instruments().await {
688 Ok(instruments) => {
689 for instrument in &instruments {
690 upsert_instrument(&instruments_cache, instrument.clone());
691 }
692
693 let response = DataResponse::Instruments(InstrumentsResponse::new(
694 request_id,
695 client_id,
696 venue,
697 instruments,
698 start_nanos,
699 end_nanos,
700 clock.get_time_ns(),
701 params,
702 ));
703
704 if let Err(e) = sender.send(DataEvent::Response(response)) {
705 log::error!("Failed to send instruments response: {e}");
706 }
707 }
708 Err(e) => log::error!("Instruments request failed: {e:?}"),
709 }
710 });
711
712 Ok(())
713 }
714
715 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
716 let http = self.http_client.clone();
717 let sender = self.data_sender.clone();
718 let instruments = self.instruments.clone();
719 let instrument_id = request.instrument_id;
720 let request_id = request.request_id;
721 let client_id = request.client_id.unwrap_or(self.client_id);
722 let start = request.start;
723 let end = request.end;
724 let params = request.params;
725 let clock = self.clock;
726 let start_nanos = datetime_to_unix_nanos(start);
727 let end_nanos = datetime_to_unix_nanos(end);
728
729 get_runtime().spawn(async move {
730 match http.request_instruments().await {
731 Ok(all_instruments) => {
732 for instrument in &all_instruments {
733 upsert_instrument(&instruments, instrument.clone());
734 }
735
736 let instrument = all_instruments
737 .into_iter()
738 .find(|i| i.id() == instrument_id);
739
740 if let Some(instrument) = instrument {
741 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
742 request_id,
743 client_id,
744 instrument.id(),
745 instrument,
746 start_nanos,
747 end_nanos,
748 clock.get_time_ns(),
749 params,
750 )));
751
752 if let Err(e) = sender.send(DataEvent::Response(response)) {
753 log::error!("Failed to send instrument response: {e}");
754 }
755 } else {
756 log::error!("Instrument not found: {instrument_id}");
757 }
758 }
759 Err(e) => log::error!("Instrument request failed: {e:?}"),
760 }
761 });
762
763 Ok(())
764 }
765
766 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
767 let http = self.http_client.clone();
768 let sender = self.data_sender.clone();
769 let instrument_id = request.instrument_id;
770 let limit = request.limit.map(|n| n.get() as u32);
771 let request_id = request.request_id;
772 let client_id = request.client_id.unwrap_or(self.client_id);
773 let params = request.params;
774 let clock = self.clock;
775 let start_nanos = datetime_to_unix_nanos(request.start);
776 let end_nanos = datetime_to_unix_nanos(request.end);
777
778 get_runtime().spawn(async move {
779 match http
780 .request_trades(instrument_id, limit)
781 .await
782 .context("failed to request trades from Binance")
783 {
784 Ok(trades) => {
785 let response = DataResponse::Trades(TradesResponse::new(
786 request_id,
787 client_id,
788 instrument_id,
789 trades,
790 start_nanos,
791 end_nanos,
792 clock.get_time_ns(),
793 params,
794 ));
795
796 if let Err(e) = sender.send(DataEvent::Response(response)) {
797 log::error!("Failed to send trades response: {e}");
798 }
799 }
800 Err(e) => log::error!("Trade request failed: {e:?}"),
801 }
802 });
803
804 Ok(())
805 }
806
807 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
808 let http = self.http_client.clone();
809 let sender = self.data_sender.clone();
810 let bar_type = request.bar_type;
811 let start = request.start;
812 let end = request.end;
813 let limit = request.limit.map(|n| n.get() as u32);
814 let request_id = request.request_id;
815 let client_id = request.client_id.unwrap_or(self.client_id);
816 let params = request.params;
817 let clock = self.clock;
818 let start_nanos = datetime_to_unix_nanos(start);
819 let end_nanos = datetime_to_unix_nanos(end);
820
821 get_runtime().spawn(async move {
822 match http
823 .request_bars(bar_type, start, end, limit)
824 .await
825 .context("failed to request bars from Binance")
826 {
827 Ok(bars) => {
828 let response = DataResponse::Bars(BarsResponse::new(
829 request_id,
830 client_id,
831 bar_type,
832 bars,
833 start_nanos,
834 end_nanos,
835 clock.get_time_ns(),
836 params,
837 ));
838
839 if let Err(e) = sender.send(DataEvent::Response(response)) {
840 log::error!("Failed to send bars response: {e}");
841 }
842 }
843 Err(e) => log::error!("Bar request failed: {e:?}"),
844 }
845 });
846
847 Ok(())
848 }
849}