Skip to content

Commit 8f0ad66

Browse files
committed
.
1 parent 0965077 commit 8f0ad66

File tree

1 file changed

+150
-5
lines changed

1 file changed

+150
-5
lines changed

src/segment_merge_policy.zig

+150-5
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
11
const std = @import("std");
2+
const log = std.log.scoped(.segment_merge_policy);
23

34
pub const LogMergePolicy = struct {
4-
pub const default_merge_factor = 10;
5+
// Defines the allowed range of log(size) for each
6+
// level. A level is computed by taking the max segment
7+
// log size, minus level_log_span, and finding all
8+
// segments falling within that range.
9+
const level_log_span: f64 = 0.75;
10+
11+
// Default merge factor, which is how many segments are
12+
// merged at a time.
13+
pub const default_merge_factor: usize = 10;
514

615
// How many segments to merge at a time.
716
merge_factor: usize = default_merge_factor,
817

918
// Any segments whose size is smaller than this value
10-
// will be rounded up to this value. This ensures that
19+
// will be rounded up to this value. This ensures that
1120
// tiny segments are aggressively merged.
1221
min_merge_size: usize,
1322

@@ -19,8 +28,144 @@ pub const LogMergePolicy = struct {
1928
// percentage of non-deleted documents.
2029
calibrate_size_by_deletes: bool = false,
2130

22-
pub fn findMerges(self: *LogMergePolicy, comptime T: type, segments: std.DoublyLinkedList(T)) !void {
23-
_ = self;
24-
_ = segments;
31+
// Log details about the segment selection process.
32+
verbose: bool = false,
33+
34+
fn logSize(size: usize) f64 {
35+
return @log2(@as(f64, @floatFromInt(@max(1, size))));
36+
}
37+
38+
pub fn findMerges(self: LogMergePolicy, comptime T: type, segments: std.DoublyLinkedList(T), allocator: std.mem.Allocator) !void {
39+
const LevelAndSegment = struct {
40+
level: f64,
41+
segment: *const T,
42+
merging: bool = false,
43+
};
44+
45+
var levels = std.ArrayList(LevelAndSegment).init(allocator);
46+
defer levels.deinit();
47+
48+
const norm = logSize(self.merge_factor);
49+
50+
std.debug.print("candidates:\n", .{});
51+
var iter = segments.first;
52+
while (iter) |node| : (iter = node.next) {
53+
const segment = &node.data;
54+
const level = logSize(segment.getSize()) / norm;
55+
std.debug.print(" segment {}: {}\n", .{ segment.id, segment.getSize() });
56+
try levels.append(.{ .level = level, .segment = segment });
57+
}
58+
59+
const level_floor = logSize(self.min_merge_size) / norm;
60+
std.debug.print("level_floor: {}\n", .{level_floor});
61+
62+
const num_mergeable_segments = levels.items.len;
63+
64+
var start: usize = 0;
65+
while (start < num_mergeable_segments) {
66+
67+
// Find max level of all segments not already
68+
// quantized.
69+
var max_level: f64 = 0.0;
70+
for (levels.items[start..]) |item| {
71+
max_level = @max(max_level, item.level);
72+
}
73+
74+
// Now search backwards for the rightmost segment that
75+
// falls into this level:
76+
var level_bottom: f64 = undefined;
77+
if (max_level <= level_floor) {
78+
// All remaining segments fall into the min level
79+
level_bottom = -1.0;
80+
} else {
81+
level_bottom = max_level - level_log_span;
82+
83+
// Force a boundary at the level floor
84+
if (level_bottom < level_floor and max_level >= level_floor) {
85+
level_bottom = level_floor;
86+
}
87+
}
88+
89+
std.debug.print("level_bottom: {}\n", .{level_bottom});
90+
91+
var upto: usize = num_mergeable_segments - 1;
92+
while (upto >= start) {
93+
if (levels.items[upto].level >= level_bottom) {
94+
break;
95+
}
96+
if (upto > start) {
97+
upto -= 1;
98+
} else {
99+
break;
100+
}
101+
}
102+
103+
// Finally, record all merges that are viable at this level:
104+
var end: usize = start + self.merge_factor;
105+
while (end <= 1 + upto) {
106+
var any_too_large = false;
107+
var any_merging = false;
108+
109+
for (levels.items[start..end]) |item| {
110+
if (item.segment.getSize() > self.max_merge_size) {
111+
any_too_large = true;
112+
}
113+
if (item.merging) {
114+
any_merging = true;
115+
break;
116+
}
117+
}
118+
119+
if (!any_too_large and !any_merging) {
120+
std.debug.print("merge:\n", .{});
121+
for (levels.items[start..end]) |*item| {
122+
std.debug.print(" segment {}: {}\n", .{ item.segment.id, item.segment.getSize() });
123+
item.merging = true;
124+
}
125+
}
126+
127+
start = end;
128+
end = start + self.merge_factor;
129+
}
130+
131+
start = 1 + upto;
132+
}
25133
}
26134
};
135+
136+
test {
137+
const MockSegment = struct {
138+
id: u64,
139+
size: usize,
140+
141+
pub fn getSize(self: @This()) usize {
142+
return self.size;
143+
}
144+
};
145+
146+
const MockSegmentList = std.DoublyLinkedList(MockSegment);
147+
148+
var segments: std.DoublyLinkedList(MockSegment) = .{};
149+
150+
defer {
151+
var iter = segments.first;
152+
while (iter) |node| {
153+
iter = node.next;
154+
std.testing.allocator.destroy(node);
155+
}
156+
}
157+
158+
for (0..10) |i| {
159+
var segment = try std.testing.allocator.create(MockSegmentList.Node);
160+
segment.data = .{ .id = i, .size = 100 + (10 - i) * 10 };
161+
segments.append(segment);
162+
}
163+
164+
const policy = LogMergePolicy{
165+
.min_merge_size = 1,
166+
.max_merge_size = 1000,
167+
.merge_factor = 3,
168+
};
169+
170+
try policy.findMerges(MockSegment, segments, std.testing.allocator);
171+
}

0 commit comments

Comments
 (0)