Skip to main content

nautilus_serialization/arrow/display/
close.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Display-mode Arrow encoder for [`InstrumentClose`].
17
18use std::sync::Arc;
19
20use arrow::{
21    array::{Float64Builder, StringBuilder, TimestampNanosecondBuilder},
22    datatypes::Schema,
23    error::ArrowError,
24    record_batch::RecordBatch,
25};
26use nautilus_model::data::InstrumentClose;
27
28use super::{float64_field, price_to_f64, timestamp_field, unix_nanos_to_i64, utf8_field};
29
30/// Returns the display-mode Arrow schema for [`InstrumentClose`].
31#[must_use]
32pub fn instrument_closes_schema() -> Schema {
33    Schema::new(vec![
34        utf8_field("instrument_id", false),
35        float64_field("close_price", false),
36        utf8_field("close_type", false),
37        timestamp_field("ts_event", false),
38        timestamp_field("ts_init", false),
39    ])
40}
41
42/// Encodes instrument closes as a display-friendly Arrow [`RecordBatch`].
43///
44/// Emits a `Float64` `close_price` column, `Utf8` columns for the instrument
45/// ID and close type, and `Timestamp(Nanosecond)` columns for event and init
46/// times. Mixed-instrument batches are supported. Precision is lost on the
47/// conversion to `f64`; use
48/// [`crate::arrow::instrument_closes_to_arrow_record_batch_bytes`] for catalog
49/// storage.
50///
51/// Returns an empty [`RecordBatch`] with the correct schema when `data` is empty.
52///
53/// # Errors
54///
55/// Returns an [`ArrowError`] if the Arrow `RecordBatch` cannot be constructed.
56pub fn encode_instrument_closes(data: &[InstrumentClose]) -> Result<RecordBatch, ArrowError> {
57    let mut instrument_id_builder = StringBuilder::new();
58    let mut close_price_builder = Float64Builder::with_capacity(data.len());
59    let mut close_type_builder = StringBuilder::new();
60    let mut ts_event_builder = TimestampNanosecondBuilder::with_capacity(data.len());
61    let mut ts_init_builder = TimestampNanosecondBuilder::with_capacity(data.len());
62
63    for close in data {
64        instrument_id_builder.append_value(close.instrument_id.to_string());
65        close_price_builder.append_value(price_to_f64(&close.close_price));
66        close_type_builder.append_value(format!("{}", close.close_type));
67        ts_event_builder.append_value(unix_nanos_to_i64(close.ts_event.as_u64()));
68        ts_init_builder.append_value(unix_nanos_to_i64(close.ts_init.as_u64()));
69    }
70
71    RecordBatch::try_new(
72        Arc::new(instrument_closes_schema()),
73        vec![
74            Arc::new(instrument_id_builder.finish()),
75            Arc::new(close_price_builder.finish()),
76            Arc::new(close_type_builder.finish()),
77            Arc::new(ts_event_builder.finish()),
78            Arc::new(ts_init_builder.finish()),
79        ],
80    )
81}
82
83#[cfg(test)]
84mod tests {
85    use arrow::{
86        array::{Array, Float64Array, StringArray, TimestampNanosecondArray},
87        datatypes::{DataType, TimeUnit},
88    };
89    use nautilus_model::{enums::InstrumentCloseType, identifiers::InstrumentId, types::Price};
90    use rstest::rstest;
91
92    use super::*;
93
94    fn make_close(
95        instrument_id: &str,
96        price: &str,
97        close_type: InstrumentCloseType,
98        ts: u64,
99    ) -> InstrumentClose {
100        InstrumentClose {
101            instrument_id: InstrumentId::from(instrument_id),
102            close_price: Price::from(price),
103            close_type,
104            ts_event: ts.into(),
105            ts_init: (ts + 1).into(),
106        }
107    }
108
109    #[rstest]
110    fn test_encode_instrument_closes_schema() {
111        let batch = encode_instrument_closes(&[]).unwrap();
112        let fields = batch.schema().fields().clone();
113        assert_eq!(fields.len(), 5);
114        assert_eq!(fields[0].name(), "instrument_id");
115        assert_eq!(fields[0].data_type(), &DataType::Utf8);
116        assert_eq!(fields[1].name(), "close_price");
117        assert_eq!(fields[1].data_type(), &DataType::Float64);
118        assert_eq!(fields[2].name(), "close_type");
119        assert_eq!(fields[2].data_type(), &DataType::Utf8);
120        assert_eq!(fields[3].name(), "ts_event");
121        assert_eq!(
122            fields[3].data_type(),
123            &DataType::Timestamp(TimeUnit::Nanosecond, None)
124        );
125        assert_eq!(fields[4].name(), "ts_init");
126    }
127
128    #[rstest]
129    fn test_encode_instrument_closes_values() {
130        let closes = vec![
131            make_close(
132                "AAPL.XNAS",
133                "150.50",
134                InstrumentCloseType::EndOfSession,
135                1_000,
136            ),
137            make_close(
138                "AAPL.XNAS",
139                "151.25",
140                InstrumentCloseType::ContractExpired,
141                2_000,
142            ),
143        ];
144        let batch = encode_instrument_closes(&closes).unwrap();
145
146        assert_eq!(batch.num_rows(), 2);
147
148        let close_price_col = batch
149            .column(1)
150            .as_any()
151            .downcast_ref::<Float64Array>()
152            .unwrap();
153        let close_type_col = batch
154            .column(2)
155            .as_any()
156            .downcast_ref::<StringArray>()
157            .unwrap();
158        let ts_event_col = batch
159            .column(3)
160            .as_any()
161            .downcast_ref::<TimestampNanosecondArray>()
162            .unwrap();
163
164        assert!((close_price_col.value(0) - 150.50).abs() < 1e-9);
165        assert!((close_price_col.value(1) - 151.25).abs() < 1e-9);
166        assert_eq!(
167            close_type_col.value(0),
168            format!("{}", InstrumentCloseType::EndOfSession)
169        );
170        assert_eq!(
171            close_type_col.value(1),
172            format!("{}", InstrumentCloseType::ContractExpired)
173        );
174        assert_eq!(ts_event_col.value(0), 1_000);
175    }
176
177    #[rstest]
178    fn test_encode_instrument_closes_empty() {
179        let batch = encode_instrument_closes(&[]).unwrap();
180        assert_eq!(batch.num_rows(), 0);
181    }
182
183    #[rstest]
184    fn test_encode_instrument_closes_mixed_instruments() {
185        let closes = vec![
186            make_close("AAPL.XNAS", "150.50", InstrumentCloseType::EndOfSession, 1),
187            make_close("MSFT.XNAS", "300.00", InstrumentCloseType::EndOfSession, 2),
188        ];
189        let batch = encode_instrument_closes(&closes).unwrap();
190        let instrument_id_col = batch
191            .column(0)
192            .as_any()
193            .downcast_ref::<StringArray>()
194            .unwrap();
195        assert_eq!(instrument_id_col.value(0), "AAPL.XNAS");
196        assert_eq!(instrument_id_col.value(1), "MSFT.XNAS");
197    }
198}