Skip to content

Commit 6e9a411

Browse files
committed
Use the new merge policy
1 parent 1355389 commit 6e9a411

File tree

4 files changed

+104
-122
lines changed

4 files changed

+104
-122
lines changed

src/Index.zig

+41-29
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,16 @@ const FileSegmentNode = FileSegment.List.List.Node;
2323

2424
const SegmentMerger = @import("segment_merger.zig").SegmentMerger;
2525

26+
const TieredMergePolicy = @import("segment_merge_policy.zig").TieredMergePolicy;
27+
2628
const filefmt = @import("filefmt.zig");
2729

2830
const Self = @This();
2931

3032
const Options = struct {
3133
create: bool = false,
3234
min_segment_size: usize = 1_000_000,
33-
max_segment_size: usize = 100_000_000,
35+
max_segment_size: usize = 1_000_000_000,
3436
};
3537

3638
options: Options,
@@ -58,8 +60,6 @@ update_lock: std.Thread.Mutex = .{},
5860
// Mutex used to control merging of in-memory segments.
5961
memory_merge_lock: std.Thread.Mutex = .{},
6062

61-
file_segments_to_delete: std.ArrayList(SegmentID),
62-
6363
checkpoint_mutex: std.Thread.Mutex = .{},
6464
checkpoint_condition: std.Thread.Condition = .{},
6565
checkpoint_stop: bool = false,
@@ -82,15 +82,28 @@ pub fn init(allocator: std.mem.Allocator, dir: std.fs.Dir, options: Options) !Se
8282
var oplog_dir = try dir.makeOpenPath("oplog", .{ .iterate = true });
8383
errdefer oplog_dir.close();
8484

85+
const file_segment_merge_policy = TieredMergePolicy(FileSegment){
86+
.min_segment_size = options.min_segment_size,
87+
.max_segment_size = options.max_segment_size,
88+
.segments_per_level = 10,
89+
.segments_per_merge = 2, // TODO increase to 10
90+
};
91+
92+
const memory_segment_merge_policy = TieredMergePolicy(MemorySegment){
93+
.min_segment_size = 100,
94+
.max_segment_size = options.min_segment_size,
95+
.segments_per_level = 10,
96+
.segments_per_merge = 2, // TODO increase to 5
97+
};
98+
8599
return .{
86100
.options = options,
87101
.allocator = allocator,
88102
.data_dir = data_dir,
89103
.oplog_dir = oplog_dir,
90104
.oplog = Oplog.init(allocator, oplog_dir),
91-
.file_segments = FileSegmentList.init(allocator),
92-
.file_segments_to_delete = std.ArrayList(SegmentID).init(allocator),
93-
.memory_segments = MemorySegmentList.init(allocator),
105+
.file_segments = FileSegmentList.init(allocator, file_segment_merge_policy),
106+
.memory_segments = MemorySegmentList.init(allocator, memory_segment_merge_policy),
94107
};
95108
}
96109

@@ -123,21 +136,19 @@ fn prepareMemorySegmentMerge(self: *Self) !?MemorySegmentList.PreparedMerge {
123136
self.segments_lock.lockShared();
124137
defer self.segments_lock.unlockShared();
125138

126-
const options = SegmentMergeOptions{
127-
.max_segment_size = self.options.min_segment_size,
128-
};
129-
130-
const merge = try self.memory_segments.prepareMerge(options) orelse return null;
139+
const merge = try self.memory_segments.prepareMerge() orelse return null;
131140
errdefer self.memory_segments.destroySegment(merge.target);
132141

133-
try merge.target.data.merge(&merge.sources.node1.data, &merge.sources.node2.data, &self.memory_segments);
142+
std.debug.assert(merge.sources.num_segments == 2);
143+
try merge.target.data.merge(&merge.sources.start.data, &merge.sources.end.data, &self.memory_segments);
134144

135145
return merge;
136146
}
137147

