Skip to content

Commit 3d78342

Browse files
committed
Avoid blocking updates during merges
1 parent 2e4987c commit 3d78342

File tree

2 files changed

+20
-14
lines changed

2 files changed

+20
-14
lines changed

src/Index.zig

+18-12
Original file line numberDiff line numberDiff line change
@@ -48,18 +48,20 @@ memory_segments: MemorySegmentList,
4848

4949
// RW lock used to control general mutations to either file_segments or memory_segments.
5050
// This lock needs to be held for any read/write operations on either list.
51+
// Once you hold this lock, you can be sure that no changes are happening to either list.
5152
segments_lock: std.Thread.RwLock = .{},
5253

53-
// Mutex used to control exclusivity during re-shuffling of segment lists during checkpoint/merges.
54-
// This lock by itself doesn't give access to either list, you need to hold the segments_lock as well.
54+
// These locks give partial access to the respective segments list.
55+
// 1) For memory_segments, new segment can be appended to the list without this lock.
56+
// 2) For file_segments, no write operation can happen without this lock.
57+
// These lock can be only acquired before segments_lock, never after, to avoid deadlock situatons.
58+
// They are mostly useful to allowing read access to segments during merge/checkpoint, without blocking real-time update.
5559
file_segments_lock: std.Thread.Mutex = .{},
60+
memory_segments_lock: std.Thread.Mutex = .{},
5661

5762
// Mutex used to control linearity of updates.
5863
update_lock: std.Thread.Mutex = .{},
5964

60-
// Mutex used to control merging of in-memory segments.
61-
memory_merge_lock: std.Thread.Mutex = .{},
62-
6365
checkpoint_mutex: std.Thread.Mutex = .{},
6466
checkpoint_condition: std.Thread.Condition = .{},
6567
checkpoint_stop: bool = false,
@@ -135,8 +137,8 @@ fn flattenMemorySegmentIds(self: *Self) void {
135137
}
136138

137139
fn prepareMemorySegmentMerge(self: *Self) !?MemorySegmentList.PreparedMerge {
138-
self.segments_lock.lockShared();
139-
defer self.segments_lock.unlockShared();
140+
self.memory_segments_lock.lock();
141+
defer self.memory_segments_lock.unlock();
140142

141143
var merge = try self.memory_segments.prepareMerge() orelse return null;
142144
defer merge.merger.deinit();
@@ -148,6 +150,9 @@ fn prepareMemorySegmentMerge(self: *Self) !?MemorySegmentList.PreparedMerge {
148150
}
149151

150152
fn finishMemorySegmentMerge(self: *Self, merge: MemorySegmentList.PreparedMerge) bool {
153+
self.memory_segments_lock.lock();
154+
defer self.memory_segments_lock.unlock();
155+
151156
defer self.memory_segments.cleanupAfterMerge(merge, .{});
152157

153158
self.segments_lock.lock();
@@ -166,9 +171,6 @@ fn finishMemorySegmentMerge(self: *Self, merge: MemorySegmentList.PreparedMerge)
166171

167172
// Perform partial compaction on the in-memory segments.
168173
fn maybeMergeMemorySegments(self: *Self) !bool {
169-
self.memory_merge_lock.lock();
170-
defer self.memory_merge_lock.unlock();
171-
172174
const merge = try self.prepareMemorySegmentMerge() orelse return false;
173175
return self.finishMemorySegmentMerge(merge);
174176
}
@@ -292,6 +294,10 @@ fn doCheckpoint(self: *Self) !bool {
292294

293295
try filefmt.writeIndexFile(self.data_dir, ids.items);
294296

297+
// we are about to remove segment from the memory_segments list
298+
self.memory_segments_lock.lock();
299+
defer self.memory_segments_lock.unlock();
300+
295301
self.segments_lock.lock();
296302
defer self.segments_lock.unlock();
297303

@@ -405,8 +411,8 @@ fn stopFileSegmentMergeThread(self: *Self) void {
405411
}
406412

407413
fn prepareFileSegmentMerge(self: *Self) !?FileSegmentList.PreparedMerge {
408-
self.segments_lock.lockShared();
409-
defer self.segments_lock.unlockShared();
414+
self.file_segments_lock.lock();
415+
defer self.file_segments_lock.unlock();
410416

411417
var merge = try self.file_segments.prepareMerge() orelse return null;
412418
defer merge.merger.deinit();

src/segment_list.zig

+2-2
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ pub fn SegmentList(Segment: type) type {
182182
@call(.auto, Segment.cleanup, .{&iter.data} ++ cleanup_args);
183183
self.destroySegment(iter);
184184
if (is_end) break;
185-
iter = next_node orelse break;
185+
iter = next_node orelse unreachable; // next_node being null implies a memory corruption
186186
}
187187
}
188188

@@ -194,7 +194,7 @@ pub fn SegmentList(Segment: type) type {
194194
const is_end = iter == merge.sources.end;
195195
self.segments.remove(iter);
196196
if (is_end) break;
197-
iter = next_node orelse break;
197+
iter = next_node orelse unreachable; // next_node being null implies a memory corruption
198198
}
199199
}
200200
};

0 commit comments

Comments
 (0)