Skip to content

Commit 2e4987c

Browse files
committed
Better merge performance
1 parent b3278d6 commit 2e4987c

File tree

4 files changed

+94
-23
lines changed

4 files changed

+94
-23
lines changed

src/Index.zig

+27-13
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ const Self = @This();
3131

3232
const Options = struct {
3333
create: bool = false,
34-
min_segment_size: usize = 1_000_000,
34+
min_segment_size: usize = 250_000,
3535
max_segment_size: usize = 1_000_000_000,
3636
};
3737

@@ -86,14 +86,16 @@ pub fn init(allocator: std.mem.Allocator, dir: std.fs.Dir, options: Options) !Se
8686
.min_segment_size = options.min_segment_size,
8787
.max_segment_size = options.max_segment_size,
8888
.segments_per_level = 10,
89-
.segments_per_merge = 2, // TODO increase to 10
89+
.segments_per_merge = 10,
90+
.strategy = .balanced,
9091
};
9192

9293
const memory_segment_merge_policy = TieredMergePolicy(MemorySegment){
9394
.min_segment_size = 100,
9495
.max_segment_size = options.min_segment_size,
9596
.segments_per_level = 10,
96-
.segments_per_merge = 2, // TODO increase to 5
97+
.segments_per_merge = 10,
98+
.strategy = .aggressive,
9799
};
98100

