nautilus_serialization/arrow/
instrument_status.rs1use std::collections::HashMap;
17
18use arrow::{datatypes::Schema, error::ArrowError, record_batch::RecordBatch};
19use nautilus_model::data::{Data, InstrumentStatus};
20
21use super::{
22 ArrowSchemaProvider, DecodeDataFromRecordBatch, DecodeTypedFromRecordBatch,
23 EncodeToRecordBatch, EncodingError, KEY_INSTRUMENT_ID,
24 json::{JsonFieldSpec, decode_batch, encode_batch, metadata_for_type, schema_for_type},
25};
26
27const INSTRUMENT_STATUS_FIELDS: &[JsonFieldSpec] = &[
28 JsonFieldSpec::utf8("instrument_id", false),
29 JsonFieldSpec::utf8("action", false),
30 JsonFieldSpec::u64("ts_event", false),
31 JsonFieldSpec::u64("ts_init", false),
32 JsonFieldSpec::utf8("reason", true),
33 JsonFieldSpec::utf8("trading_event", true),
34 JsonFieldSpec::boolean("is_trading", true),
35 JsonFieldSpec::boolean("is_quoting", true),
36 JsonFieldSpec::boolean("is_short_sell_restricted", true),
37];
38
39impl ArrowSchemaProvider for InstrumentStatus {
40 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
41 schema_for_type("InstrumentStatus", metadata, INSTRUMENT_STATUS_FIELDS)
42 }
43}
44
45impl EncodeToRecordBatch for InstrumentStatus {
46 fn encode_batch(
47 metadata: &HashMap<String, String>,
48 data: &[Self],
49 ) -> Result<RecordBatch, ArrowError> {
50 encode_batch("InstrumentStatus", metadata, data, INSTRUMENT_STATUS_FIELDS)
51 }
52
53 fn metadata(&self) -> HashMap<String, String> {
54 let mut metadata = metadata_for_type("InstrumentStatus");
55 metadata.insert(
56 KEY_INSTRUMENT_ID.to_string(),
57 self.instrument_id.to_string(),
58 );
59 metadata
60 }
61}
62
63impl DecodeTypedFromRecordBatch for InstrumentStatus {
64 fn decode_typed_batch(
65 metadata: &HashMap<String, String>,
66 record_batch: RecordBatch,
67 ) -> Result<Vec<Self>, EncodingError> {
68 decode_batch(
69 metadata,
70 &record_batch,
71 INSTRUMENT_STATUS_FIELDS,
72 Some("InstrumentStatus"),
73 )
74 }
75}
76
77impl DecodeDataFromRecordBatch for InstrumentStatus {
78 fn decode_data_batch(
79 metadata: &HashMap<String, String>,
80 record_batch: RecordBatch,
81 ) -> Result<Vec<Data>, EncodingError> {
82 let items: Vec<Self> = Self::decode_typed_batch(metadata, record_batch)?;
83 Ok(items.into_iter().map(Data::from).collect())
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use nautilus_model::{enums::MarketStatusAction, identifiers::InstrumentId};
90 use rstest::rstest;
91 use ustr::Ustr;
92
93 use super::*;
94
95 #[rstest]
96 fn test_encode_decode_round_trip() {
97 let instrument_id = InstrumentId::from("AAPL.XNAS");
98 let metadata = HashMap::from([(KEY_INSTRUMENT_ID.to_string(), instrument_id.to_string())]);
99
100 let status1 = InstrumentStatus::new(
101 instrument_id,
102 MarketStatusAction::Trading,
103 1_000_000_000.into(),
104 1_000_000_001.into(),
105 Some(Ustr::from("Normal trading")),
106 Some(Ustr::from("MARKET_OPEN")),
107 Some(true),
108 Some(true),
109 Some(false),
110 );
111
112 let status2 = InstrumentStatus::new(
113 instrument_id,
114 MarketStatusAction::Halt,
115 2_000_000_000.into(),
116 2_000_000_001.into(),
117 None,
118 None,
119 None,
120 None,
121 None,
122 );
123
124 let original = vec![status1, status2];
125 let record_batch = InstrumentStatus::encode_batch(&metadata, &original).unwrap();
126 let decoded: Vec<Data> =
127 InstrumentStatus::decode_data_batch(&metadata, record_batch).unwrap();
128
129 assert_eq!(decoded.len(), original.len());
130 for (orig, dec) in original.iter().zip(decoded.iter()) {
131 match dec {
132 Data::InstrumentStatus(s) => assert_eq!(s, orig),
133 other => panic!("expected Data::InstrumentStatus, was {other:?}"),
134 }
135 }
136 }
137}