1use std::sync::Arc;
32
33use arrow::{
34 array::{
35 BooleanBuilder, Float64Builder, StringBuilder, TimestampNanosecondBuilder, UInt8Builder,
36 },
37 datatypes::Schema,
38 error::ArrowError,
39 record_batch::RecordBatch,
40};
41use nautilus_model::instruments::{Instrument, InstrumentAny};
42use rust_decimal::prelude::ToPrimitive;
43
44use super::{
45 bool_field, float64_field, money_to_f64, price_to_f64, quantity_to_f64, timestamp_field,
46 uint8_field, unix_nanos_to_i64, utf8_field,
47};
48
49#[must_use]
51pub fn instrument_schema() -> Schema {
52 Schema::new(vec![
53 utf8_field("instrument_id", false),
54 utf8_field("symbol", false),
55 utf8_field("venue", false),
56 utf8_field("instrument_type", false),
57 utf8_field("raw_symbol", false),
58 utf8_field("asset_class", false),
59 utf8_field("instrument_class", false),
60 utf8_field("underlying", true),
61 utf8_field("base_currency", true),
62 utf8_field("quote_currency", false),
63 utf8_field("settlement_currency", false),
64 utf8_field("isin", true),
65 utf8_field("option_kind", true),
66 utf8_field("exchange", true),
67 float64_field("strike_price", true),
68 timestamp_field("activation_ns", true),
69 timestamp_field("expiration_ns", true),
70 bool_field("is_inverse", false),
71 bool_field("is_quanto", false),
72 uint8_field("price_precision", false),
73 uint8_field("size_precision", false),
74 float64_field("price_increment", false),
75 float64_field("size_increment", false),
76 float64_field("multiplier", false),
77 float64_field("lot_size", true),
78 float64_field("max_quantity", true),
79 float64_field("min_quantity", true),
80 float64_field("max_notional_amount", true),
81 utf8_field("max_notional_currency", true),
82 float64_field("min_notional_amount", true),
83 utf8_field("min_notional_currency", true),
84 float64_field("max_price", true),
85 float64_field("min_price", true),
86 float64_field("margin_init", false),
87 float64_field("margin_maint", false),
88 float64_field("maker_fee", false),
89 float64_field("taker_fee", false),
90 timestamp_field("ts_event", false),
91 timestamp_field("ts_init", false),
92 ])
93}
94
95fn instrument_type_name(instrument: &InstrumentAny) -> &'static str {
97 match instrument {
98 InstrumentAny::Betting(_) => "BettingInstrument",
99 InstrumentAny::BinaryOption(_) => "BinaryOption",
100 InstrumentAny::Cfd(_) => "Cfd",
101 InstrumentAny::Commodity(_) => "Commodity",
102 InstrumentAny::CryptoFuture(_) => "CryptoFuture",
103 InstrumentAny::CryptoOption(_) => "CryptoOption",
104 InstrumentAny::CryptoPerpetual(_) => "CryptoPerpetual",
105 InstrumentAny::CurrencyPair(_) => "CurrencyPair",
106 InstrumentAny::Equity(_) => "Equity",
107 InstrumentAny::FuturesContract(_) => "FuturesContract",
108 InstrumentAny::FuturesSpread(_) => "FuturesSpread",
109 InstrumentAny::IndexInstrument(_) => "IndexInstrument",
110 InstrumentAny::OptionContract(_) => "OptionContract",
111 InstrumentAny::OptionSpread(_) => "OptionSpread",
112 InstrumentAny::PerpetualContract(_) => "PerpetualContract",
113 InstrumentAny::TokenizedAsset(_) => "TokenizedAsset",
114 }
115}
116
117pub fn encode_instruments(data: &[InstrumentAny]) -> Result<RecordBatch, ArrowError> {
134 let mut instrument_id = StringBuilder::new();
135 let mut symbol = StringBuilder::new();
136 let mut venue = StringBuilder::new();
137 let mut instrument_type = StringBuilder::new();
138 let mut raw_symbol = StringBuilder::new();
139 let mut asset_class = StringBuilder::new();
140 let mut instrument_class = StringBuilder::new();
141 let mut underlying = StringBuilder::new();
142 let mut base_currency = StringBuilder::new();
143 let mut quote_currency = StringBuilder::new();
144 let mut settlement_currency = StringBuilder::new();
145 let mut isin = StringBuilder::new();
146 let mut option_kind = StringBuilder::new();
147 let mut exchange = StringBuilder::new();
148 let mut strike_price = Float64Builder::with_capacity(data.len());
149 let mut activation_ns = TimestampNanosecondBuilder::with_capacity(data.len());
150 let mut expiration_ns = TimestampNanosecondBuilder::with_capacity(data.len());
151 let mut is_inverse = BooleanBuilder::with_capacity(data.len());
152 let mut is_quanto = BooleanBuilder::with_capacity(data.len());
153 let mut price_precision = UInt8Builder::with_capacity(data.len());
154 let mut size_precision = UInt8Builder::with_capacity(data.len());
155 let mut price_increment = Float64Builder::with_capacity(data.len());
156 let mut size_increment = Float64Builder::with_capacity(data.len());
157 let mut multiplier = Float64Builder::with_capacity(data.len());
158 let mut lot_size = Float64Builder::with_capacity(data.len());
159 let mut max_quantity = Float64Builder::with_capacity(data.len());
160 let mut min_quantity = Float64Builder::with_capacity(data.len());
161 let mut max_notional_amount = Float64Builder::with_capacity(data.len());
162 let mut max_notional_currency = StringBuilder::new();
163 let mut min_notional_amount = Float64Builder::with_capacity(data.len());
164 let mut min_notional_currency = StringBuilder::new();
165 let mut max_price = Float64Builder::with_capacity(data.len());
166 let mut min_price = Float64Builder::with_capacity(data.len());
167 let mut margin_init = Float64Builder::with_capacity(data.len());
168 let mut margin_maint = Float64Builder::with_capacity(data.len());
169 let mut maker_fee = Float64Builder::with_capacity(data.len());
170 let mut taker_fee = Float64Builder::with_capacity(data.len());
171 let mut ts_event = TimestampNanosecondBuilder::with_capacity(data.len());
172 let mut ts_init = TimestampNanosecondBuilder::with_capacity(data.len());
173
174 for instrument in data {
175 instrument_id.append_value(instrument.id().to_string());
176 symbol.append_value(instrument.symbol());
177 venue.append_value(instrument.venue());
178 instrument_type.append_value(instrument_type_name(instrument));
179 raw_symbol.append_value(instrument.raw_symbol());
180 asset_class.append_value(format!("{}", instrument.asset_class()));
181 instrument_class.append_value(format!("{}", instrument.instrument_class()));
182 underlying.append_option(instrument.underlying().map(|v| v.to_string()));
183 base_currency.append_option(instrument.base_currency().map(|v| v.to_string()));
184 quote_currency.append_value(instrument.quote_currency().to_string());
185 settlement_currency.append_value(instrument.settlement_currency().to_string());
186 isin.append_option(instrument.isin().map(|v| v.to_string()));
187 option_kind.append_option(instrument.option_kind().map(|v| format!("{v}")));
188 exchange.append_option(instrument.exchange().map(|v| v.to_string()));
189 strike_price.append_option(instrument.strike_price().map(|v| price_to_f64(&v)));
190 activation_ns.append_option(
191 instrument
192 .activation_ns()
193 .map(|v| unix_nanos_to_i64(v.as_u64())),
194 );
195 expiration_ns.append_option(
196 instrument
197 .expiration_ns()
198 .map(|v| unix_nanos_to_i64(v.as_u64())),
199 );
200 is_inverse.append_value(instrument.is_inverse());
201 is_quanto.append_value(instrument.is_quanto());
202 price_precision.append_value(instrument.price_precision());
203 size_precision.append_value(instrument.size_precision());
204 price_increment.append_value(price_to_f64(&instrument.price_increment()));
205 size_increment.append_value(quantity_to_f64(&instrument.size_increment()));
206 multiplier.append_value(quantity_to_f64(&instrument.multiplier()));
207 lot_size.append_option(instrument.lot_size().map(|v| quantity_to_f64(&v)));
208 max_quantity.append_option(instrument.max_quantity().map(|v| quantity_to_f64(&v)));
209 min_quantity.append_option(instrument.min_quantity().map(|v| quantity_to_f64(&v)));
210 max_notional_amount.append_option(instrument.max_notional().map(|v| money_to_f64(&v)));
211 max_notional_currency
212 .append_option(instrument.max_notional().map(|v| v.currency.to_string()));
213 min_notional_amount.append_option(instrument.min_notional().map(|v| money_to_f64(&v)));
214 min_notional_currency
215 .append_option(instrument.min_notional().map(|v| v.currency.to_string()));
216 max_price.append_option(instrument.max_price().map(|v| price_to_f64(&v)));
217 min_price.append_option(instrument.min_price().map(|v| price_to_f64(&v)));
218 margin_init.append_value(instrument.margin_init().to_f64().unwrap_or(f64::NAN));
219 margin_maint.append_value(instrument.margin_maint().to_f64().unwrap_or(f64::NAN));
220 maker_fee.append_value(instrument.maker_fee().to_f64().unwrap_or(f64::NAN));
221 taker_fee.append_value(instrument.taker_fee().to_f64().unwrap_or(f64::NAN));
222 ts_event.append_value(unix_nanos_to_i64(instrument.ts_event().as_u64()));
223 ts_init.append_value(unix_nanos_to_i64(instrument.ts_init().as_u64()));
224 }
225
226 RecordBatch::try_new(
227 Arc::new(instrument_schema()),
228 vec![
229 Arc::new(instrument_id.finish()),
230 Arc::new(symbol.finish()),
231 Arc::new(venue.finish()),
232 Arc::new(instrument_type.finish()),
233 Arc::new(raw_symbol.finish()),
234 Arc::new(asset_class.finish()),
235 Arc::new(instrument_class.finish()),
236 Arc::new(underlying.finish()),
237 Arc::new(base_currency.finish()),
238 Arc::new(quote_currency.finish()),
239 Arc::new(settlement_currency.finish()),
240 Arc::new(isin.finish()),
241 Arc::new(option_kind.finish()),
242 Arc::new(exchange.finish()),
243 Arc::new(strike_price.finish()),
244 Arc::new(activation_ns.finish()),
245 Arc::new(expiration_ns.finish()),
246 Arc::new(is_inverse.finish()),
247 Arc::new(is_quanto.finish()),
248 Arc::new(price_precision.finish()),
249 Arc::new(size_precision.finish()),
250 Arc::new(price_increment.finish()),
251 Arc::new(size_increment.finish()),
252 Arc::new(multiplier.finish()),
253 Arc::new(lot_size.finish()),
254 Arc::new(max_quantity.finish()),
255 Arc::new(min_quantity.finish()),
256 Arc::new(max_notional_amount.finish()),
257 Arc::new(max_notional_currency.finish()),
258 Arc::new(min_notional_amount.finish()),
259 Arc::new(min_notional_currency.finish()),
260 Arc::new(max_price.finish()),
261 Arc::new(min_price.finish()),
262 Arc::new(margin_init.finish()),
263 Arc::new(margin_maint.finish()),
264 Arc::new(maker_fee.finish()),
265 Arc::new(taker_fee.finish()),
266 Arc::new(ts_event.finish()),
267 Arc::new(ts_init.finish()),
268 ],
269 )
270}
271
272#[cfg(test)]
273mod tests {
274 use arrow::{
275 array::{Array, BooleanArray, Float64Array, StringArray, TimestampNanosecondArray},
276 datatypes::{DataType, TimeUnit},
277 };
278 use nautilus_model::{
279 instruments::{
280 InstrumentAny,
281 stubs::{
282 betting, binary_option, cfd_gold, commodity_gold, crypto_future_btcusdt,
283 crypto_option_btc_deribit, crypto_perpetual_ethusdt, currency_pair_btcusdt,
284 equity_aapl, futures_contract_es, futures_spread_es, index_instrument_spx,
285 option_contract_appl, option_spread, perpetual_contract_eurusd,
286 tokenized_asset_aaplx, xbtusd_bitmex,
287 },
288 },
289 types::{Price, Quantity},
290 };
291 use rstest::rstest;
292
293 use super::*;
294
295 fn spot() -> InstrumentAny {
296 InstrumentAny::CurrencyPair(currency_pair_btcusdt())
297 }
298
299 fn all_variants() -> Vec<(InstrumentAny, &'static str)> {
300 vec![
301 (InstrumentAny::Betting(betting()), "BettingInstrument"),
302 (InstrumentAny::BinaryOption(binary_option()), "BinaryOption"),
303 (InstrumentAny::Cfd(cfd_gold()), "Cfd"),
304 (InstrumentAny::Commodity(commodity_gold()), "Commodity"),
305 (
306 InstrumentAny::CryptoFuture(crypto_future_btcusdt(
307 2,
308 6,
309 Price::from("0.01"),
310 Quantity::from("0.000001"),
311 )),
312 "CryptoFuture",
313 ),
314 (
315 InstrumentAny::CryptoOption(crypto_option_btc_deribit(
316 3,
317 1,
318 Price::from("0.001"),
319 Quantity::from("0.1"),
320 )),
321 "CryptoOption",
322 ),
323 (
324 InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt()),
325 "CryptoPerpetual",
326 ),
327 (
328 InstrumentAny::CurrencyPair(currency_pair_btcusdt()),
329 "CurrencyPair",
330 ),
331 (InstrumentAny::Equity(equity_aapl()), "Equity"),
332 (
333 InstrumentAny::FuturesContract(futures_contract_es(None, None)),
334 "FuturesContract",
335 ),
336 (
337 InstrumentAny::FuturesSpread(futures_spread_es()),
338 "FuturesSpread",
339 ),
340 (
341 InstrumentAny::IndexInstrument(index_instrument_spx()),
342 "IndexInstrument",
343 ),
344 (
345 InstrumentAny::OptionContract(option_contract_appl()),
346 "OptionContract",
347 ),
348 (InstrumentAny::OptionSpread(option_spread()), "OptionSpread"),
349 (
350 InstrumentAny::PerpetualContract(perpetual_contract_eurusd()),
351 "PerpetualContract",
352 ),
353 (
354 InstrumentAny::TokenizedAsset(tokenized_asset_aaplx()),
355 "TokenizedAsset",
356 ),
357 ]
358 }
359
360 #[rstest]
361 fn test_encode_instruments_schema() {
362 let batch = encode_instruments(&[]).unwrap();
363 let schema = batch.schema();
364 let fields = schema.fields();
365 assert_eq!(fields.len(), 39);
366 assert_eq!(fields[0].name(), "instrument_id");
367 assert_eq!(fields[0].data_type(), &DataType::Utf8);
368 assert_eq!(fields[14].name(), "strike_price");
369 assert_eq!(fields[14].data_type(), &DataType::Float64);
370 assert_eq!(fields[17].name(), "is_inverse");
371 assert_eq!(fields[17].data_type(), &DataType::Boolean);
372 assert_eq!(fields[19].name(), "price_precision");
373 assert_eq!(fields[19].data_type(), &DataType::UInt8);
374 assert_eq!(fields[33].name(), "margin_init");
375 assert_eq!(fields[33].data_type(), &DataType::Float64);
376 assert_eq!(fields[36].name(), "taker_fee");
377 assert_eq!(fields[37].name(), "ts_event");
378 assert_eq!(
379 fields[37].data_type(),
380 &DataType::Timestamp(TimeUnit::Nanosecond, None)
381 );
382 }
383
384 #[rstest]
385 fn test_encode_instruments_empty() {
386 let batch = encode_instruments(&[]).unwrap();
387 assert_eq!(batch.num_rows(), 0);
388 assert_eq!(batch.schema().fields().len(), 39);
389 }
390
391 #[rstest]
392 fn test_encode_instruments_spot_values() {
393 let instruments = vec![spot()];
394 let batch = encode_instruments(&instruments).unwrap();
395
396 assert_eq!(batch.num_rows(), 1);
397
398 let instrument_type_col = batch
399 .column(3)
400 .as_any()
401 .downcast_ref::<StringArray>()
402 .unwrap();
403 let strike_price_col = batch
404 .column(14)
405 .as_any()
406 .downcast_ref::<Float64Array>()
407 .unwrap();
408 let activation_col = batch
409 .column(15)
410 .as_any()
411 .downcast_ref::<TimestampNanosecondArray>()
412 .unwrap();
413 let expiration_col = batch
414 .column(16)
415 .as_any()
416 .downcast_ref::<TimestampNanosecondArray>()
417 .unwrap();
418 let is_inverse_col = batch
419 .column(17)
420 .as_any()
421 .downcast_ref::<BooleanArray>()
422 .unwrap();
423 let price_increment_col = batch
424 .column(21)
425 .as_any()
426 .downcast_ref::<Float64Array>()
427 .unwrap();
428
429 assert_eq!(instrument_type_col.value(0), "CurrencyPair");
430 assert!(strike_price_col.is_null(0));
431 assert!(activation_col.is_null(0));
432 assert!(expiration_col.is_null(0));
433 assert!(!is_inverse_col.value(0));
434 assert!(price_increment_col.value(0) > 0.0);
435 }
436
437 #[rstest]
438 fn test_encode_instruments_mixed_variants_preserves_per_row_nulls() {
439 let instruments = vec![
440 spot(),
441 InstrumentAny::Equity(equity_aapl()),
442 InstrumentAny::OptionContract(option_contract_appl()),
443 ];
444 let batch = encode_instruments(&instruments).unwrap();
445
446 assert_eq!(batch.num_rows(), 3);
447
448 let instrument_type_col = batch
449 .column(3)
450 .as_any()
451 .downcast_ref::<StringArray>()
452 .unwrap();
453 let strike_price_col = batch
454 .column(14)
455 .as_any()
456 .downcast_ref::<Float64Array>()
457 .unwrap();
458 let expiration_col = batch
459 .column(16)
460 .as_any()
461 .downcast_ref::<TimestampNanosecondArray>()
462 .unwrap();
463 let base_currency_col = batch
464 .column(8)
465 .as_any()
466 .downcast_ref::<StringArray>()
467 .unwrap();
468
469 assert_eq!(instrument_type_col.value(0), "CurrencyPair");
470 assert_eq!(instrument_type_col.value(1), "Equity");
471 assert_eq!(instrument_type_col.value(2), "OptionContract");
472
473 assert!(strike_price_col.is_null(0));
475 assert!(strike_price_col.is_null(1));
476 assert!(!strike_price_col.is_null(2));
477 assert!(expiration_col.is_null(0));
478 assert!(expiration_col.is_null(1));
479 assert!(!expiration_col.is_null(2));
480
481 assert!(!base_currency_col.is_null(0));
483 assert!(base_currency_col.is_null(1));
484 }
485
486 #[rstest]
487 fn test_encode_instruments_shared_schema_across_batches() {
488 let a = encode_instruments(&[spot()]).unwrap();
489 let b = encode_instruments(&[InstrumentAny::Equity(equity_aapl())]).unwrap();
490 assert_eq!(a.schema(), b.schema());
491 }
492
493 #[rstest]
494 fn test_encode_instruments_all_variant_names() {
495 let variants = all_variants();
496 assert_eq!(variants.len(), 16, "all InstrumentAny variants covered");
497
498 let instruments: Vec<InstrumentAny> = variants.iter().map(|(v, _)| v.clone()).collect();
499 let batch = encode_instruments(&instruments).unwrap();
500 let instrument_type_col = batch
501 .column(3)
502 .as_any()
503 .downcast_ref::<StringArray>()
504 .unwrap();
505
506 for (row, (_, expected)) in variants.iter().enumerate() {
507 assert_eq!(instrument_type_col.value(row), *expected);
508 }
509 }
510
511 #[rstest]
512 fn test_encode_instruments_inverse_perpetual() {
513 let instruments = vec![InstrumentAny::CryptoPerpetual(xbtusd_bitmex())];
514 let batch = encode_instruments(&instruments).unwrap();
515
516 let instrument_type_col = batch
517 .column(3)
518 .as_any()
519 .downcast_ref::<StringArray>()
520 .unwrap();
521 let settlement_currency_col = batch
522 .column(10)
523 .as_any()
524 .downcast_ref::<StringArray>()
525 .unwrap();
526 let is_inverse_col = batch
527 .column(17)
528 .as_any()
529 .downcast_ref::<BooleanArray>()
530 .unwrap();
531 let max_notional_amount_col = batch
532 .column(27)
533 .as_any()
534 .downcast_ref::<Float64Array>()
535 .unwrap();
536 let max_notional_currency_col = batch
537 .column(28)
538 .as_any()
539 .downcast_ref::<StringArray>()
540 .unwrap();
541 let min_notional_amount_col = batch
542 .column(29)
543 .as_any()
544 .downcast_ref::<Float64Array>()
545 .unwrap();
546 let min_notional_currency_col = batch
547 .column(30)
548 .as_any()
549 .downcast_ref::<StringArray>()
550 .unwrap();
551
552 assert_eq!(instrument_type_col.value(0), "CryptoPerpetual");
553 assert_eq!(settlement_currency_col.value(0), "BTC");
554 assert!(is_inverse_col.value(0));
555 assert!((max_notional_amount_col.value(0) - 10_000_000.0).abs() < 1e-9);
556 assert_eq!(max_notional_currency_col.value(0), "USD");
557 assert!((min_notional_amount_col.value(0) - 1.0).abs() < 1e-9);
558 assert_eq!(min_notional_currency_col.value(0), "USD");
559
560 let margin_init_col = batch
561 .column(33)
562 .as_any()
563 .downcast_ref::<Float64Array>()
564 .unwrap();
565 let margin_maint_col = batch
566 .column(34)
567 .as_any()
568 .downcast_ref::<Float64Array>()
569 .unwrap();
570 let maker_fee_col = batch
571 .column(35)
572 .as_any()
573 .downcast_ref::<Float64Array>()
574 .unwrap();
575 let taker_fee_col = batch
576 .column(36)
577 .as_any()
578 .downcast_ref::<Float64Array>()
579 .unwrap();
580
581 assert!((margin_init_col.value(0) - 0.01).abs() < 1e-9);
582 assert!((margin_maint_col.value(0) - 0.0035).abs() < 1e-9);
583 assert!((maker_fee_col.value(0) - (-0.00025)).abs() < 1e-9);
584 assert!((taker_fee_col.value(0) - 0.00075).abs() < 1e-9);
585 }
586}