Skip to content

Commit 9053120

Browse files
committed
Refactor how updates work
1 parent 9906472 commit 9053120

File tree

3 files changed

+94
-92
lines changed

3 files changed

+94
-92
lines changed

src/Index2.zig

+28-33
Original file line numberDiff line numberDiff line change
@@ -266,54 +266,49 @@ fn loadSegments(self: *Self) !void {
266266
}
267267

268268
fn doCheckpoint(self: *Self) !bool {
269-
const start_time = std.time.milliTimestamp();
269+
var segments = self.acquireSegments();
270+
defer self.releaseSegments(&segments);
270271

271-
var src = self.readyForCheckpoint() orelse return false;
272+
const source = segments.memory_segments.value.getFirst() orelse return false;
273+
if (source.value.getSize() < self.options.min_segment_size) {
274+
return false;
275+
}
272276

273-
var src_reader = src.data.reader();
274-
defer src_reader.close();
277+
// build new file segment
275278

276-
var dest = try self.file_segments.createSegment(.{self.allocator});
277-
errdefer self.file_segments.destroySegment(dest);
279+
var target = try FileSegmentList.createSegment(self.allocator, .{ .dir = self.data_dir });
280+
errdefer FileSegmentList.destroySegment(self.allocator, &target);
278281

279-
try dest.data.build(self.data_dir, &src_reader);
282+
var reader = source.value.reader();
283+
defer reader.close();
280284

281-
errdefer dest.data.delete(self.data_dir);
285+
try target.value.build(&reader);
286+
errdefer target.value.cleanup();
282287

283-
self.file_segments_lock.lock();
284-
defer self.file_segments_lock.unlock();
288+
// update memory segments list
285289

286-
var ids = try self.file_segments.getIdsAfterAppend(dest, self.allocator);
287-
defer ids.deinit();
290+
var memory_segments_update = try self.memory_segments.beginUpdate();
291+
defer self.memory_segments.cleanupAfterUpdate(&memory_segments_update);
288292

289-
try filefmt.writeIndexFile(self.data_dir, ids.items);
293+
memory_segments_update.removeSegment(source);
290294

291-
// we are about to remove segment from the memory_segments list
292-
self.memory_segments_lock.lock();
293-
defer self.memory_segments_lock.unlock();
295+
// update file segments list
294296

295-
self.segments_lock.lock();
296-
defer self.segments_lock.unlock();
297+
var file_segments_update = try self.file_segments.beginUpdate();
298+
defer self.file_segments.cleanupAfterUpdate(&file_segments_update);
297299

298-
log.info("stage stats size={}, len={}", .{ self.memory_segments.getTotalSize(), self.memory_segments.segments.len });
300+
file_segments_update.appendSegment(target);
299301

300-
if (src != self.memory_segments.segments.first) {
301-
std.debug.panic("checkpoint node is not first in list", .{});
302-
}
302+
try self.writeIndexFile(file_segments_update.segments.value);
303303

304-
if (self.file_segments.segments.last) |last_file_segment| {
305-
if (last_file_segment.data.id.version >= dest.data.id.version) {
306-
std.debug.panic("inconsistent versions between memory and file segments", .{});
307-
}
308-
}
304+
// commit updated lists
309305

310-
self.file_segments.appendSegment(dest);
311-
self.memory_segments.removeAndDestroySegment(src);
306+
self.segments_lock.lock();
307+
defer self.segments_lock.unlock();
312308

313-
log.info("saved changes up to commit {} to disk", .{dest.data.max_commit_id});
309+
self.memory_segments.commitUpdate(&memory_segments_update);
310+
self.file_segments.commitUpdate(&file_segments_update);
314311

315-
const end_time = std.time.milliTimestamp();
316-
log.info("checkpoint took {} ms", .{end_time - start_time});
317312
return true;
318313
}
319314

@@ -336,7 +331,7 @@ fn startCheckpointThread(self: *Self) !void {
336331
if (self.checkpoint_thread != null) return;
337332

338333
log.info("starting checkpoint thread", .{});
339-
// self.checkpoint_thread = try std.Thread.spawn(.{}, checkpointThreadFn, .{self});
334+
self.checkpoint_thread = try std.Thread.spawn(.{}, checkpointThreadFn, .{self});
340335
}
341336

