nautilus_serialization/sbe/
cursor.rs1use std::str;
19
20use super::error::{MAX_GROUP_SIZE, SbeDecodeError};
21
22#[derive(Debug, Clone)]
27pub struct SbeCursor<'a> {
28 buf: &'a [u8],
29 pos: usize,
30}
31
32impl<'a> SbeCursor<'a> {
33 #[must_use]
35 pub const fn new(buf: &'a [u8]) -> Self {
36 Self { buf, pos: 0 }
37 }
38
39 #[must_use]
41 pub const fn new_at(buf: &'a [u8], pos: usize) -> Self {
42 Self { buf, pos }
43 }
44
45 #[must_use]
47 pub const fn pos(&self) -> usize {
48 self.pos
49 }
50
51 #[must_use]
53 pub const fn remaining(&self) -> usize {
54 self.buf.len().saturating_sub(self.pos)
55 }
56
57 #[must_use]
59 pub const fn buffer(&self) -> &'a [u8] {
60 self.buf
61 }
62
63 #[must_use]
65 pub fn peek(&self) -> &'a [u8] {
66 &self.buf[self.pos..]
67 }
68
69 #[inline]
75 pub fn require(&self, n: usize) -> Result<(), SbeDecodeError> {
76 if self.remaining() < n {
77 return Err(SbeDecodeError::BufferTooShort {
78 expected: self.pos + n,
79 actual: self.buf.len(),
80 });
81 }
82 Ok(())
83 }
84
85 #[inline]
91 pub fn advance(&mut self, n: usize) -> Result<(), SbeDecodeError> {
92 self.require(n)?;
93 self.pos += n;
94 Ok(())
95 }
96
97 #[inline]
101 pub fn skip(&mut self, n: usize) {
102 self.pos += n;
103 }
104
105 pub fn reset(&mut self) {
107 self.pos = 0;
108 }
109
110 pub fn set_pos(&mut self, pos: usize) {
112 self.pos = pos;
113 }
114
115 #[inline]
121 pub fn read_u8(&mut self) -> Result<u8, SbeDecodeError> {
122 self.require(1)?;
123 let value = self.buf[self.pos];
124 self.pos += 1;
125 Ok(value)
126 }
127
128 #[inline]
134 pub fn read_i8(&mut self) -> Result<i8, SbeDecodeError> {
135 self.require(1)?;
136 let value = self.buf[self.pos] as i8;
137 self.pos += 1;
138 Ok(value)
139 }
140
141 #[inline]
147 pub fn read_u16_le(&mut self) -> Result<u16, SbeDecodeError> {
148 Ok(u16::from_le_bytes(self.read_array::<2>()?))
149 }
150
151 #[inline]
157 pub fn read_i16_le(&mut self) -> Result<i16, SbeDecodeError> {
158 Ok(i16::from_le_bytes(self.read_array::<2>()?))
159 }
160
161 #[inline]
167 pub fn read_u32_le(&mut self) -> Result<u32, SbeDecodeError> {
168 Ok(u32::from_le_bytes(self.read_array::<4>()?))
169 }
170
171 #[inline]
177 pub fn read_i32_le(&mut self) -> Result<i32, SbeDecodeError> {
178 Ok(i32::from_le_bytes(self.read_array::<4>()?))
179 }
180
181 #[inline]
187 pub fn read_u64_le(&mut self) -> Result<u64, SbeDecodeError> {
188 Ok(u64::from_le_bytes(self.read_array::<8>()?))
189 }
190
191 #[inline]
197 pub fn read_i64_le(&mut self) -> Result<i64, SbeDecodeError> {
198 Ok(i64::from_le_bytes(self.read_array::<8>()?))
199 }
200
201 #[inline]
207 pub fn read_u128_le(&mut self) -> Result<u128, SbeDecodeError> {
208 Ok(u128::from_le_bytes(self.read_array::<16>()?))
209 }
210
211 #[inline]
217 pub fn read_i128_le(&mut self) -> Result<i128, SbeDecodeError> {
218 Ok(i128::from_le_bytes(self.read_array::<16>()?))
219 }
220
221 #[inline]
227 pub fn read_optional_i64_le(&mut self) -> Result<Option<i64>, SbeDecodeError> {
228 let value = self.read_i64_le()?;
229 Ok(if value == i64::MIN { None } else { Some(value) })
230 }
231
232 #[inline]
238 pub fn read_bytes(&mut self, n: usize) -> Result<&'a [u8], SbeDecodeError> {
239 self.require(n)?;
240 let slice = &self.buf[self.pos..self.pos + n];
241 self.pos += n;
242 Ok(slice)
243 }
244
245 #[inline]
249 fn read_array<const N: usize>(&mut self) -> Result<[u8; N], SbeDecodeError> {
250 self.require(N)?;
251 let bytes: [u8; N] = self.buf[self.pos..self.pos + N]
252 .try_into()
253 .expect("slice length matches N");
254 self.pos += N;
255 Ok(bytes)
256 }
257
258 #[inline]
267 pub fn read_var_string8(&mut self) -> Result<String, SbeDecodeError> {
268 let len = self.read_u8()? as usize;
269 if len == 0 {
270 return Ok(String::new());
271 }
272 self.require(len)?;
273 let s = str::from_utf8(&self.buf[self.pos..self.pos + len])
274 .map_err(|_| SbeDecodeError::InvalidUtf8)?
275 .to_string();
276 self.pos += len;
277 Ok(s)
278 }
279
280 #[inline]
287 pub fn read_var_string8_ref(&mut self) -> Result<&'a str, SbeDecodeError> {
288 let len = self.read_u8()? as usize;
289 if len == 0 {
290 return Ok("");
291 }
292 self.require(len)?;
293 let s = str::from_utf8(&self.buf[self.pos..self.pos + len])
294 .map_err(|_| SbeDecodeError::InvalidUtf8)?;
295 self.pos += len;
296 Ok(s)
297 }
298
299 #[inline]
308 pub fn read_var_string16(&mut self) -> Result<String, SbeDecodeError> {
309 let len = usize::from(self.read_u16_le()?);
310 if len == 0 {
311 return Ok(String::new());
312 }
313 self.require(len)?;
314 let s = str::from_utf8(&self.buf[self.pos..self.pos + len])
315 .map_err(|_| SbeDecodeError::InvalidUtf8)?
316 .to_string();
317 self.pos += len;
318 Ok(s)
319 }
320
321 #[inline]
328 pub fn read_var_string16_ref(&mut self) -> Result<&'a str, SbeDecodeError> {
329 let len = usize::from(self.read_u16_le()?);
330 if len == 0 {
331 return Ok("");
332 }
333 self.require(len)?;
334 let s = str::from_utf8(&self.buf[self.pos..self.pos + len])
335 .map_err(|_| SbeDecodeError::InvalidUtf8)?;
336 self.pos += len;
337 Ok(s)
338 }
339
340 pub fn skip_var_data8(&mut self) -> Result<(), SbeDecodeError> {
348 let len = self.read_u8()? as usize;
349 if len > 0 {
350 self.advance(len)?;
351 }
352 Ok(())
353 }
354
355 pub fn read_var_bytes8(&mut self) -> Result<Vec<u8>, SbeDecodeError> {
363 let len = self.read_u8()? as usize;
364 if len == 0 {
365 return Ok(Vec::new());
366 }
367 self.require(len)?;
368 let bytes = self.buf[self.pos..self.pos + len].to_vec();
369 self.pos += len;
370 Ok(bytes)
371 }
372
373 pub fn skip_var_data16(&mut self) -> Result<(), SbeDecodeError> {
379 let len = usize::from(self.read_u16_le()?);
380 if len > 0 {
381 self.advance(len)?;
382 }
383 Ok(())
384 }
385
386 pub fn read_var_bytes16(&mut self) -> Result<Vec<u8>, SbeDecodeError> {
394 let len = usize::from(self.read_u16_le()?);
395 if len == 0 {
396 return Ok(Vec::new());
397 }
398 self.require(len)?;
399 let bytes = self.buf[self.pos..self.pos + len].to_vec();
400 self.pos += len;
401 Ok(bytes)
402 }
403
404 #[inline]
413 pub fn read_group_header(&mut self) -> Result<(u16, u32), SbeDecodeError> {
414 let block_length = self.read_u16_le()?;
415 let num_in_group = self.read_u32_le()?;
416
417 if num_in_group > MAX_GROUP_SIZE {
418 return Err(SbeDecodeError::GroupSizeTooLarge {
419 count: num_in_group,
420 max: MAX_GROUP_SIZE,
421 });
422 }
423
424 Ok((block_length, num_in_group))
425 }
426
427 #[inline]
436 pub fn read_group_header_16(&mut self) -> Result<(u16, u16), SbeDecodeError> {
437 let block_length = self.read_u16_le()?;
438 let num_in_group = self.read_u16_le()?;
439
440 if u32::from(num_in_group) > MAX_GROUP_SIZE {
441 return Err(SbeDecodeError::GroupSizeTooLarge {
442 count: u32::from(num_in_group),
443 max: MAX_GROUP_SIZE,
444 });
445 }
446
447 Ok((block_length, num_in_group))
448 }
449
450 pub fn read_group<T, F>(
460 &mut self,
461 block_length: u16,
462 num_in_group: u32,
463 mut decode_item: F,
464 ) -> Result<Vec<T>, SbeDecodeError>
465 where
466 F: FnMut(&mut Self) -> Result<T, SbeDecodeError>,
467 {
468 let block_len = block_length as usize;
469 let count = num_in_group as usize;
470
471 self.require(count * block_len)?;
473
474 let mut items = Vec::with_capacity(count);
475 for _ in 0..count {
476 let item_start = self.pos;
477 let item = decode_item(self)?;
478 items.push(item);
479
480 self.pos = item_start + block_len;
482 }
483
484 Ok(items)
485 }
486}
487
488#[cfg(test)]
489mod tests {
490 use rstest::rstest;
491
492 use super::*;
493
494 #[rstest]
495 fn test_new_starts_at_zero() {
496 let buf = [1, 2, 3, 4];
497 let cursor = SbeCursor::new(&buf);
498 assert_eq!(cursor.pos(), 0);
499 assert_eq!(cursor.remaining(), 4);
500 }
501
502 #[rstest]
503 fn test_new_at_starts_at_offset() {
504 let buf = [1, 2, 3, 4];
505 let cursor = SbeCursor::new_at(&buf, 2);
506 assert_eq!(cursor.pos(), 2);
507 assert_eq!(cursor.remaining(), 2);
508 }
509
510 #[rstest]
511 fn test_read_u8() {
512 let buf = [0x42, 0xFF];
513 let mut cursor = SbeCursor::new(&buf);
514
515 assert_eq!(cursor.read_u8().unwrap(), 0x42);
516 assert_eq!(cursor.pos(), 1);
517
518 assert_eq!(cursor.read_u8().unwrap(), 0xFF);
519 assert_eq!(cursor.pos(), 2);
520
521 assert!(cursor.read_u8().is_err());
522 }
523
524 #[rstest]
525 fn test_read_i8() {
526 let buf = [0x7F, 0x80]; let mut cursor = SbeCursor::new(&buf);
528
529 assert_eq!(cursor.read_i8().unwrap(), 127);
530 assert_eq!(cursor.read_i8().unwrap(), -128);
531 }
532
533 #[rstest]
534 fn test_read_u16_le() {
535 let buf = [0x34, 0x12]; let mut cursor = SbeCursor::new(&buf);
537
538 assert_eq!(cursor.read_u16_le().unwrap(), 0x1234);
539 assert_eq!(cursor.pos(), 2);
540 }
541
542 #[rstest]
543 fn test_read_i64_le() {
544 let value: i64 = -1_234_567_890_123_456_789;
545 let buf = value.to_le_bytes();
546 let mut cursor = SbeCursor::new(&buf);
547
548 assert_eq!(cursor.read_i64_le().unwrap(), value);
549 assert_eq!(cursor.pos(), 8);
550 }
551
552 #[rstest]
553 #[case::u16(&[0x34][..], 2)]
554 #[case::u32(&[0x34, 0x12, 0x00][..], 4)]
555 #[case::u64(&[0; 7][..], 8)]
556 #[case::u128(&[0; 15][..], 16)]
557 fn test_multi_byte_reads_buffer_too_short(#[case] buf: &[u8], #[case] needed: usize) {
558 let mut cursor = SbeCursor::new(buf);
559 let err = match needed {
560 2 => cursor.read_u16_le().map(|_| ()).unwrap_err(),
561 4 => cursor.read_u32_le().map(|_| ()).unwrap_err(),
562 8 => cursor.read_u64_le().map(|_| ()).unwrap_err(),
563 16 => cursor.read_u128_le().map(|_| ()).unwrap_err(),
564 _ => unreachable!(),
565 };
566 assert!(matches!(err, SbeDecodeError::BufferTooShort { .. }));
567 assert_eq!(cursor.pos(), 0, "position must not advance on error");
568 }
569
570 #[rstest]
571 fn test_read_optional_i64_null() {
572 let buf = i64::MIN.to_le_bytes();
573 let mut cursor = SbeCursor::new(&buf);
574
575 assert_eq!(cursor.read_optional_i64_le().unwrap(), None);
576 }
577
578 #[rstest]
579 fn test_read_optional_i64_present() {
580 let value: i64 = 12345;
581 let buf = value.to_le_bytes();
582 let mut cursor = SbeCursor::new(&buf);
583
584 assert_eq!(cursor.read_optional_i64_le().unwrap(), Some(12345));
585 }
586
587 #[rstest]
588 fn test_read_var_string8() {
589 let mut buf = vec![5]; buf.extend_from_slice(b"hello");
591 let mut cursor = SbeCursor::new(&buf);
592
593 assert_eq!(cursor.read_var_string8().unwrap(), "hello");
594 assert_eq!(cursor.pos(), 6); }
596
597 #[rstest]
598 fn test_read_var_string8_empty() {
599 let buf = [0]; let mut cursor = SbeCursor::new(&buf);
601
602 assert_eq!(cursor.read_var_string8().unwrap(), "");
603 assert_eq!(cursor.pos(), 1);
604 }
605
606 #[rstest]
607 fn test_read_var_string8_invalid_utf8() {
608 let buf = [2, 0xFF, 0xFE]; let mut cursor = SbeCursor::new(&buf);
610
611 assert!(matches!(
612 cursor.read_var_string8(),
613 Err(SbeDecodeError::InvalidUtf8)
614 ));
615 }
616
617 #[rstest]
618 fn test_read_group_header() {
619 let buf = [24, 0, 3, 0, 0, 0];
621 let mut cursor = SbeCursor::new(&buf);
622
623 let (block_len, count) = cursor.read_group_header().unwrap();
624 assert_eq!(block_len, 24);
625 assert_eq!(count, 3);
626 assert_eq!(cursor.pos(), 6);
627 }
628
629 #[rstest]
630 fn test_read_group_header_too_large() {
631 let count = MAX_GROUP_SIZE + 1;
633 let mut buf = vec![24, 0]; buf.extend_from_slice(&count.to_le_bytes());
635 let mut cursor = SbeCursor::new(&buf);
636
637 assert!(matches!(
638 cursor.read_group_header(),
639 Err(SbeDecodeError::GroupSizeTooLarge { .. })
640 ));
641 }
642
643 #[rstest]
644 fn test_read_group() {
645 let mut buf = Vec::new();
647 buf.extend_from_slice(&100u32.to_le_bytes()); buf.extend_from_slice(&200u32.to_le_bytes()); let mut cursor = SbeCursor::new(&buf);
651 let items: Vec<u32> = cursor
652 .read_group(4, 2, super::SbeCursor::read_u32_le)
653 .unwrap();
654
655 assert_eq!(items, vec![100, 200]);
656 assert_eq!(cursor.pos(), 8);
657 }
658
659 #[rstest]
660 fn test_read_group_respects_block_length() {
661 let mut buf = Vec::new();
663 buf.extend_from_slice(&100u32.to_le_bytes());
664 buf.extend_from_slice(&[0, 0, 0, 0]); buf.extend_from_slice(&200u32.to_le_bytes());
666 buf.extend_from_slice(&[0, 0, 0, 0]); let mut cursor = SbeCursor::new(&buf);
669 let items: Vec<u32> = cursor
670 .read_group(8, 2, super::SbeCursor::read_u32_le)
671 .unwrap();
672
673 assert_eq!(items, vec![100, 200]);
674 assert_eq!(cursor.pos(), 16); }
676
677 #[rstest]
678 fn test_require_success() {
679 let buf = [1, 2, 3, 4];
680 let cursor = SbeCursor::new(&buf);
681
682 assert!(cursor.require(4).is_ok());
683 assert!(cursor.require(3).is_ok());
684 }
685
686 #[rstest]
687 fn test_require_failure() {
688 let buf = [1, 2];
689 let cursor = SbeCursor::new(&buf);
690
691 let err = cursor.require(3).unwrap_err();
692 assert_eq!(
693 err,
694 SbeDecodeError::BufferTooShort {
695 expected: 3,
696 actual: 2
697 }
698 );
699 }
700
701 #[rstest]
702 fn test_advance() {
703 let buf = [1, 2, 3, 4];
704 let mut cursor = SbeCursor::new(&buf);
705
706 cursor.advance(2).unwrap();
707 assert_eq!(cursor.pos(), 2);
708 assert_eq!(cursor.remaining(), 2);
709
710 assert!(cursor.advance(3).is_err());
711 }
712
713 #[rstest]
714 fn test_peek() {
715 let buf = [1, 2, 3, 4];
716 let mut cursor = SbeCursor::new(&buf);
717
718 assert_eq!(cursor.peek(), &[1, 2, 3, 4]);
719 cursor.advance(2).unwrap();
720 assert_eq!(cursor.peek(), &[3, 4]);
721 }
722
723 #[rstest]
724 fn test_reset() {
725 let buf = [1, 2, 3, 4];
726 let mut cursor = SbeCursor::new(&buf);
727
728 cursor.advance(3).unwrap();
729 assert_eq!(cursor.pos(), 3);
730
731 cursor.reset();
732 assert_eq!(cursor.pos(), 0);
733 assert_eq!(cursor.remaining(), 4);
734 }
735}