Skip to content

Commit fd95de1

Browse files
committed
prepare for 1.0
1 parent a236fe5 commit fd95de1

15 files changed

+365
-218
lines changed

src/key_range.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use crate::{
2+
serde::{Deserializable, DeserializeError, Serializable, SerializeError},
3+
value::UserKey,
4+
};
5+
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
6+
use std::{
7+
io::{Read, Write},
8+
ops::Deref,
9+
sync::Arc,
10+
};
11+
12+
/// A key range in the format of [min, max] (inclusive on both sides)
13+
#[derive(Clone, Debug, PartialEq, Eq)]
14+
pub struct KeyRange((UserKey, UserKey));
15+
16+
impl std::ops::Deref for KeyRange {
17+
type Target = (UserKey, UserKey);
18+
19+
fn deref(&self) -> &Self::Target {
20+
&self.0
21+
}
22+
}
23+
24+
impl KeyRange {
25+
pub fn new(range: (UserKey, UserKey)) -> Self {
26+
Self(range)
27+
}
28+
}
29+
30+
impl Serializable for KeyRange {
31+
fn serialize<W: Write>(&self, writer: &mut W) -> Result<(), SerializeError> {
32+
// NOTE: Max key size = u16
33+
#[allow(clippy::cast_possible_truncation)]
34+
writer.write_u16::<BigEndian>(self.deref().0.len() as u16)?;
35+
writer.write_all(&self.deref().0)?;
36+
37+
// NOTE: Max key size = u16
38+
#[allow(clippy::cast_possible_truncation)]
39+
writer.write_u16::<BigEndian>(self.deref().1.len() as u16)?;
40+
writer.write_all(&self.deref().1)?;
41+
42+
Ok(())
43+
}
44+
}
45+
46+
impl Deserializable for KeyRange {
47+
fn deserialize<R: Read>(reader: &mut R) -> Result<Self, DeserializeError> {
48+
let key_min_len = reader.read_u16::<BigEndian>()?;
49+
let mut key_min = vec![0; key_min_len.into()];
50+
reader.read_exact(&mut key_min)?;
51+
let key_min: UserKey = Arc::from(key_min);
52+
53+
let key_max_len = reader.read_u16::<BigEndian>()?;
54+
let mut key_max = vec![0; key_max_len.into()];
55+
reader.read_exact(&mut key_max)?;
56+
let key_max: UserKey = Arc::from(key_max);
57+
58+
Ok(Self::new((key_min, key_max)))
59+
}
60+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ mod error;
9393
mod handle;
9494
mod id;
9595
mod index;
96+
mod key_range;
9697
mod manifest;
9798
mod mock;
9899
mod path;

src/manifest.rs

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
use crate::{
22
id::SegmentId,
3-
segment::stats::{SegmentFileTrailer, Stats},
3+
key_range::KeyRange,
4+
segment::{gc_stats::GcStats, meta::Metadata, trailer::SegmentFileTrailer},
45
Segment, SegmentWriter as MultiWriter,
56
};
67
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
78
use std::{
89
collections::HashMap,
910
io::{Cursor, Write},
1011
path::{Path, PathBuf},
11-
sync::{atomic::AtomicU64, Arc, RwLock},
12+
sync::{Arc, RwLock},
1213
};
1314

1415
pub const VLOG_MARKER: &str = ".vlog";
@@ -125,11 +126,8 @@ impl SegmentManifest {
125126
Arc::new(Segment {
126127
id,
127128
path,
128-
stats: Stats {
129-
persisted: trailer,
130-
stale_bytes: AtomicU64::default(),
131-
stale_items: AtomicU64::default(),
132-
},
129+
meta: trailer.metadata,
130+
gc_stats: GcStats::default(),
133131
}),
134132
);
135133
}
@@ -191,15 +189,22 @@ impl SegmentManifest {
191189
Arc::new(Segment {
192190
id: segment_id,
193191
path: writer.path,
194-
stats: Stats {
195-
persisted: SegmentFileTrailer {
196-
item_count: writer.item_count,
197-
total_bytes: writer.written_blob_bytes,
198-
total_uncompressed_bytes: writer.uncompressed_bytes,
199-
},
200-
stale_items: AtomicU64::default(),
201-
stale_bytes: AtomicU64::default(),
192+
meta: Metadata {
193+
item_count: writer.item_count,
194+
total_bytes: writer.written_blob_bytes,
195+
total_uncompressed_bytes: writer.uncompressed_bytes,
196+
key_range: KeyRange::new((
197+
writer
198+
.first_key
199+
.clone()
200+
.expect("should have written at least 1 item"),
201+
writer
202+
.last_key
203+
.clone()
204+
.expect("should have written at least 1 item"),
205+
)),
202206
},
207+
gc_stats: GcStats::default(),
203208
}),
204209
);
205210

@@ -276,7 +281,7 @@ impl SegmentManifest {
276281
.read()
277282
.expect("lock is poisoned")
278283
.values()
279-
.map(|x| x.stats.total_bytes)
284+
.map(|x| x.meta.total_bytes)
280285
.sum::<u64>()
281286
}
282287

@@ -287,7 +292,7 @@ impl SegmentManifest {
287292
.read()
288293
.expect("lock is poisoned")
289294
.values()
290-
.map(|x| x.stats.stale_items())
295+
.map(|x| x.meta.stale_items())
291296
.sum::<u64>()
292297
} */
293298

@@ -299,7 +304,7 @@ impl SegmentManifest {
299304
.read()
300305
.expect("lock is poisoned")
301306
.values()
302-
.map(|x| x.stats.total_uncompressed_bytes)
307+
.map(|x| x.meta.total_uncompressed_bytes)
303308
.sum::<u64>();
304309
if used_bytes == 0 {
305310
return 0.0;
@@ -310,7 +315,7 @@ impl SegmentManifest {
310315
.read()
311316
.expect("lock is poisoned")
312317
.values()
313-
.map(|x| x.stats.stale_bytes())
318+
.map(|x| x.gc_stats.stale_bytes())
314319
.sum::<u64>();
315320

316321
if stale_bytes == 0 {
@@ -330,7 +335,7 @@ impl SegmentManifest {
330335
.read()
331336
.expect("lock is poisoned")
332337
.values()
333-
.map(|x| x.stats.total_uncompressed_bytes)
338+
.map(|x| x.meta.total_uncompressed_bytes)
334339
.sum::<u64>();
335340

336341
if used_bytes == 0 {
@@ -342,7 +347,7 @@ impl SegmentManifest {
342347
.read()
343348
.expect("lock is poisoned")
344349
.values()
345-
.map(|x| x.stats.stale_bytes())
350+
.map(|x| x.gc_stats.stale_bytes())
346351
.sum::<u64>();
347352

348353
let alive_bytes = used_bytes - stale_bytes;

src/segment/gc_stats.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use std::sync::atomic::AtomicU64;
2+
3+
#[derive(Debug, Default)]
4+
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
5+
pub struct GcStats {
6+
pub(crate) stale_items: AtomicU64,
7+
pub(crate) stale_bytes: AtomicU64,
8+
}
9+
10+
impl GcStats {
11+
pub fn set_stale_items(&self, x: u64) {
12+
self.stale_items
13+
.store(x, std::sync::atomic::Ordering::Release)
14+
}
15+
16+
pub fn set_stale_bytes(&self, x: u64) {
17+
self.stale_bytes
18+
.store(x, std::sync::atomic::Ordering::Release)
19+
}
20+
21+
/// Returns the amount of dead items in the segment
22+
pub fn stale_items(&self) -> u64 {
23+
self.stale_items.load(std::sync::atomic::Ordering::Acquire)
24+
}
25+
26+
/// Returns the amount of dead bytes in the segment
27+
pub fn stale_bytes(&self) -> u64 {
28+
self.stale_bytes.load(std::sync::atomic::Ordering::Acquire)
29+
}
30+
}

src/segment/meta.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use crate::{
2+
key_range::KeyRange,
3+
serde::{Deserializable, DeserializeError, Serializable, SerializeError},
4+
};
5+
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
6+
use std::io::{Read, Write};
7+
8+
pub const METADATA_HEADER_MAGIC: &[u8] = &[b'F', b'J', b'L', b'L', b'S', b'M', b'D', b'1'];
9+
10+
#[derive(Debug)]
11+
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
12+
pub struct Metadata {
13+
/// Number of KV-pairs in the segment
14+
pub item_count: u64,
15+
16+
/// compressed size in bytes (on disk) (without the fixed size trailer)
17+
pub total_bytes: u64,
18+
19+
/// true size in bytes (if no compression were used)
20+
pub total_uncompressed_bytes: u64,
21+
22+
// TODO:
23+
///// What type of compression is used
24+
// pub compression: CompressionType,
25+
/// Key range
26+
pub key_range: KeyRange,
27+
}
28+
29+
impl Serializable for Metadata {
30+
fn serialize<W: Write>(&self, writer: &mut W) -> Result<(), SerializeError> {
31+
// Write header
32+
writer.write_all(METADATA_HEADER_MAGIC)?;
33+
34+
writer.write_u64::<BigEndian>(self.item_count)?;
35+
writer.write_u64::<BigEndian>(self.total_bytes)?;
36+
writer.write_u64::<BigEndian>(self.total_uncompressed_bytes)?;
37+
38+
self.key_range.serialize(writer)?;
39+
40+
Ok(())
41+
}
42+
}
43+
44+
impl Deserializable for Metadata {
45+
fn deserialize<R: Read>(reader: &mut R) -> Result<Self, DeserializeError> {
46+
// Check header
47+
let mut magic = [0u8; METADATA_HEADER_MAGIC.len()];
48+
reader.read_exact(&mut magic)?;
49+
50+
if magic != METADATA_HEADER_MAGIC {
51+
return Err(DeserializeError::InvalidHeader("SegmentMetadata"));
52+
}
53+
54+
let item_count = reader.read_u64::<BigEndian>()?;
55+
let total_bytes = reader.read_u64::<BigEndian>()?;
56+
let total_uncompressed_bytes = reader.read_u64::<BigEndian>()?;
57+
58+
let key_range = KeyRange::deserialize(reader)?;
59+
60+
Ok(Self {
61+
item_count,
62+
total_bytes,
63+
total_uncompressed_bytes,
64+
key_range,
65+
})
66+
}
67+
}

src/segment/mod.rs

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1+
pub mod gc_stats;
12
pub mod merge;
3+
pub mod meta;
24
pub mod multi_writer;
35
pub mod reader;
4-
pub mod stats;
6+
pub mod trailer;
57
pub mod writer;
68

7-
use self::stats::Stats;
89
use crate::id::SegmentId;
10+
use gc_stats::GcStats;
11+
use meta::Metadata;
912
use std::path::PathBuf;
1013

1114
/// A disk segment is an immutable, sorted, contiguous file
@@ -19,26 +22,52 @@ pub struct Segment {
1922
pub path: PathBuf,
2023

2124
/// Segment statistics
22-
pub stats: Stats,
25+
pub meta: Metadata,
26+
27+
/// Runtime stats for garbage collection
28+
pub gc_stats: GcStats,
2329
}
2430

2531
impl Segment {
26-
/// Returns a scanner that can iterate through the segment
32+
/// Returns a scanner that can iterate through the segment.
2733
///
2834
/// # Errors
2935
///
3036
/// Will return `Err` if an IO error occurs.
3137
pub fn scan(&self) -> std::io::Result<reader::Reader> {
32-
reader::Reader::new(&self.path, self.id, self.stats.item_count)
38+
reader::Reader::new(&self.path, self.id, self.meta.item_count)
3339
}
3440

35-
/// Always returns `false`
41+
/// Always returns `false` because a segment is never empty.
3642
pub fn is_empty(&self) -> bool {
3743
false
3844
}
3945

40-
/// Returns the amount of items in the segment
46+
/// Returns the amount of items in the segment.
4147
pub fn len(&self) -> u64 {
42-
self.stats.item_count
48+
self.meta.item_count
49+
}
50+
51+
/// Marks the segment as fully stale.
52+
pub(crate) fn mark_as_stale(&self) {
53+
self.gc_stats.set_stale_items(self.meta.item_count);
54+
55+
self.gc_stats
56+
.set_stale_bytes(self.meta.total_uncompressed_bytes);
57+
}
58+
59+
/// Returns `true` if the segment is fully stale.
60+
pub fn is_stale(&self) -> bool {
61+
self.gc_stats.stale_items() == self.meta.item_count
62+
}
63+
64+
/// Returns the percent of dead items in the segment.
65+
pub fn stale_ratio(&self) -> f32 {
66+
let dead = self.gc_stats.stale_items() as f32;
67+
if dead == 0.0 {
68+
return 0.0;
69+
}
70+
71+
dead / self.meta.item_count as f32
4372
}
4473
}

0 commit comments

Comments
 (0)