@@ -6,7 +6,7 @@ pub fn TieredMergePolicy(comptime T: type) type {
6
6
max_segment_size : usize ,
7
7
min_segment_size : usize ,
8
8
9
- max_merge_size : u32 = 10 ,
9
+ segments_per_merge : u32 = 10 ,
10
10
segments_per_level : u32 = 10 ,
11
11
12
12
const SegmentList = std .DoublyLinkedList (T );
@@ -17,11 +17,12 @@ pub fn TieredMergePolicy(comptime T: type) type {
17
17
end : * SegmentNode ,
18
18
num_segments : usize = 0 ,
19
19
size : usize = 0 ,
20
- level_size : usize ,
21
- level_no : usize ,
20
+ score : f64 = 0.0 ,
22
21
};
23
22
24
- pub fn calculateBudget (self : TieredMergePolicy , segments : SegmentList ) usize {
23
+ const Self = @This ();
24
+
25
+ pub fn calculateBudget (self : Self , segments : SegmentList ) usize {
25
26
var total_size : usize = 0 ;
26
27
var num_oversized_segments : usize = 0 ;
27
28
var min_segment_size : usize = std .math .maxInt (usize );
@@ -40,7 +41,7 @@ pub fn TieredMergePolicy(comptime T: type) type {
40
41
41
42
var floor_level = self .min_segment_size ;
42
43
var top_level = floor_level ;
43
- const merge_factor = @min (self .max_merge_size , self .segments_per_level );
44
+ const merge_factor = @min (self .segments_per_merge , self .segments_per_level );
44
45
45
46
var num_allowed_segments : usize = 0 ;
46
47
var level_size = floor_level ;
@@ -64,18 +65,75 @@ pub fn TieredMergePolicy(comptime T: type) type {
64
65
return num_allowed_segments + num_oversized_segments ;
65
66
}
66
67
67
- pub fn findMerges (self : TieredMergePolicy , segments : std .DoublyLinkedList (T ), allocator : std .mem .Allocator ) ! std .ArrayList (Candidate ) {
68
+ pub fn findSegmentsToMerge (self : Self , segments : std .DoublyLinkedList (T )) ? Candidate {
69
+ const num_segments = segments .len ;
68
70
const num_allowed_segments = self .calculateBudget (segments );
69
71
log .debug ("budget: {} segments" , .{num_allowed_segments });
70
72
71
- var candidates = std .ArrayList (Candidate ).init (allocator );
72
- errdefer candidates .deinit ();
73
-
74
73
if (num_allowed_segments >= segments .len ) {
75
- return candidates ;
74
+ return null ;
76
75
}
77
76
78
- const merge_factor = @min (self .max_merge_size , self .segments_per_level );
77
+ const merge_factor = @min (self .segments_per_merge , self .segments_per_level );
78
+ const log_merge_factor = @log2 (@as (f64 , @floatFromInt (merge_factor )));
79
+ const log_min_segment_size = @log2 (@as (f64 , @floatFromInt (self .min_segment_size )));
80
+
81
+ const tier_scaling_factor = @as (f64 , @floatFromInt (num_allowed_segments )) / @as (f64 , @floatFromInt (num_segments )) / @as (f64 , @floatFromInt (self .segments_per_level ));
82
+ var tier = @as (f64 , @floatFromInt (num_segments - 1 )) * tier_scaling_factor ;
83
+
84
+ var best_candidate : ? Candidate = null ;
85
+ var best_score : f64 = 0.0 ;
86
+
87
+ var max_merge_size : usize = self .max_segment_size * 2 ;
88
+
89
+ var iter = segments .first ;
90
+ while (iter ) | current_node | : (iter = current_node .next ) {
91
+ tier -= tier_scaling_factor ;
92
+
93
+ if (current_node .data .getSize () > self .max_segment_size ) {
94
+ // skip oversized segments
95
+ continue ;
96
+ }
97
+
98
+ // std.debug.print("evaluating segment {d} (size={d}, max_merge_size={}, tier={})\n", .{ current_node.data.id, current_node.data.getSize(), max_merge_size, tier });
99
+
100
+ var candidate = Candidate {
101
+ .start = current_node ,
102
+ .end = current_node ,
103
+ .num_segments = 1 ,
104
+ .size = current_node .data .getSize (),
105
+ };
106
+
107
+ while (candidate .num_segments < self .segments_per_merge ) {
108
+ const next_node = candidate .end .next orelse break ;
109
+ const next_size = next_node .data .getSize ();
110
+ candidate .end = next_node ;
111
+ candidate .num_segments += 1 ;
112
+ candidate .size += next_size ;
113
+
114
+ if (candidate .size > max_merge_size ) {
115
+ break ;
116
+ }
117
+
118
+ const log_size = @log2 (@as (f64 , @floatFromInt (candidate .size )));
119
+ const candidate_tier = (log_size - log_min_segment_size ) / log_merge_factor ;
120
+ const score = candidate_tier - tier ;
121
+ // std.debug.print("candidate {}-{}: len={} size={} candidate_tier={}, score={d}\n", .{ candidate.start.data.id, candidate.end.data.id, candidate.num_segments, candidate.size, candidate_tier, score });
122
+ if (score < best_score or best_candidate == null ) {
123
+ best_candidate = candidate ;
124
+ best_score = score ;
125
+ }
126
+
127
+ if (candidate .size > self .max_segment_size ) {
128
+ // if we are over the max_segment_size setting, don't try to add more segments to the merge
129
+ break ;
130
+ }
131
+ }
132
+
133
+ max_merge_size = current_node .data .getSize ();
134
+ }
135
+
136
+ return best_candidate ;
79
137
}
80
138
};
81
139
}
@@ -123,11 +181,11 @@ test {
123
181
});
124
182
const rand = prng .random ();
125
183
126
- const policy = TieredMergePolicy {
184
+ const policy = TieredMergePolicy ( MockSegment ) {
127
185
.min_segment_size = 100 ,
128
186
.max_segment_size = 100000 ,
129
- .max_merge_size = 3 ,
130
- .segments_per_level = 3 ,
187
+ .segments_per_merge = 10 ,
188
+ .segments_per_level = 5 ,
131
189
};
132
190
133
191
for (0.. 10) | _ | {
@@ -137,20 +195,28 @@ test {
137
195
last_id += 1 ;
138
196
}
139
197
140
- for (0.. 1) | _ | {
141
- if (rand .boolean ()) {
198
+ for (0.. 10000) | _ | {
199
+ std .debug .print ("---\n " , .{});
200
+
201
+ if (rand .boolean () or true ) {
142
202
var segment = try std .testing .allocator .create (MockSegmentList .Node );
143
- segment .data = .{ .id = last_id , .size = 100 + rand .intRangeAtMost (u8 , 0 , 50 ) };
203
+ segment .data = .{ .id = last_id , .size = 100 + rand .intRangeAtMost (u16 , 0 , 200 ) };
144
204
segments .append (segment );
145
205
last_id += 1 ;
146
206
}
147
207
148
- var candidates = try policy .findMerges (MockSegment , segments , std .testing .allocator );
149
- defer candidates .deinit ();
150
-
151
- if (candidates .items .len > 0 ) {
152
- try applyMerge (MockSegment , & segments , candidates .items [0 ], std .testing .allocator );
208
+ {
209
+ std .debug .print ("segments:\n " , .{});
210
+ var iter = segments .first ;
211
+ while (iter ) | node | {
212
+ std .debug .print (" {}: {}\n " , .{ node .data .id , node .data .size });
213
+ iter = node .next ;
214
+ }
153
215
}
154
- std .debug .print ("---\n " , .{});
216
+
217
+ const candidate = policy .findSegmentsToMerge (segments ) orelse continue ;
218
+
219
+ std .debug .print ("merging {}-{}\n " , .{ candidate .start .data .id , candidate .end .data .id });
220
+ try applyMerge (MockSegment , & segments , candidate , std .testing .allocator );
155
221
}
156
222
}
0 commit comments