Skip to main content

nautilus_serialization/arrow/display/
index_price.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Display-mode Arrow encoder for [`IndexPriceUpdate`].
17
18use std::sync::Arc;
19
20use arrow::{
21    array::{Float64Builder, StringBuilder, TimestampNanosecondBuilder},
22    datatypes::Schema,
23    error::ArrowError,
24    record_batch::RecordBatch,
25};
26use nautilus_model::data::IndexPriceUpdate;
27
28use super::{float64_field, price_to_f64, timestamp_field, unix_nanos_to_i64, utf8_field};
29
30/// Returns the display-mode Arrow schema for [`IndexPriceUpdate`].
31#[must_use]
32pub fn index_prices_schema() -> Schema {
33    Schema::new(vec![
34        utf8_field("instrument_id", false),
35        float64_field("value", false),
36        timestamp_field("ts_event", false),
37        timestamp_field("ts_init", false),
38    ])
39}
40
41/// Encodes index price updates as a display-friendly Arrow [`RecordBatch`].
42///
43/// Emits a `Float64` `value` column, a `Utf8` `instrument_id` column, and
44/// `Timestamp(Nanosecond)` columns for event and init times. Mixed-instrument
45/// batches are supported. Precision is lost on the conversion to `f64`; use
46/// [`crate::arrow::index_prices_to_arrow_record_batch_bytes`] for catalog storage.
47///
48/// Returns an empty [`RecordBatch`] with the correct schema when `data` is empty.
49///
50/// # Errors
51///
52/// Returns an [`ArrowError`] if the Arrow `RecordBatch` cannot be constructed.
53pub fn encode_index_prices(data: &[IndexPriceUpdate]) -> Result<RecordBatch, ArrowError> {
54    let mut instrument_id_builder = StringBuilder::new();
55    let mut value_builder = Float64Builder::with_capacity(data.len());
56    let mut ts_event_builder = TimestampNanosecondBuilder::with_capacity(data.len());
57    let mut ts_init_builder = TimestampNanosecondBuilder::with_capacity(data.len());
58
59    for update in data {
60        instrument_id_builder.append_value(update.instrument_id.to_string());
61        value_builder.append_value(price_to_f64(&update.value));
62        ts_event_builder.append_value(unix_nanos_to_i64(update.ts_event.as_u64()));
63        ts_init_builder.append_value(unix_nanos_to_i64(update.ts_init.as_u64()));
64    }
65
66    RecordBatch::try_new(
67        Arc::new(index_prices_schema()),
68        vec![
69            Arc::new(instrument_id_builder.finish()),
70            Arc::new(value_builder.finish()),
71            Arc::new(ts_event_builder.finish()),
72            Arc::new(ts_init_builder.finish()),
73        ],
74    )
75}
76
77#[cfg(test)]
78mod tests {
79    use arrow::{
80        array::{Array, Float64Array, StringArray, TimestampNanosecondArray},
81        datatypes::{DataType, TimeUnit},
82    };
83    use nautilus_model::{identifiers::InstrumentId, types::Price};
84    use rstest::rstest;
85
86    use super::*;
87
88    fn make_update(instrument_id: &str, value: &str, ts: u64) -> IndexPriceUpdate {
89        IndexPriceUpdate {
90            instrument_id: InstrumentId::from(instrument_id),
91            value: Price::from(value),
92            ts_event: ts.into(),
93            ts_init: (ts + 1).into(),
94        }
95    }
96
97    #[rstest]
98    fn test_encode_index_prices_schema() {
99        let batch = encode_index_prices(&[]).unwrap();
100        let fields = batch.schema().fields().clone();
101        assert_eq!(fields.len(), 4);
102        assert_eq!(fields[0].name(), "instrument_id");
103        assert_eq!(fields[0].data_type(), &DataType::Utf8);
104        assert_eq!(fields[1].name(), "value");
105        assert_eq!(fields[1].data_type(), &DataType::Float64);
106        assert_eq!(fields[2].name(), "ts_event");
107        assert_eq!(
108            fields[2].data_type(),
109            &DataType::Timestamp(TimeUnit::Nanosecond, None)
110        );
111        assert_eq!(fields[3].name(), "ts_init");
112    }
113
114    #[rstest]
115    fn test_encode_index_prices_values() {
116        let updates = vec![
117            make_update("BTC-USDT.BINANCE", "50200.00", 1_000),
118            make_update("BTC-USDT.BINANCE", "50300.00", 2_000),
119        ];
120        let batch = encode_index_prices(&updates).unwrap();
121
122        assert_eq!(batch.num_rows(), 2);
123
124        let value_col = batch
125            .column(1)
126            .as_any()
127            .downcast_ref::<Float64Array>()
128            .unwrap();
129        let ts_event_col = batch
130            .column(2)
131            .as_any()
132            .downcast_ref::<TimestampNanosecondArray>()
133            .unwrap();
134
135        assert!((value_col.value(0) - 50_200.00).abs() < 1e-9);
136        assert!((value_col.value(1) - 50_300.00).abs() < 1e-9);
137        assert_eq!(ts_event_col.value(0), 1_000);
138    }
139
140    #[rstest]
141    fn test_encode_index_prices_empty() {
142        let batch = encode_index_prices(&[]).unwrap();
143        assert_eq!(batch.num_rows(), 0);
144    }
145
146    #[rstest]
147    fn test_encode_index_prices_mixed_instruments() {
148        let updates = vec![
149            make_update("BTC-USDT.BINANCE", "50200.00", 1),
150            make_update("ETH-USDT.BINANCE", "2500.00", 2),
151        ];
152        let batch = encode_index_prices(&updates).unwrap();
153        let instrument_id_col = batch
154            .column(0)
155            .as_any()
156            .downcast_ref::<StringArray>()
157            .unwrap();
158        assert_eq!(instrument_id_col.value(0), "BTC-USDT.BINANCE");
159        assert_eq!(instrument_id_col.value(1), "ETH-USDT.BINANCE");
160    }
161}