342337
fn stopCheckpointThread(self: *Self) void {

src/segment_list2.zig

+65-58
Original file line numberDiff line numberDiff line change
@@ -64,35 +64,33 @@ pub fn SegmentList(Segment: type) type {
6464
}
6565

6666
pub fn appendSegmentInto(self: Self, copy: *Self, node: Node) void {
67+
copy.nodes.clearRetainingCapacity();
6768
for (self.nodes.items) |n| {
6869
copy.nodes.appendAssumeCapacity(n.acquire());
6970
}
7071
copy.nodes.appendAssumeCapacity(node.acquire());
7172
}
7273

73-
pub fn appendSegment(self: *Self, allocator: Allocator, node: Node) Allocator.Error!Self {
74-
var copy = try Self.init(allocator, self.nodes.items.len + 1);
75-
self.appendSegmentInto(&copy, node);
76-
return copy;
77-
}
78-
79-
pub fn removeSegment(self: *Self, allocator: Allocator, idx: usize) Allocator.Error!Self {
80-
var copy = try Self.init(allocator, self.nodes.items.len - 1);
81-
for (self.nodes.items, 0..) |n, i| {
82-
if (i != idx) {
74+
pub fn removeSegmentInto(self: Self, copy: *Self, node: Node) void {
75+
copy.nodes.clearRetainingCapacity();
76+
for (self.nodes.items) |n| {
77+
if (n.value != node.value) {
8378
copy.nodes.appendAssumeCapacity(n.acquire());
8479
}
8580
}
86-
return copy;
8781
}
8882

89-
pub fn replaceSegments(self: *Self, allocator: Allocator, node: Node, start_idx: usize, end_idx: usize) Allocator.Error!Self {
90-
var copy = try Self.init(allocator, self.nodes.items.len + 1 - (end_idx - start_idx));
91-
for (self.nodes.items, 0..) |n, i| {
92-
if (i < start_idx or i >= end_idx) {
83+
pub fn replaceMergedSegmentInto(self: *Self, copy: *Self, node: Node) Self {
84+
copy.nodes.clearRetainingCapacity();
85+
var inserted_merged = false;
86+
for (self.nodes.items) |n| {
87+
if (node.value.id.contains(n.value.id)) {
88+
if (!inserted_merged) {
89+
copy.nodes.appendAssumeCapacity(node);
90+
inserted_merged = true;
91+
}
92+
} else {
9393
copy.nodes.appendAssumeCapacity(n.acquire());
94-
} else if (i == start_idx) {
95-
copy.nodes.appendAssumeCapacity(node.acquire());
9694
}
9795
}
9896
return copy;
@@ -232,8 +230,8 @@ pub fn SegmentListManager(Segment: type) type {
232230

233231
const candidate = self.merge_policy.findSegmentsToMerge(segments.value.nodes.items) orelse return false;
234232

235-
var target = try List.createSegment(self.allocator, self.options);
236-
errdefer List.destroySegment(self.allocator, &target);
233+
var new_segment = try List.createSegment(self.allocator, self.options);
234+
defer List.destroySegment(self.allocator, &new_segment);
237235

238236
var merger = SegmentMerger(Segment).init(self.allocator, segments.value);
239237
defer merger.deinit();
@@ -243,64 +241,73 @@ pub fn SegmentListManager(Segment: type) type {
243241
}
244242
try merger.prepare();
245243

246-
try target.value.merge(&merger);
247-
errdefer target.value.cleanup();
248-
249-
self.update_lock.lock();
250-
defer self.update_lock.unlock();
251-
252-
var new_segments = try SharedPtr(List).create(self.allocator, undefined);
253-
defer new_segments.release(self.allocator, .{self.allocator});
254-
255-
new_segments.value.* = try List.init(self.allocator, self.segments.value.nodes.items.len);
256-
defer new_segments.value.deinit(self.allocator);
244+
try new_segment.value.merge(&merger);
245+
errdefer new_segment.value.cleanup();
257246

258-
var inserted_merged = false;
259-
for (self.segments.value.nodes.items) |node| {
260-
if (target.value.id.contains(node.value.id)) {
261-
if (!inserted_merged) {
262-
new_segments.value.nodes.appendAssumeCapacity(target);
263-
inserted_merged = true;
264-
}
265-
} else {
266-
new_segments.value.nodes.appendAssumeCapacity(node.acquire());
267-
}
268-
}
247+
var update = try self.beginUpdate();
248+
defer self.cleanupAfterUpdate(&update);
269249

270-
try @call(.auto, preCommitFn, .{ ctx, new_segments.value });
250+
try @call(.auto, preCommitFn, .{ ctx, update.segments.value });
271251

272252
lock.lock();
273253
defer lock.unlock();
274254

275-
self.segments.swap(&new_segments);
255+
self.commitUpdate(&update);
276256

277257
return true;
278258
}
279259

280-
pub fn update(self: *Self, changes: []const Change, lock: *std.Thread.RwLock, preCommitFn: anytype, ctx: anytype) !bool {
281-
var new_segment = try List.createSegment(self.allocator, self.options);
282-
errdefer List.destroySegment(self.allocator, &new_segment);
260+
pub const Update = struct {
261+
manager: *Self,
262+
segments: SharedPtr(List),
263+
committed: bool = false,
283264

284-
try new_segment.value.build(changes);
285-
errdefer new_segment.value.cleanup();
265+
pub fn removeSegment(self: *@This(), node: List.Node) void {
266+
self.manager.segments.value.removeSegmentInto(self.segments.value, node);
267+
}
286268

269+
pub fn appendSegment(self: *@This(), node: List.Node) void {
270+
self.manager.segments.value.appendSegmentInto(self.segments.value, node);
271+
}
272+
273+
pub fn replaceMergedSegment(self: *@This(), node: List.Node) void {
274+
self.manager.segments.value.replaceMergedSegmentInto(self.segments.value, node);
275+
}
276+
};
277+
278+
pub fn beginUpdate(self: *Self) !Update {
287279
self.update_lock.lock();
288-
defer self.update_lock.unlock();
280+
errdefer self.update_lock.unlock();
289281

290-
var new_segments_list = try self.segments.value.appendSegment(self.allocator, new_segment);
291-
errdefer new_segments_list.deinit(self.allocator);
282+
var segments = try SharedPtr(List).create(self.allocator, List.initEmpty());
283+
errdefer self.releaseSegments(&segments);
292284

293-
var new_segments = try SharedPtr(List).create(self.allocator, new_segments_list);
294-
defer new_segments.release(self.allocator, .{self.allocator});
285+
try segments.value.nodes.ensureTotalCapacity(self.allocator, self.segments.value.nodes.items.len + 1);
295286

296-
try @call(.auto, preCommitFn, .{ ctx, new_segments.value });
287+
return .{
288+
.manager = self,
289+
.segments = segments,
290+
};
291+
}
297292

298-
lock.lock();
299-
defer lock.unlock();
293+
pub fn commitUpdate(self: *Self, update: *Update) void {
294+
self.segments.swap(&update.segments);
295+
self.update_lock.unlock();
296+
update.committed = true;
297+
}
300298

301-
self.segments.swap(&new_segments);
299+
pub fn cleanupAfterUpdate(self: *Self, update: *Update) void {
300+
if (!update.committed) {
301+
self.update_lock.unlock();
302+
}
303+
update.segments.releaseWithCleanup(self.allocator, destroySegmentList, .{self});
304+
}
302305

303-
return true;
306+
fn destroySegmentList(segments: *List, self: *Self) void {
307+
for (segments.nodes.items) |node| {
308+
node.value.cleanup();
309+
}
310+
segments.deinit(self.allocator);
304311
}
305312
};
306313
}

src/utils/smartptr.zig

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ pub fn SharedPtr(comptime T: type) type {
7171
}
7272
}
7373

74-
pub fn releaseWithCleanup(self: *Self, allocator: Allocator, cleanupFn: fn (*T) void, extra_args: anytype) void {
74+
pub fn releaseWithCleanup(self: *Self, allocator: Allocator, cleanupFn: anytype, extra_args: anytype) void {
7575
const inner_ptr = self.getInnerPtr();
7676
if (inner_ptr.refs.decr()) {
7777
@call(.auto, cleanupFn, .{&inner_ptr.value} ++ extra_args);

0 commit comments

Comments
 (0)