Skip to main content

nautilus_serialization/arrow/display/
position.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Display-mode Arrow encoder for [`Position`].
17
18use std::sync::Arc;
19
20use arrow::{
21    array::{
22        BooleanBuilder, Float64Builder, StringBuilder, TimestampNanosecondBuilder, UInt8Builder,
23        UInt32Builder, UInt64Builder,
24    },
25    datatypes::Schema,
26    error::ArrowError,
27    record_batch::RecordBatch,
28};
29use nautilus_model::position::Position;
30
31use super::{
32    bool_field, float64_field, money_to_f64, quantity_to_f64, timestamp_field, uint8_field,
33    uint32_field, uint64_field, unix_nanos_to_i64, utf8_field,
34};
35
36/// Returns the display-mode Arrow schema for [`Position`].
37#[must_use]
38pub fn position_schema() -> Schema {
39    Schema::new(vec![
40        utf8_field("trader_id", false),
41        utf8_field("strategy_id", false),
42        utf8_field("instrument_id", false),
43        utf8_field("position_id", false),
44        utf8_field("account_id", false),
45        utf8_field("opening_order_id", false),
46        utf8_field("closing_order_id", true),
47        utf8_field("entry", false),
48        utf8_field("side", false),
49        float64_field("signed_qty", false),
50        float64_field("quantity", false),
51        float64_field("peak_qty", false),
52        uint8_field("price_precision", false),
53        uint8_field("size_precision", false),
54        float64_field("multiplier", false),
55        bool_field("is_inverse", false),
56        bool_field("is_currency_pair", false),
57        utf8_field("instrument_class", false),
58        utf8_field("base_currency", true),
59        utf8_field("quote_currency", false),
60        utf8_field("settlement_currency", false),
61        timestamp_field("ts_init", false),
62        timestamp_field("ts_opened", false),
63        timestamp_field("ts_last", false),
64        timestamp_field("ts_closed", true),
65        uint64_field("duration_ns", false),
66        float64_field("avg_px_open", false),
67        float64_field("avg_px_close", true),
68        float64_field("realized_return", false),
69        float64_field("realized_pnl_amount", true),
70        utf8_field("realized_pnl_currency", true),
71        utf8_field("trade_ids", false),
72        float64_field("buy_qty", false),
73        float64_field("sell_qty", false),
74        utf8_field("commissions", false),
75        uint32_field("event_count", false),
76        uint32_field("adjustment_count", false),
77    ])
78}
79
80fn trade_ids_to_json(position: &Position) -> String {
81    let mut trade_ids: Vec<String> = position.trade_ids.iter().map(ToString::to_string).collect();
82    trade_ids.sort();
83    serde_json::to_string(&trade_ids).unwrap_or_default()
84}
85
86fn commissions_to_json(position: &Position) -> String {
87    let mut commissions: Vec<(String, f64)> = position
88        .commissions
89        .iter()
90        .map(|(currency, money)| (currency.to_string(), money_to_f64(money)))
91        .collect();
92    commissions.sort_by(|lhs, rhs| lhs.0.cmp(&rhs.0));
93    serde_json::to_string(&commissions).unwrap_or_default()
94}
95
96/// Encodes positions as a display-friendly Arrow [`RecordBatch`].
97///
98/// Emits `Utf8` columns for identifiers and enums, `Float64` columns for
99/// quantities and PnL amounts, `Timestamp(Nanosecond)` columns for all time
100/// fields, and `Boolean` columns for `is_inverse` and `is_currency_pair`.
101/// The `trade_ids` and `commissions` columns carry deterministic JSON payloads
102/// (sorted by trade id and currency respectively) so that repeated encodings
103/// of the same position produce identical bytes.
104///
105/// Returns an empty [`RecordBatch`] with the correct schema when `data` is empty.
106///
107/// # Errors
108///
109/// Returns an [`ArrowError`] if the Arrow `RecordBatch` cannot be constructed.
110pub fn encode_positions(data: &[Position]) -> Result<RecordBatch, ArrowError> {
111    let mut trader_id = StringBuilder::new();
112    let mut strategy_id = StringBuilder::new();
113    let mut instrument_id = StringBuilder::new();
114    let mut position_id = StringBuilder::new();
115    let mut account_id = StringBuilder::new();
116    let mut opening_order_id = StringBuilder::new();
117    let mut closing_order_id = StringBuilder::new();
118    let mut entry = StringBuilder::new();
119    let mut side = StringBuilder::new();
120    let mut signed_qty = Float64Builder::with_capacity(data.len());
121    let mut quantity = Float64Builder::with_capacity(data.len());
122    let mut peak_qty = Float64Builder::with_capacity(data.len());
123    let mut price_precision = UInt8Builder::with_capacity(data.len());
124    let mut size_precision = UInt8Builder::with_capacity(data.len());
125    let mut multiplier = Float64Builder::with_capacity(data.len());
126    let mut is_inverse = BooleanBuilder::with_capacity(data.len());
127    let mut is_currency_pair = BooleanBuilder::with_capacity(data.len());
128    let mut instrument_class = StringBuilder::new();
129    let mut base_currency = StringBuilder::new();
130    let mut quote_currency = StringBuilder::new();
131    let mut settlement_currency = StringBuilder::new();
132    let mut ts_init = TimestampNanosecondBuilder::with_capacity(data.len());
133    let mut ts_opened = TimestampNanosecondBuilder::with_capacity(data.len());
134    let mut ts_last = TimestampNanosecondBuilder::with_capacity(data.len());
135    let mut ts_closed = TimestampNanosecondBuilder::with_capacity(data.len());
136    let mut duration_ns = UInt64Builder::with_capacity(data.len());
137    let mut avg_px_open = Float64Builder::with_capacity(data.len());
138    let mut avg_px_close = Float64Builder::with_capacity(data.len());
139    let mut realized_return = Float64Builder::with_capacity(data.len());
140    let mut realized_pnl_amount = Float64Builder::with_capacity(data.len());
141    let mut realized_pnl_currency = StringBuilder::new();
142    let mut trade_ids = StringBuilder::new();
143    let mut buy_qty = Float64Builder::with_capacity(data.len());
144    let mut sell_qty = Float64Builder::with_capacity(data.len());
145    let mut commissions = StringBuilder::new();
146    let mut event_count = UInt32Builder::with_capacity(data.len());
147    let mut adjustment_count = UInt32Builder::with_capacity(data.len());
148
149    for position in data {
150        trader_id.append_value(position.trader_id);
151        strategy_id.append_value(position.strategy_id);
152        instrument_id.append_value(position.instrument_id.to_string());
153        position_id.append_value(position.id);
154        account_id.append_value(position.account_id);
155        opening_order_id.append_value(position.opening_order_id);
156        closing_order_id.append_option(position.closing_order_id.map(|v| v.to_string()));
157        entry.append_value(format!("{}", position.entry));
158        side.append_value(format!("{}", position.side));
159        signed_qty.append_value(position.signed_qty);
160        quantity.append_value(quantity_to_f64(&position.quantity));
161        peak_qty.append_value(quantity_to_f64(&position.peak_qty));
162        price_precision.append_value(position.price_precision);
163        size_precision.append_value(position.size_precision);
164        multiplier.append_value(quantity_to_f64(&position.multiplier));
165        is_inverse.append_value(position.is_inverse);
166        is_currency_pair.append_value(position.is_currency_pair);
167        instrument_class.append_value(format!("{}", position.instrument_class));
168        base_currency.append_option(position.base_currency.map(|v| v.to_string()));
169        quote_currency.append_value(position.quote_currency.to_string());
170        settlement_currency.append_value(position.settlement_currency.to_string());
171        ts_init.append_value(unix_nanos_to_i64(position.ts_init.as_u64()));
172        ts_opened.append_value(unix_nanos_to_i64(position.ts_opened.as_u64()));
173        ts_last.append_value(unix_nanos_to_i64(position.ts_last.as_u64()));
174        ts_closed.append_option(position.ts_closed.map(|v| unix_nanos_to_i64(v.as_u64())));
175        duration_ns.append_value(position.duration_ns);
176        avg_px_open.append_value(position.avg_px_open);
177        avg_px_close.append_option(position.avg_px_close);
178        realized_return.append_value(position.realized_return);
179        realized_pnl_amount.append_option(position.realized_pnl.map(|v| money_to_f64(&v)));
180        realized_pnl_currency.append_option(position.realized_pnl.map(|v| v.currency.to_string()));
181        trade_ids.append_value(trade_ids_to_json(position));
182        buy_qty.append_value(quantity_to_f64(&position.buy_qty));
183        sell_qty.append_value(quantity_to_f64(&position.sell_qty));
184        commissions.append_value(commissions_to_json(position));
185        event_count.append_value(position.events.len() as u32);
186        adjustment_count.append_value(position.adjustments.len() as u32);
187    }
188
189    RecordBatch::try_new(
190        Arc::new(position_schema()),
191        vec![
192            Arc::new(trader_id.finish()),
193            Arc::new(strategy_id.finish()),
194            Arc::new(instrument_id.finish()),
195            Arc::new(position_id.finish()),
196            Arc::new(account_id.finish()),
197            Arc::new(opening_order_id.finish()),
198            Arc::new(closing_order_id.finish()),
199            Arc::new(entry.finish()),
200            Arc::new(side.finish()),
201            Arc::new(signed_qty.finish()),
202            Arc::new(quantity.finish()),
203            Arc::new(peak_qty.finish()),
204            Arc::new(price_precision.finish()),
205            Arc::new(size_precision.finish()),
206            Arc::new(multiplier.finish()),
207            Arc::new(is_inverse.finish()),
208            Arc::new(is_currency_pair.finish()),
209            Arc::new(instrument_class.finish()),
210            Arc::new(base_currency.finish()),
211            Arc::new(quote_currency.finish()),
212            Arc::new(settlement_currency.finish()),
213            Arc::new(ts_init.finish()),
214            Arc::new(ts_opened.finish()),
215            Arc::new(ts_last.finish()),
216            Arc::new(ts_closed.finish()),
217            Arc::new(duration_ns.finish()),
218            Arc::new(avg_px_open.finish()),
219            Arc::new(avg_px_close.finish()),
220            Arc::new(realized_return.finish()),
221            Arc::new(realized_pnl_amount.finish()),
222            Arc::new(realized_pnl_currency.finish()),
223            Arc::new(trade_ids.finish()),
224            Arc::new(buy_qty.finish()),
225            Arc::new(sell_qty.finish()),
226            Arc::new(commissions.finish()),
227            Arc::new(event_count.finish()),
228            Arc::new(adjustment_count.finish()),
229        ],
230    )
231}
232
233#[cfg(test)]
234mod tests {
235    use arrow::{
236        array::{
237            Array, BooleanArray, Float64Array, StringArray, TimestampNanosecondArray, UInt8Array,
238            UInt32Array, UInt64Array,
239        },
240        datatypes::{DataType, TimeUnit},
241    };
242    use nautilus_core::UUID4;
243    use nautilus_model::{
244        enums::{LiquiditySide, OrderSide, OrderType},
245        events::OrderFilled,
246        identifiers::{
247            AccountId, ClientOrderId, PositionId, StrategyId, TradeId, TraderId, VenueOrderId,
248        },
249        instruments::{CurrencyPair, InstrumentAny, stubs::currency_pair_btcusdt},
250        types::{Money, Price, Quantity},
251    };
252    use rstest::rstest;
253
254    use super::*;
255
256    #[expect(clippy::too_many_arguments)]
257    fn make_fill(
258        instrument: &CurrencyPair,
259        side: OrderSide,
260        qty: &str,
261        price: &str,
262        trade_id: &str,
263        order_id: &str,
264        ts: u64,
265        commission: Option<Money>,
266    ) -> OrderFilled {
267        OrderFilled::new(
268            TraderId::from("TRADER-001"),
269            StrategyId::from("S-001"),
270            instrument.id,
271            ClientOrderId::from(order_id),
272            VenueOrderId::from(order_id),
273            AccountId::from("SIM-001"),
274            TradeId::from(trade_id),
275            side,
276            OrderType::Market,
277            Quantity::from(qty),
278            Price::from(price),
279            instrument.quote_currency,
280            LiquiditySide::Taker,
281            UUID4::default(),
282            ts.into(),
283            (ts + 1).into(),
284            false,
285            Some(PositionId::from("P-001")),
286            commission,
287        )
288    }
289
290    fn make_position(ts: u64) -> Position {
291        let instrument = currency_pair_btcusdt();
292        let fill = make_fill(
293            &instrument,
294            OrderSide::Buy,
295            "1.0",
296            "50000.0",
297            "T-1",
298            "O-1",
299            ts,
300            None,
301        );
302        let any = InstrumentAny::CurrencyPair(instrument);
303        Position::new(&any, fill)
304    }
305
306    #[rstest]
307    fn test_encode_positions_schema() {
308        let batch = encode_positions(&[]).unwrap();
309        let schema = batch.schema();
310        let fields = schema.fields();
311        assert_eq!(fields.len(), 37);
312        assert_eq!(fields[0].name(), "trader_id");
313        assert_eq!(fields[0].data_type(), &DataType::Utf8);
314        assert_eq!(fields[9].name(), "signed_qty");
315        assert_eq!(fields[9].data_type(), &DataType::Float64);
316        assert_eq!(fields[12].name(), "price_precision");
317        assert_eq!(fields[12].data_type(), &DataType::UInt8);
318        assert_eq!(fields[15].name(), "is_inverse");
319        assert_eq!(fields[15].data_type(), &DataType::Boolean);
320        assert_eq!(fields[21].name(), "ts_init");
321        assert_eq!(
322            fields[21].data_type(),
323            &DataType::Timestamp(TimeUnit::Nanosecond, None)
324        );
325        assert_eq!(fields[25].name(), "duration_ns");
326        assert_eq!(fields[25].data_type(), &DataType::UInt64);
327        assert_eq!(fields[35].name(), "event_count");
328        assert_eq!(fields[35].data_type(), &DataType::UInt32);
329    }
330
331    #[rstest]
332    fn test_encode_positions_empty() {
333        let batch = encode_positions(&[]).unwrap();
334        assert_eq!(batch.num_rows(), 0);
335        assert_eq!(batch.schema().fields().len(), 37);
336    }
337
338    #[rstest]
339    fn test_encode_positions_values() {
340        let positions = vec![make_position(1_000_000)];
341        let batch = encode_positions(&positions).unwrap();
342
343        assert_eq!(batch.num_rows(), 1);
344
345        let trader_id_col = batch
346            .column(0)
347            .as_any()
348            .downcast_ref::<StringArray>()
349            .unwrap();
350        let quantity_col = batch
351            .column(10)
352            .as_any()
353            .downcast_ref::<Float64Array>()
354            .unwrap();
355        let price_precision_col = batch
356            .column(12)
357            .as_any()
358            .downcast_ref::<UInt8Array>()
359            .unwrap();
360        let is_currency_pair_col = batch
361            .column(16)
362            .as_any()
363            .downcast_ref::<BooleanArray>()
364            .unwrap();
365        let ts_opened_col = batch
366            .column(22)
367            .as_any()
368            .downcast_ref::<TimestampNanosecondArray>()
369            .unwrap();
370        let duration_col = batch
371            .column(25)
372            .as_any()
373            .downcast_ref::<UInt64Array>()
374            .unwrap();
375        let event_count_col = batch
376            .column(35)
377            .as_any()
378            .downcast_ref::<UInt32Array>()
379            .unwrap();
380
381        assert_eq!(trader_id_col.value(0), "TRADER-001");
382        assert!((quantity_col.value(0) - 1.0).abs() < 1e-9);
383        assert_eq!(price_precision_col.value(0), 2);
384        assert!(is_currency_pair_col.value(0));
385        assert_eq!(ts_opened_col.value(0), 1_000_000);
386        assert_eq!(duration_col.value(0), 0);
387        assert_eq!(event_count_col.value(0), 1);
388    }
389
390    #[rstest]
391    fn test_encode_positions_nullable_fields() {
392        let positions = vec![make_position(1_000)];
393        let batch = encode_positions(&positions).unwrap();
394
395        let closing_order_id_col = batch
396            .column(6)
397            .as_any()
398            .downcast_ref::<StringArray>()
399            .unwrap();
400        let ts_closed_col = batch
401            .column(24)
402            .as_any()
403            .downcast_ref::<TimestampNanosecondArray>()
404            .unwrap();
405        let avg_px_close_col = batch
406            .column(27)
407            .as_any()
408            .downcast_ref::<Float64Array>()
409            .unwrap();
410
411        assert!(closing_order_id_col.is_null(0));
412        assert!(ts_closed_col.is_null(0));
413        assert!(avg_px_close_col.is_null(0));
414    }
415
416    #[rstest]
417    fn test_encode_positions_trade_ids_sorted() {
418        let instrument = currency_pair_btcusdt();
419        let any = InstrumentAny::CurrencyPair(instrument.clone());
420        let open = make_fill(
421            &instrument,
422            OrderSide::Buy,
423            "1.0",
424            "50000.0",
425            "T-Z",
426            "O-1",
427            1_000,
428            None,
429        );
430        let add = make_fill(
431            &instrument,
432            OrderSide::Buy,
433            "1.0",
434            "50000.0",
435            "T-A",
436            "O-2",
437            2_000,
438            None,
439        );
440        let mut position = Position::new(&any, open);
441        position.apply(&add);
442
443        let batch = encode_positions(&[position]).unwrap();
444        let trade_ids_col = batch
445            .column(31)
446            .as_any()
447            .downcast_ref::<StringArray>()
448            .unwrap();
449
450        let parsed: Vec<String> = serde_json::from_str(trade_ids_col.value(0)).unwrap();
451        assert_eq!(parsed, vec!["T-A".to_string(), "T-Z".to_string()]);
452    }
453
454    #[rstest]
455    fn test_encode_positions_closed() {
456        let instrument = currency_pair_btcusdt();
457        let any = InstrumentAny::CurrencyPair(instrument.clone());
458        let open = make_fill(
459            &instrument,
460            OrderSide::Buy,
461            "1.0",
462            "50000.0",
463            "T-1",
464            "O-1",
465            1_000,
466            None,
467        );
468        let close = make_fill(
469            &instrument,
470            OrderSide::Sell,
471            "1.0",
472            "50500.0",
473            "T-2",
474            "O-2",
475            5_000,
476            None,
477        );
478        let mut position = Position::new(&any, open);
479        position.apply(&close);
480
481        let batch = encode_positions(&[position]).unwrap();
482        let closing_order_id_col = batch
483            .column(6)
484            .as_any()
485            .downcast_ref::<StringArray>()
486            .unwrap();
487        let ts_closed_col = batch
488            .column(24)
489            .as_any()
490            .downcast_ref::<TimestampNanosecondArray>()
491            .unwrap();
492        let duration_col = batch
493            .column(25)
494            .as_any()
495            .downcast_ref::<UInt64Array>()
496            .unwrap();
497        let avg_px_close_col = batch
498            .column(27)
499            .as_any()
500            .downcast_ref::<Float64Array>()
501            .unwrap();
502        let realized_pnl_amount_col = batch
503            .column(29)
504            .as_any()
505            .downcast_ref::<Float64Array>()
506            .unwrap();
507        let realized_pnl_currency_col = batch
508            .column(30)
509            .as_any()
510            .downcast_ref::<StringArray>()
511            .unwrap();
512        let event_count_col = batch
513            .column(35)
514            .as_any()
515            .downcast_ref::<UInt32Array>()
516            .unwrap();
517
518        assert_eq!(closing_order_id_col.value(0), "O-2");
519        assert!(!ts_closed_col.is_null(0));
520        assert_eq!(ts_closed_col.value(0), 5_000);
521        assert_eq!(duration_col.value(0), 4_000);
522        assert!((avg_px_close_col.value(0) - 50_500.0).abs() < 1e-9);
523        assert!(!realized_pnl_amount_col.is_null(0));
524        assert_eq!(realized_pnl_currency_col.value(0), "USDT");
525        assert_eq!(event_count_col.value(0), 2);
526    }
527
528    #[rstest]
529    fn test_encode_positions_commissions_sorted() {
530        let instrument = currency_pair_btcusdt();
531        let any = InstrumentAny::CurrencyPair(instrument.clone());
532        let usdt_fill = make_fill(
533            &instrument,
534            OrderSide::Buy,
535            "1.0",
536            "50000.0",
537            "T-1",
538            "O-1",
539            1_000,
540            Some(Money::from("0.50 USDT")),
541        );
542        let btc_fill = make_fill(
543            &instrument,
544            OrderSide::Buy,
545            "1.0",
546            "50000.0",
547            "T-2",
548            "O-2",
549            2_000,
550            Some(Money::from("0.00001 BTC")),
551        );
552        let mut position = Position::new(&any, usdt_fill);
553        position.apply(&btc_fill);
554
555        let batch = encode_positions(&[position]).unwrap();
556        let commissions_col = batch
557            .column(34)
558            .as_any()
559            .downcast_ref::<StringArray>()
560            .unwrap();
561
562        let parsed: Vec<(String, f64)> = serde_json::from_str(commissions_col.value(0)).unwrap();
563        let currencies: Vec<&str> = parsed.iter().map(|(c, _)| c.as_str()).collect();
564        assert_eq!(currencies, vec!["BTC", "USDT"]);
565    }
566}