Skip to main content

nautilus_binance/spot/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the Binance Spot adapter.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27    clients::DataClient,
28    live::{runner::get_data_event_sender, runtime::get_runtime},
29    messages::{
30        DataEvent,
31        data::{
32            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
34            SubscribeBookDeltas, SubscribeInstrument, SubscribeInstruments, SubscribeQuotes,
35            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
36            UnsubscribeQuotes, UnsubscribeTrades, subscribe::SubscribeInstrumentStatus,
37            unsubscribe::UnsubscribeInstrumentStatus,
38        },
39    },
40};
41use nautilus_core::{
42    AtomicMap,
43    datetime::datetime_to_unix_nanos,
44    time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_model::{
47    data::{Data, OrderBookDeltas_API},
48    enums::{BookType, MarketStatusAction},
49    identifiers::{ClientId, InstrumentId, Symbol, Venue},
50    instruments::{Instrument, InstrumentAny},
51};
52use tokio::task::JoinHandle;
53use tokio_util::sync::CancellationToken;
54use ustr::Ustr;
55
56use crate::{
57    common::{
58        consts::BINANCE_VENUE, credential::resolve_credentials, enums::BinanceProductType,
59        parse::bar_spec_to_binance_interval, status::diff_and_emit_statuses,
60    },
61    config::BinanceDataClientConfig,
62    spot::{
63        http::client::BinanceSpotHttpClient,
64        sbe::generated::symbol_status::SymbolStatus,
65        websocket::streams::{
66            client::BinanceSpotWebSocketClient,
67            messages::BinanceSpotWsMessage,
68            parse::{parse_bbo_event, parse_depth_diff, parse_depth_snapshot, parse_trades_event},
69        },
70    },
71};
72
73/// Binance Spot data client for SBE market data.
74#[derive(Debug)]
75pub struct BinanceSpotDataClient {
76    clock: &'static AtomicTime,
77    client_id: ClientId,
78    config: BinanceDataClientConfig,
79    http_client: BinanceSpotHttpClient,
80    ws_client: BinanceSpotWebSocketClient,
81    is_connected: AtomicBool,
82    cancellation_token: CancellationToken,
83    tasks: Vec<JoinHandle<()>>,
84    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
85    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
86    status_cache: Arc<AtomicMap<InstrumentId, MarketStatusAction>>,
87}
88
89impl BinanceSpotDataClient {
90    /// Creates a new [`BinanceSpotDataClient`] instance.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the client fails to initialize.
95    pub fn new(client_id: ClientId, config: BinanceDataClientConfig) -> anyhow::Result<Self> {
96        let clock = get_atomic_clock_realtime();
97
98        let http_client = BinanceSpotHttpClient::new(
99            config.environment,
100            clock,
101            config.api_key.clone(),
102            config.api_secret.clone(),
103            config.base_url_http.clone(),
104            None, // recv_window
105            None, // timeout_secs
106            None, // proxy_url
107        )?;
108
109        let product_type = config
110            .product_types
111            .first()
112            .copied()
113            .unwrap_or(BinanceProductType::Spot);
114
115        let creds = resolve_credentials(
116            config.api_key.clone(),
117            config.api_secret.clone(),
118            config.environment,
119            product_type,
120        )
121        .ok();
122
123        // SBE streams require Ed25519 authentication
124        let ws_client = BinanceSpotWebSocketClient::new(
125            config.base_url_ws.clone(),
126            creds.as_ref().map(|(k, _)| k.clone()),
127            creds.as_ref().map(|(_, s)| s.clone()),
128            Some(20), // Heartbeat interval
129            config.transport_backend,
130        )?;
131        let data_sender = get_data_event_sender();
132
133        Ok(Self {
134            clock,
135            client_id,
136            config,
137            http_client,
138            ws_client,
139            is_connected: AtomicBool::new(false),
140            cancellation_token: CancellationToken::new(),
141            tasks: Vec::new(),
142            data_sender,
143            instruments: Arc::new(AtomicMap::new()),
144            status_cache: Arc::new(AtomicMap::new()),
145        })
146    }
147
148    fn venue(&self) -> Venue {
149        *BINANCE_VENUE
150    }
151
152    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
153        if let Err(e) = sender.send(DataEvent::Data(data)) {
154            log::error!("Failed to emit data event: {e}");
155        }
156    }
157
158    fn spawn_ws<F>(&self, fut: F, context: &'static str)
159    where
160        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
161    {
162        get_runtime().spawn(async move {
163            if let Err(e) = fut.await {
164                log::error!("{context}: {e:?}");
165            }
166        });
167    }
168
169    fn handle_ws_message(
170        msg: BinanceSpotWsMessage,
171        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
172        ws_instruments: &Arc<AtomicMap<Ustr, InstrumentAny>>,
173    ) {
174        match msg {
175            BinanceSpotWsMessage::Trades(ref event) => {
176                let symbol = Ustr::from(&event.symbol);
177                let cache = ws_instruments.load();
178                if let Some(instrument) = cache.get(&symbol) {
179                    let trades = parse_trades_event(event, instrument);
180                    for data in trades {
181                        Self::send_data(data_sender, data);
182                    }
183                }
184            }
185            BinanceSpotWsMessage::BestBidAsk(ref event) => {
186                let symbol = Ustr::from(&event.symbol);
187                let cache = ws_instruments.load();
188                if let Some(instrument) = cache.get(&symbol) {
189                    let quote = parse_bbo_event(event, instrument);
190                    Self::send_data(data_sender, Data::from(quote));
191                }
192            }
193            BinanceSpotWsMessage::DepthSnapshot(ref event) => {
194                let symbol = Ustr::from(&event.symbol);
195                let cache = ws_instruments.load();
196                if let Some(instrument) = cache.get(&symbol)
197                    && let Some(deltas) = parse_depth_snapshot(event, instrument)
198                {
199                    Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
200                }
201            }
202            BinanceSpotWsMessage::DepthDiff(ref event) => {
203                let symbol = Ustr::from(&event.symbol);
204                let cache = ws_instruments.load();
205                if let Some(instrument) = cache.get(&symbol)
206                    && let Some(deltas) = parse_depth_diff(event, instrument)
207                {
208                    Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
209                }
210            }
211            BinanceSpotWsMessage::RawBinary(data) => {
212                log::debug!("Unhandled binary message: {} bytes", data.len());
213            }
214            BinanceSpotWsMessage::RawJson(value) => {
215                log::debug!("Unhandled JSON message: {value:?}");
216            }
217            BinanceSpotWsMessage::Error(e) => {
218                log::error!("Binance WebSocket error: code={}, msg={}", e.code, e.msg);
219            }
220            BinanceSpotWsMessage::Reconnected => {
221                log::info!("WebSocket reconnected");
222            }
223        }
224    }
225}
226
227fn upsert_instrument(
228    cache: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
229    instrument: InstrumentAny,
230) {
231    cache.insert(instrument.id(), instrument);
232}
233
234#[async_trait::async_trait(?Send)]
235impl DataClient for BinanceSpotDataClient {
236    fn client_id(&self) -> ClientId {
237        self.client_id
238    }
239
240    fn venue(&self) -> Option<Venue> {
241        Some(self.venue())
242    }
243
244    fn start(&mut self) -> anyhow::Result<()> {
245        log::info!(
246            "Started: client_id={}, product_types={:?}, environment={:?}",
247            self.client_id,
248            self.config.product_types,
249            self.config.environment,
250        );
251        Ok(())
252    }
253
254    fn stop(&mut self) -> anyhow::Result<()> {
255        log::info!("Stopping {id}", id = self.client_id);
256        self.cancellation_token.cancel();
257        self.is_connected.store(false, Ordering::Relaxed);
258        Ok(())
259    }
260
261    fn reset(&mut self) -> anyhow::Result<()> {
262        log::debug!("Resetting {id}", id = self.client_id);
263
264        self.cancellation_token.cancel();
265
266        for task in self.tasks.drain(..) {
267            task.abort();
268        }
269
270        let mut ws = self.ws_client.clone();
271        get_runtime().spawn(async move {
272            let _ = ws.close().await;
273        });
274
275        self.is_connected.store(false, Ordering::Relaxed);
276        self.cancellation_token = CancellationToken::new();
277        Ok(())
278    }
279
280    fn dispose(&mut self) -> anyhow::Result<()> {
281        log::debug!("Disposing {id}", id = self.client_id);
282        self.stop()
283    }
284
285    async fn connect(&mut self) -> anyhow::Result<()> {
286        if self.is_connected() {
287            return Ok(());
288        }
289
290        // Reinitialize token in case of reconnection after disconnect
291        self.cancellation_token = CancellationToken::new();
292
293        // Fetch exchange info for both instruments and initial status cache
294        let exchange_info = self
295            .http_client
296            .exchange_info()
297            .await
298            .map_err(|e| anyhow::anyhow!("failed to request Binance exchange info: {e}"))?;
299
300        let instruments = self
301            .http_client
302            .request_instruments()
303            .await
304            .context("failed to request Binance instruments")?;
305
306        self.http_client.cache_instruments(instruments.clone());
307
308        {
309            let mut inst_map = AHashMap::new();
310            let mut status_map = AHashMap::new();
311
312            for instrument in &instruments {
313                inst_map.insert(instrument.id(), instrument.clone());
314            }
315
316            // Seed status cache from exchange info (no events emitted on initial connect)
317            for symbol_info in &exchange_info.symbols {
318                let instrument_id =
319                    InstrumentId::new(Symbol::from(symbol_info.symbol.as_str()), *BINANCE_VENUE);
320
321                if inst_map.contains_key(&instrument_id) {
322                    let action = MarketStatusAction::from(SymbolStatus::from(symbol_info.status));
323                    status_map.insert(instrument_id, action);
324                }
325            }
326
327            self.instruments.store(inst_map);
328            self.status_cache.store(status_map);
329        }
330
331        for instrument in instruments.clone() {
332            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
333                log::warn!("Failed to send instrument: {e}");
334            }
335        }
336
337        self.ws_client.cache_instruments(&instruments);
338
339        log::info!("Connecting to Binance SBE WebSocket...");
340        self.ws_client.connect().await.map_err(|e| {
341            log::error!("Binance WebSocket connection failed: {e:?}");
342            anyhow::anyhow!("failed to connect Binance WebSocket: {e}")
343        })?;
344        log::info!("Binance SBE WebSocket connected");
345
346        let stream = self.ws_client.stream();
347        let sender = self.data_sender.clone();
348        let ws_insts = self.ws_client.instruments_cache();
349        let cancel = self.cancellation_token.clone();
350
351        let handle = get_runtime().spawn(async move {
352            pin_mut!(stream);
353
354            loop {
355                tokio::select! {
356                    Some(message) = stream.next() => {
357                        Self::handle_ws_message(message, &sender, &ws_insts);
358                    }
359                    () = cancel.cancelled() => {
360                        log::debug!("WebSocket stream task cancelled");
361                        break;
362                    }
363                }
364            }
365        });
366        self.tasks.push(handle);
367
368        // Spawn instrument status polling task
369        let poll_secs = self.config.instrument_status_poll_secs;
370        if poll_secs > 0 {
371            let http = self.http_client.clone();
372            let poll_sender = self.data_sender.clone();
373            let poll_instruments = self.instruments.clone();
374            let poll_status_cache = self.status_cache.clone();
375            let poll_cancel = self.cancellation_token.clone();
376            let clock = self.clock;
377
378            let poll_handle = get_runtime().spawn(async move {
379                let mut interval =
380                    tokio::time::interval(tokio::time::Duration::from_secs(poll_secs));
381                interval.tick().await; // Skip first immediate tick
382
383                loop {
384                    tokio::select! {
385                        _ = interval.tick() => {
386                            match http.exchange_info().await {
387                                Ok(info) => {
388                                    let ts = clock.get_time_ns();
389                                    let inst_guard = poll_instruments.load();
390
391                                    let mut new_statuses = AHashMap::new();
392                                    for symbol_info in &info.symbols {
393                                        let instrument_id = InstrumentId::new(
394                                            Symbol::from(
395                                                symbol_info.symbol.as_str(),
396                                            ),
397                                            *BINANCE_VENUE,
398                                        );
399
400                                        if inst_guard.contains_key(&instrument_id) {
401                                            let action = MarketStatusAction::from(
402                                                SymbolStatus::from(symbol_info.status),
403                                            );
404                                            new_statuses.insert(instrument_id, action);
405                                        }
406                                    }
407                                    drop(inst_guard);
408
409                                    let mut cache =
410                                        (**poll_status_cache.load()).clone();
411                                    diff_and_emit_statuses(
412                                        &new_statuses, &mut cache, &poll_sender, ts, ts,
413                                    );
414                                    poll_status_cache.store(cache);
415                                }
416                                Err(e) => {
417                                    log::warn!("Instrument status poll failed: {e}");
418                                }
419                            }
420                        }
421                        () = poll_cancel.cancelled() => {
422                            log::debug!("Instrument status polling task cancelled");
423                            break;
424                        }
425                    }
426                }
427            });
428            self.tasks.push(poll_handle);
429            log::info!("Instrument status polling started: interval={poll_secs}s");
430        }
431
432        self.is_connected.store(true, Ordering::Release);
433        log::info!("Connected: client_id={}", self.client_id);
434        Ok(())
435    }
436
437    async fn disconnect(&mut self) -> anyhow::Result<()> {
438        if self.is_disconnected() {
439            return Ok(());
440        }
441
442        self.cancellation_token.cancel();
443
444        let _ = self.ws_client.close().await;
445
446        let handles: Vec<_> = self.tasks.drain(..).collect();
447        for handle in handles {
448            if let Err(e) = handle.await {
449                log::error!("Error joining WebSocket task: {e}");
450            }
451        }
452
453        self.is_connected.store(false, Ordering::Release);
454        log::info!("Disconnected: client_id={}", self.client_id);
455        Ok(())
456    }
457
458    fn is_connected(&self) -> bool {
459        self.is_connected.load(Ordering::Relaxed)
460    }
461
462    fn is_disconnected(&self) -> bool {
463        !self.is_connected()
464    }
465
466    fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
467        log::debug!("subscribe_instruments: Binance instruments are fetched via HTTP on connect");
468        Ok(())
469    }
470
471    fn subscribe_instrument(&mut self, _cmd: SubscribeInstrument) -> anyhow::Result<()> {
472        log::debug!("subscribe_instrument: Binance instruments are fetched via HTTP on connect");
473        Ok(())
474    }
475
476    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
477        if cmd.book_type != BookType::L2_MBP {
478            anyhow::bail!("Binance SBE only supports L2_MBP order book deltas");
479        }
480
481        let instrument_id = cmd.instrument_id;
482        let ws = self.ws_client.clone();
483        let depth = cmd.depth.map_or(20, |d| d.get());
484
485        // Binance SBE depth streams: depth5, depth10, depth20
486        let depth_level = match depth {
487            1..=5 => 5,
488            6..=10 => 10,
489            _ => 20,
490        };
491
492        let stream = format!(
493            "{}@depth{}",
494            instrument_id.symbol.as_str().to_lowercase(),
495            depth_level
496        );
497
498        self.spawn_ws(
499            async move {
500                ws.subscribe(vec![stream])
501                    .await
502                    .context("book deltas subscription")
503            },
504            "order book subscription",
505        );
506        Ok(())
507    }
508
509    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
510        let instrument_id = cmd.instrument_id;
511        let ws = self.ws_client.clone();
512
513        let stream = format!(
514            "{}@bestBidAsk",
515            instrument_id.symbol.as_str().to_lowercase()
516        );
517
518        self.spawn_ws(
519            async move {
520                ws.subscribe(vec![stream])
521                    .await
522                    .context("quotes subscription")
523            },
524            "quote subscription",
525        );
526        Ok(())
527    }
528
529    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
530        let instrument_id = cmd.instrument_id;
531        let ws = self.ws_client.clone();
532
533        let stream = format!("{}@trade", instrument_id.symbol.as_str().to_lowercase());
534
535        self.spawn_ws(
536            async move {
537                ws.subscribe(vec![stream])
538                    .await
539                    .context("trades subscription")
540            },
541            "trade subscription",
542        );
543        Ok(())
544    }
545
546    fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
547        let bar_type = cmd.bar_type;
548        let ws = self.ws_client.clone();
549        let interval = bar_spec_to_binance_interval(bar_type.spec())?;
550
551        let stream = format!(
552            "{}@kline_{}",
553            bar_type.instrument_id().symbol.as_str().to_lowercase(),
554            interval.as_str()
555        );
556
557        self.spawn_ws(
558            async move {
559                ws.subscribe(vec![stream])
560                    .await
561                    .context("bars subscription")
562            },
563            "bar subscription",
564        );
565        Ok(())
566    }
567
568    fn subscribe_instrument_status(
569        &mut self,
570        cmd: SubscribeInstrumentStatus,
571    ) -> anyhow::Result<()> {
572        log::debug!(
573            "subscribe_instrument_status: {id} (status changes detected via periodic exchange info polling)",
574            id = cmd.instrument_id,
575        );
576        Ok(())
577    }
578
579    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
580        let instrument_id = cmd.instrument_id;
581        let ws = self.ws_client.clone();
582
583        // Unsubscribe from all depth levels for this symbol
584        let symbol_lower = instrument_id.symbol.as_str().to_lowercase();
585        let streams = vec![
586            format!("{symbol_lower}@depth5"),
587            format!("{symbol_lower}@depth10"),
588            format!("{symbol_lower}@depth20"),
589        ];
590
591        self.spawn_ws(
592            async move {
593                ws.unsubscribe(streams)
594                    .await
595                    .context("book deltas unsubscribe")
596            },
597            "order book unsubscribe",
598        );
599        Ok(())
600    }
601
602    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
603        let instrument_id = cmd.instrument_id;
604        let ws = self.ws_client.clone();
605
606        let stream = format!(
607            "{}@bestBidAsk",
608            instrument_id.symbol.as_str().to_lowercase()
609        );
610
611        self.spawn_ws(
612            async move {
613                ws.unsubscribe(vec![stream])
614                    .await
615                    .context("quotes unsubscribe")
616            },
617            "quote unsubscribe",
618        );
619        Ok(())
620    }
621
622    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
623        let instrument_id = cmd.instrument_id;
624        let ws = self.ws_client.clone();
625
626        let stream = format!("{}@trade", instrument_id.symbol.as_str().to_lowercase());
627
628        self.spawn_ws(
629            async move {
630                ws.unsubscribe(vec![stream])
631                    .await
632                    .context("trades unsubscribe")
633            },
634            "trade unsubscribe",
635        );
636        Ok(())
637    }
638
639    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
640        let bar_type = cmd.bar_type;
641        let ws = self.ws_client.clone();
642        let interval = bar_spec_to_binance_interval(bar_type.spec())?;
643
644        let stream = format!(
645            "{}@kline_{}",
646            bar_type.instrument_id().symbol.as_str().to_lowercase(),
647            interval.as_str()
648        );
649
650        self.spawn_ws(
651            async move {
652                ws.unsubscribe(vec![stream])
653                    .await
654                    .context("bars unsubscribe")
655            },
656            "bar unsubscribe",
657        );
658        Ok(())
659    }
660
661    fn unsubscribe_instrument_status(
662        &mut self,
663        cmd: &UnsubscribeInstrumentStatus,
664    ) -> anyhow::Result<()> {
665        log::debug!(
666            "unsubscribe_instrument_status: {id}",
667            id = cmd.instrument_id,
668        );
669        Ok(())
670    }
671
672    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
673        let http = self.http_client.clone();
674        let sender = self.data_sender.clone();
675        let instruments_cache = self.instruments.clone();
676        let request_id = request.request_id;
677        let client_id = request.client_id.unwrap_or(self.client_id);
678        let venue = self.venue();
679        let start = request.start;
680        let end = request.end;
681        let params = request.params;
682        let clock = self.clock;
683        let start_nanos = datetime_to_unix_nanos(start);
684        let end_nanos = datetime_to_unix_nanos(end);
685
686        get_runtime().spawn(async move {
687            match http.request_instruments().await {
688                Ok(instruments) => {
689                    for instrument in &instruments {
690                        upsert_instrument(&instruments_cache, instrument.clone());
691                    }
692
693                    let response = DataResponse::Instruments(InstrumentsResponse::new(
694                        request_id,
695                        client_id,
696                        venue,
697                        instruments,
698                        start_nanos,
699                        end_nanos,
700                        clock.get_time_ns(),
701                        params,
702                    ));
703
704                    if let Err(e) = sender.send(DataEvent::Response(response)) {
705                        log::error!("Failed to send instruments response: {e}");
706                    }
707                }
708                Err(e) => log::error!("Instruments request failed: {e:?}"),
709            }
710        });
711
712        Ok(())
713    }
714
715    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
716        let http = self.http_client.clone();
717        let sender = self.data_sender.clone();
718        let instruments = self.instruments.clone();
719        let instrument_id = request.instrument_id;
720        let request_id = request.request_id;
721        let client_id = request.client_id.unwrap_or(self.client_id);
722        let start = request.start;
723        let end = request.end;
724        let params = request.params;
725        let clock = self.clock;
726        let start_nanos = datetime_to_unix_nanos(start);
727        let end_nanos = datetime_to_unix_nanos(end);
728
729        get_runtime().spawn(async move {
730            match http.request_instruments().await {
731                Ok(all_instruments) => {
732                    for instrument in &all_instruments {
733                        upsert_instrument(&instruments, instrument.clone());
734                    }
735
736                    let instrument = all_instruments
737                        .into_iter()
738                        .find(|i| i.id() == instrument_id);
739
740                    if let Some(instrument) = instrument {
741                        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
742                            request_id,
743                            client_id,
744                            instrument.id(),
745                            instrument,
746                            start_nanos,
747                            end_nanos,
748                            clock.get_time_ns(),
749                            params,
750                        )));
751
752                        if let Err(e) = sender.send(DataEvent::Response(response)) {
753                            log::error!("Failed to send instrument response: {e}");
754                        }
755                    } else {
756                        log::error!("Instrument not found: {instrument_id}");
757                    }
758                }
759                Err(e) => log::error!("Instrument request failed: {e:?}"),
760            }
761        });
762
763        Ok(())
764    }
765
766    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
767        let http = self.http_client.clone();
768        let sender = self.data_sender.clone();
769        let instrument_id = request.instrument_id;
770        let limit = request.limit.map(|n| n.get() as u32);
771        let request_id = request.request_id;
772        let client_id = request.client_id.unwrap_or(self.client_id);
773        let params = request.params;
774        let clock = self.clock;
775        let start_nanos = datetime_to_unix_nanos(request.start);
776        let end_nanos = datetime_to_unix_nanos(request.end);
777
778        get_runtime().spawn(async move {
779            match http
780                .request_trades(instrument_id, limit)
781                .await
782                .context("failed to request trades from Binance")
783            {
784                Ok(trades) => {
785                    let response = DataResponse::Trades(TradesResponse::new(
786                        request_id,
787                        client_id,
788                        instrument_id,
789                        trades,
790                        start_nanos,
791                        end_nanos,
792                        clock.get_time_ns(),
793                        params,
794                    ));
795
796                    if let Err(e) = sender.send(DataEvent::Response(response)) {
797                        log::error!("Failed to send trades response: {e}");
798                    }
799                }
800                Err(e) => log::error!("Trade request failed: {e:?}"),
801            }
802        });
803
804        Ok(())
805    }
806
807    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
808        let http = self.http_client.clone();
809        let sender = self.data_sender.clone();
810        let bar_type = request.bar_type;
811        let start = request.start;
812        let end = request.end;
813        let limit = request.limit.map(|n| n.get() as u32);
814        let request_id = request.request_id;
815        let client_id = request.client_id.unwrap_or(self.client_id);
816        let params = request.params;
817        let clock = self.clock;
818        let start_nanos = datetime_to_unix_nanos(start);
819        let end_nanos = datetime_to_unix_nanos(end);
820
821        get_runtime().spawn(async move {
822            match http
823                .request_bars(bar_type, start, end, limit)
824                .await
825                .context("failed to request bars from Binance")
826            {
827                Ok(bars) => {
828                    let response = DataResponse::Bars(BarsResponse::new(
829                        request_id,
830                        client_id,
831                        bar_type,
832                        bars,
833                        start_nanos,
834                        end_nanos,
835                        clock.get_time_ns(),
836                        params,
837                    ));
838
839                    if let Err(e) = sender.send(DataEvent::Response(response)) {
840                        log::error!("Failed to send bars response: {e}");
841                    }
842                }
843                Err(e) => log::error!("Bar request failed: {e:?}"),
844            }
845        });
846
847        Ok(())
848    }
849}