99101
return .{
@@ -270,7 +272,7 @@ fn loadSegments(self: *Self) !void {
270272
fn doCheckpoint(self: *Self) !bool {
271273
const start_time = std.time.milliTimestamp();
272274

273-
var src = try self.readyForCheckpoint() orelse return false;
275+
var src = self.readyForCheckpoint() orelse return false;
274276

275277
var src_reader = src.data.reader();
276278
defer src_reader.close();
@@ -322,7 +324,7 @@ fn checkpointThreadFn(self: *Self) void {
322324

323325
if (self.doCheckpoint()) |successful| {
324326
if (successful) {
325-
self.file_segment_merge_condition.signal();
327+
self.signalFileSegmentMerge();
326328
continue;
327329
}
328330
} else |err| {
@@ -502,7 +504,7 @@ const Checkpoint = struct {
502504
dest: ?*FileSegmentNode = null,
503505
};
504506

505-
fn readyForCheckpoint(self: *Self) !?*MemorySegmentNode {
507+
fn readyForCheckpoint(self: *Self) ?*MemorySegmentNode {
506508
self.segments_lock.lockShared();
507509
defer self.segments_lock.unlockShared();
508510

@@ -515,15 +517,27 @@ fn readyForCheckpoint(self: *Self) !?*MemorySegmentNode {
515517
return null;
516518
}
517519

518-
pub fn update(self: *Self, changes: []const Change) !void {
519-
//const t1 = std.time.milliTimestamp();
520-
if (try self.maybeMergeMemorySegments()) {
521-
self.checkpoint_condition.signal();
520+
fn signalMemorySegmentMerge(self: *Self) void {
521+
self.segments_lock.lockShared();
522+
defer self.segments_lock.unlockShared();
523+
524+
if (self.memory_segments.needsMerge()) {
525+
self.memory_segment_merge_condition.signal();
522526
}
523-
//const t2 = std.time.milliTimestamp();
527+
}
528+
529+
fn signalFileSegmentMerge(self: *Self) void {
530+
self.segments_lock.lockShared();
531+
defer self.segments_lock.unlockShared();
532+
533+
if (self.file_segments.needsMerge()) {
534+
self.file_segment_merge_condition.signal();
535+
}
536+
}
537+
538+
pub fn update(self: *Self, changes: []const Change) !void {
524539
try self.oplog.write(changes, Updater{ .index = self });
525-
// const t3 = std.time.milliTimestamp();
526-
//log.info("merge: {}ms, update: {}ms", .{ t2 - t1, t3 - t2 });
540+
self.signalMemorySegmentMerge();
527541
}
528542

529543
pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, deadline: Deadline) !SearchResults {

src/filefmt.zig

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ test "check writeVarint32" {
9191
}
9292

9393
pub const max_file_name_size = 64;
94-
const segment_file_name_fmt = "segment-{d}-{d}.dat";
94+
const segment_file_name_fmt = "segment-{x:0>8}-{x:0>8}.dat";
9595
pub const index_file_name = "index.dat";
9696

9797
pub fn buildSegmentFileName(buf: []u8, version: common.SegmentID) []u8 {

src/segment_list.zig

+16-2
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@ pub fn SegmentList(Segment: type) type {
1919
merge_policy: MergePolicy,
2020
segments: List,
2121

22+
num_allowed_segments: std.atomic.Value(usize),
23+
2224
pub fn init(allocator: std.mem.Allocator, merge_policy: MergePolicy) Self {
2325
return .{
2426
.allocator = allocator,
2527
.merge_policy = merge_policy,
2628
.segments = .{},
29+
.num_allowed_segments = std.atomic.Value(usize).init(0),
2730
};
2831
}
2932

@@ -134,6 +137,10 @@ pub fn SegmentList(Segment: type) type {
134137
}
135138
}
136139

140+
pub fn needsMerge(self: *Self) bool {
141+
return self.segments.len > self.num_allowed_segments.load(.monotonic);
142+
}
143+
137144
pub const SegmentsToMerge = MergePolicy.Candidate;
138145

139146
pub const PreparedMerge = struct {
@@ -143,7 +150,10 @@ pub fn SegmentList(Segment: type) type {
143150
};
144151

145152
pub fn prepareMerge(self: *Self) !?PreparedMerge {
146-
const sources = self.merge_policy.findSegmentsToMerge(self.segments) orelse return null;
153+
const result = self.merge_policy.findSegmentsToMerge(self.segments);
154+
self.num_allowed_segments.store(result.num_allowed_segments, .monotonic);
155+
156+
const sources = result.candidate orelse return null;
147157

148158
var merger = SegmentMerger(Segment).init(self.allocator, self);
149159
errdefer merger.deinit();
@@ -157,7 +167,11 @@ pub fn SegmentList(Segment: type) type {
157167
try merger.prepare();
158168

159169
const target = try self.createSegment();
160-
return .{ .sources = sources, .merger = merger, .target = target };
170+
return .{
171+
.sources = sources,
172+
.merger = merger,
173+
.target = target,
174+
};
161175
}
162176

163177
pub fn cleanupAfterMerge(self: *Self, merge: PreparedMerge, cleanup_args: anytype) void {

src/segment_merge_policy.zig

+50-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
const std = @import("std");
22
const log = std.log.scoped(.segment_merge_policy);
33

4-
const verbose = false;
4+
const verbose = true;
55

66
pub fn MergeCandidate(comptime Segment: type) type {
77
return struct {
@@ -24,6 +24,13 @@ pub fn TieredMergePolicy(comptime T: type) type {
2424
segments_per_merge: u32 = 10,
2525
segments_per_level: u32 = 10,
2626

27+
strategy: Strategy = .balanced,
28+
29+
const Strategy = enum {
30+
balanced,
31+
aggressive,
32+
};
33+
2734
const SegmentList = std.DoublyLinkedList(T);
2835
const SegmentNode = SegmentList.Node;
2936

@@ -74,21 +81,34 @@ pub fn TieredMergePolicy(comptime T: type) type {
7481
return num_allowed_segments + num_oversized_segments;
7582
}
7683

77-
pub fn findSegmentsToMerge(self: Self, segments: SegmentList) ?Candidate {
84+
pub const FindSegmentsToMergeResult = struct {
85+
num_allowed_segments: usize,
86+
candidate: ?Candidate,
87+
};
88+
89+
pub fn findSegmentsToMerge(self: Self, segments: SegmentList) FindSegmentsToMergeResult {
7890
const num_segments = segments.len;
7991
const num_allowed_segments = self.calculateBudget(segments);
8092
log.debug("budget: {} segments", .{num_allowed_segments});
8193

94+
var result = FindSegmentsToMergeResult{
95+
.num_allowed_segments = num_allowed_segments,
96+
.candidate = null,
97+
};
98+
8299
if (num_allowed_segments >= segments.len) {
83-
return null;
100+
return result;
84101
}
85102

86103
const merge_factor = @min(self.segments_per_merge, self.segments_per_level);
87104
const log_merge_factor = @log2(@as(f64, @floatFromInt(merge_factor)));
88105
const log_min_segment_size = @log2(@as(f64, @floatFromInt(self.min_segment_size)));
89106

90107
const tier_scaling_factor = @as(f64, @floatFromInt(num_allowed_segments)) / @as(f64, @floatFromInt(num_segments)) / @as(f64, @floatFromInt(self.segments_per_level));
91-
var tier = @as(f64, @floatFromInt(num_segments - 1)) * tier_scaling_factor;
108+
const top_tier = @as(f64, @floatFromInt(num_segments)) * tier_scaling_factor;
109+
var tier = top_tier;
110+
111+
var segment_no: usize = 0;
92112

93113
var best_candidate: ?Candidate = null;
94114
var best_score: f64 = 0.0;
@@ -98,6 +118,7 @@ pub fn TieredMergePolicy(comptime T: type) type {
98118
var iter = segments.first;
99119
while (iter) |current_node| : (iter = current_node.next) {
100120
tier -= tier_scaling_factor;
121+
segment_no += 1;
101122

102123
if (current_node.data.getSize() > self.max_segment_size) {
103124
// skip oversized segments
@@ -126,7 +147,16 @@ pub fn TieredMergePolicy(comptime T: type) type {
126147

127148
const log_size = @log2(@as(f64, @floatFromInt(candidate.size)));
128149
const candidate_tier = (log_size - log_min_segment_size) / log_merge_factor;
129-
const score = candidate_tier - tier;
150+
var score = candidate_tier - tier;
151+
152+
const adjustment_factor: f64 = switch (self.strategy) {
153+
.balanced => 1.2,
154+
.aggressive => 1.8,
155+
};
156+
157+
const adjustment = @as(f64, @floatFromInt(candidate.num_segments)) / @as(f64, @floatFromInt(self.segments_per_merge));
158+
score = score - adjustment_factor * adjustment;
159+
130160
// std.debug.print("candidate {}-{}: len={} size={} candidate_tier={}, score={d}\n", .{ candidate.start.data.id, candidate.end.data.id, candidate.num_segments, candidate.size, candidate_tier, score });
131161
if (score < best_score or best_candidate == null) {
132162
best_candidate = candidate;
@@ -142,7 +172,8 @@ pub fn TieredMergePolicy(comptime T: type) type {
142172
max_merge_size = current_node.data.getSize();
143173
}
144174

145-
return best_candidate;
175+
result.candidate = best_candidate;
176+
return result;
146177
}
147178
};
148179
}
@@ -186,6 +217,7 @@ test "TieredMergePolicy" {
186217
.max_segment_size = 100000,
187218
.segments_per_merge = 10,
188219
.segments_per_level = 5,
220+
.strategy = .aggressive,
189221
};
190222

191223
var last_id: u64 = 1;
@@ -200,6 +232,9 @@ test "TieredMergePolicy" {
200232
last_id += 1;
201233
}
202234

235+
var total_merge_size: u64 = 0;
236+
var total_merge_count: u64 = 0;
237+
203238
for (0..1000) |_| {
204239
if (verbose) {
205240
std.debug.print("---\n", .{});
@@ -221,14 +256,22 @@ test "TieredMergePolicy" {
221256
}
222257
}
223258

224-
const candidate = policy.findSegmentsToMerge(segments) orelse continue;
259+
const result = policy.findSegmentsToMerge(segments);
260+
const candidate = result.candidate orelse continue;
261+
262+
total_merge_size += candidate.num_segments;
263+
total_merge_count += 1;
225264

226265
if (verbose) {
227266
std.debug.print("merging {}-{}\n", .{ candidate.start.data.id, candidate.end.data.id });
228267
}
229268
try applyMerge(MockSegment, &segments, candidate, std.testing.allocator);
230269
}
231270

271+
if (verbose) {
272+
std.debug.print("avg merge size: {}\n", .{total_merge_size / total_merge_count});
273+
}
274+
232275
const num_allowed_segmens = policy.calculateBudget(segments);
233276
try std.testing.expect(num_allowed_segmens >= segments.len);
234277
}

0 commit comments

Comments
 (0)