Skip to content

Commit 655ce10

Browse files
committed
fix: prevent dropping of new blob files
1 parent a4560f5 commit 655ce10

File tree

5 files changed

+174
-59
lines changed

5 files changed

+174
-59
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22
name = "value-log"
33
description = "Value log implementation for key-value separated LSM storage"
44
license = "MIT OR Apache-2.0"
5-
version = "1.0.0-pre.2"
5+
version = "1.0.0-pre.3"
66
edition = "2021"
77
rust-version = "1.74.0"
88
readme = "README.md"
99
include = ["src/**/*", "LICENSE-APACHE", "LICENSE-MIT", "README.md"]
1010
repository = "https://github.com/fjall-rs/value-log"
1111
homepage = "https://github.com/fjall-rs/value-log"
12-
keywords = ["database", "lsmt", "lsm", "wisckey", "key-value"]
12+
keywords = ["database", "blobdb", "lsm", "wisckey", "key-value"]
1313
categories = ["data-structures", "database-implementations", "algorithms"]
1414

1515
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ mod key_range;
9494
mod manifest;
9595
mod mock;
9696
mod path;
97+
98+
#[doc(hidden)]
99+
pub mod scanner;
100+
97101
mod segment;
98102
mod serde;
99103
mod value;

src/scanner.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use crate::{id::SegmentId, ValueHandle, ValueLog};
2+
use std::{collections::BTreeMap, sync::MutexGuard};
3+
4+
#[derive(Debug, Default)]
5+
pub struct SegmentCounter {
6+
pub size: u64,
7+
pub item_count: u64,
8+
}
9+
10+
pub type SizeMap = BTreeMap<SegmentId, SegmentCounter>;
11+
12+
pub struct Scanner<'a, I: Iterator<Item = std::io::Result<(ValueHandle, u32)>>> {
13+
iter: I,
14+
15+
#[allow(unused)]
16+
lock_guard: MutexGuard<'a, ()>,
17+
18+
size_map: SizeMap,
19+
}
20+
21+
impl<'a, I: Iterator<Item = std::io::Result<(ValueHandle, u32)>>> Scanner<'a, I> {
22+
pub fn new(vlog: &'a ValueLog, iter: I) -> Self {
23+
Self {
24+
iter,
25+
lock_guard: vlog.rollover_guard.lock().expect("lock is poisoned"),
26+
size_map: BTreeMap::default(),
27+
}
28+
}
29+
30+
pub fn finish(self) -> SizeMap {
31+
self.size_map
32+
}
33+
34+
pub fn scan(&mut self) -> crate::Result<()> {
35+
for handle in self.iter.by_ref() {
36+
let (handle, size) = handle.map_err(|_| {
37+
crate::Error::Io(std::io::Error::new(
38+
std::io::ErrorKind::Other,
39+
"Index returned error",
40+
))
41+
})?;
42+
let size = u64::from(size);
43+
44+
self.size_map
45+
.entry(handle.segment_id)
46+
.and_modify(|x| {
47+
x.item_count += 1;
48+
x.size += size;
49+
})
50+
.or_insert_with(|| SegmentCounter {
51+
size,
52+
item_count: 1,
53+
});
54+
}
55+
56+
Ok(())
57+
}
58+
}

src/value_log.rs

