Skip to content

Commit 03656d1

Browse files
committed
prepare for v1
1 parent c09084f commit 03656d1

12 files changed

+42
-48
lines changed

benches/value_log.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ fn load_value(c: &mut Criterion) {
2929
let value_log = ValueLog::open(
3030
vl_path,
3131
Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))),
32-
index.clone(),
3332
)
3433
.unwrap();
3534

@@ -76,7 +75,6 @@ fn load_value(c: &mut Criterion) {
7675
vl_path,
7776
Config::default()
7877
.blob_cache(Arc::new(BlobCache::with_capacity_bytes(64 * 1_024 * 1_024))),
79-
index.clone(),
8078
)
8179
.unwrap();
8280

@@ -128,7 +126,6 @@ fn compression(c: &mut Criterion) {
128126
let value_log = ValueLog::open(
129127
vl_path,
130128
Config::default().blob_cache(Arc::new(BlobCache::with_capacity_bytes(0))),
131-
index.clone(),
132129
)
133130
.unwrap();
134131

src/manifest.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ fn rewrite_atomic<P: AsRef<Path>>(path: P, content: &[u8]) -> std::io::Result<()
3434
#[allow(clippy::module_name_repetitions)]
3535
pub struct SegmentManifestInner {
3636
path: PathBuf,
37-
pub(crate) segments: RwLock<HashMap<SegmentId, Arc<Segment>>>,
37+
pub segments: RwLock<HashMap<SegmentId, Arc<Segment>>>,
3838
}
3939

4040
#[allow(clippy::module_name_repetitions)]
@@ -141,12 +141,16 @@ impl SegmentManifest {
141141
}
142142

143143
pub fn drop_segments(&self, ids: &[u64]) -> crate::Result<()> {
144+
// TODO: atomic swap
145+
144146
let mut lock = self.segments.write().expect("lock is poisoned");
145147
lock.retain(|x, _| !ids.contains(x));
146148
Self::write_to_disk(&self.path, &lock.keys().copied().collect::<Vec<_>>())
147149
}
148150

149151
pub fn register(&self, writer: MultiWriter) -> crate::Result<()> {
152+
// TODO: atomic swap
153+
150154
let mut lock = self.segments.write().expect("lock is poisoned");
151155
let writers = writer.finish()?;
152156

@@ -162,6 +166,7 @@ impl SegmentManifest {
162166
stats: Stats {
163167
item_count: writer.item_count.into(),
164168
total_bytes: writer.written_blob_bytes.into(),
169+
total_uncompressed_bytes: writer.uncompressed_bytes.into(),
165170
stale_items: AtomicU64::default(),
166171
stale_bytes: AtomicU64::default(),
167172
},

src/segment/merge.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{id::SegmentId, SegmentReader};
22
use std::cmp::Reverse;
33

4-
// TODO: replace with MinHeap
4+
// TODO: replace with MinHeap...
55
use min_max_heap::MinMaxHeap;
66

77
type IteratorIndex = usize;

src/segment/mod.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,6 @@ use std::path::PathBuf;
1010

1111
/// A disk segment is an immutable, sorted, contiguous file
1212
/// that contains key-value pairs.
13-
///
14-
/// ### File format
15-
///
16-
/// KV: \<key length: u16\> \<key: N\> \<crc hash: u32\> \<value length: u32\> \<value: N\>
17-
///
18-
/// Segment: { KV } +
1913
#[derive(Debug)]
2014
pub struct Segment {
2115
/// Segment ID

src/segment/stats.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ use std::sync::atomic::AtomicU64;
33
#[derive(Debug, Default)]
44
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
55
pub struct Stats {
6-
pub(crate) item_count: AtomicU64,
6+
pub(crate) item_count: AtomicU64, // TODO: u64?
77
pub(crate) stale_items: AtomicU64,
88

9-
pub total_bytes: AtomicU64,
9+
pub total_uncompressed_bytes: AtomicU64, // TODO: u64?
10+
pub total_bytes: AtomicU64, // TODO: u64?
1011
pub(crate) stale_bytes: AtomicU64,
1112
// TODO: key range
1213
}
@@ -24,6 +25,11 @@ impl Stats {
2425
self.item_count.load(std::sync::atomic::Ordering::Acquire)
2526
}
2627

28+
pub fn total_uncompressed_bytes(&self) -> u64 {
29+
self.total_uncompressed_bytes
30+
.load(std::sync::atomic::Ordering::Acquire)
31+
}
32+
2733
pub fn total_bytes(&self) -> u64 {
2834
self.total_bytes.load(std::sync::atomic::Ordering::Acquire)
2935
}

src/segment/writer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub struct Writer {
1717
pub(crate) item_count: u64,
1818

1919
pub(crate) written_blob_bytes: u64,
20+
pub(crate) uncompressed_bytes: u64,
2021
}
2122

2223
impl Writer {
@@ -40,6 +41,7 @@ impl Writer {
4041
offset: 0,
4142
item_count: 0,
4243
written_blob_bytes: 0,
44+
uncompressed_bytes: 0,
4345
})
4446
}
4547

@@ -71,6 +73,8 @@ impl Writer {
7173
assert!(key.len() <= u16::MAX.into());
7274
assert!(u32::try_from(value.len()).is_ok());
7375

76+
self.uncompressed_bytes += value.len() as u64;
77+
7478
#[cfg(feature = "lz4")]
7579
let value = lz4_flex::compress_prepend_size(value);
7680

src/value_log.rs

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
path::absolute_path,
77
segment::merge::MergeReader,
88
version::Version,
9-
Config, ExternalIndex, SegmentWriter, ValueHandle,
9+
Config, SegmentWriter, ValueHandle,
1010
};
1111
use byteorder::{BigEndian, ReadBytesExt};
1212
use std::{
@@ -18,24 +18,25 @@ use std::{
1818

1919
/// A disk-resident value log
2020
#[derive(Clone)]
21-
pub struct ValueLog<I: ExternalIndex + Clone + Send + Sync>(Arc<ValueLogInner<I>>);
21+
pub struct ValueLog(Arc<ValueLogInner>);
2222

23-
impl<I: ExternalIndex + Clone + Send + Sync> std::ops::Deref for ValueLog<I> {
24-
type Target = ValueLogInner<I>;
23+
impl std::ops::Deref for ValueLog {
24+
type Target = ValueLogInner;
2525

2626
fn deref(&self) -> &Self::Target {
2727
&self.0
2828
}
2929
}
3030

3131
#[allow(clippy::module_name_repetitions)]
32-
pub struct ValueLogInner<I: ExternalIndex + Clone + Send + Sync> {
32+
pub struct ValueLogInner {
3333
config: Config,
3434

3535
path: PathBuf,
3636

37+
// TODO: maybe not needed persistently...
3738
/// External index
38-
pub index: I,
39+
// pub index: I,
3940

4041
/// In-memory blob cache
4142
blob_cache: Arc<BlobCache>,
@@ -48,7 +49,7 @@ pub struct ValueLogInner<I: ExternalIndex + Clone + Send + Sync> {
4849
rollover_guard: Mutex<()>,
4950
}
5051

51-
impl<I: ExternalIndex + Clone + Send + Sync> ValueLog<I> {
52+
impl ValueLog {
5253
/// Creates or recovers a value log in the given directory.
5354
///
5455
/// # Errors
@@ -57,23 +58,18 @@ impl<I: ExternalIndex + Clone + Send + Sync> ValueLog<I> {
5758
pub fn open<P: Into<PathBuf>>(
5859
path: P, // TODO: move path into config?
5960
config: Config,
60-
index: I,
6161
) -> crate::Result<Self> {
6262
let path = path.into();
6363

6464
if path.join(VLOG_MARKER).try_exists()? {
65-
Self::recover(path, config, index)
65+
Self::recover(path, config)
6666
} else {
67-
Self::create_new(path, config, index)
67+
Self::create_new(path, config)
6868
}
6969
}
7070

7171
/// Creates a new empty value log in a directory.
72-
pub(crate) fn create_new<P: Into<PathBuf>>(
73-
path: P,
74-
config: Config,
75-
index: I,
76-
) -> crate::Result<Self> {
72+
pub(crate) fn create_new<P: Into<PathBuf>>(path: P, config: Config) -> crate::Result<Self> {
7773
let path = absolute_path(path.into());
7874
log::trace!("Creating value-log at {}", path.display());
7975

@@ -109,18 +105,13 @@ impl<I: ExternalIndex + Clone + Send + Sync> ValueLog<I> {
109105
config,
110106
path,
111107
blob_cache,
112-
index,
113108
manifest,
114109
id_generator: IdGenerator::default(),
115110
rollover_guard: Mutex::new(()),
116111
})))
117112
}
118113

119-
pub(crate) fn recover<P: Into<PathBuf>>(
120-
path: P,
121-
config: Config,
122-
index: I,
123-
) -> crate::Result<Self> {
114+
pub(crate) fn recover<P: Into<PathBuf>>(path: P, config: Config) -> crate::Result<Self> {
124115
let path = path.into();
125116
log::info!("Recovering value-log at {}", path.display());
126117

@@ -143,7 +134,6 @@ impl<I: ExternalIndex + Clone + Send + Sync> ValueLog<I> {
143134
config,
144135
path,
145136
blob_cache,
146-
index,
147137
manifest,
148138
// TODO: recover ID, test!!!, maybe store next ID in manifest as u64
149139
id_generator: IdGenerator::default(),
@@ -215,7 +205,7 @@ impl<I: ExternalIndex + Clone + Send + Sync> ValueLog<I> {
215205
)?)
216206
}
217207

218-
/// Scans through a segment, refreshing its statistics
208+
/* /// Scans through a segment, refreshing its statistics
219209
///
220210
/// This function is blocking.
221211
///
@@ -277,7 +267,7 @@ impl<I: ExternalIndex + Clone + Send + Sync> ValueLog<I> {
277267
// TODO: changing stats doesn't happen **too** often, so the I/O is fine
278268
279269
Ok(())
280-
}
270+
} */
281271

282272
/// Drops stale segments
283273
pub fn drop_stale_segments(&self) -> crate::Result<()> {

tests/basic_gc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ fn basic_gc() -> value_log::Result<()> {
99

1010
let vl_path = folder.path();
1111
std::fs::create_dir_all(vl_path)?;
12-
let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
12+
let value_log = ValueLog::open(vl_path, Config::default())?;
1313

1414
{
1515
let items = ["a", "b", "c", "d", "e"];

tests/basic_kv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ fn basic_kv() -> value_log::Result<()> {
99

1010
let vl_path = folder.path();
1111
std::fs::create_dir_all(vl_path)?;
12-
let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
12+
let value_log = ValueLog::open(vl_path, Config::default())?;
1313

1414
let items = ["a", "b", "c", "d", "e"];
1515

tests/recovery.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ fn basic_recovery() -> value_log::Result<()> {
1313
let items = ["a", "b", "c", "d", "e"];
1414

1515
{
16-
let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
16+
let value_log = ValueLog::open(vl_path, Config::default())?;
1717

1818
{
1919
let mut writer = value_log.get_writer()?;
@@ -47,7 +47,7 @@ fn basic_recovery() -> value_log::Result<()> {
4747
}
4848

4949
{
50-
let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
50+
let value_log = ValueLog::open(vl_path, Config::default())?;
5151

5252
// TODO: should be recovered
5353
for id in value_log.manifest.list_segment_ids() {
@@ -76,8 +76,6 @@ fn basic_recovery() -> value_log::Result<()> {
7676
fn delete_unfinished_segment_folders() -> value_log::Result<()> {
7777
let folder = tempfile::tempdir()?;
7878

79-
let index = MockIndex::default();
80-
8179
let vl_path = folder.path();
8280
std::fs::create_dir_all(vl_path)?;
8381

@@ -86,12 +84,12 @@ fn delete_unfinished_segment_folders() -> value_log::Result<()> {
8684
assert!(mock_path.try_exists()?);
8785

8886
{
89-
let _value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
87+
let _value_log = ValueLog::open(vl_path, Config::default())?;
9088
assert!(mock_path.try_exists()?);
9189
}
9290

9391
{
94-
let _value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
92+
let _value_log = ValueLog::open(vl_path, Config::default())?;
9593
assert!(!mock_path.try_exists()?);
9694
}
9795

tests/rollover_index_fail_finish.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ fn rollover_index_fail_finish() -> value_log::Result<()> {
6161

6262
let vl_path = folder.path();
6363
std::fs::create_dir_all(vl_path)?;
64-
let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
64+
let value_log = ValueLog::open(vl_path, Config::default())?;
6565

6666
let items = ["a", "b", "c", "d", "e"];
6767

tests/space_amp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ fn worst_case_space_amp() -> value_log::Result<()> {
99

1010
let vl_path = folder.path();
1111
std::fs::create_dir_all(vl_path)?;
12-
let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
12+
let value_log = ValueLog::open(vl_path, Config::default())?;
1313

1414
assert_eq!(0.0, value_log.manifest.space_amp());
1515
assert_eq!(0.0, value_log.manifest.stale_ratio());
@@ -52,7 +52,7 @@ fn no_overlap_space_amp() -> value_log::Result<()> {
5252

5353
let vl_path = folder.path();
5454
std::fs::create_dir_all(vl_path)?;
55-
let value_log = ValueLog::open(vl_path, Config::default(), index.clone())?;
55+
let value_log = ValueLog::open(vl_path, Config::default())?;
5656

5757
assert_eq!(0.0, value_log.manifest.stale_ratio());
5858
assert_eq!(0.0, value_log.manifest.space_amp());

0 commit comments

Comments
 (0)