Skip to content

Commit 348a7ba

Browse files
authored
Merge pull request #15 from fjall-rs/byteview
Byteview
2 parents bfcdfc9 + 02abbcf commit 348a7ba

File tree

6 files changed

+122
-101
lines changed

6 files changed

+122
-101
lines changed

Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
name = "value-log"
33
description = "Value log implementation for key-value separated LSM storage"
44
license = "MIT OR Apache-2.0"
5-
version = "1.4.1"
5+
version = "1.5.0"
66
edition = "2021"
77
rust-version = "1.74.0"
88
readme = "README.md"
@@ -22,10 +22,11 @@ serde = ["dep:serde"]
2222
bytes = ["dep:bytes"]
2323

2424
[dependencies]
25+
bytes = { version = "1", optional = true }
2526
byteorder = "1.5.0"
26-
bytes = { version = "1.8.0", optional = true }
27+
byteview = "0.4.0"
28+
interval-heap = "0.0.5"
2729
log = "0.4.22"
28-
min-max-heap = "1.3.0"
2930
path-absolutize = "3.1.1"
3031
quick_cache = { version = "0.6.5", default-features = false }
3132
rustc-hash = "2.0.0"

src/segment/merge.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,17 @@
33
// (found in the LICENSE-* files in the repository)
44

55
use crate::{id::SegmentId, value::UserKey, Compressor, SegmentReader, UserValue};
6+
use interval_heap::IntervalHeap;
67
use std::cmp::Reverse;
78

8-
// TODO: replace with MinHeap...
9-
use min_max_heap::MinMaxHeap;
9+
macro_rules! fail_iter {
10+
($e:expr) => {
11+
match $e {
12+
Ok(v) => v,
13+
Err(e) => return Some(Err(e.into())),
14+
}
15+
};
16+
}
1017

1118
type IteratorIndex = usize;
1219

