1use std::{fmt::Debug, sync::Arc, time::Duration};
32
33use ahash::{AHashMap, HashSet, HashSetExt};
34use databento::{
35 dbn::{self, PitSymbolMap, Record, SymbolIndex},
36 live::Subscription,
37};
38use indexmap::IndexMap;
39use nautilus_core::{
40 AtomicMap, UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43 data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
44 enums::RecordFlag,
45 identifiers::{InstrumentId, Symbol, Venue},
46 instruments::{Instrument, InstrumentAny},
47};
48use nautilus_network::backoff::ExponentialBackoff;
49
50use super::{
51 decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
52 types::{DatabentoImbalance, DatabentoStatistics, SubscriptionAckEvent},
53};
54use crate::{
55 common::Credential,
56 decode::{decode_instrument_def_msg, decode_record},
57 types::PublisherId,
58};
59
60#[derive(Debug)]
61pub enum HandlerCommand {
62 Subscribe(Subscription),
63 Start,
64 Close,
65}
66
67#[derive(Debug)]
68pub enum DatabentoMessage {
69 Data(Data),
70 Instrument(Box<InstrumentAny>),
71 Status(InstrumentStatus),
72 Imbalance(DatabentoImbalance),
73 Statistics(DatabentoStatistics),
74 SubscriptionAck(SubscriptionAckEvent),
75 Error(anyhow::Error),
76 Close,
77}
78
79pub struct DatabentoFeedHandler {
93 credential: Credential,
94 dataset: String,
95 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
96 msg_tx: tokio::sync::mpsc::Sender<DatabentoMessage>,
97 publisher_venue_map: IndexMap<PublisherId, Venue>,
98 symbol_venue_map: Arc<AtomicMap<Symbol, Venue>>,
99 replay: bool,
100 use_exchange_as_venue: bool,
101 bars_timestamp_on_close: bool,
102 reconnect_timeout_mins: Option<u64>,
103 backoff: ExponentialBackoff,
104 subscriptions: Vec<Subscription>,
105 buffered_commands: Vec<HandlerCommand>,
106 gateway_addr: Option<String>,
107 success_threshold: Duration,
108}
109
110impl Debug for DatabentoFeedHandler {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 f.debug_struct(stringify!(DatabentoFeedHandler))
113 .field("credential", &self.credential)
114 .field("dataset", &self.dataset)
115 .field("replay", &self.replay)
116 .field("reconnect_timeout_mins", &self.reconnect_timeout_mins)
117 .field("subscriptions", &self.subscriptions.len())
118 .finish()
119 }
120}
121
122impl DatabentoFeedHandler {
123 #[must_use]
129 #[expect(clippy::too_many_arguments)]
130 pub fn new(
131 credential: Credential,
132 dataset: String,
133 rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
134 tx: tokio::sync::mpsc::Sender<DatabentoMessage>,
135 publisher_venue_map: IndexMap<PublisherId, Venue>,
136 symbol_venue_map: Arc<AtomicMap<Symbol, Venue>>,
137 use_exchange_as_venue: bool,
138 bars_timestamp_on_close: bool,
139 reconnect_timeout_mins: Option<u64>,
140 ) -> Self {
141 let delay_max = if reconnect_timeout_mins.is_some() {
145 Duration::from_secs(60)
146 } else {
147 Duration::from_secs(600)
148 };
149
150 let backoff = ExponentialBackoff::new(Duration::from_secs(1), delay_max, 2.0, 1000, false)
151 .expect("hardcoded backoff parameters are valid");
152
153 Self {
154 credential,
155 dataset,
156 cmd_rx: rx,
157 msg_tx: tx,
158 publisher_venue_map,
159 symbol_venue_map,
160 replay: false,
161 use_exchange_as_venue,
162 bars_timestamp_on_close,
163 reconnect_timeout_mins,
164 backoff,
165 subscriptions: Vec::new(),
166 buffered_commands: Vec::new(),
167 gateway_addr: None,
168 success_threshold: Duration::from_secs(60),
169 }
170 }
171
172 #[must_use]
174 pub fn with_gateway_addr(mut self, addr: String) -> Self {
175 self.gateway_addr = Some(addr);
176 self
177 }
178
179 #[must_use]
184 pub fn with_success_threshold(mut self, threshold: Duration) -> Self {
185 self.success_threshold = threshold;
186 self
187 }
188
189 pub async fn run(&mut self) -> anyhow::Result<()> {
202 log::debug!("Running feed handler");
203
204 let mut reconnect_start: Option<tokio::time::Instant> = None;
205 let mut attempt = 0;
206
207 loop {
208 attempt += 1;
209
210 match self.run_session(attempt).await {
211 Ok(ran_successfully) => {
212 if ran_successfully {
213 log::info!("Resetting reconnection cycle after successful session");
214 reconnect_start = None;
215 attempt = 0;
216 self.backoff.reset();
217 } else {
218 log::info!("Session ended normally");
219 break Ok(());
220 }
221 }
222 Err(e) => {
223 let cycle_start = reconnect_start.get_or_insert_with(tokio::time::Instant::now);
224
225 if let Some(timeout_mins) = self.reconnect_timeout_mins {
226 let elapsed = cycle_start.elapsed();
227 let timeout = Duration::from_mins(timeout_mins);
228
229 if elapsed >= timeout {
230 log::error!("Giving up reconnection after {timeout_mins} minutes");
231 self.send_msg(DatabentoMessage::Error(anyhow::anyhow!(
232 "Reconnection timeout after {timeout_mins} minutes: {e}"
233 )))
234 .await;
235 break Err(e);
236 }
237 }
238
239 let delay = self.backoff.next_duration();
240
241 log::warn!(
242 "Connection lost (attempt {}): {}. Reconnecting in {}s...",
243 attempt,
244 e,
245 delay.as_secs()
246 );
247
248 tokio::select! {
249 () = tokio::time::sleep(delay) => {}
250 cmd = self.cmd_rx.recv() => {
251 match cmd {
252 Some(HandlerCommand::Close) => {
253 log::info!("Close received during backoff");
254 return Ok(());
255 }
256 None => {
257 log::debug!("Command channel closed during backoff");
258 return Ok(());
259 }
260 Some(cmd) => {
261 log::debug!("Buffering command received during backoff: {cmd:?}");
262 self.buffered_commands.push(cmd);
263 }
264 }
265 }
266 }
267 }
268 }
269 }
270 }
271
272 async fn run_session(&mut self, attempt: usize) -> anyhow::Result<bool> {
281 if attempt > 1 {
282 log::info!("Reconnecting (attempt {attempt})...");
283 }
284
285 let session_start = tokio::time::Instant::now();
286 let clock = get_atomic_clock_realtime();
287 let mut symbol_map = PitSymbolMap::new();
288 let mut instrument_id_map: AHashMap<u32, InstrumentId> = AHashMap::new();
289 let mut price_precision_map: AHashMap<u32, u8> = AHashMap::new();
290
291 let mut buffering_start = None;
292 let mut buffered_deltas: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
293 let mut initialized_books = HashSet::new();
294 let timeout = Duration::from_secs(5); let gateway_addr = self.gateway_addr.clone();
297 let api_key = self.credential.api_key().to_owned();
298 let dataset = self.dataset.clone();
299
300 let result = tokio::time::timeout(timeout, async move {
301 let base = databento::LiveClient::builder();
302 let base = if let Some(addr) = gateway_addr {
303 base.addr(addr).await?
304 } else {
305 base
306 };
307 base.user_agent_extension(NAUTILUS_USER_AGENT.into())
308 .key(api_key)?
309 .dataset(dataset)
310 .build()
311 .await
312 })
313 .await?;
314
315 let mut client = match result {
316 Ok(client) => {
317 if attempt > 1 {
318 log::info!("Reconnected successfully");
319 } else {
320 log::info!("Connected");
321 }
322 client
323 }
324 Err(e) => {
325 anyhow::bail!("Failed to connect to Databento LSG: {e}");
326 }
327 };
328
329 let mut start_buffered = false;
331
332 if !self.buffered_commands.is_empty() {
333 log::info!(
334 "Processing {} buffered commands",
335 self.buffered_commands.len()
336 );
337
338 for cmd in self.buffered_commands.drain(..) {
339 match cmd {
340 HandlerCommand::Subscribe(sub) => {
341 if !self.replay && sub.start.is_some() {
342 self.replay = true;
343 }
344 self.subscriptions.push(sub);
345 }
346 HandlerCommand::Start => {
347 start_buffered = true;
348 }
349 HandlerCommand::Close => {
350 log::warn!("Close command was buffered, shutting down");
351 return Ok(false);
352 }
353 }
354 }
355 }
356
357 let mut running = false;
358
359 if !self.subscriptions.is_empty() {
360 log::info!(
361 "Resubscribing to {} subscriptions",
362 self.subscriptions.len()
363 );
364
365 for sub in self.subscriptions.clone() {
366 client.subscribe(sub).await?;
367 }
368 for sub in &mut self.subscriptions {
370 sub.start = None;
371 }
372 client.start().await?;
373 running = true;
374 log::info!("Resubscription complete");
375 } else if start_buffered {
376 log::info!("Starting session from buffered Start command");
377 buffering_start = if self.replay {
378 Some(clock.get_time_ns())
379 } else {
380 None
381 };
382 client.start().await?;
383 running = true;
384 }
385
386 loop {
387 if self.msg_tx.is_closed() {
388 log::debug!("Message channel was closed: stopping");
389 return Ok(false);
390 }
391
392 if !running {
397 match self.cmd_rx.recv().await {
398 Some(HandlerCommand::Subscribe(sub)) => {
399 log::debug!("Received command: Subscribe");
400
401 if !self.replay && sub.start.is_some() {
402 self.replay = true;
403 }
404 client.subscribe(sub.clone()).await?;
405 let mut sub_for_reconnect = sub;
406 sub_for_reconnect.start = None;
407 self.subscriptions.push(sub_for_reconnect);
408 continue;
409 }
410 Some(HandlerCommand::Start) => {
411 log::debug!("Received command: Start");
412 buffering_start = if self.replay {
413 Some(clock.get_time_ns())
414 } else {
415 None
416 };
417 client.start().await?;
418 running = true;
419 continue;
420 }
421 Some(HandlerCommand::Close) => {
422 self.msg_tx.send(DatabentoMessage::Close).await?;
423 return Ok(false);
424 }
425 None => {
426 log::debug!("Command channel disconnected");
427 return Ok(false);
428 }
429 }
430 }
431
432 let record_opt = tokio::select! {
433 cmd = self.cmd_rx.recv() =>
434 match cmd {
435 Some(HandlerCommand::Subscribe(sub)) => {
436 log::debug!("Received command: Subscribe");
437
438 if sub.start.is_some() {
439 self.replay = true;
440 log::error!(
441 "Ignoring `start` on {} subscribe, session already running, Databento drops replay anchors sent after session start",
442 self.dataset,
443 );
444 }
445 client.subscribe(sub.clone()).await?;
446 let mut sub_for_reconnect = sub;
447 sub_for_reconnect.start = None;
448 self.subscriptions.push(sub_for_reconnect);
449 continue;
450 }
451 Some(HandlerCommand::Start) => {
452 log::warn!("Received Start command but session already running");
453 continue;
454 }
455 Some(HandlerCommand::Close) => {
456 self.msg_tx.send(DatabentoMessage::Close).await?;
457 client.close().await?;
458 log::debug!("Closed inner client");
459 return Ok(false);
460 }
461 None => {
462 log::debug!("Command channel disconnected");
463 return Ok(false);
464 }
465 },
466 result = client.next_record() => result,
467 };
468
469 let record = match record_opt {
470 Ok(Some(record)) => record,
471 Ok(None) => {
472 if session_start.elapsed() >= self.success_threshold {
473 log::info!("Session ended after successful run");
474 return Ok(true);
475 }
476 anyhow::bail!("Session ended by gateway");
477 }
478 Err(e) => {
479 if session_start.elapsed() >= self.success_threshold {
480 log::info!("Connection error after successful run: {e}");
481 return Ok(true);
482 }
483 anyhow::bail!("Connection error: {e}");
484 }
485 };
486
487 let ts_init = clock.get_time_ns();
488
489 if let Some(msg) = record.get::<dbn::ErrorMsg>() {
491 handle_error_msg(msg);
492 } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
493 if let Some(ack) = handle_system_msg(msg, ts_init) {
494 self.send_msg(DatabentoMessage::SubscriptionAck(ack)).await;
495 }
496 } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
497 instrument_id_map.remove(&msg.hd.instrument_id);
499 handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map)?;
500 } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
501 if self.use_exchange_as_venue {
502 let exchange = msg.exchange()?;
503 if !exchange.is_empty() {
504 update_instrument_id_map_with_exchange(
505 &symbol_map,
506 &self.symbol_venue_map,
507 &mut instrument_id_map,
508 msg.hd.instrument_id,
509 exchange,
510 )?;
511 }
512 }
513 let data = {
514 let sym_map = self.symbol_venue_map.load();
515 handle_instrument_def_msg(
516 msg,
517 &record,
518 &symbol_map,
519 &self.publisher_venue_map,
520 &sym_map,
521 &mut instrument_id_map,
522 ts_init,
523 )?
524 };
525 price_precision_map.insert(msg.hd.instrument_id, data.price_precision());
526 self.send_msg(DatabentoMessage::Instrument(Box::new(data)))
527 .await;
528 } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
529 let data = {
530 let sym_map = self.symbol_venue_map.load();
531 handle_status_msg(
532 msg,
533 &record,
534 &symbol_map,
535 &self.publisher_venue_map,
536 &sym_map,
537 &mut instrument_id_map,
538 ts_init,
539 )?
540 };
541 self.send_msg(DatabentoMessage::Status(data)).await;
542 } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
543 let data = {
544 let sym_map = self.symbol_venue_map.load();
545 handle_imbalance_msg(
546 msg,
547 &record,
548 &symbol_map,
549 &self.publisher_venue_map,
550 &sym_map,
551 &mut instrument_id_map,
552 &price_precision_map,
553 ts_init,
554 )?
555 };
556 self.send_msg(DatabentoMessage::Imbalance(data)).await;
557 } else if let Some(msg) = record.get::<dbn::StatMsg>() {
558 let data = {
559 let sym_map = self.symbol_venue_map.load();
560 handle_statistics_msg(
561 msg,
562 &record,
563 &symbol_map,
564 &self.publisher_venue_map,
565 &sym_map,
566 &mut instrument_id_map,
567 &price_precision_map,
568 ts_init,
569 )?
570 };
571 self.send_msg(DatabentoMessage::Statistics(data)).await;
572 } else {
573 let res = {
575 let sym_map = self.symbol_venue_map.load();
576 handle_record(
577 record,
578 &symbol_map,
579 &self.publisher_venue_map,
580 &sym_map,
581 &mut instrument_id_map,
582 &price_precision_map,
583 ts_init,
584 &initialized_books,
585 self.bars_timestamp_on_close,
586 )
587 };
588 let (mut data1, data2) = match res {
589 Ok(decoded) => decoded,
590 Err(e) => {
591 log::error!("Error decoding record: {e}");
592 continue;
593 }
594 };
595
596 if let Some(msg) = record.get::<dbn::MboMsg>() {
597 if let Some(Data::Delta(delta)) = &data1 {
598 initialized_books.insert(delta.instrument_id);
599 } else {
600 continue;
601 }
602
603 if let Some(Data::Delta(delta)) = &data1 {
604 log::trace!(
605 "Buffering delta: {} {buffering_start:?} flags={}",
606 delta.ts_event,
607 msg.flags.raw(),
608 );
609
610 match process_mbo_delta(
611 *delta,
612 msg.flags.raw(),
613 &mut buffering_start,
614 &mut buffered_deltas,
615 )? {
616 Some(deltas) => data1 = Some(Data::Deltas(deltas)),
617 None => continue,
618 }
619 }
620 }
621
622 if let Some(data) = data1 {
623 self.send_msg(DatabentoMessage::Data(data)).await;
624 }
625
626 if let Some(data) = data2 {
627 self.send_msg(DatabentoMessage::Data(data)).await;
628 }
629 }
630 }
631 }
632
633 async fn send_msg(&self, msg: DatabentoMessage) {
635 log::trace!("Sending {msg:?}");
636 match self.msg_tx.send(msg).await {
637 Ok(()) => {}
638 Err(e) => log::error!("Error sending message: {e}"),
639 }
640 }
641}
642
643fn handle_error_msg(msg: &dbn::ErrorMsg) {
645 log::error!("{msg:?}");
646}
647
648fn handle_system_msg(msg: &dbn::SystemMsg, ts_received: UnixNanos) -> Option<SubscriptionAckEvent> {
650 match msg.code() {
651 Ok(dbn::SystemCode::SubscriptionAck) => {
652 let message = msg.msg().unwrap_or("<invalid utf-8>");
653 log::info!("Subscription acknowledged: {message}");
654
655 let schema = parse_ack_message(message);
656
657 Some(SubscriptionAckEvent {
658 schema,
659 message: message.to_string(),
660 ts_received,
661 })
662 }
663 Ok(dbn::SystemCode::Heartbeat) => {
664 log::trace!("Heartbeat received");
665 None
666 }
667 Ok(dbn::SystemCode::SlowReaderWarning) => {
668 let message = msg.msg().unwrap_or("<invalid utf-8>");
669 log::warn!("Slow reader warning: {message}");
670 None
671 }
672 Ok(dbn::SystemCode::ReplayCompleted) => {
673 let message = msg.msg().unwrap_or("<invalid utf-8>");
674 log::info!("Replay completed: {message}");
675 None
676 }
677 _ => {
678 log::debug!("{msg:?}");
679 None
680 }
681 }
682}
683
684fn parse_ack_message(message: &str) -> String {
686 message
688 .strip_prefix("Subscription request ")
689 .and_then(|rest| rest.split_once(" for "))
690 .and_then(|(_, after_num)| after_num.strip_suffix(" data succeeded"))
691 .map(|schema| schema.trim().to_string())
692 .unwrap_or_default()
693}
694
695fn handle_symbol_mapping_msg(
701 msg: &dbn::SymbolMappingMsg,
702 symbol_map: &mut PitSymbolMap,
703 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
704) -> anyhow::Result<()> {
705 symbol_map
706 .on_symbol_mapping(msg)
707 .map_err(|e| anyhow::anyhow!("on_symbol_mapping failed for {msg:?}: {e}"))?;
708 instrument_id_map.remove(&msg.header().instrument_id);
709 Ok(())
710}
711
712fn update_instrument_id_map_with_exchange(
714 symbol_map: &PitSymbolMap,
715 symbol_venue_map: &AtomicMap<Symbol, Venue>,
716 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
717 raw_instrument_id: u32,
718 exchange: &str,
719) -> anyhow::Result<InstrumentId> {
720 let raw_symbol = symbol_map.get(raw_instrument_id).ok_or_else(|| {
721 anyhow::anyhow!("Cannot resolve raw_symbol for instrument_id {raw_instrument_id}")
722 })?;
723 let symbol = Symbol::from(raw_symbol.as_str());
724 let venue = Venue::from_code(exchange)
725 .map_err(|e| anyhow::anyhow!("Invalid venue code '{exchange}': {e}"))?;
726 let instrument_id = InstrumentId::new(symbol, venue);
727 symbol_venue_map.rcu(|m| {
728 m.entry(symbol).or_insert(venue);
729 });
730 instrument_id_map.insert(raw_instrument_id, instrument_id);
731 Ok(instrument_id)
732}
733
734fn update_instrument_id_map(
735 record: &dbn::RecordRef,
736 symbol_map: &PitSymbolMap,
737 publisher_venue_map: &IndexMap<PublisherId, Venue>,
738 symbol_venue_map: &AHashMap<Symbol, Venue>,
739 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
740) -> anyhow::Result<InstrumentId> {
741 let header = record.header();
742
743 if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
745 return Ok(instrument_id);
746 }
747
748 let raw_symbol = symbol_map.get_for_rec(record).ok_or_else(|| {
749 anyhow::anyhow!(
750 "Cannot resolve `raw_symbol` from `symbol_map` for instrument_id {}",
751 header.instrument_id
752 )
753 })?;
754
755 let symbol = Symbol::from_str_unchecked(raw_symbol);
756
757 let publisher_id = header.publisher_id;
758 let venue = if let Some(venue) = symbol_venue_map.get(&symbol) {
759 *venue
760 } else {
761 let venue = publisher_venue_map
762 .get(&publisher_id)
763 .ok_or_else(|| anyhow::anyhow!("No venue found for `publisher_id` {publisher_id}"))?;
764 *venue
765 };
766 let instrument_id = InstrumentId::new(symbol, venue);
767
768 instrument_id_map.insert(header.instrument_id, instrument_id);
769 Ok(instrument_id)
770}
771
772fn handle_instrument_def_msg(
778 msg: &dbn::InstrumentDefMsg,
779 record: &dbn::RecordRef,
780 symbol_map: &PitSymbolMap,
781 publisher_venue_map: &IndexMap<PublisherId, Venue>,
782 symbol_venue_map: &AHashMap<Symbol, Venue>,
783 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
784 ts_init: UnixNanos,
785) -> anyhow::Result<InstrumentAny> {
786 let instrument_id = update_instrument_id_map(
787 record,
788 symbol_map,
789 publisher_venue_map,
790 symbol_venue_map,
791 instrument_id_map,
792 )?;
793
794 decode_instrument_def_msg(msg, instrument_id, Some(ts_init))
795}
796
797fn handle_status_msg(
798 msg: &dbn::StatusMsg,
799 record: &dbn::RecordRef,
800 symbol_map: &PitSymbolMap,
801 publisher_venue_map: &IndexMap<PublisherId, Venue>,
802 symbol_venue_map: &AHashMap<Symbol, Venue>,
803 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
804 ts_init: UnixNanos,
805) -> anyhow::Result<InstrumentStatus> {
806 let instrument_id = update_instrument_id_map(
807 record,
808 symbol_map,
809 publisher_venue_map,
810 symbol_venue_map,
811 instrument_id_map,
812 )?;
813
814 decode_status_msg(msg, instrument_id, Some(ts_init))
815}
816
817#[expect(clippy::too_many_arguments)]
818fn handle_imbalance_msg(
819 msg: &dbn::ImbalanceMsg,
820 record: &dbn::RecordRef,
821 symbol_map: &PitSymbolMap,
822 publisher_venue_map: &IndexMap<PublisherId, Venue>,
823 symbol_venue_map: &AHashMap<Symbol, Venue>,
824 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
825 price_precision_map: &AHashMap<u32, u8>,
826 ts_init: UnixNanos,
827) -> anyhow::Result<DatabentoImbalance> {
828 let instrument_id = update_instrument_id_map(
829 record,
830 symbol_map,
831 publisher_venue_map,
832 symbol_venue_map,
833 instrument_id_map,
834 )?;
835
836 let price_precision = price_precision_map
837 .get(&msg.hd.instrument_id)
838 .copied()
839 .unwrap_or(2);
840
841 decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init))
842}
843
844#[expect(clippy::too_many_arguments)]
845fn handle_statistics_msg(
846 msg: &dbn::StatMsg,
847 record: &dbn::RecordRef,
848 symbol_map: &PitSymbolMap,
849 publisher_venue_map: &IndexMap<PublisherId, Venue>,
850 symbol_venue_map: &AHashMap<Symbol, Venue>,
851 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
852 price_precision_map: &AHashMap<u32, u8>,
853 ts_init: UnixNanos,
854) -> anyhow::Result<DatabentoStatistics> {
855 let instrument_id = update_instrument_id_map(
856 record,
857 symbol_map,
858 publisher_venue_map,
859 symbol_venue_map,
860 instrument_id_map,
861 )?;
862
863 let price_precision = price_precision_map
864 .get(&msg.hd.instrument_id)
865 .copied()
866 .unwrap_or(2);
867
868 decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
869}
870
871#[expect(clippy::too_many_arguments)]
872fn handle_record(
873 record: dbn::RecordRef,
874 symbol_map: &PitSymbolMap,
875 publisher_venue_map: &IndexMap<PublisherId, Venue>,
876 symbol_venue_map: &AHashMap<Symbol, Venue>,
877 instrument_id_map: &mut AHashMap<u32, InstrumentId>,
878 price_precision_map: &AHashMap<u32, u8>,
879 ts_init: UnixNanos,
880 initialized_books: &HashSet<InstrumentId>,
881 bars_timestamp_on_close: bool,
882) -> anyhow::Result<(Option<Data>, Option<Data>)> {
883 let instrument_id = update_instrument_id_map(
884 &record,
885 symbol_map,
886 publisher_venue_map,
887 symbol_venue_map,
888 instrument_id_map,
889 )?;
890
891 let price_precision = price_precision_map
892 .get(&record.header().instrument_id)
893 .copied()
894 .unwrap_or(2);
895
896 let include_trades = if record.get::<dbn::Mbp1Msg>().is_some()
899 || record.get::<dbn::TbboMsg>().is_some()
900 || record.get::<dbn::Cmbp1Msg>().is_some()
901 {
902 true } else {
904 initialized_books.contains(&instrument_id) };
906
907 decode_record(
908 &record,
909 instrument_id,
910 price_precision,
911 Some(ts_init),
912 include_trades,
913 bars_timestamp_on_close,
914 )
915}
916
917fn process_mbo_delta(
922 delta: OrderBookDelta,
923 flags: u8,
924 buffering_start: &mut Option<UnixNanos>,
925 buffered_deltas: &mut AHashMap<InstrumentId, Vec<OrderBookDelta>>,
926) -> anyhow::Result<Option<OrderBookDeltas_API>> {
927 let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
928 buffer.push(delta);
929
930 if !RecordFlag::F_LAST.matches(flags) {
931 return Ok(None);
932 }
933
934 if RecordFlag::F_SNAPSHOT.matches(flags) {
935 return Ok(None);
936 }
937
938 if let Some(start_ns) = *buffering_start {
939 if delta.ts_event <= start_ns {
940 return Ok(None);
941 }
942 *buffering_start = None;
943 }
944
945 let buffer = buffered_deltas
946 .remove(&delta.instrument_id)
947 .ok_or_else(|| {
948 anyhow::anyhow!(
949 "Internal error: no buffered deltas for instrument {id}",
950 id = delta.instrument_id
951 )
952 })?;
953 let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
954 Ok(Some(OrderBookDeltas_API::new(deltas)))
955}
956
957#[cfg(test)]
958mod tests {
959 use databento::live::Subscription;
960 use indexmap::IndexMap;
961 use rstest::*;
962 use time::macros::datetime;
963
964 use super::*;
965
966 fn create_test_handler(reconnect_timeout_mins: Option<u64>) -> DatabentoFeedHandler {
967 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
968 let (msg_tx, _msg_rx) = tokio::sync::mpsc::channel(100);
969
970 DatabentoFeedHandler::new(
971 Credential::new("test_key"),
972 "GLBX.MDP3".to_string(),
973 cmd_rx,
974 msg_tx,
975 IndexMap::new(),
976 Arc::new(AtomicMap::new()),
977 false,
978 false,
979 reconnect_timeout_mins,
980 )
981 }
982
983 #[rstest]
984 #[case(Some(10))]
985 #[case(None)]
986 fn test_backoff_initialization(#[case] reconnect_timeout_mins: Option<u64>) {
987 let handler = create_test_handler(reconnect_timeout_mins);
988
989 assert_eq!(handler.reconnect_timeout_mins, reconnect_timeout_mins);
990 assert!(handler.subscriptions.is_empty());
991 assert!(handler.buffered_commands.is_empty());
992 }
993
994 #[rstest]
995 fn test_subscription_with_and_without_start() {
996 let start_time = datetime!(2024-01-01 00:00:00 UTC);
997 let sub_with_start = Subscription::builder()
998 .symbols("ES.FUT")
999 .schema(databento::dbn::Schema::Mbp1)
1000 .start(start_time)
1001 .build();
1002
1003 let mut sub_without_start = sub_with_start.clone();
1004 sub_without_start.start = None;
1005
1006 assert!(sub_with_start.start.is_some());
1007 assert!(sub_without_start.start.is_none());
1008 assert_eq!(sub_with_start.schema, sub_without_start.schema);
1009 assert_eq!(sub_with_start.symbols, sub_without_start.symbols);
1010 }
1011
1012 #[rstest]
1013 fn test_handler_initialization_state() {
1014 let handler = create_test_handler(Some(10));
1015
1016 assert!(!handler.replay);
1017 assert_eq!(handler.dataset, "GLBX.MDP3");
1018 assert_eq!(handler.credential.api_key(), "test_key");
1019 assert!(handler.subscriptions.is_empty());
1020 assert!(handler.buffered_commands.is_empty());
1021 }
1022
1023 #[rstest]
1024 fn test_handler_with_no_timeout() {
1025 let handler = create_test_handler(None);
1026
1027 assert_eq!(handler.reconnect_timeout_mins, None);
1028 assert!(!handler.replay);
1029 }
1030
1031 #[rstest]
1032 fn test_handler_with_zero_timeout() {
1033 let handler = create_test_handler(Some(0));
1034
1035 assert_eq!(handler.reconnect_timeout_mins, Some(0));
1036 assert!(!handler.replay);
1037 }
1038
1039 fn test_delta(instrument_id: InstrumentId, ts_event: u64) -> OrderBookDelta {
1040 OrderBookDelta::clear(instrument_id, 0, ts_event.into(), 0.into())
1041 }
1042
1043 #[rstest]
1044 fn test_mbo_delta_without_f_last_buffers() {
1045 let instrument_id = InstrumentId::from("ESM4.GLBX");
1046 let delta = test_delta(instrument_id, 1_000_000_000);
1047 let mut buffering_start = None;
1048 let mut buffered = AHashMap::new();
1049
1050 let result = process_mbo_delta(delta, 0, &mut buffering_start, &mut buffered).unwrap();
1051
1052 assert!(result.is_none());
1053 assert_eq!(buffered[&instrument_id].len(), 1);
1054 }
1055
1056 #[rstest]
1057 fn test_mbo_delta_with_f_last_emits() {
1058 let instrument_id = InstrumentId::from("ESM4.GLBX");
1059 let mut buffering_start = None;
1060 let mut buffered = AHashMap::new();
1061
1062 process_mbo_delta(
1063 test_delta(instrument_id, 1_000_000_000),
1064 0,
1065 &mut buffering_start,
1066 &mut buffered,
1067 )
1068 .unwrap();
1069
1070 let result = process_mbo_delta(
1071 test_delta(instrument_id, 2_000_000_000),
1072 128, &mut buffering_start,
1074 &mut buffered,
1075 )
1076 .unwrap();
1077
1078 assert!(result.is_some());
1079 assert_eq!(result.unwrap().deltas.len(), 2);
1080 assert!(buffered.is_empty());
1081 }
1082
1083 #[rstest]
1084 fn test_mbo_snapshot_with_f_last_buffers() {
1085 let instrument_id = InstrumentId::from("ESM4.GLBX");
1086 let mut buffering_start = None;
1087 let mut buffered = AHashMap::new();
1088
1089 let result = process_mbo_delta(
1090 test_delta(instrument_id, 1_000_000_000),
1091 128 | 32, &mut buffering_start,
1093 &mut buffered,
1094 )
1095 .unwrap();
1096
1097 assert!(result.is_none());
1098 assert_eq!(buffered[&instrument_id].len(), 1);
1099 }
1100
1101 #[rstest]
1102 fn test_mbo_replay_buffers_until_past_start() {
1103 let instrument_id = InstrumentId::from("ESM4.GLBX");
1104 let start_ns = 5_000_000_000u64;
1105 let mut buffering_start = Some(start_ns.into());
1106 let mut buffered = AHashMap::new();
1107
1108 let result = process_mbo_delta(
1109 test_delta(instrument_id, 4_000_000_000),
1110 128, &mut buffering_start,
1112 &mut buffered,
1113 )
1114 .unwrap();
1115 assert!(result.is_none());
1116
1117 let result = process_mbo_delta(
1118 test_delta(instrument_id, 5_000_000_000),
1119 128,
1120 &mut buffering_start,
1121 &mut buffered,
1122 )
1123 .unwrap();
1124 assert!(result.is_none());
1125
1126 let result = process_mbo_delta(
1128 test_delta(instrument_id, 6_000_000_000),
1129 128,
1130 &mut buffering_start,
1131 &mut buffered,
1132 )
1133 .unwrap();
1134 assert!(result.is_some());
1135 assert!(buffering_start.is_none());
1136 }
1137
1138 #[rstest]
1139 fn test_mbo_multiple_deltas_accumulated() {
1140 let instrument_id = InstrumentId::from("ESM4.GLBX");
1141 let mut buffering_start = None;
1142 let mut buffered = AHashMap::new();
1143
1144 for i in 0..5 {
1145 process_mbo_delta(
1146 test_delta(instrument_id, 1_000_000_000 + i),
1147 0,
1148 &mut buffering_start,
1149 &mut buffered,
1150 )
1151 .unwrap();
1152 }
1153
1154 let result = process_mbo_delta(
1155 test_delta(instrument_id, 2_000_000_000),
1156 128,
1157 &mut buffering_start,
1158 &mut buffered,
1159 )
1160 .unwrap();
1161
1162 assert!(result.is_some());
1163 assert_eq!(result.unwrap().deltas.len(), 6);
1164 }
1165
1166 #[rstest]
1167 fn test_mbo_multi_instrument_isolation() {
1168 let id_a = InstrumentId::from("ESM4.GLBX");
1169 let id_b = InstrumentId::from("NQM4.GLBX");
1170 let mut buffering_start = None;
1171 let mut buffered = AHashMap::new();
1172
1173 process_mbo_delta(
1174 test_delta(id_a, 1_000_000_000),
1175 0,
1176 &mut buffering_start,
1177 &mut buffered,
1178 )
1179 .unwrap();
1180 process_mbo_delta(
1181 test_delta(id_b, 1_000_000_000),
1182 0,
1183 &mut buffering_start,
1184 &mut buffered,
1185 )
1186 .unwrap();
1187
1188 let result = process_mbo_delta(
1190 test_delta(id_a, 2_000_000_000),
1191 128,
1192 &mut buffering_start,
1193 &mut buffered,
1194 )
1195 .unwrap();
1196
1197 assert!(result.is_some());
1198 assert_eq!(result.unwrap().instrument_id, id_a);
1199 assert!(buffered.contains_key(&id_b));
1200 assert!(!buffered.contains_key(&id_a));
1201 }
1202
1203 mod property_tests {
1204 use proptest::prelude::*;
1205 use rstest::rstest;
1206
1207 use super::*;
1208
1209 proptest! {
1210 #[rstest]
1211 fn mbo_buffering_conserves_deltas(
1212 num_non_last in 0usize..=20,
1213 ) {
1214 let instrument_id = InstrumentId::from("ESM4.GLBX");
1215 let mut buffering_start = None;
1216 let mut buffered = AHashMap::new();
1217 let total = num_non_last + 1;
1218
1219 for i in 0..num_non_last {
1220 let result = process_mbo_delta(
1221 test_delta(instrument_id, 1_000_000_000 + i as u64),
1222 0, &mut buffering_start,
1224 &mut buffered,
1225 ).unwrap();
1226 prop_assert!(result.is_none());
1227 }
1228
1229 let result = process_mbo_delta(
1230 test_delta(instrument_id, 2_000_000_000),
1231 128, &mut buffering_start,
1233 &mut buffered,
1234 ).unwrap();
1235
1236 prop_assert!(result.is_some());
1237 let emitted = result.unwrap();
1238 prop_assert_eq!(emitted.deltas.len(), total);
1239 prop_assert!(buffered.is_empty());
1240 }
1241
1242 #[rstest]
1243 fn mbo_snapshots_never_emit(
1244 num_snapshots in 1usize..=20,
1245 ) {
1246 let instrument_id = InstrumentId::from("ESM4.GLBX");
1247 let mut buffering_start = None;
1248 let mut buffered = AHashMap::new();
1249
1250 for i in 0..num_snapshots {
1251 let result = process_mbo_delta(
1252 test_delta(instrument_id, 1_000_000_000 + i as u64),
1253 128 | 32, &mut buffering_start,
1255 &mut buffered,
1256 ).unwrap();
1257 prop_assert!(result.is_none());
1258 }
1259
1260 prop_assert_eq!(buffered[&instrument_id].len(), num_snapshots);
1261 }
1262
1263 #[rstest]
1264 fn mbo_replay_delays_emission(
1265 start_offset in 1u64..=100,
1266 num_before in 1usize..=10,
1267 ) {
1268 let instrument_id = InstrumentId::from("ESM4.GLBX");
1269 let start_ns = 1_000_000_000u64 * start_offset;
1270 let mut buffering_start = Some(start_ns.into());
1271 let mut buffered = AHashMap::new();
1272
1273 for i in 0..num_before {
1274 let ts = start_ns - (num_before as u64 - i as u64);
1275 let result = process_mbo_delta(
1276 test_delta(instrument_id, ts),
1277 128, &mut buffering_start,
1279 &mut buffered,
1280 ).unwrap();
1281 prop_assert!(result.is_none());
1282 }
1283
1284 let result = process_mbo_delta(
1285 test_delta(instrument_id, start_ns + 1),
1286 128,
1287 &mut buffering_start,
1288 &mut buffered,
1289 ).unwrap();
1290
1291 prop_assert!(result.is_some());
1292 prop_assert!(buffering_start.is_none());
1293 let total = num_before + 1;
1294 prop_assert_eq!(result.unwrap().deltas.len(), total);
1295 }
1296 }
1297 }
1298}