1use std::sync::Arc;
19
20use arrow::{
21 array::{
22 BooleanBuilder, Float64Builder, StringBuilder, TimestampNanosecondBuilder, UInt8Builder,
23 UInt32Builder, UInt64Builder,
24 },
25 datatypes::Schema,
26 error::ArrowError,
27 record_batch::RecordBatch,
28};
29use nautilus_model::position::Position;
30
31use super::{
32 bool_field, float64_field, money_to_f64, quantity_to_f64, timestamp_field, uint8_field,
33 uint32_field, uint64_field, unix_nanos_to_i64, utf8_field,
34};
35
36#[must_use]
38pub fn position_schema() -> Schema {
39 Schema::new(vec![
40 utf8_field("trader_id", false),
41 utf8_field("strategy_id", false),
42 utf8_field("instrument_id", false),
43 utf8_field("position_id", false),
44 utf8_field("account_id", false),
45 utf8_field("opening_order_id", false),
46 utf8_field("closing_order_id", true),
47 utf8_field("entry", false),
48 utf8_field("side", false),
49 float64_field("signed_qty", false),
50 float64_field("quantity", false),
51 float64_field("peak_qty", false),
52 uint8_field("price_precision", false),
53 uint8_field("size_precision", false),
54 float64_field("multiplier", false),
55 bool_field("is_inverse", false),
56 bool_field("is_currency_pair", false),
57 utf8_field("instrument_class", false),
58 utf8_field("base_currency", true),
59 utf8_field("quote_currency", false),
60 utf8_field("settlement_currency", false),
61 timestamp_field("ts_init", false),
62 timestamp_field("ts_opened", false),
63 timestamp_field("ts_last", false),
64 timestamp_field("ts_closed", true),
65 uint64_field("duration_ns", false),
66 float64_field("avg_px_open", false),
67 float64_field("avg_px_close", true),
68 float64_field("realized_return", false),
69 float64_field("realized_pnl_amount", true),
70 utf8_field("realized_pnl_currency", true),
71 utf8_field("trade_ids", false),
72 float64_field("buy_qty", false),
73 float64_field("sell_qty", false),
74 utf8_field("commissions", false),
75 uint32_field("event_count", false),
76 uint32_field("adjustment_count", false),
77 ])
78}
79
80fn trade_ids_to_json(position: &Position) -> String {
81 let mut trade_ids: Vec<String> = position.trade_ids.iter().map(ToString::to_string).collect();
82 trade_ids.sort();
83 serde_json::to_string(&trade_ids).unwrap_or_default()
84}
85
86fn commissions_to_json(position: &Position) -> String {
87 let mut commissions: Vec<(String, f64)> = position
88 .commissions
89 .iter()
90 .map(|(currency, money)| (currency.to_string(), money_to_f64(money)))
91 .collect();
92 commissions.sort_by(|lhs, rhs| lhs.0.cmp(&rhs.0));
93 serde_json::to_string(&commissions).unwrap_or_default()
94}
95
96pub fn encode_positions(data: &[Position]) -> Result<RecordBatch, ArrowError> {
111 let mut trader_id = StringBuilder::new();
112 let mut strategy_id = StringBuilder::new();
113 let mut instrument_id = StringBuilder::new();
114 let mut position_id = StringBuilder::new();
115 let mut account_id = StringBuilder::new();
116 let mut opening_order_id = StringBuilder::new();
117 let mut closing_order_id = StringBuilder::new();
118 let mut entry = StringBuilder::new();
119 let mut side = StringBuilder::new();
120 let mut signed_qty = Float64Builder::with_capacity(data.len());
121 let mut quantity = Float64Builder::with_capacity(data.len());
122 let mut peak_qty = Float64Builder::with_capacity(data.len());
123 let mut price_precision = UInt8Builder::with_capacity(data.len());
124 let mut size_precision = UInt8Builder::with_capacity(data.len());
125 let mut multiplier = Float64Builder::with_capacity(data.len());
126 let mut is_inverse = BooleanBuilder::with_capacity(data.len());
127 let mut is_currency_pair = BooleanBuilder::with_capacity(data.len());
128 let mut instrument_class = StringBuilder::new();
129 let mut base_currency = StringBuilder::new();
130 let mut quote_currency = StringBuilder::new();
131 let mut settlement_currency = StringBuilder::new();
132 let mut ts_init = TimestampNanosecondBuilder::with_capacity(data.len());
133 let mut ts_opened = TimestampNanosecondBuilder::with_capacity(data.len());
134 let mut ts_last = TimestampNanosecondBuilder::with_capacity(data.len());
135 let mut ts_closed = TimestampNanosecondBuilder::with_capacity(data.len());
136 let mut duration_ns = UInt64Builder::with_capacity(data.len());
137 let mut avg_px_open = Float64Builder::with_capacity(data.len());
138 let mut avg_px_close = Float64Builder::with_capacity(data.len());
139 let mut realized_return = Float64Builder::with_capacity(data.len());
140 let mut realized_pnl_amount = Float64Builder::with_capacity(data.len());
141 let mut realized_pnl_currency = StringBuilder::new();
142 let mut trade_ids = StringBuilder::new();
143 let mut buy_qty = Float64Builder::with_capacity(data.len());
144 let mut sell_qty = Float64Builder::with_capacity(data.len());
145 let mut commissions = StringBuilder::new();
146 let mut event_count = UInt32Builder::with_capacity(data.len());
147 let mut adjustment_count = UInt32Builder::with_capacity(data.len());
148
149 for position in data {
150 trader_id.append_value(position.trader_id);
151 strategy_id.append_value(position.strategy_id);
152 instrument_id.append_value(position.instrument_id.to_string());
153 position_id.append_value(position.id);
154 account_id.append_value(position.account_id);
155 opening_order_id.append_value(position.opening_order_id);
156 closing_order_id.append_option(position.closing_order_id.map(|v| v.to_string()));
157 entry.append_value(format!("{}", position.entry));
158 side.append_value(format!("{}", position.side));
159 signed_qty.append_value(position.signed_qty);
160 quantity.append_value(quantity_to_f64(&position.quantity));
161 peak_qty.append_value(quantity_to_f64(&position.peak_qty));
162 price_precision.append_value(position.price_precision);
163 size_precision.append_value(position.size_precision);
164 multiplier.append_value(quantity_to_f64(&position.multiplier));
165 is_inverse.append_value(position.is_inverse);
166 is_currency_pair.append_value(position.is_currency_pair);
167 instrument_class.append_value(format!("{}", position.instrument_class));
168 base_currency.append_option(position.base_currency.map(|v| v.to_string()));
169 quote_currency.append_value(position.quote_currency.to_string());
170 settlement_currency.append_value(position.settlement_currency.to_string());
171 ts_init.append_value(unix_nanos_to_i64(position.ts_init.as_u64()));
172 ts_opened.append_value(unix_nanos_to_i64(position.ts_opened.as_u64()));
173 ts_last.append_value(unix_nanos_to_i64(position.ts_last.as_u64()));
174 ts_closed.append_option(position.ts_closed.map(|v| unix_nanos_to_i64(v.as_u64())));
175 duration_ns.append_value(position.duration_ns);
176 avg_px_open.append_value(position.avg_px_open);
177 avg_px_close.append_option(position.avg_px_close);
178 realized_return.append_value(position.realized_return);
179 realized_pnl_amount.append_option(position.realized_pnl.map(|v| money_to_f64(&v)));
180 realized_pnl_currency.append_option(position.realized_pnl.map(|v| v.currency.to_string()));
181 trade_ids.append_value(trade_ids_to_json(position));
182 buy_qty.append_value(quantity_to_f64(&position.buy_qty));
183 sell_qty.append_value(quantity_to_f64(&position.sell_qty));
184 commissions.append_value(commissions_to_json(position));
185 event_count.append_value(position.events.len() as u32);
186 adjustment_count.append_value(position.adjustments.len() as u32);
187 }
188
189 RecordBatch::try_new(
190 Arc::new(position_schema()),
191 vec![
192 Arc::new(trader_id.finish()),
193 Arc::new(strategy_id.finish()),
194 Arc::new(instrument_id.finish()),
195 Arc::new(position_id.finish()),
196 Arc::new(account_id.finish()),
197 Arc::new(opening_order_id.finish()),
198 Arc::new(closing_order_id.finish()),
199 Arc::new(entry.finish()),
200 Arc::new(side.finish()),
201 Arc::new(signed_qty.finish()),
202 Arc::new(quantity.finish()),
203 Arc::new(peak_qty.finish()),
204 Arc::new(price_precision.finish()),
205 Arc::new(size_precision.finish()),
206 Arc::new(multiplier.finish()),
207 Arc::new(is_inverse.finish()),
208 Arc::new(is_currency_pair.finish()),
209 Arc::new(instrument_class.finish()),
210 Arc::new(base_currency.finish()),
211 Arc::new(quote_currency.finish()),
212 Arc::new(settlement_currency.finish()),
213 Arc::new(ts_init.finish()),
214 Arc::new(ts_opened.finish()),
215 Arc::new(ts_last.finish()),
216 Arc::new(ts_closed.finish()),
217 Arc::new(duration_ns.finish()),
218 Arc::new(avg_px_open.finish()),
219 Arc::new(avg_px_close.finish()),
220 Arc::new(realized_return.finish()),
221 Arc::new(realized_pnl_amount.finish()),
222 Arc::new(realized_pnl_currency.finish()),
223 Arc::new(trade_ids.finish()),
224 Arc::new(buy_qty.finish()),
225 Arc::new(sell_qty.finish()),
226 Arc::new(commissions.finish()),
227 Arc::new(event_count.finish()),
228 Arc::new(adjustment_count.finish()),
229 ],
230 )
231}
232
233#[cfg(test)]
234mod tests {
235 use arrow::{
236 array::{
237 Array, BooleanArray, Float64Array, StringArray, TimestampNanosecondArray, UInt8Array,
238 UInt32Array, UInt64Array,
239 },
240 datatypes::{DataType, TimeUnit},
241 };
242 use nautilus_core::UUID4;
243 use nautilus_model::{
244 enums::{LiquiditySide, OrderSide, OrderType},
245 events::OrderFilled,
246 identifiers::{
247 AccountId, ClientOrderId, PositionId, StrategyId, TradeId, TraderId, VenueOrderId,
248 },
249 instruments::{CurrencyPair, InstrumentAny, stubs::currency_pair_btcusdt},
250 types::{Money, Price, Quantity},
251 };
252 use rstest::rstest;
253
254 use super::*;
255
256 #[expect(clippy::too_many_arguments)]
257 fn make_fill(
258 instrument: &CurrencyPair,
259 side: OrderSide,
260 qty: &str,
261 price: &str,
262 trade_id: &str,
263 order_id: &str,
264 ts: u64,
265 commission: Option<Money>,
266 ) -> OrderFilled {
267 OrderFilled::new(
268 TraderId::from("TRADER-001"),
269 StrategyId::from("S-001"),
270 instrument.id,
271 ClientOrderId::from(order_id),
272 VenueOrderId::from(order_id),
273 AccountId::from("SIM-001"),
274 TradeId::from(trade_id),
275 side,
276 OrderType::Market,
277 Quantity::from(qty),
278 Price::from(price),
279 instrument.quote_currency,
280 LiquiditySide::Taker,
281 UUID4::default(),
282 ts.into(),
283 (ts + 1).into(),
284 false,
285 Some(PositionId::from("P-001")),
286 commission,
287 )
288 }
289
290 fn make_position(ts: u64) -> Position {
291 let instrument = currency_pair_btcusdt();
292 let fill = make_fill(
293 &instrument,
294 OrderSide::Buy,
295 "1.0",
296 "50000.0",
297 "T-1",
298 "O-1",
299 ts,
300 None,
301 );
302 let any = InstrumentAny::CurrencyPair(instrument);
303 Position::new(&any, fill)
304 }
305
306 #[rstest]
307 fn test_encode_positions_schema() {
308 let batch = encode_positions(&[]).unwrap();
309 let schema = batch.schema();
310 let fields = schema.fields();
311 assert_eq!(fields.len(), 37);
312 assert_eq!(fields[0].name(), "trader_id");
313 assert_eq!(fields[0].data_type(), &DataType::Utf8);
314 assert_eq!(fields[9].name(), "signed_qty");
315 assert_eq!(fields[9].data_type(), &DataType::Float64);
316 assert_eq!(fields[12].name(), "price_precision");
317 assert_eq!(fields[12].data_type(), &DataType::UInt8);
318 assert_eq!(fields[15].name(), "is_inverse");
319 assert_eq!(fields[15].data_type(), &DataType::Boolean);
320 assert_eq!(fields[21].name(), "ts_init");
321 assert_eq!(
322 fields[21].data_type(),
323 &DataType::Timestamp(TimeUnit::Nanosecond, None)
324 );
325 assert_eq!(fields[25].name(), "duration_ns");
326 assert_eq!(fields[25].data_type(), &DataType::UInt64);
327 assert_eq!(fields[35].name(), "event_count");
328 assert_eq!(fields[35].data_type(), &DataType::UInt32);
329 }
330
331 #[rstest]
332 fn test_encode_positions_empty() {
333 let batch = encode_positions(&[]).unwrap();
334 assert_eq!(batch.num_rows(), 0);
335 assert_eq!(batch.schema().fields().len(), 37);
336 }
337
338 #[rstest]
339 fn test_encode_positions_values() {
340 let positions = vec![make_position(1_000_000)];
341 let batch = encode_positions(&positions).unwrap();
342
343 assert_eq!(batch.num_rows(), 1);
344
345 let trader_id_col = batch
346 .column(0)
347 .as_any()
348 .downcast_ref::<StringArray>()
349 .unwrap();
350 let quantity_col = batch
351 .column(10)
352 .as_any()
353 .downcast_ref::<Float64Array>()
354 .unwrap();
355 let price_precision_col = batch
356 .column(12)
357 .as_any()
358 .downcast_ref::<UInt8Array>()
359 .unwrap();
360 let is_currency_pair_col = batch
361 .column(16)
362 .as_any()
363 .downcast_ref::<BooleanArray>()
364 .unwrap();
365 let ts_opened_col = batch
366 .column(22)
367 .as_any()
368 .downcast_ref::<TimestampNanosecondArray>()
369 .unwrap();
370 let duration_col = batch
371 .column(25)
372 .as_any()
373 .downcast_ref::<UInt64Array>()
374 .unwrap();
375 let event_count_col = batch
376 .column(35)
377 .as_any()
378 .downcast_ref::<UInt32Array>()
379 .unwrap();
380
381 assert_eq!(trader_id_col.value(0), "TRADER-001");
382 assert!((quantity_col.value(0) - 1.0).abs() < 1e-9);
383 assert_eq!(price_precision_col.value(0), 2);
384 assert!(is_currency_pair_col.value(0));
385 assert_eq!(ts_opened_col.value(0), 1_000_000);
386 assert_eq!(duration_col.value(0), 0);
387 assert_eq!(event_count_col.value(0), 1);
388 }
389
390 #[rstest]
391 fn test_encode_positions_nullable_fields() {
392 let positions = vec![make_position(1_000)];
393 let batch = encode_positions(&positions).unwrap();
394
395 let closing_order_id_col = batch
396 .column(6)
397 .as_any()
398 .downcast_ref::<StringArray>()
399 .unwrap();
400 let ts_closed_col = batch
401 .column(24)
402 .as_any()
403 .downcast_ref::<TimestampNanosecondArray>()
404 .unwrap();
405 let avg_px_close_col = batch
406 .column(27)
407 .as_any()
408 .downcast_ref::<Float64Array>()
409 .unwrap();
410
411 assert!(closing_order_id_col.is_null(0));
412 assert!(ts_closed_col.is_null(0));
413 assert!(avg_px_close_col.is_null(0));
414 }
415
416 #[rstest]
417 fn test_encode_positions_trade_ids_sorted() {
418 let instrument = currency_pair_btcusdt();
419 let any = InstrumentAny::CurrencyPair(instrument.clone());
420 let open = make_fill(
421 &instrument,
422 OrderSide::Buy,
423 "1.0",
424 "50000.0",
425 "T-Z",
426 "O-1",
427 1_000,
428 None,
429 );
430 let add = make_fill(
431 &instrument,
432 OrderSide::Buy,
433 "1.0",
434 "50000.0",
435 "T-A",
436 "O-2",
437 2_000,
438 None,
439 );
440 let mut position = Position::new(&any, open);
441 position.apply(&add);
442
443 let batch = encode_positions(&[position]).unwrap();
444 let trade_ids_col = batch
445 .column(31)
446 .as_any()
447 .downcast_ref::<StringArray>()
448 .unwrap();
449
450 let parsed: Vec<String> = serde_json::from_str(trade_ids_col.value(0)).unwrap();
451 assert_eq!(parsed, vec!["T-A".to_string(), "T-Z".to_string()]);
452 }
453
454 #[rstest]
455 fn test_encode_positions_closed() {
456 let instrument = currency_pair_btcusdt();
457 let any = InstrumentAny::CurrencyPair(instrument.clone());
458 let open = make_fill(
459 &instrument,
460 OrderSide::Buy,
461 "1.0",
462 "50000.0",
463 "T-1",
464 "O-1",
465 1_000,
466 None,
467 );
468 let close = make_fill(
469 &instrument,
470 OrderSide::Sell,
471 "1.0",
472 "50500.0",
473 "T-2",
474 "O-2",
475 5_000,
476 None,
477 );
478 let mut position = Position::new(&any, open);
479 position.apply(&close);
480
481 let batch = encode_positions(&[position]).unwrap();
482 let closing_order_id_col = batch
483 .column(6)
484 .as_any()
485 .downcast_ref::<StringArray>()
486 .unwrap();
487 let ts_closed_col = batch
488 .column(24)
489 .as_any()
490 .downcast_ref::<TimestampNanosecondArray>()
491 .unwrap();
492 let duration_col = batch
493 .column(25)
494 .as_any()
495 .downcast_ref::<UInt64Array>()
496 .unwrap();
497 let avg_px_close_col = batch
498 .column(27)
499 .as_any()
500 .downcast_ref::<Float64Array>()
501 .unwrap();
502 let realized_pnl_amount_col = batch
503 .column(29)
504 .as_any()
505 .downcast_ref::<Float64Array>()
506 .unwrap();
507 let realized_pnl_currency_col = batch
508 .column(30)
509 .as_any()
510 .downcast_ref::<StringArray>()
511 .unwrap();
512 let event_count_col = batch
513 .column(35)
514 .as_any()
515 .downcast_ref::<UInt32Array>()
516 .unwrap();
517
518 assert_eq!(closing_order_id_col.value(0), "O-2");
519 assert!(!ts_closed_col.is_null(0));
520 assert_eq!(ts_closed_col.value(0), 5_000);
521 assert_eq!(duration_col.value(0), 4_000);
522 assert!((avg_px_close_col.value(0) - 50_500.0).abs() < 1e-9);
523 assert!(!realized_pnl_amount_col.is_null(0));
524 assert_eq!(realized_pnl_currency_col.value(0), "USDT");
525 assert_eq!(event_count_col.value(0), 2);
526 }
527
528 #[rstest]
529 fn test_encode_positions_commissions_sorted() {
530 let instrument = currency_pair_btcusdt();
531 let any = InstrumentAny::CurrencyPair(instrument.clone());
532 let usdt_fill = make_fill(
533 &instrument,
534 OrderSide::Buy,
535 "1.0",
536 "50000.0",
537 "T-1",
538 "O-1",
539 1_000,
540 Some(Money::from("0.50 USDT")),
541 );
542 let btc_fill = make_fill(
543 &instrument,
544 OrderSide::Buy,
545 "1.0",
546 "50000.0",
547 "T-2",
548 "O-2",
549 2_000,
550 Some(Money::from("0.00001 BTC")),
551 );
552 let mut position = Position::new(&any, usdt_fill);
553 position.apply(&btc_fill);
554
555 let batch = encode_positions(&[position]).unwrap();
556 let commissions_col = batch
557 .column(34)
558 .as_any()
559 .downcast_ref::<StringArray>()
560 .unwrap();
561
562 let parsed: Vec<(String, f64)> = serde_json::from_str(commissions_col.value(0)).unwrap();
563 let currencies: Vec<&str> = parsed.iter().map(|(c, _)| c.as_str()).collect();
564 assert_eq!(currencies, vec!["BTC", "USDT"]);
565 }
566}