@@ -42,16 +49,14 @@ impl Ord for IteratorValue {
4249
#[allow(clippy::module_name_repetitions)]
4350
pub struct MergeReader<C: Compressor + Clone> {
4451
readers: Vec<SegmentReader<C>>,
45-
heap: MinMaxHeap<IteratorValue>,
52+
heap: IntervalHeap<IteratorValue>,
4653
}
4754

4855
impl<C: Compressor + Clone> MergeReader<C> {
4956
/// Initializes a new merging reader
5057
pub fn new(readers: Vec<SegmentReader<C>>) -> Self {
51-
Self {
52-
readers,
53-
heap: MinMaxHeap::new(),
54-
}
58+
let heap = IntervalHeap::with_capacity(readers.len());
59+
Self { readers, heap }
5560
}
5661

5762
fn advance_reader(&mut self, idx: usize) -> crate::Result<()> {
@@ -87,22 +92,16 @@ impl<C: Compressor + Clone> Iterator for MergeReader<C> {
8792

8893
fn next(&mut self) -> Option<Self::Item> {
8994
if self.heap.is_empty() {
90-
if let Err(e) = self.push_next() {
91-
return Some(Err(e));
92-
};
95+
fail_iter!(self.push_next());
9396
}
9497

9598
if let Some(head) = self.heap.pop_min() {
96-
if let Err(e) = self.advance_reader(head.index) {
97-
return Some(Err(e));
98-
}
99+
fail_iter!(self.advance_reader(head.index));
99100

100101
// Discard old items
101102
while let Some(next) = self.heap.pop_min() {
102103
if next.key == head.key {
103-
if let Err(e) = self.advance_reader(next.index) {
104-
return Some(Err(e));
105-
}
104+
fail_iter!(self.advance_reader(next.index));
106105
} else {
107106
// Reached next user key now
108107
// Push back non-conflicting item and exit

src/segment/reader.rs

Lines changed: 27 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,23 @@
33
// (found in the LICENSE-* files in the repository)
44

55
use super::{meta::METADATA_HEADER_MAGIC, writer::BLOB_HEADER_MAGIC};
6-
use crate::{coding::DecodeError, id::SegmentId, value::UserKey, Compressor, UserValue};
6+
use crate::{coding::DecodeError, id::SegmentId, value::UserKey, Compressor, Slice, UserValue};
77
use byteorder::{BigEndian, ReadBytesExt};
88
use std::{
99
fs::File,
1010
io::{BufReader, Read, Seek},
1111
path::Path,
1212
};
1313

14+
macro_rules! fail_iter {
15+
($e:expr) => {
16+
match $e {
17+
Ok(v) => v,
18+
Err(e) => return Some(Err(e.into())),
19+
}
20+
};
21+
}
22+
1423
/// Reads through a segment in order.
1524
pub struct Reader<C: Compressor + Clone> {
1625
pub(crate) segment_id: SegmentId,
@@ -62,10 +71,7 @@ impl<C: Compressor + Clone> Iterator for Reader<C> {
6271

6372
{
6473
let mut buf = [0; BLOB_HEADER_MAGIC.len()];
65-
66-
if let Err(e) = self.inner.read_exact(&mut buf) {
67-
return Some(Err(e.into()));
68-
};
74+
fail_iter!(self.inner.read_exact(&mut buf));
6975

7076
if buf == METADATA_HEADER_MAGIC {
7177
self.is_terminated = true;
@@ -79,54 +85,26 @@ impl<C: Compressor + Clone> Iterator for Reader<C> {
7985
}
8086
}
8187

82-
let checksum = match self.inner.read_u64::<BigEndian>() {
83-
Ok(v) => v,
84-
Err(e) => {
85-
if e.kind() == std::io::ErrorKind::UnexpectedEof {
86-
return None;
87-
}
88-
return Some(Err(e.into()));
89-
}
90-
};
88+
let checksum = fail_iter!(self.inner.read_u64::<BigEndian>());
9189

92-
let key_len = match self.inner.read_u16::<BigEndian>() {
93-
Ok(v) => v,
94-
Err(e) => {
95-
if e.kind() == std::io::ErrorKind::UnexpectedEof {
96-
return None;
97-
}
98-
return Some(Err(e.into()));
99-
}
100-
};
101-
102-
let mut key = vec![0; key_len.into()];
103-
if let Err(e) = self.inner.read_exact(&mut key) {
104-
return Some(Err(e.into()));
105-
};
106-
107-
let val_len = match self.inner.read_u32::<BigEndian>() {
108-
Ok(v) => v,
109-
Err(e) => {
110-
if e.kind() == std::io::ErrorKind::UnexpectedEof {
111-
return None;
112-
}
113-
return Some(Err(e.into()));
114-
}
115-
};
116-
117-
let mut val = vec![0; val_len as usize];
118-
if let Err(e) = self.inner.read_exact(&mut val) {
119-
return Some(Err(e.into()));
120-
};
90+
let key_len = fail_iter!(self.inner.read_u16::<BigEndian>());
91+
let key = fail_iter!(Slice::from_reader(&mut self.inner, key_len as usize));
12192

93+
let val_len = fail_iter!(self.inner.read_u32::<BigEndian>());
12294
let val = match &self.compression {
123-
Some(compressor) => match compressor.decompress(&val) {
124-
Ok(val) => val,
125-
Err(e) => return Some(Err(e)),
126-
},
127-
None => val,
95+
Some(compressor) => {
96+
// TODO: https://github.com/PSeitz/lz4_flex/issues/166
97+
let mut val = vec![0; val_len as usize];
98+
fail_iter!(self.inner.read_exact(&mut val));
99+
Slice::from(fail_iter!(compressor.decompress(&val)))
100+
}
101+
None => {
102+
// NOTE: When not using compression, we can skip
103+
// the intermediary heap allocation and read directly into a Slice
104+
fail_iter!(Slice::from_reader(&mut self.inner, val_len as usize))
105+
}
128106
};
129107

130-
Some(Ok((key.into(), val.into(), checksum)))
108+
Some(Ok((key, val, checksum)))
131109
}
132110
}

src/slice.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
// (found in the LICENSE-* files in the repository)
44

55
#[cfg(not(feature = "bytes"))]
6-
mod slice_arc;
6+
mod slice_default;
77

88
#[cfg(feature = "bytes")]
99
mod slice_bytes;
@@ -13,10 +13,10 @@ use std::{
1313
sync::Arc,
1414
};
1515

16-
#[cfg(not(feature = "bytes"))]
17-
pub use slice_arc::Slice;
1816
#[cfg(feature = "bytes")]
1917
pub use slice_bytes::Slice;
18+
#[cfg(not(feature = "bytes"))]
19+
pub use slice_default::Slice;
2020

2121
impl AsRef<[u8]> for Slice {
2222
fn as_ref(&self) -> &[u8] {
@@ -26,7 +26,21 @@ impl AsRef<[u8]> for Slice {
2626

2727
impl From<&[u8]> for Slice {
2828
fn from(value: &[u8]) -> Self {
29-
Self::new(value)
29+
#[cfg(not(feature = "bytes"))]
30+
{
31+
Self(byteview::ByteView::new(value))
32+
}
33+
34+
#[cfg(feature = "bytes")]
35+
{
36+
Self(bytes::Bytes::from(value.to_vec()))
37+
}
38+
}
39+
}
40+
41+
impl From<Arc<[u8]>> for Slice {
42+
fn from(value: Arc<[u8]>) -> Self {
43+
Self::from(&*value)
3044
}
3145
}
3246

@@ -180,6 +194,7 @@ mod serde {
180194
mod tests {
181195
use super::Slice;
182196
use std::{fmt::Debug, sync::Arc};
197+
use test_log::test;
183198

184199
fn assert_slice_handles<T>(v: T)
185200
where
@@ -192,6 +207,17 @@ mod tests {
192207
assert!(slice >= v, "slice_arc: {slice:?}, v: {v:?}");
193208
}
194209

210+
#[test]
211+
fn slice_empty() {
212+
assert_eq!(Slice::empty(), []);
213+
}
214+
215+
#[test]
216+
fn slice_with_size() {
217+
assert_eq!(Slice::with_size(5), [0, 0, 0, 0, 0]);
218+
assert_eq!(Slice::with_size(50), [0; 50]);
219+
}
220+
195221
/// This test verifies that we can create a `Slice` from various types and compare a `Slice` with them.
196222
#[test]
197223
fn test_slice_instantiation() {

src/slice/slice_bytes.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
// This source code is licensed under both the Apache 2.0 and MIT License
33
// (found in the LICENSE-* files in the repository)
44

5-
use std::sync::Arc;
6-
75
use bytes::{Bytes, BytesMut};
86

97
/// An immutable byte slice that can be cloned without additional heap allocation
8+
///
9+
/// There is no guarantee of any sort of alignment for zero-copy (de)serialization.
1010
#[derive(Debug, Clone, Eq, Hash, Ord)]
1111
pub struct Slice(pub(super) Bytes);
1212

@@ -17,6 +17,26 @@ impl Slice {
1717
Self(Bytes::copy_from_slice(bytes))
1818
}
1919

20+
#[doc(hidden)]
21+
#[must_use]
22+
pub fn empty() -> Self {
23+
Self(Bytes::from_static(&[]))
24+
}
25+
26+
#[doc(hidden)]
27+
#[must_use]
28+
pub fn slice(&self, range: impl std::ops::RangeBounds<usize>) -> Self {
29+
Self(self.0.slice(range))
30+
}
31+
32+
#[must_use]
33+
#[doc(hidden)]
34+
pub fn with_size(len: usize) -> Self {
35+
let bytes = vec![0; len];
36+
Self(Bytes::from(bytes))
37+
}
38+
39+
/// Constructs a [`Slice`] from an I/O reader by pulling in `len` bytes.
2040
#[doc(hidden)]
2141
pub fn from_reader<R: std::io::Read>(reader: &mut R, len: usize) -> std::io::Result<Self> {
2242
let mut builder = BytesMut::zeroed(len);
@@ -50,10 +70,3 @@ impl From<String> for Slice {
5070
Self(Bytes::from(value))
5171
}
5272
}
53-
54-
// Needed because slice_arc specializes this impl
55-
impl From<Arc<[u8]>> for Slice {
56-
fn from(value: Arc<[u8]>) -> Self {
57-
Self::new(value.as_ref())
58-
}
59-
}

src/slice/slice_arc.rs renamed to src/slice/slice_default.rs

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
// This source code is licensed under both the Apache 2.0 and MIT License
33
// (found in the LICENSE-* files in the repository)
44

5-
use std::sync::Arc;
5+
use byteview::ByteView;
66

77
/// An immutable byte slice that can be cloned without additional heap allocation
8+
///
9+
/// There is no guarantee of any sort of alignment for zero-copy (de)serialization.
810
#[derive(Debug, Clone, Eq, Hash, Ord)]
9-
pub struct Slice(pub(super) Arc<[u8]>);
11+
pub struct Slice(pub(super) ByteView);
1012

1113
impl Slice {
1214
/// Construct a [`Slice`] from a byte slice.
@@ -15,40 +17,42 @@ impl Slice {
1517
Self(bytes.into())
1618
}
1719

20+
#[doc(hidden)]
21+
#[must_use]
22+
pub fn empty() -> Self {
23+
Self(ByteView::new(&[]))
24+
}
25+
26+
#[doc(hidden)]
27+
#[must_use]
28+
pub fn slice(&self, range: impl std::ops::RangeBounds<usize>) -> Self {
29+
Self(self.0.slice(range))
30+
}
31+
1832
#[must_use]
1933
#[doc(hidden)]
2034
pub fn with_size(len: usize) -> Self {
21-
// TODO: optimize this with byteview to remove the reallocation
22-
let v = vec![0; len];
23-
Self(v.into())
35+
Self(ByteView::with_size(len))
2436
}
2537

38+
/// Constructs a [`Slice`] from an I/O reader by pulling in `len` bytes.
2639
#[doc(hidden)]
2740
pub fn from_reader<R: std::io::Read>(reader: &mut R, len: usize) -> std::io::Result<Self> {
28-
let mut view = Self::with_size(len);
29-
let builder = Arc::get_mut(&mut view.0).expect("we are the owner");
30-
reader.read_exact(builder)?;
31-
Ok(view)
41+
let view = ByteView::from_reader(reader, len)?;
42+
Ok(Self(view))
3243
}
3344
}
3445

35-
// Arc::from<Vec<T>> is specialized
46+
// Arc::from<Vec<u8>> is specialized
3647
impl From<Vec<u8>> for Slice {
3748
fn from(value: Vec<u8>) -> Self {
38-
Self(Arc::from(value))
49+
Self(ByteView::from(value))
3950
}
4051
}
4152

42-
// Arc::from<Vec<T>> is specialized
53+
// Arc::from<Vec<String>> is specialized
4354
impl From<String> for Slice {
4455
fn from(value: String) -> Self {
45-
Self(Arc::from(value.into_bytes()))
46-
}
47-
}
48-
49-
// direct conversion
50-
impl From<Arc<[u8]>> for Slice {
51-
fn from(value: Arc<[u8]>) -> Self {
52-
Self(value)
56+
Self(ByteView::from(value.into_bytes()))
5357
}
5458
}

0 commit comments

Comments
 (0)