Skip to main content

nautilus_serialization/arrow/display/
quote.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Display-mode Arrow encoder for [`QuoteTick`].
17
18use std::sync::Arc;
19
20use arrow::{
21    array::{Float64Builder, StringBuilder, TimestampNanosecondBuilder},
22    datatypes::Schema,
23    error::ArrowError,
24    record_batch::RecordBatch,
25};
26use nautilus_model::data::QuoteTick;
27
28use super::{
29    float64_field, price_to_f64, quantity_to_f64, timestamp_field, unix_nanos_to_i64, utf8_field,
30};
31
32/// Returns the display-mode Arrow schema for [`QuoteTick`].
33#[must_use]
34pub fn quotes_schema() -> Schema {
35    Schema::new(vec![
36        utf8_field("instrument_id", false),
37        float64_field("bid_price", false),
38        float64_field("ask_price", false),
39        float64_field("bid_size", false),
40        float64_field("ask_size", false),
41        timestamp_field("ts_event", false),
42        timestamp_field("ts_init", false),
43    ])
44}
45
46/// Encodes quotes as a display-friendly Arrow [`RecordBatch`].
47///
48/// Emits `Float64` columns for prices and sizes, a `Utf8` `instrument_id`
49/// column, and `Timestamp(Nanosecond)` columns for event and init times.
50/// Mixed-instrument batches are supported. Precision is lost in the
51/// conversion to `f64`; use [`crate::arrow::quotes_to_arrow_record_batch_bytes`]
52/// for catalog storage.
53///
54/// Returns an empty [`RecordBatch`] with the correct schema when `data` is empty.
55///
56/// # Errors
57///
58/// Returns an [`ArrowError`] if the Arrow `RecordBatch` cannot be constructed.
59pub fn encode_quotes(data: &[QuoteTick]) -> Result<RecordBatch, ArrowError> {
60    let mut instrument_id_builder = StringBuilder::new();
61    let mut bid_price_builder = Float64Builder::with_capacity(data.len());
62    let mut ask_price_builder = Float64Builder::with_capacity(data.len());
63    let mut bid_size_builder = Float64Builder::with_capacity(data.len());
64    let mut ask_size_builder = Float64Builder::with_capacity(data.len());
65    let mut ts_event_builder = TimestampNanosecondBuilder::with_capacity(data.len());
66    let mut ts_init_builder = TimestampNanosecondBuilder::with_capacity(data.len());
67
68    for quote in data {
69        instrument_id_builder.append_value(quote.instrument_id.to_string());
70        bid_price_builder.append_value(price_to_f64(&quote.bid_price));
71        ask_price_builder.append_value(price_to_f64(&quote.ask_price));
72        bid_size_builder.append_value(quantity_to_f64(&quote.bid_size));
73        ask_size_builder.append_value(quantity_to_f64(&quote.ask_size));
74        ts_event_builder.append_value(unix_nanos_to_i64(quote.ts_event.as_u64()));
75        ts_init_builder.append_value(unix_nanos_to_i64(quote.ts_init.as_u64()));
76    }
77
78    RecordBatch::try_new(
79        Arc::new(quotes_schema()),
80        vec![
81            Arc::new(instrument_id_builder.finish()),
82            Arc::new(bid_price_builder.finish()),
83            Arc::new(ask_price_builder.finish()),
84            Arc::new(bid_size_builder.finish()),
85            Arc::new(ask_size_builder.finish()),
86            Arc::new(ts_event_builder.finish()),
87            Arc::new(ts_init_builder.finish()),
88        ],
89    )
90}
91
92#[cfg(test)]
93mod tests {
94    use arrow::{
95        array::{Array, Float64Array, StringArray, TimestampNanosecondArray},
96        datatypes::{DataType, TimeUnit},
97    };
98    use nautilus_model::{
99        identifiers::InstrumentId,
100        types::{Price, Quantity},
101    };
102    use rstest::rstest;
103
104    use super::*;
105
106    fn make_quote(instrument_id: &str, bid: &str, ask: &str, ts: u64) -> QuoteTick {
107        QuoteTick {
108            instrument_id: InstrumentId::from(instrument_id),
109            bid_price: Price::from(bid),
110            ask_price: Price::from(ask),
111            bid_size: Quantity::from(1_000),
112            ask_size: Quantity::from(500),
113            ts_event: ts.into(),
114            ts_init: (ts + 1).into(),
115        }
116    }
117
118    #[rstest]
119    fn test_encode_quotes_schema() {
120        let quotes = vec![make_quote("AAPL.XNAS", "100.10", "100.20", 1)];
121        let batch = encode_quotes(&quotes).unwrap();
122
123        let schema = batch.schema();
124        let fields = schema.fields();
125        assert_eq!(fields.len(), 7);
126        assert_eq!(fields[0].name(), "instrument_id");
127        assert_eq!(fields[0].data_type(), &DataType::Utf8);
128        assert_eq!(fields[1].name(), "bid_price");
129        assert_eq!(fields[1].data_type(), &DataType::Float64);
130        assert_eq!(fields[2].name(), "ask_price");
131        assert_eq!(fields[2].data_type(), &DataType::Float64);
132        assert_eq!(fields[3].name(), "bid_size");
133        assert_eq!(fields[3].data_type(), &DataType::Float64);
134        assert_eq!(fields[4].name(), "ask_size");
135        assert_eq!(fields[4].data_type(), &DataType::Float64);
136        assert_eq!(fields[5].name(), "ts_event");
137        assert_eq!(
138            fields[5].data_type(),
139            &DataType::Timestamp(TimeUnit::Nanosecond, None)
140        );
141        assert_eq!(fields[6].name(), "ts_init");
142        assert_eq!(
143            fields[6].data_type(),
144            &DataType::Timestamp(TimeUnit::Nanosecond, None)
145        );
146    }
147
148    #[rstest]
149    fn test_encode_quotes_values() {
150        let quotes = vec![
151            make_quote("AAPL.XNAS", "100.10", "100.20", 1_000_000_000),
152            make_quote("AAPL.XNAS", "100.15", "100.25", 2_000_000_000),
153        ];
154        let batch = encode_quotes(&quotes).unwrap();
155
156        assert_eq!(batch.num_rows(), 2);
157
158        let instrument_id_col = batch
159            .column(0)
160            .as_any()
161            .downcast_ref::<StringArray>()
162            .unwrap();
163        let bid_price_col = batch
164            .column(1)
165            .as_any()
166            .downcast_ref::<Float64Array>()
167            .unwrap();
168        let ask_price_col = batch
169            .column(2)
170            .as_any()
171            .downcast_ref::<Float64Array>()
172            .unwrap();
173        let bid_size_col = batch
174            .column(3)
175            .as_any()
176            .downcast_ref::<Float64Array>()
177            .unwrap();
178        let ask_size_col = batch
179            .column(4)
180            .as_any()
181            .downcast_ref::<Float64Array>()
182            .unwrap();
183        let ts_event_col = batch
184            .column(5)
185            .as_any()
186            .downcast_ref::<TimestampNanosecondArray>()
187            .unwrap();
188        let ts_init_col = batch
189            .column(6)
190            .as_any()
191            .downcast_ref::<TimestampNanosecondArray>()
192            .unwrap();
193
194        assert_eq!(instrument_id_col.value(0), "AAPL.XNAS");
195        assert_eq!(instrument_id_col.value(1), "AAPL.XNAS");
196        assert!((bid_price_col.value(0) - 100.10).abs() < 1e-9);
197        assert!((bid_price_col.value(1) - 100.15).abs() < 1e-9);
198        assert!((ask_price_col.value(0) - 100.20).abs() < 1e-9);
199        assert!((ask_price_col.value(1) - 100.25).abs() < 1e-9);
200        assert!((bid_size_col.value(0) - 1_000.0).abs() < 1e-9);
201        assert!((ask_size_col.value(0) - 500.0).abs() < 1e-9);
202        assert_eq!(ts_event_col.value(0), 1_000_000_000);
203        assert_eq!(ts_event_col.value(1), 2_000_000_000);
204        assert_eq!(ts_init_col.value(0), 1_000_000_001);
205        assert_eq!(ts_init_col.value(1), 2_000_000_001);
206    }
207
208    #[rstest]
209    fn test_encode_quotes_empty() {
210        let batch = encode_quotes(&[]).unwrap();
211        assert_eq!(batch.num_rows(), 0);
212        assert_eq!(batch.schema().fields().len(), 7);
213    }
214
215    #[rstest]
216    fn test_encode_quotes_mixed_instruments() {
217        let quotes = vec![
218            make_quote("AAPL.XNAS", "100.10", "100.20", 1),
219            make_quote("MSFT.XNAS", "250.00", "250.05", 2),
220        ];
221        let batch = encode_quotes(&quotes).unwrap();
222
223        let instrument_id_col = batch
224            .column(0)
225            .as_any()
226            .downcast_ref::<StringArray>()
227            .unwrap();
228        assert_eq!(instrument_id_col.value(0), "AAPL.XNAS");
229        assert_eq!(instrument_id_col.value(1), "MSFT.XNAS");
230    }
231}