Skip to main content

nautilus_databento/
live.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Databento live feed handler.
17//!
18//! The feed handler runs a single async task per dataset. It receives
19//! [`HandlerCommand`] messages over an unbounded channel and streams decoded
20//! market data back as [`DatabentoMessage`]s on a bounded tokio channel.
21//!
22//! The inner loop uses `tokio::select!` to concurrently await the next record
23//! from the Databento gateway and the next command from the engine, giving
24//! near-zero idle CPU and immediate command responsiveness.
25//!
26//! Heartbeat detection is delegated to the upstream `databento` client, which
27//! returns `Error::HeartbeatTimeout` when no data arrives within
28//! `heartbeat_interval + 5 s` (default 35 s). The handler treats this as a
29//! connection error and enters the reconnection backoff loop.
30
31use std::{fmt::Debug, sync::Arc, time::Duration};
32
33use ahash::{AHashMap, HashSet, HashSetExt};
34use databento::{
35    dbn::{self, PitSymbolMap, Record, SymbolIndex},
36    live::Subscription,
37};
38use indexmap::IndexMap;
39use nautilus_core::{
40    AtomicMap, UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43    data::{Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
44    enums::RecordFlag,
45    identifiers::{InstrumentId, Symbol, Venue},
46    instruments::{Instrument, InstrumentAny},
47};
48use nautilus_network::backoff::ExponentialBackoff;
49
50use super::{
51    decode::{decode_imbalance_msg, decode_statistics_msg, decode_status_msg},
52    types::{DatabentoImbalance, DatabentoStatistics, SubscriptionAckEvent},
53};
54use crate::{
55    common::Credential,
56    decode::{decode_instrument_def_msg, decode_record},
57    types::PublisherId,
58};
59
60#[derive(Debug)]
61pub enum HandlerCommand {
62    Subscribe(Subscription),
63    Start,
64    Close,
65}
66
67#[derive(Debug)]
68pub enum DatabentoMessage {
69    Data(Data),
70    Instrument(Box<InstrumentAny>),
71    Status(InstrumentStatus),
72    Imbalance(DatabentoImbalance),
73    Statistics(DatabentoStatistics),
74    SubscriptionAck(SubscriptionAckEvent),
75    Error(anyhow::Error),
76    Close,
77}
78
79/// Handles a raw TCP data feed from the Databento LSG for a single dataset.
80///
81/// [`HandlerCommand`] messages are received synchronously across a channel,
82/// decoded records are sent asynchronously on a tokio channel as [`DatabentoMessage`]s
83/// back to a message processing task.
84///
85/// # Crash Policy
86///
87/// This handler intentionally crashes on catastrophic feed issues rather than
88/// attempting recovery. If excessive buffering occurs (indicating severe feed
89/// misbehavior), the process will run out of memory and terminate. This is by
90/// design - such scenarios indicate fundamental problems that require external
91/// intervention.
92pub struct DatabentoFeedHandler {
93    credential: Credential,
94    dataset: String,
95    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
96    msg_tx: tokio::sync::mpsc::Sender<DatabentoMessage>,
97    publisher_venue_map: IndexMap<PublisherId, Venue>,
98    symbol_venue_map: Arc<AtomicMap<Symbol, Venue>>,
99    replay: bool,
100    use_exchange_as_venue: bool,
101    bars_timestamp_on_close: bool,
102    reconnect_timeout_mins: Option<u64>,
103    backoff: ExponentialBackoff,
104    subscriptions: Vec<Subscription>,
105    buffered_commands: Vec<HandlerCommand>,
106    gateway_addr: Option<String>,
107    success_threshold: Duration,
108}
109
110impl Debug for DatabentoFeedHandler {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        f.debug_struct(stringify!(DatabentoFeedHandler))
113            .field("credential", &self.credential)
114            .field("dataset", &self.dataset)
115            .field("replay", &self.replay)
116            .field("reconnect_timeout_mins", &self.reconnect_timeout_mins)
117            .field("subscriptions", &self.subscriptions.len())
118            .finish()
119    }
120}
121
122impl DatabentoFeedHandler {
123    /// Creates a new [`DatabentoFeedHandler`] instance.
124    ///
125    /// # Panics
126    ///
127    /// Panics if exponential backoff creation fails (should never happen with valid hardcoded parameters).
128    #[must_use]
129    #[expect(clippy::too_many_arguments)]
130    pub fn new(
131        credential: Credential,
132        dataset: String,
133        rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
134        tx: tokio::sync::mpsc::Sender<DatabentoMessage>,
135        publisher_venue_map: IndexMap<PublisherId, Venue>,
136        symbol_venue_map: Arc<AtomicMap<Symbol, Venue>>,
137        use_exchange_as_venue: bool,
138        bars_timestamp_on_close: bool,
139        reconnect_timeout_mins: Option<u64>,
140    ) -> Self {
141        // Choose max delay based on timeout configuration:
142        // - With timeout: 60s max (quick recovery to reconnect within window)
143        // - Without timeout (None): 600s max (patient recovery, respectful of infrastructure)
144        let delay_max = if reconnect_timeout_mins.is_some() {
145            Duration::from_secs(60)
146        } else {
147            Duration::from_secs(600)
148        };
149
150        let backoff = ExponentialBackoff::new(Duration::from_secs(1), delay_max, 2.0, 1000, false)
151            .expect("hardcoded backoff parameters are valid");
152
153        Self {
154            credential,
155            dataset,
156            cmd_rx: rx,
157            msg_tx: tx,
158            publisher_venue_map,
159            symbol_venue_map,
160            replay: false,
161            use_exchange_as_venue,
162            bars_timestamp_on_close,
163            reconnect_timeout_mins,
164            backoff,
165            subscriptions: Vec::new(),
166            buffered_commands: Vec::new(),
167            gateway_addr: None,
168            success_threshold: Duration::from_secs(60),
169        }
170    }
171
172    /// Sets a custom gateway address, overriding the default Databento LSG endpoint.
173    #[must_use]
174    pub fn with_gateway_addr(mut self, addr: String) -> Self {
175        self.gateway_addr = Some(addr);
176        self
177    }
178
179    /// Sets the duration a session must run before it counts as successful.
180    ///
181    /// A successful session resets the reconnection backoff cycle.
182    /// Defaults to 60 seconds.
183    #[must_use]
184    pub fn with_success_threshold(mut self, threshold: Duration) -> Self {
185        self.success_threshold = threshold;
186        self
187    }
188
189    /// Runs the feed handler main loop, processing commands and streaming market data.
190    ///
191    /// Establishes a connection to the Databento LSG, subscribes to requested data feeds,
192    /// and continuously processes incoming market data messages until shutdown.
193    ///
194    /// Implements automatic reconnection with exponential backoff (1s to 60s with jitter).
195    /// Each successful session resets the reconnection cycle, giving the next disconnect
196    /// a fresh timeout window. Gives up after `reconnect_timeout_mins` if configured.
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if any client operation or message handling fails.
201    pub async fn run(&mut self) -> anyhow::Result<()> {
202        log::debug!("Running feed handler");
203
204        let mut reconnect_start: Option<tokio::time::Instant> = None;
205        let mut attempt = 0;
206
207        loop {
208            attempt += 1;
209
210            match self.run_session(attempt).await {
211                Ok(ran_successfully) => {
212                    if ran_successfully {
213                        log::info!("Resetting reconnection cycle after successful session");
214                        reconnect_start = None;
215                        attempt = 0;
216                        self.backoff.reset();
217                    } else {
218                        log::info!("Session ended normally");
219                        break Ok(());
220                    }
221                }
222                Err(e) => {
223                    let cycle_start = reconnect_start.get_or_insert_with(tokio::time::Instant::now);
224
225                    if let Some(timeout_mins) = self.reconnect_timeout_mins {
226                        let elapsed = cycle_start.elapsed();
227                        let timeout = Duration::from_mins(timeout_mins);
228
229                        if elapsed >= timeout {
230                            log::error!("Giving up reconnection after {timeout_mins} minutes");
231                            self.send_msg(DatabentoMessage::Error(anyhow::anyhow!(
232                                "Reconnection timeout after {timeout_mins} minutes: {e}"
233                            )))
234                            .await;
235                            break Err(e);
236                        }
237                    }
238
239                    let delay = self.backoff.next_duration();
240
241                    log::warn!(
242                        "Connection lost (attempt {}): {}. Reconnecting in {}s...",
243                        attempt,
244                        e,
245                        delay.as_secs()
246                    );
247
248                    tokio::select! {
249                        () = tokio::time::sleep(delay) => {}
250                        cmd = self.cmd_rx.recv() => {
251                            match cmd {
252                                Some(HandlerCommand::Close) => {
253                                    log::info!("Close received during backoff");
254                                    return Ok(());
255                                }
256                                None => {
257                                    log::debug!("Command channel closed during backoff");
258                                    return Ok(());
259                                }
260                                Some(cmd) => {
261                                    log::debug!("Buffering command received during backoff: {cmd:?}");
262                                    self.buffered_commands.push(cmd);
263                                }
264                            }
265                        }
266                    }
267                }
268            }
269        }
270    }
271
272    /// Runs a single session, handling connection, subscriptions, and data streaming.
273    ///
274    /// Returns `Ok(bool)` where the bool indicates if the session ran successfully
275    /// for a meaningful duration (true) or was intentionally closed (false).
276    ///
277    /// # Errors
278    ///
279    /// Returns an error if connection fails, subscription fails, or data streaming encounters an error.
280    async fn run_session(&mut self, attempt: usize) -> anyhow::Result<bool> {
281        if attempt > 1 {
282            log::info!("Reconnecting (attempt {attempt})...");
283        }
284
285        let session_start = tokio::time::Instant::now();
286        let clock = get_atomic_clock_realtime();
287        let mut symbol_map = PitSymbolMap::new();
288        let mut instrument_id_map: AHashMap<u32, InstrumentId> = AHashMap::new();
289        let mut price_precision_map: AHashMap<u32, u8> = AHashMap::new();
290
291        let mut buffering_start = None;
292        let mut buffered_deltas: AHashMap<InstrumentId, Vec<OrderBookDelta>> = AHashMap::new();
293        let mut initialized_books = HashSet::new();
294        let timeout = Duration::from_secs(5); // Hardcoded timeout for now
295
296        let gateway_addr = self.gateway_addr.clone();
297        let api_key = self.credential.api_key().to_owned();
298        let dataset = self.dataset.clone();
299
300        let result = tokio::time::timeout(timeout, async move {
301            let base = databento::LiveClient::builder();
302            let base = if let Some(addr) = gateway_addr {
303                base.addr(addr).await?
304            } else {
305                base
306            };
307            base.user_agent_extension(NAUTILUS_USER_AGENT.into())
308                .key(api_key)?
309                .dataset(dataset)
310                .build()
311                .await
312        })
313        .await?;
314
315        let mut client = match result {
316            Ok(client) => {
317                if attempt > 1 {
318                    log::info!("Reconnected successfully");
319                } else {
320                    log::info!("Connected");
321                }
322                client
323            }
324            Err(e) => {
325                anyhow::bail!("Failed to connect to Databento LSG: {e}");
326            }
327        };
328
329        // Process any commands buffered during reconnection backoff
330        let mut start_buffered = false;
331
332        if !self.buffered_commands.is_empty() {
333            log::info!(
334                "Processing {} buffered commands",
335                self.buffered_commands.len()
336            );
337
338            for cmd in self.buffered_commands.drain(..) {
339                match cmd {
340                    HandlerCommand::Subscribe(sub) => {
341                        if !self.replay && sub.start.is_some() {
342                            self.replay = true;
343                        }
344                        self.subscriptions.push(sub);
345                    }
346                    HandlerCommand::Start => {
347                        start_buffered = true;
348                    }
349                    HandlerCommand::Close => {
350                        log::warn!("Close command was buffered, shutting down");
351                        return Ok(false);
352                    }
353                }
354            }
355        }
356
357        let mut running = false;
358
359        if !self.subscriptions.is_empty() {
360            log::info!(
361                "Resubscribing to {} subscriptions",
362                self.subscriptions.len()
363            );
364
365            for sub in self.subscriptions.clone() {
366                client.subscribe(sub).await?;
367            }
368            // Strip start timestamps after successful subscription to avoid replaying history on future reconnects
369            for sub in &mut self.subscriptions {
370                sub.start = None;
371            }
372            client.start().await?;
373            running = true;
374            log::info!("Resubscription complete");
375        } else if start_buffered {
376            log::info!("Starting session from buffered Start command");
377            buffering_start = if self.replay {
378                Some(clock.get_time_ns())
379            } else {
380                None
381            };
382            client.start().await?;
383            running = true;
384        }
385
386        loop {
387            if self.msg_tx.is_closed() {
388                log::debug!("Message channel was closed: stopping");
389                return Ok(false);
390            }
391
392            // Wait for either a command or a record. When the session has not
393            // started yet (`!running`), only commands are awaited. Once running,
394            // `next_record` is cancel-safe so `tokio::select!` can safely
395            // race both futures.
396            if !running {
397                match self.cmd_rx.recv().await {
398                    Some(HandlerCommand::Subscribe(sub)) => {
399                        log::debug!("Received command: Subscribe");
400
401                        if !self.replay && sub.start.is_some() {
402                            self.replay = true;
403                        }
404                        client.subscribe(sub.clone()).await?;
405                        let mut sub_for_reconnect = sub;
406                        sub_for_reconnect.start = None;
407                        self.subscriptions.push(sub_for_reconnect);
408                        continue;
409                    }
410                    Some(HandlerCommand::Start) => {
411                        log::debug!("Received command: Start");
412                        buffering_start = if self.replay {
413                            Some(clock.get_time_ns())
414                        } else {
415                            None
416                        };
417                        client.start().await?;
418                        running = true;
419                        continue;
420                    }
421                    Some(HandlerCommand::Close) => {
422                        self.msg_tx.send(DatabentoMessage::Close).await?;
423                        return Ok(false);
424                    }
425                    None => {
426                        log::debug!("Command channel disconnected");
427                        return Ok(false);
428                    }
429                }
430            }
431
432            let record_opt = tokio::select! {
433                cmd = self.cmd_rx.recv() =>
434                match cmd {
435                    Some(HandlerCommand::Subscribe(sub)) => {
436                        log::debug!("Received command: Subscribe");
437
438                        if sub.start.is_some() {
439                            self.replay = true;
440                            log::error!(
441                                "Ignoring `start` on {} subscribe, session already running, Databento drops replay anchors sent after session start",
442                                self.dataset,
443                            );
444                        }
445                        client.subscribe(sub.clone()).await?;
446                        let mut sub_for_reconnect = sub;
447                        sub_for_reconnect.start = None;
448                        self.subscriptions.push(sub_for_reconnect);
449                        continue;
450                    }
451                    Some(HandlerCommand::Start) => {
452                        log::warn!("Received Start command but session already running");
453                        continue;
454                    }
455                    Some(HandlerCommand::Close) => {
456                        self.msg_tx.send(DatabentoMessage::Close).await?;
457                        client.close().await?;
458                        log::debug!("Closed inner client");
459                        return Ok(false);
460                    }
461                    None => {
462                        log::debug!("Command channel disconnected");
463                        return Ok(false);
464                    }
465                },
466                result = client.next_record() => result,
467            };
468
469            let record = match record_opt {
470                Ok(Some(record)) => record,
471                Ok(None) => {
472                    if session_start.elapsed() >= self.success_threshold {
473                        log::info!("Session ended after successful run");
474                        return Ok(true);
475                    }
476                    anyhow::bail!("Session ended by gateway");
477                }
478                Err(e) => {
479                    if session_start.elapsed() >= self.success_threshold {
480                        log::info!("Connection error after successful run: {e}");
481                        return Ok(true);
482                    }
483                    anyhow::bail!("Connection error: {e}");
484                }
485            };
486
487            let ts_init = clock.get_time_ns();
488
489            // Decode record
490            if let Some(msg) = record.get::<dbn::ErrorMsg>() {
491                handle_error_msg(msg);
492            } else if let Some(msg) = record.get::<dbn::SystemMsg>() {
493                if let Some(ack) = handle_system_msg(msg, ts_init) {
494                    self.send_msg(DatabentoMessage::SubscriptionAck(ack)).await;
495                }
496            } else if let Some(msg) = record.get::<dbn::SymbolMappingMsg>() {
497                // Remove instrument ID index as the raw symbol may have changed
498                instrument_id_map.remove(&msg.hd.instrument_id);
499                handle_symbol_mapping_msg(msg, &mut symbol_map, &mut instrument_id_map)?;
500            } else if let Some(msg) = record.get::<dbn::InstrumentDefMsg>() {
501                if self.use_exchange_as_venue {
502                    let exchange = msg.exchange()?;
503                    if !exchange.is_empty() {
504                        update_instrument_id_map_with_exchange(
505                            &symbol_map,
506                            &self.symbol_venue_map,
507                            &mut instrument_id_map,
508                            msg.hd.instrument_id,
509                            exchange,
510                        )?;
511                    }
512                }
513                let data = {
514                    let sym_map = self.symbol_venue_map.load();
515                    handle_instrument_def_msg(
516                        msg,
517                        &record,
518                        &symbol_map,
519                        &self.publisher_venue_map,
520                        &sym_map,
521                        &mut instrument_id_map,
522                        ts_init,
523                    )?
524                };
525                price_precision_map.insert(msg.hd.instrument_id, data.price_precision());
526                self.send_msg(DatabentoMessage::Instrument(Box::new(data)))
527                    .await;
528            } else if let Some(msg) = record.get::<dbn::StatusMsg>() {
529                let data = {
530                    let sym_map = self.symbol_venue_map.load();
531                    handle_status_msg(
532                        msg,
533                        &record,
534                        &symbol_map,
535                        &self.publisher_venue_map,
536                        &sym_map,
537                        &mut instrument_id_map,
538                        ts_init,
539                    )?
540                };
541                self.send_msg(DatabentoMessage::Status(data)).await;
542            } else if let Some(msg) = record.get::<dbn::ImbalanceMsg>() {
543                let data = {
544                    let sym_map = self.symbol_venue_map.load();
545                    handle_imbalance_msg(
546                        msg,
547                        &record,
548                        &symbol_map,
549                        &self.publisher_venue_map,
550                        &sym_map,
551                        &mut instrument_id_map,
552                        &price_precision_map,
553                        ts_init,
554                    )?
555                };
556                self.send_msg(DatabentoMessage::Imbalance(data)).await;
557            } else if let Some(msg) = record.get::<dbn::StatMsg>() {
558                let data = {
559                    let sym_map = self.symbol_venue_map.load();
560                    handle_statistics_msg(
561                        msg,
562                        &record,
563                        &symbol_map,
564                        &self.publisher_venue_map,
565                        &sym_map,
566                        &mut instrument_id_map,
567                        &price_precision_map,
568                        ts_init,
569                    )?
570                };
571                self.send_msg(DatabentoMessage::Statistics(data)).await;
572            } else {
573                // Decode a generic record with possible errors
574                let res = {
575                    let sym_map = self.symbol_venue_map.load();
576                    handle_record(
577                        record,
578                        &symbol_map,
579                        &self.publisher_venue_map,
580                        &sym_map,
581                        &mut instrument_id_map,
582                        &price_precision_map,
583                        ts_init,
584                        &initialized_books,
585                        self.bars_timestamp_on_close,
586                    )
587                };
588                let (mut data1, data2) = match res {
589                    Ok(decoded) => decoded,
590                    Err(e) => {
591                        log::error!("Error decoding record: {e}");
592                        continue;
593                    }
594                };
595
596                if let Some(msg) = record.get::<dbn::MboMsg>() {
597                    if let Some(Data::Delta(delta)) = &data1 {
598                        initialized_books.insert(delta.instrument_id);
599                    } else {
600                        continue;
601                    }
602
603                    if let Some(Data::Delta(delta)) = &data1 {
604                        log::trace!(
605                            "Buffering delta: {} {buffering_start:?} flags={}",
606                            delta.ts_event,
607                            msg.flags.raw(),
608                        );
609
610                        match process_mbo_delta(
611                            *delta,
612                            msg.flags.raw(),
613                            &mut buffering_start,
614                            &mut buffered_deltas,
615                        )? {
616                            Some(deltas) => data1 = Some(Data::Deltas(deltas)),
617                            None => continue,
618                        }
619                    }
620                }
621
622                if let Some(data) = data1 {
623                    self.send_msg(DatabentoMessage::Data(data)).await;
624                }
625
626                if let Some(data) = data2 {
627                    self.send_msg(DatabentoMessage::Data(data)).await;
628                }
629            }
630        }
631    }
632
633    /// Sends a message to the message processing task.
634    async fn send_msg(&self, msg: DatabentoMessage) {
635        log::trace!("Sending {msg:?}");
636        match self.msg_tx.send(msg).await {
637            Ok(()) => {}
638            Err(e) => log::error!("Error sending message: {e}"),
639        }
640    }
641}
642
643/// Handles Databento error messages by logging them.
644fn handle_error_msg(msg: &dbn::ErrorMsg) {
645    log::error!("{msg:?}");
646}
647
648/// Handles Databento system messages, returning a subscription ack event if applicable.
649fn handle_system_msg(msg: &dbn::SystemMsg, ts_received: UnixNanos) -> Option<SubscriptionAckEvent> {
650    match msg.code() {
651        Ok(dbn::SystemCode::SubscriptionAck) => {
652            let message = msg.msg().unwrap_or("<invalid utf-8>");
653            log::info!("Subscription acknowledged: {message}");
654
655            let schema = parse_ack_message(message);
656
657            Some(SubscriptionAckEvent {
658                schema,
659                message: message.to_string(),
660                ts_received,
661            })
662        }
663        Ok(dbn::SystemCode::Heartbeat) => {
664            log::trace!("Heartbeat received");
665            None
666        }
667        Ok(dbn::SystemCode::SlowReaderWarning) => {
668            let message = msg.msg().unwrap_or("<invalid utf-8>");
669            log::warn!("Slow reader warning: {message}");
670            None
671        }
672        Ok(dbn::SystemCode::ReplayCompleted) => {
673            let message = msg.msg().unwrap_or("<invalid utf-8>");
674            log::info!("Replay completed: {message}");
675            None
676        }
677        _ => {
678            log::debug!("{msg:?}");
679            None
680        }
681    }
682}
683
684/// Parses a subscription ack message to extract the schema.
685fn parse_ack_message(message: &str) -> String {
686    // Format: "Subscription request N for <schema> data succeeded"
687    message
688        .strip_prefix("Subscription request ")
689        .and_then(|rest| rest.split_once(" for "))
690        .and_then(|(_, after_num)| after_num.strip_suffix(" data succeeded"))
691        .map(|schema| schema.trim().to_string())
692        .unwrap_or_default()
693}
694
695/// Handles symbol mapping messages and updates the instrument ID map.
696///
697/// # Errors
698///
699/// Returns an error if symbol mapping fails.
700fn handle_symbol_mapping_msg(
701    msg: &dbn::SymbolMappingMsg,
702    symbol_map: &mut PitSymbolMap,
703    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
704) -> anyhow::Result<()> {
705    symbol_map
706        .on_symbol_mapping(msg)
707        .map_err(|e| anyhow::anyhow!("on_symbol_mapping failed for {msg:?}: {e}"))?;
708    instrument_id_map.remove(&msg.header().instrument_id);
709    Ok(())
710}
711
712/// Updates the instrument ID map using exchange information from the symbol map.
713fn update_instrument_id_map_with_exchange(
714    symbol_map: &PitSymbolMap,
715    symbol_venue_map: &AtomicMap<Symbol, Venue>,
716    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
717    raw_instrument_id: u32,
718    exchange: &str,
719) -> anyhow::Result<InstrumentId> {
720    let raw_symbol = symbol_map.get(raw_instrument_id).ok_or_else(|| {
721        anyhow::anyhow!("Cannot resolve raw_symbol for instrument_id {raw_instrument_id}")
722    })?;
723    let symbol = Symbol::from(raw_symbol.as_str());
724    let venue = Venue::from_code(exchange)
725        .map_err(|e| anyhow::anyhow!("Invalid venue code '{exchange}': {e}"))?;
726    let instrument_id = InstrumentId::new(symbol, venue);
727    symbol_venue_map.rcu(|m| {
728        m.entry(symbol).or_insert(venue);
729    });
730    instrument_id_map.insert(raw_instrument_id, instrument_id);
731    Ok(instrument_id)
732}
733
734fn update_instrument_id_map(
735    record: &dbn::RecordRef,
736    symbol_map: &PitSymbolMap,
737    publisher_venue_map: &IndexMap<PublisherId, Venue>,
738    symbol_venue_map: &AHashMap<Symbol, Venue>,
739    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
740) -> anyhow::Result<InstrumentId> {
741    let header = record.header();
742
743    // Check if instrument ID is already in the map
744    if let Some(&instrument_id) = instrument_id_map.get(&header.instrument_id) {
745        return Ok(instrument_id);
746    }
747
748    let raw_symbol = symbol_map.get_for_rec(record).ok_or_else(|| {
749        anyhow::anyhow!(
750            "Cannot resolve `raw_symbol` from `symbol_map` for instrument_id {}",
751            header.instrument_id
752        )
753    })?;
754
755    let symbol = Symbol::from_str_unchecked(raw_symbol);
756
757    let publisher_id = header.publisher_id;
758    let venue = if let Some(venue) = symbol_venue_map.get(&symbol) {
759        *venue
760    } else {
761        let venue = publisher_venue_map
762            .get(&publisher_id)
763            .ok_or_else(|| anyhow::anyhow!("No venue found for `publisher_id` {publisher_id}"))?;
764        *venue
765    };
766    let instrument_id = InstrumentId::new(symbol, venue);
767
768    instrument_id_map.insert(header.instrument_id, instrument_id);
769    Ok(instrument_id)
770}
771
772/// Handles instrument definition messages and decodes them into Nautilus instruments.
773///
774/// # Errors
775///
776/// Returns an error if instrument decoding fails.
777fn handle_instrument_def_msg(
778    msg: &dbn::InstrumentDefMsg,
779    record: &dbn::RecordRef,
780    symbol_map: &PitSymbolMap,
781    publisher_venue_map: &IndexMap<PublisherId, Venue>,
782    symbol_venue_map: &AHashMap<Symbol, Venue>,
783    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
784    ts_init: UnixNanos,
785) -> anyhow::Result<InstrumentAny> {
786    let instrument_id = update_instrument_id_map(
787        record,
788        symbol_map,
789        publisher_venue_map,
790        symbol_venue_map,
791        instrument_id_map,
792    )?;
793
794    decode_instrument_def_msg(msg, instrument_id, Some(ts_init))
795}
796
797fn handle_status_msg(
798    msg: &dbn::StatusMsg,
799    record: &dbn::RecordRef,
800    symbol_map: &PitSymbolMap,
801    publisher_venue_map: &IndexMap<PublisherId, Venue>,
802    symbol_venue_map: &AHashMap<Symbol, Venue>,
803    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
804    ts_init: UnixNanos,
805) -> anyhow::Result<InstrumentStatus> {
806    let instrument_id = update_instrument_id_map(
807        record,
808        symbol_map,
809        publisher_venue_map,
810        symbol_venue_map,
811        instrument_id_map,
812    )?;
813
814    decode_status_msg(msg, instrument_id, Some(ts_init))
815}
816
817#[expect(clippy::too_many_arguments)]
818fn handle_imbalance_msg(
819    msg: &dbn::ImbalanceMsg,
820    record: &dbn::RecordRef,
821    symbol_map: &PitSymbolMap,
822    publisher_venue_map: &IndexMap<PublisherId, Venue>,
823    symbol_venue_map: &AHashMap<Symbol, Venue>,
824    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
825    price_precision_map: &AHashMap<u32, u8>,
826    ts_init: UnixNanos,
827) -> anyhow::Result<DatabentoImbalance> {
828    let instrument_id = update_instrument_id_map(
829        record,
830        symbol_map,
831        publisher_venue_map,
832        symbol_venue_map,
833        instrument_id_map,
834    )?;
835
836    let price_precision = price_precision_map
837        .get(&msg.hd.instrument_id)
838        .copied()
839        .unwrap_or(2);
840
841    decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init))
842}
843
844#[expect(clippy::too_many_arguments)]
845fn handle_statistics_msg(
846    msg: &dbn::StatMsg,
847    record: &dbn::RecordRef,
848    symbol_map: &PitSymbolMap,
849    publisher_venue_map: &IndexMap<PublisherId, Venue>,
850    symbol_venue_map: &AHashMap<Symbol, Venue>,
851    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
852    price_precision_map: &AHashMap<u32, u8>,
853    ts_init: UnixNanos,
854) -> anyhow::Result<DatabentoStatistics> {
855    let instrument_id = update_instrument_id_map(
856        record,
857        symbol_map,
858        publisher_venue_map,
859        symbol_venue_map,
860        instrument_id_map,
861    )?;
862
863    let price_precision = price_precision_map
864        .get(&msg.hd.instrument_id)
865        .copied()
866        .unwrap_or(2);
867
868    decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
869}
870
871#[expect(clippy::too_many_arguments)]
872fn handle_record(
873    record: dbn::RecordRef,
874    symbol_map: &PitSymbolMap,
875    publisher_venue_map: &IndexMap<PublisherId, Venue>,
876    symbol_venue_map: &AHashMap<Symbol, Venue>,
877    instrument_id_map: &mut AHashMap<u32, InstrumentId>,
878    price_precision_map: &AHashMap<u32, u8>,
879    ts_init: UnixNanos,
880    initialized_books: &HashSet<InstrumentId>,
881    bars_timestamp_on_close: bool,
882) -> anyhow::Result<(Option<Data>, Option<Data>)> {
883    let instrument_id = update_instrument_id_map(
884        &record,
885        symbol_map,
886        publisher_venue_map,
887        symbol_venue_map,
888        instrument_id_map,
889    )?;
890
891    let price_precision = price_precision_map
892        .get(&record.header().instrument_id)
893        .copied()
894        .unwrap_or(2);
895
896    // For MBP-1 and quote-based schemas, always include trades since they're integral to the data
897    // For MBO, only include trades after the book is initialized to maintain consistency
898    let include_trades = if record.get::<dbn::Mbp1Msg>().is_some()
899        || record.get::<dbn::TbboMsg>().is_some()
900        || record.get::<dbn::Cmbp1Msg>().is_some()
901    {
902        true // These schemas include trade information directly
903    } else {
904        initialized_books.contains(&instrument_id) // MBO requires initialized book
905    };
906
907    decode_record(
908        &record,
909        instrument_id,
910        price_precision,
911        Some(ts_init),
912        include_trades,
913        bars_timestamp_on_close,
914    )
915}
916
917/// Processes an MBO delta through the buffering state machine.
918///
919/// Returns `Some(deltas)` when a complete batch is ready to emit (non-snapshot
920/// F_LAST with replay buffering complete), or `None` when still accumulating.
921fn process_mbo_delta(
922    delta: OrderBookDelta,
923    flags: u8,
924    buffering_start: &mut Option<UnixNanos>,
925    buffered_deltas: &mut AHashMap<InstrumentId, Vec<OrderBookDelta>>,
926) -> anyhow::Result<Option<OrderBookDeltas_API>> {
927    let buffer = buffered_deltas.entry(delta.instrument_id).or_default();
928    buffer.push(delta);
929
930    if !RecordFlag::F_LAST.matches(flags) {
931        return Ok(None);
932    }
933
934    if RecordFlag::F_SNAPSHOT.matches(flags) {
935        return Ok(None);
936    }
937
938    if let Some(start_ns) = *buffering_start {
939        if delta.ts_event <= start_ns {
940            return Ok(None);
941        }
942        *buffering_start = None;
943    }
944
945    let buffer = buffered_deltas
946        .remove(&delta.instrument_id)
947        .ok_or_else(|| {
948            anyhow::anyhow!(
949                "Internal error: no buffered deltas for instrument {id}",
950                id = delta.instrument_id
951            )
952        })?;
953    let deltas = OrderBookDeltas::new(delta.instrument_id, buffer);
954    Ok(Some(OrderBookDeltas_API::new(deltas)))
955}
956
957#[cfg(test)]
958mod tests {
959    use databento::live::Subscription;
960    use indexmap::IndexMap;
961    use rstest::*;
962    use time::macros::datetime;
963
964    use super::*;
965
966    fn create_test_handler(reconnect_timeout_mins: Option<u64>) -> DatabentoFeedHandler {
967        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
968        let (msg_tx, _msg_rx) = tokio::sync::mpsc::channel(100);
969
970        DatabentoFeedHandler::new(
971            Credential::new("test_key"),
972            "GLBX.MDP3".to_string(),
973            cmd_rx,
974            msg_tx,
975            IndexMap::new(),
976            Arc::new(AtomicMap::new()),
977            false,
978            false,
979            reconnect_timeout_mins,
980        )
981    }
982
983    #[rstest]
984    #[case(Some(10))]
985    #[case(None)]
986    fn test_backoff_initialization(#[case] reconnect_timeout_mins: Option<u64>) {
987        let handler = create_test_handler(reconnect_timeout_mins);
988
989        assert_eq!(handler.reconnect_timeout_mins, reconnect_timeout_mins);
990        assert!(handler.subscriptions.is_empty());
991        assert!(handler.buffered_commands.is_empty());
992    }
993
994    #[rstest]
995    fn test_subscription_with_and_without_start() {
996        let start_time = datetime!(2024-01-01 00:00:00 UTC);
997        let sub_with_start = Subscription::builder()
998            .symbols("ES.FUT")
999            .schema(databento::dbn::Schema::Mbp1)
1000            .start(start_time)
1001            .build();
1002
1003        let mut sub_without_start = sub_with_start.clone();
1004        sub_without_start.start = None;
1005
1006        assert!(sub_with_start.start.is_some());
1007        assert!(sub_without_start.start.is_none());
1008        assert_eq!(sub_with_start.schema, sub_without_start.schema);
1009        assert_eq!(sub_with_start.symbols, sub_without_start.symbols);
1010    }
1011
1012    #[rstest]
1013    fn test_handler_initialization_state() {
1014        let handler = create_test_handler(Some(10));
1015
1016        assert!(!handler.replay);
1017        assert_eq!(handler.dataset, "GLBX.MDP3");
1018        assert_eq!(handler.credential.api_key(), "test_key");
1019        assert!(handler.subscriptions.is_empty());
1020        assert!(handler.buffered_commands.is_empty());
1021    }
1022
1023    #[rstest]
1024    fn test_handler_with_no_timeout() {
1025        let handler = create_test_handler(None);
1026
1027        assert_eq!(handler.reconnect_timeout_mins, None);
1028        assert!(!handler.replay);
1029    }
1030
1031    #[rstest]
1032    fn test_handler_with_zero_timeout() {
1033        let handler = create_test_handler(Some(0));
1034
1035        assert_eq!(handler.reconnect_timeout_mins, Some(0));
1036        assert!(!handler.replay);
1037    }
1038
1039    fn test_delta(instrument_id: InstrumentId, ts_event: u64) -> OrderBookDelta {
1040        OrderBookDelta::clear(instrument_id, 0, ts_event.into(), 0.into())
1041    }
1042
1043    #[rstest]
1044    fn test_mbo_delta_without_f_last_buffers() {
1045        let instrument_id = InstrumentId::from("ESM4.GLBX");
1046        let delta = test_delta(instrument_id, 1_000_000_000);
1047        let mut buffering_start = None;
1048        let mut buffered = AHashMap::new();
1049
1050        let result = process_mbo_delta(delta, 0, &mut buffering_start, &mut buffered).unwrap();
1051
1052        assert!(result.is_none());
1053        assert_eq!(buffered[&instrument_id].len(), 1);
1054    }
1055
1056    #[rstest]
1057    fn test_mbo_delta_with_f_last_emits() {
1058        let instrument_id = InstrumentId::from("ESM4.GLBX");
1059        let mut buffering_start = None;
1060        let mut buffered = AHashMap::new();
1061
1062        process_mbo_delta(
1063            test_delta(instrument_id, 1_000_000_000),
1064            0,
1065            &mut buffering_start,
1066            &mut buffered,
1067        )
1068        .unwrap();
1069
1070        let result = process_mbo_delta(
1071            test_delta(instrument_id, 2_000_000_000),
1072            128, // F_LAST
1073            &mut buffering_start,
1074            &mut buffered,
1075        )
1076        .unwrap();
1077
1078        assert!(result.is_some());
1079        assert_eq!(result.unwrap().deltas.len(), 2);
1080        assert!(buffered.is_empty());
1081    }
1082
1083    #[rstest]
1084    fn test_mbo_snapshot_with_f_last_buffers() {
1085        let instrument_id = InstrumentId::from("ESM4.GLBX");
1086        let mut buffering_start = None;
1087        let mut buffered = AHashMap::new();
1088
1089        let result = process_mbo_delta(
1090            test_delta(instrument_id, 1_000_000_000),
1091            128 | 32, // F_LAST | F_SNAPSHOT
1092            &mut buffering_start,
1093            &mut buffered,
1094        )
1095        .unwrap();
1096
1097        assert!(result.is_none());
1098        assert_eq!(buffered[&instrument_id].len(), 1);
1099    }
1100
1101    #[rstest]
1102    fn test_mbo_replay_buffers_until_past_start() {
1103        let instrument_id = InstrumentId::from("ESM4.GLBX");
1104        let start_ns = 5_000_000_000u64;
1105        let mut buffering_start = Some(start_ns.into());
1106        let mut buffered = AHashMap::new();
1107
1108        let result = process_mbo_delta(
1109            test_delta(instrument_id, 4_000_000_000),
1110            128, // F_LAST
1111            &mut buffering_start,
1112            &mut buffered,
1113        )
1114        .unwrap();
1115        assert!(result.is_none());
1116
1117        let result = process_mbo_delta(
1118            test_delta(instrument_id, 5_000_000_000),
1119            128,
1120            &mut buffering_start,
1121            &mut buffered,
1122        )
1123        .unwrap();
1124        assert!(result.is_none());
1125
1126        // Delta past start: emits and clears buffering_start
1127        let result = process_mbo_delta(
1128            test_delta(instrument_id, 6_000_000_000),
1129            128,
1130            &mut buffering_start,
1131            &mut buffered,
1132        )
1133        .unwrap();
1134        assert!(result.is_some());
1135        assert!(buffering_start.is_none());
1136    }
1137
1138    #[rstest]
1139    fn test_mbo_multiple_deltas_accumulated() {
1140        let instrument_id = InstrumentId::from("ESM4.GLBX");
1141        let mut buffering_start = None;
1142        let mut buffered = AHashMap::new();
1143
1144        for i in 0..5 {
1145            process_mbo_delta(
1146                test_delta(instrument_id, 1_000_000_000 + i),
1147                0,
1148                &mut buffering_start,
1149                &mut buffered,
1150            )
1151            .unwrap();
1152        }
1153
1154        let result = process_mbo_delta(
1155            test_delta(instrument_id, 2_000_000_000),
1156            128,
1157            &mut buffering_start,
1158            &mut buffered,
1159        )
1160        .unwrap();
1161
1162        assert!(result.is_some());
1163        assert_eq!(result.unwrap().deltas.len(), 6);
1164    }
1165
1166    #[rstest]
1167    fn test_mbo_multi_instrument_isolation() {
1168        let id_a = InstrumentId::from("ESM4.GLBX");
1169        let id_b = InstrumentId::from("NQM4.GLBX");
1170        let mut buffering_start = None;
1171        let mut buffered = AHashMap::new();
1172
1173        process_mbo_delta(
1174            test_delta(id_a, 1_000_000_000),
1175            0,
1176            &mut buffering_start,
1177            &mut buffered,
1178        )
1179        .unwrap();
1180        process_mbo_delta(
1181            test_delta(id_b, 1_000_000_000),
1182            0,
1183            &mut buffering_start,
1184            &mut buffered,
1185        )
1186        .unwrap();
1187
1188        // F_LAST for A: only A's deltas emitted, B remains
1189        let result = process_mbo_delta(
1190            test_delta(id_a, 2_000_000_000),
1191            128,
1192            &mut buffering_start,
1193            &mut buffered,
1194        )
1195        .unwrap();
1196
1197        assert!(result.is_some());
1198        assert_eq!(result.unwrap().instrument_id, id_a);
1199        assert!(buffered.contains_key(&id_b));
1200        assert!(!buffered.contains_key(&id_a));
1201    }
1202
1203    mod property_tests {
1204        use proptest::prelude::*;
1205        use rstest::rstest;
1206
1207        use super::*;
1208
1209        proptest! {
1210            #[rstest]
1211            fn mbo_buffering_conserves_deltas(
1212                num_non_last in 0usize..=20,
1213            ) {
1214                let instrument_id = InstrumentId::from("ESM4.GLBX");
1215                let mut buffering_start = None;
1216                let mut buffered = AHashMap::new();
1217                let total = num_non_last + 1;
1218
1219                for i in 0..num_non_last {
1220                    let result = process_mbo_delta(
1221                        test_delta(instrument_id, 1_000_000_000 + i as u64),
1222                        0, // No F_LAST
1223                        &mut buffering_start,
1224                        &mut buffered,
1225                    ).unwrap();
1226                    prop_assert!(result.is_none());
1227                }
1228
1229                let result = process_mbo_delta(
1230                    test_delta(instrument_id, 2_000_000_000),
1231                    128, // F_LAST
1232                    &mut buffering_start,
1233                    &mut buffered,
1234                ).unwrap();
1235
1236                prop_assert!(result.is_some());
1237                let emitted = result.unwrap();
1238                prop_assert_eq!(emitted.deltas.len(), total);
1239                prop_assert!(buffered.is_empty());
1240            }
1241
1242            #[rstest]
1243            fn mbo_snapshots_never_emit(
1244                num_snapshots in 1usize..=20,
1245            ) {
1246                let instrument_id = InstrumentId::from("ESM4.GLBX");
1247                let mut buffering_start = None;
1248                let mut buffered = AHashMap::new();
1249
1250                for i in 0..num_snapshots {
1251                    let result = process_mbo_delta(
1252                        test_delta(instrument_id, 1_000_000_000 + i as u64),
1253                        128 | 32, // F_LAST | F_SNAPSHOT
1254                        &mut buffering_start,
1255                        &mut buffered,
1256                    ).unwrap();
1257                    prop_assert!(result.is_none());
1258                }
1259
1260                prop_assert_eq!(buffered[&instrument_id].len(), num_snapshots);
1261            }
1262
1263            #[rstest]
1264            fn mbo_replay_delays_emission(
1265                start_offset in 1u64..=100,
1266                num_before in 1usize..=10,
1267            ) {
1268                let instrument_id = InstrumentId::from("ESM4.GLBX");
1269                let start_ns = 1_000_000_000u64 * start_offset;
1270                let mut buffering_start = Some(start_ns.into());
1271                let mut buffered = AHashMap::new();
1272
1273                for i in 0..num_before {
1274                    let ts = start_ns - (num_before as u64 - i as u64);
1275                    let result = process_mbo_delta(
1276                        test_delta(instrument_id, ts),
1277                        128, // F_LAST
1278                        &mut buffering_start,
1279                        &mut buffered,
1280                    ).unwrap();
1281                    prop_assert!(result.is_none());
1282                }
1283
1284                let result = process_mbo_delta(
1285                    test_delta(instrument_id, start_ns + 1),
1286                    128,
1287                    &mut buffering_start,
1288                    &mut buffered,
1289                ).unwrap();
1290
1291                prop_assert!(result.is_some());
1292                prop_assert!(buffering_start.is_none());
1293                let total = num_before + 1;
1294                prop_assert_eq!(result.unwrap().deltas.len(), total);
1295            }
1296        }
1297    }
1298}