Skip to content

Commit ea7d3a4

Browse files
committed
More code cleanup
1 parent cdd3b20 commit ea7d3a4

File tree

2 files changed

+42
-76
lines changed

2 files changed

+42
-76
lines changed

src/Index2.zig

+26-52
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,8 @@ pub fn deinit(self: *Self) void {
136136
self.stopMemorySegmentMergeThread();
137137
self.stopFileSegmentMergeThread();
138138

139-
self.memory_segments.deinit(self.allocator);
140-
self.file_segments.deinit(self.allocator);
139+
self.memory_segments.deinit();
140+
self.file_segments.deinit();
141141

142142
self.oplog.deinit();
143143
self.oplog_dir.close();
@@ -245,7 +245,7 @@ fn loadSegments(self: *Self) !void {
245245
const segment_ids = filefmt.readIndexFile(self.data_dir, self.allocator) catch |err| {
246246
if (err == error.FileNotFound) {
247247
if (self.options.create) {
248-
try filefmt.writeIndexFile(self.data_dir, &[_]SegmentId{});
248+
try self.updateIndexFile(self.file_segments.segments.value);
249249
return;
250250
}
251251
return error.IndexNotFound;
@@ -254,15 +254,12 @@ fn loadSegments(self: *Self) !void {
254254
};
255255
defer self.allocator.free(segment_ids);
256256

257-
var segments = try FileSegmentList.createShared(self.allocator, segment_ids.len);
258-
defer segments.release(self.allocator, .{self.allocator});
257+
try self.file_segments.segments.value.nodes.ensureTotalCapacity(self.allocator, segment_ids.len);
259258

260259
for (segment_ids) |segment_id| {
261260
const node = try self.loadSegment(segment_id);
262-
segments.value.nodes.appendAssumeCapacity(node);
261+
self.file_segments.segments.value.nodes.appendAssumeCapacity(node);
263262
}
264-
265-
self.file_segments.segments.swap(&segments);
266263
}
267264

268265
fn doCheckpoint(self: *Self) !bool {
@@ -299,7 +296,7 @@ fn doCheckpoint(self: *Self) !bool {
299296

300297
file_segments_update.appendSegment(target);
301298

302-
try self.writeIndexFile(file_segments_update.segments.value);
299+
try self.updateIndexFile(file_segments_update.segments.value);
303300

304301
// commit updated lists
305302

@@ -343,16 +340,29 @@ fn stopCheckpointThread(self: *Self) void {
343340
self.checkpoint_thread = null;
344341
}
345342

346-
fn writeIndexFile(self: *Self, segments: *FileSegmentList) !void {
343+
fn updateIndexFile(self: *Self, segments: *FileSegmentList) !void {
347344
var ids = try segments.getIds(self.allocator);
348345
defer ids.deinit(self.allocator);
349346

350347
try filefmt.writeIndexFile(self.data_dir, ids.items);
351348
}
352349

350+
fn maybeMergeFileSegments(self: *Self) !bool {
351+
var upd = try self.file_segments.prepareMerge() orelse return false;
352+
defer self.file_segments.cleanupAfterUpdate(&upd);
353+
354+
try self.updateIndexFile(upd.segments.value);
355+
356+
self.segments_lock.lock();
357+
defer self.segments_lock.unlock();
358+
359+
self.file_segments.commitUpdate(&upd);
360+
return true;
361+
}
362+
353363
fn fileSegmentMergeThreadFn(self: *Self) void {
354364
while (!self.stopping.load(.acquire)) {
355-
if (self.file_segments.merge(&self.segments_lock, writeIndexFile, self)) |successful| {
365+
if (self.maybeMergeFileSegments()) |successful| {
356366
if (successful) {
357367
continue;
358368
}
@@ -380,56 +390,20 @@ fn stopFileSegmentMergeThread(self: *Self) void {
380390
self.file_segment_merge_thread = null;
381391
}
382392

383-
fn prepareFileSegmentMerge(self: *Self) !?FileSegmentList.PreparedMerge {
384-
self.segments_lock.lockShared();
385-
defer self.segments_lock.unlockShared();
393+
fn maybeMergeMemorySegments(self: *Self) !bool {
394+
var upd = try self.memory_segments.prepareMerge() orelse return false;
395+
defer self.memory_segments.cleanupAfterUpdate(&upd);
386396

387-
return try self.file_segments.prepareMerge();
388-
}
389-
390-
fn maybeMergeFileSegments(self: *Self) !bool {
391-
var merge = try self.prepareFileSegmentMerge() orelse return false;
392-
defer merge.merger.deinit();
393-
errdefer self.file_segments.destroySegment(merge.target);
394-
395-
// We are reading segment data without holding any lock here,
396-
// but it's OK, because are the only ones modifying segments.
397-
// The only other place with write access to the segment list is
398-
// the checkpoint thread, which is only ever adding new segments.
399-
try merge.target.data.build(self.data_dir, &merge.merger);
400-
errdefer merge.target.data.delete(self.data_dir);
401-
402-
// By acquiring file_segments_lock, we make sure that the file_segments list
403-
// can't be modified by other threads.
404-
self.file_segments_lock.lock();
405-
defer self.file_segments_lock.unlock();
406-
407-
var ids = try self.file_segments.getIdsAfterAppliedMerge(merge, self.allocator);
408-
defer ids.deinit();
409-
410-
try filefmt.writeIndexFile(self.data_dir, ids.items);
411-
412-
// We want to do this outside of segments_lock to avoid blocking searches more than necessary
413-
defer self.file_segments.cleanupAfterMerge(merge, .{self.data_dir});
414-
415-
// This lock allows to modify the file_segments list, it's blocking all other threads.
416397
self.segments_lock.lock();
417398
defer self.segments_lock.unlock();
418399

419-
self.file_segments.applyMerge(merge);
420-
421-
log.info("committed merge segment {}:{}", .{ merge.target.data.id.version, merge.target.data.id.included_merges });
400+
self.memory_segments.commitUpdate(&upd);
422401
return true;
423402
}
424403

425-
fn memorySegmentMergePreCommit(self: *Self, segments: *MemorySegmentList) !void {
426-
_ = self;
427-
_ = segments;
428-
}
429-
430404
fn memorySegmentMergeThreadFn(self: *Self) void {
431405
while (!self.stopping.load(.acquire)) {
432-
if (self.memory_segments.merge(&self.segments_lock, memorySegmentMergePreCommit, self)) |successful| {
406+
if (self.maybeMergeMemorySegments()) |successful| {
433407
if (successful) {
434408
self.checkpoint_event.set();
435409
continue;

src/segment_list2.zig

+16-24
Original file line numberDiff line numberDiff line change
@@ -80,20 +80,19 @@ pub fn SegmentList(Segment: type) type {
8080
}
8181
}
8282

83-
pub fn replaceMergedSegmentInto(self: *Self, copy: *Self, node: Node) Self {
83+
pub fn replaceMergedSegmentInto(self: *Self, copy: *Self, node: Node) void {
8484
copy.nodes.clearRetainingCapacity();
8585
var inserted_merged = false;
8686
for (self.nodes.items) |n| {
8787
if (node.value.id.contains(n.value.id)) {
8888
if (!inserted_merged) {
89-
copy.nodes.appendAssumeCapacity(node);
89+
copy.nodes.appendAssumeCapacity(node.acquire());
9090
inserted_merged = true;
9191
}
9292
} else {
9393
copy.nodes.appendAssumeCapacity(n.acquire());
9494
}
9595
}
96-
return copy;
9796
}
9897

9998
pub fn getIds(self: Self, allocator: Allocator) Allocator.Error!std.ArrayListUnmanaged(SegmentId) {
@@ -190,16 +189,16 @@ pub fn SegmentListManager(Segment: type) type {
190189
}
191190

192191
pub fn deinit(self: *Self) void {
193-
releaseSegments(&self.segments);
192+
self.releaseSegments(&self.segments);
194193
}
195194

196195
pub fn count(self: Self) usize {
197196
return self.segments.value.nodes.items.len;
198197
}
199198

200-
fn acquireSegments(self: Self, lock: *std.Thread.RwLock) SharedPtr(List) {
201-
lock.lockShared();
202-
defer lock.unlockShared();
199+
fn acquireSegments(self: *Self) SharedPtr(List) {
200+
self.update_lock.lock();
201+
defer self.update_lock.unlock();
203202

204203
return self.segments.acquire();
205204
}
@@ -212,19 +211,19 @@ pub fn SegmentListManager(Segment: type) type {
212211
return self.segments.value.nodes.items.len > self.num_allowed_segments.load(.acquire);
213212
}
214213

215-
pub fn merge(self: *Self, lock: *std.Thread.RwLock, preCommitFn: anytype, ctx: anytype) !bool {
216-
var segments = self.acquireSegments(lock);
214+
pub fn prepareMerge(self: *Self) !?Update {
215+
var segments = self.acquireSegments();
217216
defer self.releaseSegments(&segments);
218217

219218
self.num_allowed_segments.store(self.merge_policy.calculateBudget(segments.value.nodes.items), .release);
220219
if (!self.needsMerge()) {
221-
return false;
220+
return null;
222221
}
223222

224-
const candidate = self.merge_policy.findSegmentsToMerge(segments.value.nodes.items) orelse return false;
223+
const candidate = self.merge_policy.findSegmentsToMerge(segments.value.nodes.items) orelse return null;
225224

226-
var new_segment = try List.createSegment(self.allocator, self.options);
227-
defer List.destroySegment(self.allocator, &new_segment);
225+
var target = try List.createSegment(self.allocator, self.options);
226+
defer List.destroySegment(self.allocator, &target);
228227

229228
var merger = SegmentMerger(Segment).init(self.allocator, segments.value);
230229
defer merger.deinit();
@@ -234,20 +233,13 @@ pub fn SegmentListManager(Segment: type) type {
234233
}
235234
try merger.prepare();
236235

237-
try new_segment.value.merge(&merger);
238-
errdefer new_segment.value.cleanup();
236+
try target.value.merge(&merger);
237+
errdefer target.value.cleanup();
239238

240239
var update = try self.beginUpdate();
241-
defer self.cleanupAfterUpdate(&update);
242-
243-
try @call(.auto, preCommitFn, .{ ctx, update.segments.value });
244-
245-
lock.lock();
246-
defer lock.unlock();
247-
248-
self.commitUpdate(&update);
240+
update.replaceMergedSegment(target);
249241

250-
return true;
242+
return update;
251243
}
252244

253245
pub const Update = struct {

0 commit comments

Comments
 (0)