Skip to content

Commit 576c843

Browse files
committed
fix tests
1 parent f42efa6 commit 576c843

20 files changed

+83
-47
lines changed

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
//!
5858
//! let key = key.as_bytes();
5959
//!
60-
//! let handle = writer.get_next_value_handle(key);
60+
//! let handle = writer.get_next_value_handle();
6161
//! index_writer.insert_indirect(key, handle, value.len() as u32)?;
6262
//!
6363
//! writer.write(key, value)?;

src/segment/merge.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ struct IteratorValue {
1212
key: Arc<[u8]>,
1313
value: Arc<[u8]>,
1414
segment_id: SegmentId,
15+
crc: u32,
1516
}
1617

1718
impl PartialEq for IteratorValue {
@@ -53,14 +54,15 @@ impl MergeReader {
5354
let reader = self.readers.get_mut(idx).expect("iter should exist");
5455

5556
if let Some(value) = reader.next() {
56-
let (k, v) = value?;
57+
let (k, v, crc) = value?;
5758
let segment_id = reader.segment_id;
5859

5960
self.heap.push(IteratorValue {
6061
index: idx,
6162
key: k,
6263
value: v,
6364
segment_id,
65+
crc,
6466
});
6567
}
6668

@@ -77,7 +79,7 @@ impl MergeReader {
7779
}
7880

7981
impl Iterator for MergeReader {
80-
type Item = crate::Result<(Arc<[u8]>, Arc<[u8]>, SegmentId)>;
82+
type Item = crate::Result<(Arc<[u8]>, Arc<[u8]>, SegmentId, u32)>;
8183

8284
fn next(&mut self) -> Option<Self::Item> {
8385
if self.heap.is_empty() {
@@ -105,7 +107,7 @@ impl Iterator for MergeReader {
105107
}
106108
}
107109

108-
return Some(Ok((head.key, head.value, head.segment_id)));
110+
return Some(Ok((head.key, head.value, head.segment_id, head.crc)));
109111
}
110112

111113
None

src/segment/multi_writer.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::writer::{Writer, BLOB_HEADER_MAGIC};
1+
use super::writer::Writer;
22
use crate::{
33
compression::Compressor,
44
id::{IdGenerator, SegmentId},
@@ -75,22 +75,16 @@ impl MultiWriter {
7575
///
7676
/// This can be used to index an item into an external `Index`.
7777
#[must_use]
78-
pub fn get_next_value_handle(&self, key: &[u8]) -> ValueHandle {
78+
pub fn get_next_value_handle(&self) -> ValueHandle {
7979
ValueHandle {
80-
offset: self.offset(key),
80+
offset: self.offset(),
8181
segment_id: self.segment_id(),
8282
}
8383
}
8484

8585
#[must_use]
86-
fn offset(&self, key: &[u8]) -> u64 {
86+
fn offset(&self) -> u64 {
8787
self.get_active_writer().offset()
88-
// NOTE: Point to the value record, not the key
89-
// The key is not really needed when dereferencing a value handle
90-
+ (BLOB_HEADER_MAGIC.len()
91-
+ std::mem::size_of::<u16>()
92-
+ key.len()
93-
) as u64
9488
}
9589

9690
#[must_use]

src/segment/reader.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl Reader {
3434
}
3535

3636
impl Iterator for Reader {
37-
type Item = crate::Result<(Arc<[u8]>, Arc<[u8]>)>;
37+
type Item = crate::Result<(Arc<[u8]>, Arc<[u8]>, u32)>;
3838

3939
fn next(&mut self) -> Option<Self::Item> {
4040
if self.is_terminated {
@@ -60,7 +60,7 @@ impl Iterator for Reader {
6060
}
6161
}
6262

63-
let key_len = match self.inner.read_u16::<BigEndian>() {
63+
let crc = match self.inner.read_u32::<BigEndian>() {
6464
Ok(v) => v,
6565
Err(e) => {
6666
if e.kind() == std::io::ErrorKind::UnexpectedEof {
@@ -70,13 +70,7 @@ impl Iterator for Reader {
7070
}
7171
};
7272

73-
let mut key = vec![0; key_len.into()];
74-
if let Err(e) = self.inner.read_exact(&mut key) {
75-
return Some(Err(e.into()));
76-
};
77-
78-
// TODO: handle crc
79-
let _crc = match self.inner.read_u32::<BigEndian>() {
73+
let key_len = match self.inner.read_u16::<BigEndian>() {
8074
Ok(v) => v,
8175
Err(e) => {
8276
if e.kind() == std::io::ErrorKind::UnexpectedEof {
@@ -86,6 +80,11 @@ impl Iterator for Reader {
8680
}
8781
};
8882

83+
let mut key = vec![0; key_len.into()];
84+
if let Err(e) = self.inner.read_exact(&mut key) {
85+
return Some(Err(e.into()));
86+
};
87+
8988
let val_len = match self.inner.read_u32::<BigEndian>() {
9089
Ok(v) => v,
9190
Err(e) => {
@@ -101,6 +100,6 @@ impl Iterator for Reader {
101100
return Some(Err(e.into()));
102101
};
103102

104-
Some(Ok((key.into(), val.into())))
103+
Some(Ok((key.into(), val.into(), crc)))
105104
}
106105
}

src/segment/writer.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,16 @@ impl Writer {
107107
};
108108

109109
let mut hasher = crc32fast::Hasher::new();
110+
hasher.update(key);
110111
hasher.update(&value);
111112
let crc = hasher.finalize();
112113

113114
// Write header
114115
self.active_writer.write_all(BLOB_HEADER_MAGIC)?;
115116

117+
// Write CRC
118+
self.active_writer.write_u32::<BigEndian>(crc)?;
119+
116120
// Write key
117121

118122
// NOTE: Truncation is okay and actually needed
@@ -121,9 +125,6 @@ impl Writer {
121125
.write_u16::<BigEndian>(key.len() as u16)?;
122126
self.active_writer.write_all(key)?;
123127

124-
// Write CRC
125-
self.active_writer.write_u32::<BigEndian>(crc)?;
126-
127128
// Write value
128129

129130
// NOTE: Truncation is okay and actually needed

src/value_log.rs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::{
55
manifest::{SegmentManifest, SEGMENTS_FOLDER, VLOG_MARKER},
66
path::absolute_path,
77
scanner::{Scanner, SizeMap},
8-
segment::merge::MergeReader,
8+
segment::{merge::MergeReader, writer::BLOB_HEADER_MAGIC},
99
value::UserValue,
1010
version::Version,
1111
Config, IndexReader, SegmentWriter, ValueHandle,
@@ -89,7 +89,21 @@ impl ValueLog {
8989
pub fn verify(&self) -> crate::Result<usize> {
9090
let _lock = self.rollover_guard.lock().expect("lock is poisoned");
9191

92-
Ok(0)
92+
let mut sum = 0;
93+
94+
for item in self.get_reader()? {
95+
let (k, v, _, crc) = item?;
96+
97+
let mut hasher = crc32fast::Hasher::new();
98+
hasher.update(&k);
99+
hasher.update(&v);
100+
101+
if hasher.finalize() != crc {
102+
sum += 1;
103+
}
104+
}
105+
106+
Ok(sum)
93107
}
94108

95109
/// Creates a new empty value log in a directory.
@@ -208,19 +222,23 @@ impl ValueLog {
208222
}
209223

210224
let mut reader = BufReader::new(File::open(&segment.path)?);
211-
reader.seek(std::io::SeekFrom::Start(handle.offset))?;
225+
reader.seek(std::io::SeekFrom::Start(
226+
handle.offset + BLOB_HEADER_MAGIC.len() as u64,
227+
))?;
212228

229+
// TODO: handle CRC
213230
let _crc = reader.read_u32::<BigEndian>()?;
214231

232+
let key_len = reader.read_u16::<BigEndian>()?;
233+
reader.seek_relative(key_len.into())?;
234+
215235
let val_len = reader.read_u32::<BigEndian>()?;
216236

217237
let mut value = vec![0; val_len as usize];
218238
reader.read_exact(&mut value)?;
219239

220240
let value = self.config.compression.decompress(&value)?;
221241

222-
// TODO: handle CRC
223-
224242
let val: UserValue = value.into();
225243

226244
self.blob_cache
@@ -497,7 +515,7 @@ impl ValueLog {
497515
.use_compression(self.config.compression.clone());
498516

499517
for item in reader {
500-
let (k, v, segment_id) = item?;
518+
let (k, v, segment_id, _) = item?;
501519

502520
match index_reader.get(&k)? {
503521
// If this value is in an older segment, we can discard it
@@ -506,7 +524,7 @@ impl ValueLog {
506524
_ => {}
507525
}
508526

509-
let vhandle = writer.get_next_value_handle(&k);
527+
let vhandle = writer.get_next_value_handle();
510528
index_writer.insert_indirect(&k, vhandle, v.len() as u32)?;
511529

512530
writer.write(&k, &v)?;

test_fixture/v1_vlog/segments/0

370 Bytes
Binary file not shown.

test_fixture/v1_vlog/segments/1

32 Bytes
Binary file not shown.

test_fixture/v1_vlog/vlog_manifest

8 Bytes
Binary file not shown.
370 Bytes
Binary file not shown.
32 Bytes
Binary file not shown.
8 Bytes
Binary file not shown.

tests/accidental_drop_rc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ fn accidental_drop_rc() -> value_log::Result<()> {
2222
let mut index_writer = MockIndexWriter(index.clone());
2323
let mut writer = value_log.get_writer()?;
2424

25-
let handle = writer.get_next_value_handle(key.as_bytes());
25+
let handle = writer.get_next_value_handle();
2626
index_writer.insert_indirect(key.as_bytes(), handle, value.len() as u32)?;
2727

2828
writer.write(key.as_bytes(), value.as_bytes())?;
@@ -53,7 +53,7 @@ fn accidental_drop_rc() -> value_log::Result<()> {
5353
let mut index_writer = MockIndexWriter(index.clone());
5454
let mut writer = value_log.get_writer()?;
5555

56-
let handle = writer.get_next_value_handle(key.as_bytes());
56+
let handle = writer.get_next_value_handle();
5757
index_writer.insert_indirect(key.as_bytes(), handle, value.len() as u32)?;
5858

5959
writer.write(key.as_bytes(), value.as_bytes())?;

tests/basic_gc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ fn basic_gc() -> value_log::Result<()> {
2222

2323
let key = key.as_bytes();
2424

25-
let handle = writer.get_next_value_handle(key);
25+
let handle = writer.get_next_value_handle();
2626
index_writer.insert_indirect(key, handle, value.len() as u32)?;
2727

2828
writer.write(key, value)?;
@@ -57,7 +57,7 @@ fn basic_gc() -> value_log::Result<()> {
5757

5858
let key = key.as_bytes();
5959

60-
let handle = writer.get_next_value_handle(key);
60+
let handle = writer.get_next_value_handle();
6161
index_writer.insert_indirect(key, handle, value.len() as u32)?;
6262

6363
writer.write(key, value)?;

tests/basic_kv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ fn basic_kv() -> value_log::Result<()> {
2323

2424
let key = key.as_bytes();
2525

26-
let handle = writer.get_next_value_handle(key);
26+
let handle = writer.get_next_value_handle();
2727
index_writer.insert_indirect(key, handle, value.len() as u32)?;
2828

2929
writer.write(key, value)?;

tests/gc_space_amp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ fn gc_space_amp_target_1() -> value_log::Result<()> {
2222
let mut index_writer = MockIndexWriter(index.clone());
2323
let mut writer = value_log.get_writer()?;
2424

25-
let handle = writer.get_next_value_handle(key);
25+
let handle = writer.get_next_value_handle();
2626
index_writer.insert_indirect(key, handle, value.len() as u32)?;
2727

2828
writer.write(key, value.as_bytes())?;
@@ -33,7 +33,7 @@ fn gc_space_amp_target_1() -> value_log::Result<()> {
3333

3434
let key = key.as_bytes();
3535

36-
let handle = writer.get_next_value_handle(key);
36+
let handle = writer.get_next_value_handle();
3737
index_writer.insert_indirect(key, handle, value.len() as u32)?;
3838

3939
writer.write(key, value.as_bytes())?;

tests/recovery.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ fn basic_recovery() -> value_log::Result<()> {
2323

2424
let key = key.as_bytes();
2525

26-
let handle = writer.get_next_value_handle(key);
26+
let handle = writer.get_next_value_handle();
2727
index_writer.insert_indirect(key, handle, value.len() as u32)?;
2828

2929
writer.write(key, value)?;

tests/rollover_index_fail_finish.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ fn rollover_index_fail_finish() -> value_log::Result<()> {
3333
let value = key.repeat(10_000);
3434
let value = value.as_bytes();
3535

36-
let handle = writer.get_next_value_handle(key.as_bytes());
36+
let handle = writer.get_next_value_handle();
3737
index_writer.insert_indirect(key.as_bytes(), handle, value.len() as u32)?;
3838

3939
writer.write(key.as_bytes(), value)?;

tests/space_amp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ fn worst_case_space_amp() -> value_log::Result<()> {
2222
let mut index_writer = MockIndexWriter(index.clone());
2323
let mut writer = value_log.get_writer()?;
2424

25-
let handle = writer.get_next_value_handle(key);
25+
let handle = writer.get_next_value_handle();
2626
index_writer.insert_indirect(key, handle, value.len() as u32)?;
2727

2828
writer.write(key, value.as_bytes())?;
@@ -61,7 +61,7 @@ fn no_overlap_space_amp() -> value_log::Result<()> {
6161
let mut index_writer = MockIndexWriter(index.clone());
6262
let mut writer = value_log.get_writer()?;
6363

64-
let handle = writer.get_next_value_handle(key.as_bytes());
64+
let handle = writer.get_next_value_handle();
6565
index_writer.insert_indirect(key.as_bytes(), handle, value.len() as u32)?;
6666

6767
writer.write(key.as_bytes(), value.as_bytes())?;

tests/vlog_load_fixture.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,31 @@ fn vlog_load_v1() -> value_log::Result<()> {
77

88
let value_log = ValueLog::open(path, Config::default())?;
99

10-
assert_eq!(4, value_log.get_reader()?.count());
10+
let count = {
11+
let mut count = 0;
12+
13+
for kv in value_log.get_reader()? {
14+
let _ = kv?;
15+
count += 1;
16+
}
17+
18+
count
19+
};
20+
21+
assert_eq!(4, count);
22+
assert_eq!(2, value_log.segment_count());
23+
assert_eq!(0, value_log.verify()?);
1124

1225
Ok(())
1326
}
1427

15-
// TODO: corrupt
28+
#[test]
29+
fn vlog_load_v1_corrupt() -> value_log::Result<()> {
30+
let path = std::path::Path::new("test_fixture/v1_vlog_corrupt");
31+
32+
let value_log = ValueLog::open(path, Config::default())?;
33+
34+
assert_eq!(2, value_log.verify()?);
35+
36+
Ok(())
37+
}

0 commit comments

Comments
 (0)