Skip to content

Commit 9c69152

Browse files
committed
WIP
1 parent 0fe8bf6 commit 9c69152

File tree

1 file changed

+39
-117
lines changed

1 file changed

+39
-117
lines changed

src/segment_merge_policy.zig

+39-117
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,48 @@
11
const std = @import("std");
22
const log = std.log.scoped(.segment_merge_policy);
33

4-
pub fn MergeCandidate(comptime T: type) type {
4+
pub fn TieredMergePolicy(comptime T: type) type {
55
return struct {
6-
start: *std.DoublyLinkedList(T).Node,
7-
end: *std.DoublyLinkedList(T).Node,
8-
num_segments: usize = 0,
9-
size: usize = 0,
10-
level_size: usize,
11-
level_no: usize,
12-
};
13-
}
6+
max_segment_size: usize,
7+
min_segment_size: usize,
148

15-
pub const TieredMergePolicy = struct {
16-
max_segment_size: usize,
17-
min_segment_size: usize,
9+
max_merge_size: u32 = 10,
10+
segments_per_level: u32 = 10,
1811

19-
max_merge_size: u32 = 10,
20-
segments_per_level: u32 = 10,
12+
const SegmentList = std.DoublyLinkedList(T);
13+
const SegmentNode = SegmentList.Node;
2114

22-
pub fn findMerges(self: TieredMergePolicy, comptime T: type, segments: std.DoublyLinkedList(T), allocator: std.mem.Allocator) !std.ArrayList(MergeCandidate(T)) {
23-
const Candidate = MergeCandidate(T);
15+
const Candidate = struct {
16+
start: *SegmentNode,
17+
end: *SegmentNode,
18+
num_segments: usize = 0,
19+
size: usize = 0,
20+
level_size: usize,
21+
level_no: usize,
22+
};
2423

25-
var candidates = std.ArrayList(Candidate).init(allocator);
26-
errdefer candidates.deinit();
24+
pub fn calculateBudget(self: TieredMergePolicy, segments: SegmentList) usize {
25+
var total_size: usize = 0;
26+
var num_oversized_segments: usize = 0;
27+
var min_segment_size: usize = std.math.maxInt(usize);
2728

28-
var total_size: usize = 0;
29-
var num_mergeable_segments: usize = 0;
30-
var min_segment_size: usize = std.math.maxInt(usize);
31-
32-
{
33-
std.debug.print("segments:\n", .{});
3429
var iter = segments.first;
3530
while (iter) |node| : (iter = node.next) {
3631
const segment = &node.data;
3732
const size = segment.getSize();
3833
if (size > self.max_segment_size) {
39-
std.debug.print(" segment {}: {} (too large)\n", .{ segment.id, size });
34+
num_oversized_segments += 1;
4035
continue;
4136
}
4237
total_size += size;
43-
num_mergeable_segments += 1;
4438
min_segment_size = @min(min_segment_size, size);
45-
std.debug.print(" segment {}: {}\n", .{ segment.id, size });
4639
}
47-
}
4840

49-
std.debug.print("total size: {}\n", .{total_size});
50-
std.debug.print("num mergeable segments: {}\n", .{num_mergeable_segments});
41+
var floor_level = self.min_segment_size;
42+
var top_level = floor_level;
43+
const merge_factor = @min(self.max_merge_size, self.segments_per_level);
5144

52-
var floor_level = self.min_segment_size;
53-
var top_level = floor_level;
54-
const merge_factor = @min(self.max_merge_size, self.segments_per_level);
55-
56-
var allowed_segment_count: usize = 0;
57-
{
45+
var num_allowed_segments: usize = 0;
5846
var level_size = floor_level;
5947
var remaining_size = total_size;
6048
while (true) {
@@ -63,100 +51,34 @@ pub const TieredMergePolicy = struct {
6351
} else {
6452
const segments_per_level = remaining_size * 100 / level_size;
6553
if (segments_per_level < self.segments_per_level * 100 or level_size >= self.max_segment_size) {
66-
allowed_segment_count += segments_per_level;
54+
num_allowed_segments += segments_per_level;
6755
top_level = level_size;
6856
break;
6957
}
70-
allowed_segment_count += self.segments_per_level * 100;
58+
num_allowed_segments += self.segments_per_level * 100;
7159
remaining_size -= self.segments_per_level * level_size;
7260
}
7361
level_size = @min(self.max_segment_size, level_size * merge_factor);
7462
}
75-
allowed_segment_count = (allowed_segment_count + 50) / 100;
76-
std.debug.print("allowed segment count: {}\n", .{allowed_segment_count});
77-
}
78-
79-
std.debug.print("floor level: {}\n", .{floor_level});
80-
std.debug.print("top level: {}\n", .{top_level});
81-
82-
if (allowed_segment_count >= num_mergeable_segments) {
83-
return candidates;
63+
num_allowed_segments = (num_allowed_segments + 50) / 100;
64+
return num_allowed_segments + num_oversized_segments;
8465
}
8566

86-
{
87-
var level_size = floor_level;
88-
var level_boundary = level_size * merge_factor * 2 / 4;
89-
90-
var level_no: usize = 0;
91-
var end_node = segments.last orelse return candidates;
92-
while (true) {
93-
if (end_node.data.getSize() > self.max_segment_size) {
94-
end_node = end_node.prev orelse break;
95-
continue;
96-
}
97-
98-
const next_level_size = level_size * merge_factor;
99-
const next_level_boundary = next_level_size * merge_factor * 2 / 4;
100-
101-
var start_node = end_node;
102-
while (true) {
103-
if (start_node.prev) |prev_node| {
104-
if (prev_node.data.getSize() <= level_boundary) {
105-
start_node = prev_node;
106-
continue;
107-
}
108-
}
109-
break;
110-
}
111-
112-
std.debug.print("level={} segments={}-{}\n", .{ level_size, start_node.data.id, end_node.data.id });
113-
114-
var candidate = Candidate{
115-
.start = start_node,
116-
.end = start_node,
117-
.num_segments = 0,
118-
.size = 0,
119-
.level_size = level_size,
120-
.level_no = level_no,
121-
};
67+
pub fn findMerges(self: TieredMergePolicy, segments: std.DoublyLinkedList(T), allocator: std.mem.Allocator) !std.ArrayList(Candidate) {
68+
const num_allowed_segments = self.calculateBudget(segments);
69+
log.debug("budget: {} segments", .{num_allowed_segments});
12270

123-
var iter = start_node;
124-
while (true) {
125-
if (candidate.num_segments >= self.max_merge_size or candidate.size >= self.max_segment_size or candidate.size >= level_boundary) {
126-
break;
127-
}
128-
candidate.end = iter;
129-
candidate.num_segments += 1;
130-
candidate.size += iter.data.getSize();
131-
if (iter == end_node) break;
132-
iter = iter.next orelse break;
133-
}
71+
var candidates = std.ArrayList(Candidate).init(allocator);
72+
errdefer candidates.deinit();
13473

135-
if (candidate.num_segments > 1) {
136-
if (candidate.size >= level_boundary or candidate.size < next_level_boundary or true) {
137-
const prev_size: usize = if (candidate.start.prev) |prev_node| prev_node.data.getSize() else std.math.maxInt(usize);
138-
if (prev_size > candidate.size * 75 / 100) {
139-
try candidates.append(candidate);
140-
} else {
141-
std.debug.print("skipping candidate {}-{}, because size={} and prev_size={}\n", .{ candidate.start.data.id, candidate.end.data.id, candidate.size, prev_size });
142-
}
143-
}
144-
}
145-
146-
level_size = next_level_size;
147-
level_boundary = next_level_boundary;
148-
level_no += 1;
149-
end_node = start_node.prev orelse break;
74+
if (num_allowed_segments >= segments.len) {
75+
return candidates;
15076
}
151-
}
15277

153-
std.debug.print("candidate:\n", .{});
154-
for (candidates.items) |c| {
155-
std.debug.print(" {}-{}: {} {} level_size={}\n", .{ c.start.data.id, c.end.data.id, c.size, c.num_segments, c.level_size });
78+
const merge_factor = @min(self.max_merge_size, self.segments_per_level);
15679
}
157-
return candidates;
158-
}
159-
};
80+
};
81+
}
16082

16183
const MockSegment = struct {
16284
id: u64,
@@ -169,7 +91,7 @@ const MockSegment = struct {
16991

17092
const MockSegmentList = std.DoublyLinkedList(MockSegment);
17193

172-
fn applyMerge(comptime T: type, segments: *std.DoublyLinkedList(T), merge: MergeCandidate(T), allocator: std.mem.Allocator) !void {
94+
fn applyMerge(comptime T: type, segments: *std.DoublyLinkedList(T), merge: TieredMergePolicy(T).Candidate, allocator: std.mem.Allocator) !void {
17395
var iter = merge.start.next;
17496
while (iter) |node| {
17597
const next_node = node.next;

0 commit comments

Comments
 (0)