138148
fn finishMemorySegmentMerge(self: *Self, merge: MemorySegmentList.PreparedMerge) bool {
139-
defer self.memory_segments.destroySegment(merge.sources.node1);
140-
defer self.memory_segments.destroySegment(merge.sources.node2);
149+
std.debug.assert(merge.sources.num_segments == 2);
150+
defer self.memory_segments.destroySegment(merge.sources.start);
151+
defer self.memory_segments.destroySegment(merge.sources.end);
141152

142153
self.segments_lock.lock();
143154
defer self.segments_lock.unlock();
@@ -399,18 +410,18 @@ fn prepareFileSegmentMerge(self: *Self) !?FileSegmentList.PreparedMerge {
399410
self.segments_lock.lockShared();
400411
defer self.segments_lock.unlockShared();
401412

402-
const options = SegmentMergeOptions{
403-
.max_segment_size = self.options.max_segment_size,
404-
};
405-
406-
const merge = try self.file_segments.prepareMerge(options) orelse return null;
413+
const merge = try self.file_segments.prepareMerge() orelse return null;
407414
errdefer self.file_segments.destroySegment(merge.target);
408415

409416
var merger = SegmentMerger(FileSegment).init(self.allocator, &self.file_segments);
410417
defer merger.deinit();
411418

412-
try merger.addSource(&merge.sources.node1.data);
413-
try merger.addSource(&merge.sources.node2.data);
419+
var source_node = merge.sources.start;
420+
while (true) {
421+
try merger.addSource(&source_node.data);
422+
if (source_node == merge.sources.end) break;
423+
source_node = source_node.next orelse break;
424+
}
414425
try merger.prepare();
415426

416427
try merge.target.data.build(self.data_dir, &merger);
@@ -422,8 +433,6 @@ fn finishFileSegmentMerge(self: *Self, merge: FileSegmentList.PreparedMerge) !vo
422433
self.file_segments_lock.lock();
423434
defer self.file_segments_lock.unlock();
424435

425-
try self.file_segments_to_delete.ensureUnusedCapacity(3);
426-
427436
errdefer self.file_segments.destroySegment(merge.target);
428437
errdefer merge.target.data.delete(self.data_dir);
429438

@@ -433,12 +442,14 @@ fn finishFileSegmentMerge(self: *Self, merge: FileSegmentList.PreparedMerge) !vo
433442
var index1: usize = 0;
434443
var index2: usize = 0;
435444

445+
std.debug.assert(merge.sources.num_segments == 2);
446+
436447
var i: usize = 0;
437448
while (i < ids.items.len) : (i += 1) {
438-
if (SegmentID.eq(ids.items[i], merge.sources.node1.data.id)) {
449+
if (SegmentID.eq(ids.items[i], merge.sources.start.data.id)) {
439450
index1 = i;
440451
}
441-
if (SegmentID.eq(ids.items[i], merge.sources.node2.data.id)) {
452+
if (SegmentID.eq(ids.items[i], merge.sources.end.data.id)) {
442453
index2 = i;
443454
}
444455
}
@@ -451,11 +462,12 @@ fn finishFileSegmentMerge(self: *Self, merge: FileSegmentList.PreparedMerge) !vo
451462

452463
try filefmt.writeIndexFile(self.data_dir, ids.items);
453464

454-
defer self.file_segments.destroySegment(merge.sources.node1);
455-
defer self.file_segments.destroySegment(merge.sources.node2);
465+
std.debug.assert(merge.sources.num_segments == 2);
466+
defer self.file_segments.destroySegment(merge.sources.start);
467+
defer self.file_segments.destroySegment(merge.sources.end);
456468

457-
defer merge.sources.node1.data.delete(self.data_dir);
458-
defer merge.sources.node2.data.delete(self.data_dir);
469+
defer merge.sources.start.data.delete(self.data_dir);
470+
defer merge.sources.end.data.delete(self.data_dir);
459471

460472
self.segments_lock.lock();
461473
defer self.segments_lock.unlock();

src/segment_list.zig

+21-66
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,25 @@ const std = @import("std");
33
const common = @import("common.zig");
44
const SearchResults = common.SearchResults;
55

6+
const TieredMergePolicy = @import("segment_merge_policy.zig").TieredMergePolicy;
7+
68
const Deadline = @import("utils/Deadline.zig");
79

810
pub fn SegmentList(Segment: type) type {
911
return struct {
1012
pub const Self = @This();
1113
pub const List = std.DoublyLinkedList(Segment);
1214

15+
pub const MergePolicy = TieredMergePolicy(Segment);
16+
1317
allocator: std.mem.Allocator,
18+
merge_policy: MergePolicy,
1419
segments: List,
1520

16-
pub fn init(allocator: std.mem.Allocator) Self {
21+
pub fn init(allocator: std.mem.Allocator, merge_policy: MergePolicy) Self {
1722
return .{
1823
.allocator = allocator,
24+
.merge_policy = merge_policy,
1925
.segments = .{},
2026
};
2127
}
@@ -95,82 +101,31 @@ pub fn SegmentList(Segment: type) type {
95101
}
96102
}
97103

98-
pub const SegmentsToMerge = struct {
99-
node1: *List.Node,
100-
node2: *List.Node,
101-
};
102-
103-
pub fn findSegmentsToMerge(self: *Self, options: SegmentMergeOptions) ?SegmentsToMerge {
104-
var total_size: usize = 0;
105-
var max_size: usize = 0;
106-
var min_size: usize = std.math.maxInt(usize);
107-
var num_segments: usize = 0;
108-
var segments_iter = self.segments.first;
109-
while (segments_iter) |node| : (segments_iter = node.next) {
110-
if (!node.data.canBeMerged())
111-
continue;
112-
const size = node.data.getSize();
113-
if (size >= options.max_segment_size)
114-
continue;
115-
num_segments += 1;
116-
total_size += size;
117-
max_size = @max(max_size, size);
118-
min_size = @min(min_size, size);
119-
}
120-
121-
if (total_size == 0) {
122-
return null;
123-
}
124-
125-
const max_segments = options.getMaxSegments(total_size);
126-
if (num_segments < max_segments) {
127-
return null;
128-
}
129-
130-
var best_node: ?*List.Node = null;
131-
var best_score: f64 = std.math.inf(f64);
132-
segments_iter = self.segments.first;
133-
var level_size = @as(f64, @floatFromInt(total_size)) / 2;
134-
while (segments_iter) |node| : (segments_iter = node.next) {
135-
if (!node.data.canBeMerged())
136-
continue;
137-
const size = node.data.getSize();
138-
if (size >= options.max_segment_size)
139-
continue;
140-
if (node.next) |next_node| {
141-
const merge_size = size + next_node.data.getSize();
142-
const score = @as(f64, @floatFromInt(merge_size)) - level_size;
143-
if (score < best_score) {
144-
best_node = node;
145-
best_score = score;
146-
}
147-
}
148-
level_size /= 2;
149-
}
150-
151-
if (best_node) |node| {
152-
if (node.next) |next_node| {
153-
return .{ .node1 = node, .node2 = next_node };
154-
}
155-
}
156-
return null;
157-
}
104+
pub const SegmentsToMerge = MergePolicy.Candidate;
158105

159106
pub const PreparedMerge = struct {
160107
sources: SegmentsToMerge,
161108
target: *List.Node,
162109
};
163110

164-
pub fn prepareMerge(self: *Self, options: SegmentMergeOptions) !?PreparedMerge {
165-
const sources = self.findSegmentsToMerge(options) orelse return null;
111+
pub fn prepareMerge(self: *Self) !?PreparedMerge {
112+
const sources = self.merge_policy.findSegmentsToMerge(self.segments) orelse return null;
166113
const target = try self.createSegment();
167114
return .{ .sources = sources, .target = target };
168115
}
169116

170117
pub fn applyMerge(self: *Self, merge: PreparedMerge) void {
171-
self.segments.insertBefore(merge.sources.node1, merge.target);
172-
self.segments.remove(merge.sources.node1);
173-
self.segments.remove(merge.sources.node2);
118+
self.segments.insertBefore(merge.sources.start, merge.target);
119+
var iter = merge.sources.start;
120+
while (true) {
121+
const next_node = iter.next;
122+
self.segments.remove(iter);
123+
if (iter == merge.sources.end) {
124+
break;
125+
} else {
126+
iter = next_node orelse break;
127+
}
128+
}
174129
}
175130
};
176131
}

0 commit comments

Comments
 (0)