Skip to content

Commit 3c3a916

Browse files
committed
Add pre-commit hook to merge()
1 parent f1e20b0 commit 3c3a916

File tree

2 files changed

+33
-10
lines changed

2 files changed

+33
-10
lines changed

src/Index2.zig

+14-2
Original file line numberDiff line numberDiff line change
@@ -338,9 +338,16 @@ fn stopCheckpointThread(self: *Self) void {
338338
self.checkpoint_thread = null;
339339
}
340340

341+
fn writeIndexFile(self: *Self, segments: *MemorySegmentList) !void {
342+
var ids = try segments.getIds(self.allocator);
343+
defer ids.deinit(self.allocator);
344+
345+
try filefmt.writeIndexFile(self.data_dir, ids.items);
346+
}
347+
341348
fn fileSegmentMergeThreadFn(self: *Self) void {
342349
while (!self.stopping.load(.acquire)) {
343-
if (self.maybeMergeFileSegments()) |successful| {
350+
if (self.file_segments.merge(&self.segments_lock, writeIndexFile, self)) |successful| {
344351
if (successful) {
345352
continue;
346353
}
@@ -410,9 +417,14 @@ fn maybeMergeFileSegments(self: *Self) !bool {
410417
return true;
411418
}
412419

420+
fn memorySegmentMergePreCommit(self: *Self, segments: *MemorySegmentList) !void {
421+
_ = self;
422+
_ = segments;
423+
}
424+
413425
fn memorySegmentMergeThreadFn(self: *Self) void {
414426
while (!self.stopping.load(.acquire)) {
415-
if (self.memory_segments.merge(&self.segments_lock)) |successful| {
427+
if (self.memory_segments.merge(&self.segments_lock, memorySegmentMergePreCommit, self)) |successful| {
416428
if (successful) {
417429
self.checkpoint_event.set();
418430
continue;

src/segment_list2.zig

+19-8
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ pub fn SegmentListManager(Segment: type) type {
175175
segments: SharedPtr(List),
176176
merge_policy: MergePolicy,
177177
num_allowed_segments: std.atomic.Value(usize),
178+
update_lock: std.Thread.Mutex,
178179

179180
pub fn init(allocator: Allocator, merge_policy: MergePolicy) !Self {
180181
const segments = try SharedPtr(List).create(allocator, List.initEmpty());
@@ -183,6 +184,7 @@ pub fn SegmentListManager(Segment: type) type {
183184
.segments = segments,
184185
.merge_policy = merge_policy,
185186
.num_allowed_segments = std.atomic.Value(usize).init(0),
187+
.update_lock = .{},
186188
};
187189
}
188190

@@ -216,7 +218,7 @@ pub fn SegmentListManager(Segment: type) type {
216218
return self.segments.value.nodes.items.len > self.num_allowed_segments.load(.acquire);
217219
}
218220

219-
pub fn merge(self: *Self, lock: *std.Thread.RwLock) !bool {
221+
pub fn merge(self: *Self, lock: *std.Thread.RwLock, preCommitFn: anytype, ctx: anytype) !bool {
220222
var segments = self.acquireSegments(lock);
221223
defer self.releaseSegments(&segments);
222224

@@ -240,25 +242,34 @@ pub fn SegmentListManager(Segment: type) type {
240242

241243
try target.value.merge(&merger);
242244

243-
lock.lock();
244-
defer lock.unlock();
245+
self.update_lock.lock();
246+
defer self.update_lock.unlock();
245247

246-
var new_segments = try List.init(self.allocator, self.segments.value.nodes.items.len);
247-
errdefer new_segments.deinit(self.allocator);
248+
var new_segments = try SharedPtr(List).create(self.allocator, undefined);
249+
defer new_segments.release(self.allocator, .{self.allocator});
250+
251+
new_segments.value.* = try List.init(self.allocator, self.segments.value.nodes.items.len);
252+
defer new_segments.value.deinit(self.allocator);
248253

249254
var inserted_merged = false;
250255
for (self.segments.value.nodes.items) |node| {
251256
if (target.value.id.contains(node.value.id)) {
252257
if (!inserted_merged) {
253-
new_segments.nodes.appendAssumeCapacity(target);
258+
new_segments.value.nodes.appendAssumeCapacity(target);
254259
inserted_merged = true;
255260
}
256261
} else {
257-
new_segments.nodes.appendAssumeCapacity(node.acquire());
262+
new_segments.value.nodes.appendAssumeCapacity(node.acquire());
258263
}
259264
}
260265

261-
try self.swap(new_segments);
266+
try @call(.auto, preCommitFn, .{ ctx, new_segments.value });
267+
268+
lock.lock();
269+
defer lock.unlock();
270+
271+
self.segments.swap(&new_segments);
272+
262273
return true;
263274
}
264275
};

0 commit comments

Comments
 (0)