nautilus_serialization/arrow/display/
close.rs1use std::sync::Arc;
19
20use arrow::{
21 array::{Float64Builder, StringBuilder, TimestampNanosecondBuilder},
22 datatypes::Schema,
23 error::ArrowError,
24 record_batch::RecordBatch,
25};
26use nautilus_model::data::InstrumentClose;
27
28use super::{float64_field, price_to_f64, timestamp_field, unix_nanos_to_i64, utf8_field};
29
30#[must_use]
32pub fn instrument_closes_schema() -> Schema {
33 Schema::new(vec![
34 utf8_field("instrument_id", false),
35 float64_field("close_price", false),
36 utf8_field("close_type", false),
37 timestamp_field("ts_event", false),
38 timestamp_field("ts_init", false),
39 ])
40}
41
42pub fn encode_instrument_closes(data: &[InstrumentClose]) -> Result<RecordBatch, ArrowError> {
57 let mut instrument_id_builder = StringBuilder::new();
58 let mut close_price_builder = Float64Builder::with_capacity(data.len());
59 let mut close_type_builder = StringBuilder::new();
60 let mut ts_event_builder = TimestampNanosecondBuilder::with_capacity(data.len());
61 let mut ts_init_builder = TimestampNanosecondBuilder::with_capacity(data.len());
62
63 for close in data {
64 instrument_id_builder.append_value(close.instrument_id.to_string());
65 close_price_builder.append_value(price_to_f64(&close.close_price));
66 close_type_builder.append_value(format!("{}", close.close_type));
67 ts_event_builder.append_value(unix_nanos_to_i64(close.ts_event.as_u64()));
68 ts_init_builder.append_value(unix_nanos_to_i64(close.ts_init.as_u64()));
69 }
70
71 RecordBatch::try_new(
72 Arc::new(instrument_closes_schema()),
73 vec![
74 Arc::new(instrument_id_builder.finish()),
75 Arc::new(close_price_builder.finish()),
76 Arc::new(close_type_builder.finish()),
77 Arc::new(ts_event_builder.finish()),
78 Arc::new(ts_init_builder.finish()),
79 ],
80 )
81}
82
83#[cfg(test)]
84mod tests {
85 use arrow::{
86 array::{Array, Float64Array, StringArray, TimestampNanosecondArray},
87 datatypes::{DataType, TimeUnit},
88 };
89 use nautilus_model::{enums::InstrumentCloseType, identifiers::InstrumentId, types::Price};
90 use rstest::rstest;
91
92 use super::*;
93
94 fn make_close(
95 instrument_id: &str,
96 price: &str,
97 close_type: InstrumentCloseType,
98 ts: u64,
99 ) -> InstrumentClose {
100 InstrumentClose {
101 instrument_id: InstrumentId::from(instrument_id),
102 close_price: Price::from(price),
103 close_type,
104 ts_event: ts.into(),
105 ts_init: (ts + 1).into(),
106 }
107 }
108
109 #[rstest]
110 fn test_encode_instrument_closes_schema() {
111 let batch = encode_instrument_closes(&[]).unwrap();
112 let fields = batch.schema().fields().clone();
113 assert_eq!(fields.len(), 5);
114 assert_eq!(fields[0].name(), "instrument_id");
115 assert_eq!(fields[0].data_type(), &DataType::Utf8);
116 assert_eq!(fields[1].name(), "close_price");
117 assert_eq!(fields[1].data_type(), &DataType::Float64);
118 assert_eq!(fields[2].name(), "close_type");
119 assert_eq!(fields[2].data_type(), &DataType::Utf8);
120 assert_eq!(fields[3].name(), "ts_event");
121 assert_eq!(
122 fields[3].data_type(),
123 &DataType::Timestamp(TimeUnit::Nanosecond, None)
124 );
125 assert_eq!(fields[4].name(), "ts_init");
126 }
127
128 #[rstest]
129 fn test_encode_instrument_closes_values() {
130 let closes = vec![
131 make_close(
132 "AAPL.XNAS",
133 "150.50",
134 InstrumentCloseType::EndOfSession,
135 1_000,
136 ),
137 make_close(
138 "AAPL.XNAS",
139 "151.25",
140 InstrumentCloseType::ContractExpired,
141 2_000,
142 ),
143 ];
144 let batch = encode_instrument_closes(&closes).unwrap();
145
146 assert_eq!(batch.num_rows(), 2);
147
148 let close_price_col = batch
149 .column(1)
150 .as_any()
151 .downcast_ref::<Float64Array>()
152 .unwrap();
153 let close_type_col = batch
154 .column(2)
155 .as_any()
156 .downcast_ref::<StringArray>()
157 .unwrap();
158 let ts_event_col = batch
159 .column(3)
160 .as_any()
161 .downcast_ref::<TimestampNanosecondArray>()
162 .unwrap();
163
164 assert!((close_price_col.value(0) - 150.50).abs() < 1e-9);
165 assert!((close_price_col.value(1) - 151.25).abs() < 1e-9);
166 assert_eq!(
167 close_type_col.value(0),
168 format!("{}", InstrumentCloseType::EndOfSession)
169 );
170 assert_eq!(
171 close_type_col.value(1),
172 format!("{}", InstrumentCloseType::ContractExpired)
173 );
174 assert_eq!(ts_event_col.value(0), 1_000);
175 }
176
177 #[rstest]
178 fn test_encode_instrument_closes_empty() {
179 let batch = encode_instrument_closes(&[]).unwrap();
180 assert_eq!(batch.num_rows(), 0);
181 }
182
183 #[rstest]
184 fn test_encode_instrument_closes_mixed_instruments() {
185 let closes = vec![
186 make_close("AAPL.XNAS", "150.50", InstrumentCloseType::EndOfSession, 1),
187 make_close("MSFT.XNAS", "300.00", InstrumentCloseType::EndOfSession, 2),
188 ];
189 let batch = encode_instrument_closes(&closes).unwrap();
190 let instrument_id_col = batch
191 .column(0)
192 .as_any()
193 .downcast_ref::<StringArray>()
194 .unwrap();
195 assert_eq!(instrument_id_col.value(0), "AAPL.XNAS");
196 assert_eq!(instrument_id_col.value(1), "MSFT.XNAS");
197 }
198}