Skip to main content

nautilus_serialization/sbe/
market.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Hand-written SBE codecs for Nautilus market data types.
17
18mod bars;
19mod book;
20mod common;
21mod data_any;
22mod ticks;
23
24use nautilus_model::data::{
25    Bar, FundingRateUpdate, IndexPriceUpdate, InstrumentClose, InstrumentStatus, MarkPriceUpdate,
26    OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
27};
28
29use self::common::{HEADER_LENGTH, decode_header, encode_header, validate_header};
30use super::{SbeCursor, SbeDecodeError, SbeEncodeError, SbeWriter};
31
32pub const MARKET_SCHEMA_ID: u16 = 1;
33pub const MARKET_SCHEMA_VERSION: u16 = 0;
34
35pub(super) mod data_any_variant {
36    pub const ORDER_BOOK_DELTA: u16 = 0;
37    pub const ORDER_BOOK_DELTAS: u16 = 1;
38    pub const ORDER_BOOK_DEPTH10: u16 = 2;
39    pub const QUOTE: u16 = 3;
40    pub const TRADE: u16 = 4;
41    pub const BAR: u16 = 5;
42    pub const MARK_PRICE: u16 = 6;
43    pub const INDEX_PRICE: u16 = 7;
44    pub const FUNDING_RATE: u16 = 8;
45    pub const INSTRUMENT_STATUS: u16 = 9;
46    pub const INSTRUMENT_CLOSE: u16 = 10;
47}
48
49pub(super) mod template_id {
50    pub const BOOK_ORDER: u16 = 30_001;
51    pub const ORDER_BOOK_DELTA: u16 = 30_002;
52    pub const ORDER_BOOK_DELTAS: u16 = 30_003;
53    pub const ORDER_BOOK_DEPTH10: u16 = 30_004;
54    pub const QUOTE_TICK: u16 = 30_005;
55    pub const TRADE_TICK: u16 = 30_006;
56    pub const BAR_TYPE: u16 = 30_007;
57    pub const BAR: u16 = 30_008;
58    pub const MARK_PRICE_UPDATE: u16 = 30_009;
59    pub const INDEX_PRICE_UPDATE: u16 = 30_010;
60    pub const FUNDING_RATE_UPDATE: u16 = 30_011;
61    pub const INSTRUMENT_STATUS: u16 = 30_012;
62    pub const INSTRUMENT_CLOSE: u16 = 30_013;
63    pub const DATA_ANY: u16 = 30_014;
64}
65
66#[expect(clippy::large_enum_variant)]
67#[derive(Debug, Clone, PartialEq)]
68pub enum DataAny {
69    OrderBookDelta(OrderBookDelta),
70    OrderBookDeltas(OrderBookDeltas),
71    OrderBookDepth10(OrderBookDepth10),
72    Quote(QuoteTick),
73    Trade(TradeTick),
74    Bar(Bar),
75    MarkPrice(MarkPriceUpdate),
76    IndexPrice(IndexPriceUpdate),
77    FundingRate(FundingRateUpdate),
78    InstrumentStatus(InstrumentStatus),
79    InstrumentClose(InstrumentClose),
80}
81
82pub trait ToSbe {
83    /// Encodes the value into an SBE message buffer.
84    ///
85    /// # Errors
86    ///
87    /// Returns an error if any field cannot be encoded into the target SBE wire format.
88    fn to_sbe(&self) -> Result<Vec<u8>, SbeEncodeError>;
89
90    /// Encodes the value into the provided SBE message buffer.
91    ///
92    /// This method clears any existing bytes in `buf` before encoding.
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if any field cannot be encoded into the target SBE wire format.
97    fn to_sbe_into(&self, buf: &mut Vec<u8>) -> Result<(), SbeEncodeError> {
98        let bytes = self.to_sbe()?;
99        buf.clear();
100        buf.extend_from_slice(&bytes);
101        Ok(())
102    }
103}
104
105pub trait FromSbe: Sized {
106    /// Decodes the value from an SBE message buffer.
107    ///
108    /// # Errors
109    ///
110    /// Returns an error if the header is invalid or the payload is malformed.
111    fn from_sbe(bytes: &[u8]) -> Result<Self, SbeDecodeError>;
112}
113
114/// Extension of [`FromSbe`] that reuses allocations between decodes.
115///
116/// Scalar messages decode without heap allocation, so they do not need this trait. Types with
117/// growable internal buffers (for example [`OrderBookDeltas`] with its `Vec<OrderBookDelta>`)
118/// implement it to let callers supply a pre-allocated scratch buffer and avoid per-message
119/// allocation in hot paths.
120pub trait FromSbeReuse: FromSbe {
121    /// Scratch buffer whose allocation is reused across decodes.
122    type Scratch;
123
124    /// Decodes a value from an SBE message buffer, reusing `scratch`'s allocation.
125    ///
126    /// On success, ownership of the allocation moves from `scratch` into the returned value and
127    /// `scratch` is left in its empty state. To continue reusing the allocation, move the buffer
128    /// back from the returned value (for example `scratch = std::mem::take(&mut result.deltas)`).
129    ///
130    /// # Errors
131    ///
132    /// Returns an error if the header is invalid or the payload is malformed.
133    fn from_sbe_reuse(bytes: &[u8], scratch: &mut Self::Scratch) -> Result<Self, SbeDecodeError>;
134}
135
136pub(super) trait MarketSbeMessage: Sized {
137    const TEMPLATE_ID: u16;
138    const BLOCK_LENGTH: u16;
139
140    fn encode_body(&self, writer: &mut SbeWriter<'_>) -> Result<(), SbeEncodeError>;
141
142    fn decode_body(cursor: &mut SbeCursor<'_>) -> Result<Self, SbeDecodeError>;
143
144    fn encoded_body_size(&self) -> usize {
145        usize::from(Self::BLOCK_LENGTH)
146    }
147}
148
149impl<T> ToSbe for T
150where
151    T: MarketSbeMessage,
152{
153    #[inline]
154    fn to_sbe(&self) -> Result<Vec<u8>, SbeEncodeError> {
155        let encoded_size = HEADER_LENGTH + self.encoded_body_size();
156        let mut buf = Vec::with_capacity(encoded_size);
157        encode_into_uninit(self, &mut buf, encoded_size)?;
158        Ok(buf)
159    }
160
161    #[inline]
162    fn to_sbe_into(&self, buf: &mut Vec<u8>) -> Result<(), SbeEncodeError> {
163        let encoded_size = HEADER_LENGTH + self.encoded_body_size();
164        buf.clear();
165        buf.reserve(encoded_size);
166        encode_into_uninit(self, buf, encoded_size)
167    }
168}
169
170impl<T> FromSbe for T
171where
172    T: MarketSbeMessage,
173{
174    #[inline]
175    fn from_sbe(bytes: &[u8]) -> Result<Self, SbeDecodeError> {
176        let mut cursor = SbeCursor::new(bytes);
177        let header = decode_header(&mut cursor)?;
178        validate_header(header, T::TEMPLATE_ID, T::BLOCK_LENGTH)?;
179        T::decode_body(&mut cursor)
180    }
181}
182
183// Writes an SBE message into the spare capacity of `buf` without zero
184// initialization, then commits the length on success. Caller must ensure
185// `buf.len() == 0` and `buf.capacity() >= encoded_size`.
186#[inline]
187#[allow(
188    unsafe_code,
189    reason = "set_len commits writes the SbeWriter has already made into spare capacity"
190)]
191#[allow(
192    clippy::panic_in_result_fn,
193    reason = "load-bearing safety check for the unsafe set_len; panic is the right outcome"
194)]
195fn encode_into_uninit<T>(
196    value: &T,
197    buf: &mut Vec<u8>,
198    encoded_size: usize,
199) -> Result<(), SbeEncodeError>
200where
201    T: MarketSbeMessage,
202{
203    debug_assert_eq!(buf.len(), 0);
204    debug_assert!(buf.capacity() >= encoded_size);
205
206    let spare = &mut buf.spare_capacity_mut()[..encoded_size];
207    let mut writer = SbeWriter::new_uninit(spare);
208    encode_header(
209        &mut writer,
210        T::BLOCK_LENGTH,
211        T::TEMPLATE_ID,
212        MARKET_SCHEMA_ID,
213        MARKET_SCHEMA_VERSION,
214    );
215    value.encode_body(&mut writer)?;
216
217    // Load-bearing for the unsafe `set_len` below: this is the invariant that
218    // converts the writer's per-byte initialization into Vec-level safety. Run
219    // in release builds too so a future size mismatch panics rather than
220    // commits uninit bytes.
221    assert_eq!(
222        writer.pos(),
223        encoded_size,
224        "SBE encode_body wrote {} bytes but encoded_body_size reported {}",
225        writer.pos(),
226        encoded_size,
227    );
228
229    // SAFETY: the writer panics if it attempts to write past `encoded_size`,
230    // the assert above confirms it wrote exactly `encoded_size` bytes, and
231    // errors propagate before `set_len` runs. Reaching this line means the
232    // first `encoded_size` bytes of `buf` hold initialized u8 values.
233    unsafe {
234        buf.set_len(encoded_size);
235    }
236    Ok(())
237}