Skip to content

Commit f42efa6

Browse files
committed
add blob header
1 parent 655ce10 commit f42efa6

File tree

7 files changed

+62
-31
lines changed

7 files changed

+62
-31
lines changed

src/segment/merge.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{id::SegmentId, SegmentReader};
2-
use std::cmp::Reverse;
2+
use std::{cmp::Reverse, sync::Arc};
33

44
// TODO: replace with MinHeap...
55
use min_max_heap::MinMaxHeap;
@@ -9,8 +9,8 @@ type IteratorIndex = usize;
99
#[derive(Debug)]
1010
struct IteratorValue {
1111
index: IteratorIndex,
12-
key: Vec<u8>,
13-
value: Vec<u8>,
12+
key: Arc<[u8]>,
13+
value: Arc<[u8]>,
1414
segment_id: SegmentId,
1515
}
1616

@@ -77,7 +77,7 @@ impl MergeReader {
7777
}
7878

7979
impl Iterator for MergeReader {
80-
type Item = crate::Result<(Vec<u8>, Vec<u8>, SegmentId)>;
80+
type Item = crate::Result<(Arc<[u8]>, Arc<[u8]>, SegmentId)>;
8181

8282
fn next(&mut self) -> Option<Self::Item> {
8383
if self.heap.is_empty() {

src/segment/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ impl Segment {
3434
/// # Errors
3535
///
3636
/// Will return `Err` if an IO error occurs.
37-
pub fn scan(&self) -> std::io::Result<reader::Reader> {
38-
reader::Reader::new(&self.path, self.id, self.meta.item_count)
37+
pub fn scan(&self) -> crate::Result<reader::Reader> {
38+
reader::Reader::new(&self.path, self.id)
3939
}
4040

4141
/// Always returns `false` because a segment is never empty.

src/segment/multi_writer.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::writer::Writer;
1+
use super::writer::{Writer, BLOB_HEADER_MAGIC};
22
use crate::{
33
compression::Compressor,
44
id::{IdGenerator, SegmentId},
@@ -87,7 +87,10 @@ impl MultiWriter {
8787
self.get_active_writer().offset()
8888
// NOTE: Point to the value record, not the key
8989
// The key is not really needed when dereferencing a value handle
90-
+ std::mem::size_of::<u16>() as u64 + key.len() as u64
90+
+ (BLOB_HEADER_MAGIC.len()
91+
+ std::mem::size_of::<u16>()
92+
+ key.len()
93+
) as u64
9194
}
9295

9396
#[must_use]

src/segment/reader.rs

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1+
use super::{meta::METADATA_HEADER_MAGIC, writer::BLOB_HEADER_MAGIC};
12
use crate::id::SegmentId;
23
use byteorder::{BigEndian, ReadBytesExt};
34
use std::{
45
fs::File,
56
io::{BufReader, Read},
67
path::PathBuf,
8+
sync::Arc,
79
};
810

911
/// Reads through a segment in order.
1012
pub struct Reader {
1113
pub(crate) segment_id: SegmentId,
1214
inner: BufReader<File>,
13-
item_count: u64,
15+
is_terminated: bool,
1416
}
1517

1618
impl Reader {
@@ -19,43 +21,58 @@ impl Reader {
1921
/// # Errors
2022
///
2123
/// Will return `Err` if an IO error occurs.
22-
pub fn new<P: Into<PathBuf>>(
23-
path: P,
24-
segment_id: SegmentId,
25-
item_count: u64,
26-
) -> std::io::Result<Self> {
24+
pub fn new<P: Into<PathBuf>>(path: P, segment_id: SegmentId) -> crate::Result<Self> {
2725
let path = path.into();
2826
let file_reader = BufReader::new(File::open(path)?);
2927

3028
Ok(Self {
3129
segment_id,
3230
inner: file_reader,
33-
item_count,
31+
is_terminated: false,
3432
})
3533
}
3634
}
3735

3836
impl Iterator for Reader {
39-
type Item = std::io::Result<(Vec<u8>, Vec<u8>)>;
37+
type Item = crate::Result<(Arc<[u8]>, Arc<[u8]>)>;
4038

4139
fn next(&mut self) -> Option<Self::Item> {
42-
if self.item_count == 0 {
40+
if self.is_terminated {
4341
return None;
4442
}
4543

44+
{
45+
let mut buf = [0; BLOB_HEADER_MAGIC.len()];
46+
47+
if let Err(e) = self.inner.read_exact(&mut buf) {
48+
return Some(Err(e.into()));
49+
};
50+
51+
if buf == METADATA_HEADER_MAGIC {
52+
self.is_terminated = true;
53+
return None;
54+
}
55+
56+
if buf != BLOB_HEADER_MAGIC {
57+
return Some(Err(crate::Error::Deserialize(
58+
crate::serde::DeserializeError::InvalidHeader("Blob"),
59+
)));
60+
}
61+
}
62+
4663
let key_len = match self.inner.read_u16::<BigEndian>() {
4764
Ok(v) => v,
4865
Err(e) => {
4966
if e.kind() == std::io::ErrorKind::UnexpectedEof {
5067
return None;
5168
}
52-
return Some(Err(e));
69+
return Some(Err(e.into()));
5370
}
5471
};
5572

5673
let mut key = vec![0; key_len.into()];
5774
if let Err(e) = self.inner.read_exact(&mut key) {
58-
return Some(Err(e));
75+
return Some(Err(e.into()));
5976
};
6077

6178
// TODO: handle crc
@@ -65,7 +82,7 @@ impl Iterator for Reader {
6582
if e.kind() == std::io::ErrorKind::UnexpectedEof {
6683
return None;
6784
}
68-
return Some(Err(e));
85+
return Some(Err(e.into()));
6986
}
7087
};
7188

@@ -75,17 +92,15 @@ impl Iterator for Reader {
7592
if e.kind() == std::io::ErrorKind::UnexpectedEof {
7693
return None;
7794
}
78-
return Some(Err(e));
95+
return Some(Err(e.into()));
7996
}
8097
};
8198

8299
let mut val = vec![0; val_len as usize];
83100
if let Err(e) = self.inner.read_exact(&mut val) {
84-
return Some(Err(e));
101+
return Some(Err(e.into()));
85102
};
86103

87-
self.item_count -= 1;
88-
89-
Some(Ok((key, val)))
104+
Some(Ok((key.into(), val.into())))
90105
}
91106
}

src/segment/trailer.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ impl SegmentFileTrailer {
2626
// Get metadata ptr
2727
let metadata_ptr = reader.read_u64::<BigEndian>()?;
2828

29-
eprintln!("load metadata @ {metadata_ptr}");
30-
3129
// IMPORTANT: Subtract sizeof(meta_ptr) ------v
3230
let remaining_padding = TRAILER_SIZE - std::mem::size_of::<u64>() - TRAILER_MAGIC.len();
3331
reader.seek_relative(remaining_padding as i64)?;

src/segment/writer.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use std::{
1111
sync::Arc,
1212
};
1313

14+
pub const BLOB_HEADER_MAGIC: &[u8] = &[b'V', b'L', b'G', b'B', b'L', b'O', b'B', b'1'];
15+
1416
/// Segment writer
1517
pub struct Writer {
1618
pub path: PathBuf,
@@ -104,11 +106,13 @@ impl Writer {
104106
None => value.to_vec(),
105107
};
106108

107-
// Write CRC
108109
let mut hasher = crc32fast::Hasher::new();
109110
hasher.update(&value);
110111
let crc = hasher.finalize();
111112

113+
// Write header
114+
self.active_writer.write_all(BLOB_HEADER_MAGIC)?;
115+
112116
// Write key
113117

114118
// NOTE: Truncation is okay and actually needed
@@ -117,6 +121,7 @@ impl Writer {
117121
.write_u16::<BigEndian>(key.len() as u16)?;
118122
self.active_writer.write_all(key)?;
119123

124+
// Write CRC
120125
self.active_writer.write_u32::<BigEndian>(crc)?;
121126

122127
// Write value
@@ -127,6 +132,9 @@ impl Writer {
127132
.write_u32::<BigEndian>(value.len() as u32)?;
128133
self.active_writer.write_all(&value)?;
129134

135+
// Header
136+
self.offset += BLOB_HEADER_MAGIC.len() as u64;
137+
130138
// CRC
131139
self.offset += std::mem::size_of::<u32>() as u64;
132140

src/value_log.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,13 @@ impl ValueLog {
8585
}
8686
}
8787

88+
#[doc(hidden)]
89+
pub fn verify(&self) -> crate::Result<usize> {
90+
let _lock = self.rollover_guard.lock().expect("lock is poisoned");
91+
92+
Ok(0)
93+
}
94+
8895
/// Creates a new empty value log in a directory.
8996
pub(crate) fn create_new<P: Into<PathBuf>>(path: P, config: Config) -> crate::Result<Self> {
9097
let path = absolute_path(path.into());
@@ -427,13 +434,13 @@ impl ValueLog {
427434
}
428435

429436
#[doc(hidden)]
430-
pub fn get_reader(&self) -> std::io::Result<MergeReader> {
437+
pub fn get_reader(&self) -> crate::Result<MergeReader> {
431438
let segments = self.manifest.segments.read().expect("lock is poisoned");
432439

433440
let readers = segments
434441
.values()
435442
.map(|x| x.scan())
436-
.collect::<std::io::Result<Vec<_>>>()?;
443+
.collect::<crate::Result<Vec<_>>>()?;
437444

438445
Ok(MergeReader::new(readers))
439446
}
@@ -481,7 +488,7 @@ impl ValueLog {
481488
let readers = segments
482489
.into_iter()
483490
.map(|x| x.scan())
484-
.collect::<std::io::Result<Vec<_>>>()?;
491+
.collect::<crate::Result<Vec<_>>>()?;
485492

486493
let reader = MergeReader::new(readers);
487494

0 commit comments

Comments
 (0)