Skip to content

Commit dc4c9c6

Browse files
committed
segment list atomic swap
1 parent 2a36278 commit dc4c9c6

File tree

2 files changed

+74
-54
lines changed

2 files changed

+74
-54
lines changed

src/manifest.rs

Lines changed: 73 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -153,69 +153,89 @@ impl SegmentManifest {
153153
Ok(m)
154154
}
155155

156-
pub fn drop_segments(&self, ids: &[u64]) -> crate::Result<()> {
157-
// TODO: atomic swap
156+
/// Modifies the level manifest atomically.
157+
pub(crate) fn atomic_swap<F: Fn(&mut HashMap<SegmentId, Arc<Segment>>)>(
158+
&self,
159+
f: F,
160+
) -> crate::Result<()> {
161+
// NOTE: Create a copy of the levels we can operate on
162+
// without mutating the current level manifest
163+
// If persisting to disk fails, this way the level manifest
164+
// is unchanged
165+
let mut prev_segments = self.segments.write().expect("lock is poisoned");
166+
167+
let mut working_copy = prev_segments.clone();
168+
169+
f(&mut working_copy);
170+
171+
let ids = working_copy.keys().copied().collect::<Vec<_>>();
172+
173+
Self::write_to_disk(&self.path, &ids)?;
174+
*prev_segments = working_copy;
158175

159-
let mut lock = self.segments.write().expect("lock is poisoned");
160-
lock.retain(|x, _| !ids.contains(x));
161-
Self::write_to_disk(&self.path, &lock.keys().copied().collect::<Vec<_>>())
176+
log::trace!("Swapped vLog segment list to: {ids:?}");
177+
178+
Ok(())
162179
}
163180

164-
pub fn register(&self, writer: MultiWriter) -> crate::Result<()> {
165-
// TODO: atomic swap
181+
pub fn drop_segments(&self, ids: &[u64]) -> crate::Result<()> {
182+
self.atomic_swap(|recipe| {
183+
recipe.retain(|x, _| !ids.contains(x));
184+
})
185+
}
166186

167-
let mut lock = self.segments.write().expect("lock is poisoned");
187+
pub fn register(&self, writer: MultiWriter) -> crate::Result<()> {
168188
let writers = writer.finish()?;
169189

170-
for writer in writers {
171-
if writer.item_count == 0 {
172-
log::trace!(
173-
"Writer at {:?} has written no data, deleting empty vLog segment file",
174-
writer.path
175-
);
176-
if let Err(e) = std::fs::remove_file(&writer.path) {
177-
log::warn!(
178-
"Could not delete empty vLog segment file at {:?}: {e:?}",
190+
self.atomic_swap(move |recipe| {
191+
for writer in &writers {
192+
if writer.item_count == 0 {
193+
log::trace!(
194+
"Writer at {:?} has written no data, deleting empty vLog segment file",
179195
writer.path
180196
);
181-
};
182-
continue;
183-
}
197+
if let Err(e) = std::fs::remove_file(&writer.path) {
198+
log::warn!(
199+
"Could not delete empty vLog segment file at {:?}: {e:?}",
200+
writer.path
201+
);
202+
};
203+
continue;
204+
}
184205

185-
let segment_id = writer.segment_id;
186-
187-
lock.insert(
188-
segment_id,
189-
Arc::new(Segment {
190-
id: segment_id,
191-
path: writer.path,
192-
meta: Metadata {
193-
item_count: writer.item_count,
194-
compressed_bytes: writer.written_blob_bytes,
195-
total_uncompressed_bytes: writer.uncompressed_bytes,
196-
key_range: KeyRange::new((
197-
writer
198-
.first_key
199-
.clone()
200-
.expect("should have written at least 1 item"),
201-
writer
202-
.last_key
203-
.clone()
204-
.expect("should have written at least 1 item"),
205-
)),
206-
},
207-
gc_stats: GcStats::default(),
208-
}),
209-
);
210-
211-
log::debug!(
212-
"Created segment #{segment_id:?} ({} items, {} userdata bytes)",
213-
writer.item_count,
214-
writer.uncompressed_bytes,
215-
);
216-
}
206+
let segment_id = writer.segment_id;
217207

218-
Self::write_to_disk(&self.path, &lock.keys().copied().collect::<Vec<_>>())
208+
recipe.insert(
209+
segment_id,
210+
Arc::new(Segment {
211+
id: segment_id,
212+
path: writer.path.clone(),
213+
meta: Metadata {
214+
item_count: writer.item_count,
215+
compressed_bytes: writer.written_blob_bytes,
216+
total_uncompressed_bytes: writer.uncompressed_bytes,
217+
key_range: KeyRange::new((
218+
writer
219+
.first_key
220+
.clone()
221+
.expect("should have written at least 1 item"),
222+
writer
223+
.last_key
224+
.clone()
225+
.expect("should have written at least 1 item"),
226+
)),
227+
},
228+
gc_stats: GcStats::default(),
229+
}),
230+
);
231+
232+
log::debug!(
233+
"Created segment #{segment_id:?} ({} items, {} userdata bytes)",
234+
writer.item_count,
235+
writer.uncompressed_bytes,
236+
);
237+
}
238+
})
219239
}
220240

221241
fn write_to_disk<P: AsRef<Path>>(path: P, segment_ids: &[SegmentId]) -> crate::Result<()> {

src/segment/meta.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub struct Metadata {
1919
/// true size in bytes (if no compression were used)
2020
pub total_uncompressed_bytes: u64,
2121

22-
// TODO:
22+
// TODO: 1.0.0
2323
///// What type of compression is used
2424
// pub compression: CompressionType,
2525
/// Key range

0 commit comments

Comments
 (0)