Lines changed: 34 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ use crate::{
44
index::Writer as IndexWriter,
55
manifest::{SegmentManifest, SEGMENTS_FOLDER, VLOG_MARKER},
66
path::absolute_path,
7+
scanner::{Scanner, SizeMap},
78
segment::merge::MergeReader,
89
value::UserValue,
910
version::Version,
1011
Config, IndexReader, SegmentWriter, ValueHandle,
1112
};
1213
use byteorder::{BigEndian, ReadBytesExt};
1314
use std::{
14-
collections::BTreeMap,
1515
fs::File,
1616
io::{BufReader, Read, Seek},
1717
path::PathBuf,
@@ -63,7 +63,7 @@ pub struct ValueLogInner {
6363

6464
/// Guards the rollover (compaction) process to only
6565
/// allow one to happen at a time
66-
rollover_guard: Mutex<()>,
66+
pub(crate) rollover_guard: Mutex<()>,
6767
}
6868

6969
impl ValueLog {
@@ -174,6 +174,8 @@ impl ValueLog {
174174
///
175175
/// Will return `Err` if an IO error occurs.
176176
pub fn register_writer(&self, writer: SegmentWriter) -> crate::Result<()> {
177+
let _lock = self.rollover_guard.lock().expect("lock is poisoned");
178+
177179
self.manifest.register(writer)?;
178180
Ok(())
179181
}
@@ -349,51 +351,9 @@ impl ValueLog {
349351
self.manifest.space_amp()
350352
}
351353

352-
/// Scans the given index and collecting GC statistics.
353-
///
354-
/// # Errors
355-
///
356-
/// Will return `Err` if an IO error occurs.
357-
#[allow(clippy::result_unit_err)]
358-
pub fn scan_for_stats(
359-
&self,
360-
iter: impl Iterator<Item = std::io::Result<(ValueHandle, u32)>>,
361-
) -> crate::Result<()> {
362-
#[derive(Debug)]
363-
struct SegmentCounter {
364-
size: u64,
365-
item_count: u64,
366-
}
367-
368-
// IMPORTANT: Only allow 1 rollover or GC at any given time
369-
let _guard = self.rollover_guard.lock().expect("lock is poisoned");
370-
371-
log::info!("--- GC report for vLog @ {:?} ---", self.path);
372-
373-
let mut size_map = BTreeMap::<SegmentId, SegmentCounter>::new();
374-
375-
for handle in iter {
376-
let (handle, size) = handle.map_err(|_| {
377-
crate::Error::Io(std::io::Error::new(
378-
std::io::ErrorKind::Other,
379-
"Index returned error",
380-
))
381-
})?;
382-
let size = u64::from(size);
383-
384-
size_map
385-
.entry(handle.segment_id)
386-
.and_modify(|x| {
387-
x.item_count += 1;
388-
x.size += size;
389-
})
390-
.or_insert_with(|| SegmentCounter {
391-
size,
392-
item_count: 1,
393-
});
394-
}
395-
396-
for (&id, counter) in &size_map {
354+
#[doc(hidden)]
355+
pub fn consume_scan_result(&self, segment_ids: &[u64], size_map: &SizeMap) {
356+
for (&id, counter) in size_map {
397357
let used_size = counter.size;
398358
let alive_item_count = counter.item_count;
399359

@@ -412,20 +372,14 @@ impl ValueLog {
412372
"Blob file #{id} has {}/{} stale MiB ({:.1}% stale, {stale_items}/{total_items} items) - space amp: {space_amp})",
413373
stale_bytes / 1_024 / 1_024,
414374
total_bytes / 1_024 / 1_024,
415-
stale_ratio * 100.0
375+
stale_ratio * 100.0,
416376
);
417377

418378
segment.gc_stats.set_stale_bytes(stale_bytes);
419379
segment.gc_stats.set_stale_items(stale_items);
420380
}
421381

422-
for id in self
423-
.manifest
424-
.segments
425-
.read()
426-
.expect("lock is poisoned")
427-
.keys()
428-
{
382+
for id in segment_ids {
429383
let segment = self
430384
.manifest
431385
.get_segment(*id)
@@ -440,13 +394,36 @@ impl ValueLog {
440394
self.mark_as_stale(&[*id]);
441395
}
442396
}
397+
}
443398

399+
/// Scans the given index and collecting GC statistics.
400+
///
401+
/// # Errors
402+
///
403+
/// Will return `Err` if an IO error occurs.
404+
#[allow(clippy::result_unit_err)]
405+
pub fn scan_for_stats(
406+
&self,
407+
iter: impl Iterator<Item = std::io::Result<(ValueHandle, u32)>>,
408+
) -> crate::Result<()> {
409+
let mut scanner = Scanner::new(self, iter);
410+
411+
let segment_ids = self.manifest.list_segment_ids();
412+
413+
scanner.scan()?;
414+
let size_map = scanner.finish();
415+
self.consume_scan_result(&segment_ids, &size_map);
416+
417+
Ok(())
418+
}
419+
420+
/// Prints GC report.
421+
pub fn print_gc_report(&self) {
422+
log::info!("--- GC report for vLog @ {:?} ---", self.path);
444423
log::info!("Total bytes: {}", self.manifest.total_bytes());
445424
log::info!("Stale bytes: {}", self.manifest.stale_bytes());
446425
log::info!("Space amp: {}", self.space_amp());
447426
log::info!("--- GC report done ---");
448-
449-
Ok(())
450427
}
451428

452429
#[doc(hidden)]

tests/accidental_drop_rc.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// This test is to prevent a race condition that may cause new blob files
2+
// to be accidentally dropped during GC.
3+
//
4+
// When a blob file is registered is after a `scan_for_stats`, it has an reference
5+
// count of 0. Then it would be dropped even though it was just created.
6+
7+
use test_log::test;
8+
use value_log::{Config, IndexWriter, MockIndex, MockIndexWriter, ValueLog};
9+
10+
#[test]
11+
fn accidental_drop_rc() -> value_log::Result<()> {
12+
let folder = tempfile::tempdir()?;
13+
let vl_path = folder.path();
14+
15+
let index = MockIndex::default();
16+
17+
let value_log = ValueLog::open(vl_path, Config::default())?;
18+
19+
for key in ["a", "b"] {
20+
let value = &key;
21+
22+
let mut index_writer = MockIndexWriter(index.clone());
23+
let mut writer = value_log.get_writer()?;
24+
25+
let handle = writer.get_next_value_handle(key.as_bytes());
26+
index_writer.insert_indirect(key.as_bytes(), handle, value.len() as u32)?;
27+
28+
writer.write(key.as_bytes(), value.as_bytes())?;
29+
value_log.register_writer(writer)?;
30+
}
31+
32+
assert_eq!(2, value_log.segment_count());
33+
34+
value_log.scan_for_stats(index.read().unwrap().values().cloned().map(Ok))?;
35+
value_log.drop_stale_segments()?;
36+
assert_eq!(2, value_log.segment_count());
37+
38+
let segment_ids = value_log.manifest.list_segment_ids();
39+
40+
// NOTE: Now start a new GC scan
41+
let index_lock = index.read().unwrap();
42+
let mut scanner =
43+
value_log::scanner::Scanner::new(&value_log, index_lock.values().cloned().map(Ok));
44+
scanner.scan()?;
45+
let scan_result = scanner.finish();
46+
drop(index_lock);
47+
48+
// NOTE: Now, we create a new blob file, that won't be referenced in the GC report
49+
{
50+
let key = "c";
51+
let value = &key;
52+
53+
let mut index_writer = MockIndexWriter(index.clone());
54+
let mut writer = value_log.get_writer()?;
55+
56+
let handle = writer.get_next_value_handle(key.as_bytes());
57+
index_writer.insert_indirect(key.as_bytes(), handle, value.len() as u32)?;
58+
59+
writer.write(key.as_bytes(), value.as_bytes())?;
60+
value_log.register_writer(writer)?;
61+
}
62+
assert_eq!(3, value_log.segment_count());
63+
64+
// NOTE: Now we can consume the scan result, which, in a bad implementation
65+
// would cause the new blob file to be marked as stale
66+
//
67+
// But we are forced to pass the list of segment IDs we saw before starting the
68+
// scan, which prevents marking ones as stale that were created later
69+
value_log.consume_scan_result(&segment_ids, &scan_result);
70+
71+
// IMPORTANT: The new blob file should not be dropped
72+
value_log.drop_stale_segments()?;
73+
assert_eq!(3, value_log.segment_count());
74+
75+
Ok(())
76+
}

0 commit comments

Comments
 (0)