@@ -40,7 +40,14 @@ pub fn TieredMergePolicy(comptime T: type) type {
40
40
41
41
const Self = @This ();
42
42
43
- pub fn calculateBudget (self : Self , segments : SegmentList ) usize {
43
+ pub const CalculateBudgetResult = struct {
44
+ floor_level : usize ,
45
+ top_level : usize ,
46
+ total_size : usize ,
47
+ num_allowed_segments : usize ,
48
+ };
49
+
50
+ pub fn calculateBudget (self : Self , segments : SegmentList ) CalculateBudgetResult {
44
51
var total_size : usize = 0 ;
45
52
var num_oversized_segments : usize = 0 ;
46
53
var min_segment_size : usize = std .math .maxInt (usize );
@@ -57,10 +64,6 @@ pub fn TieredMergePolicy(comptime T: type) type {
57
64
min_segment_size = @min (min_segment_size , size );
58
65
}
59
66
60
- if (self .max_segments ) | max_segments | {
61
- return max_segments + num_oversized_segments ;
62
- }
63
-
64
67
var floor_level = self .min_segment_size ;
65
68
var top_level = floor_level ;
66
69
const merge_factor = @max (2 , @min (self .segments_per_merge , self .segments_per_level ));
@@ -83,8 +86,13 @@ pub fn TieredMergePolicy(comptime T: type) type {
83
86
}
84
87
level_size = @min (self .max_segment_size , level_size * merge_factor );
85
88
}
86
- num_allowed_segments = (num_allowed_segments + 50 ) / 100 ;
87
- return num_allowed_segments + num_oversized_segments ;
89
+ num_allowed_segments = self .max_segments orelse (num_allowed_segments + 50 ) / 100 ;
90
+ return .{
91
+ .num_allowed_segments = num_allowed_segments + num_oversized_segments ,
92
+ .floor_level = floor_level ,
93
+ .top_level = top_level ,
94
+ .total_size = total_size ,
95
+ };
88
96
}
89
97
90
98
pub const FindSegmentsToMergeResult = struct {
@@ -93,24 +101,24 @@ pub fn TieredMergePolicy(comptime T: type) type {
93
101
};
94
102
95
103
pub fn findSegmentsToMerge (self : Self , segments : SegmentList ) FindSegmentsToMergeResult {
96
- const num_segments = segments .len ;
97
- const num_allowed_segments = self .calculateBudget (segments );
98
- log .debug ("budget: {} segments" , .{num_allowed_segments });
104
+ const stats = self .calculateBudget (segments );
105
+ log .debug ("budget: {} segments" , .{stats .num_allowed_segments });
99
106
100
107
var result = FindSegmentsToMergeResult {
101
- .num_allowed_segments = num_allowed_segments ,
108
+ .num_allowed_segments = stats . num_allowed_segments ,
102
109
.candidate = null ,
103
110
};
104
111
105
- if (num_allowed_segments >= segments .len ) {
112
+ const num_segments = segments .len ;
113
+ if (stats .num_allowed_segments >= num_segments ) {
106
114
return result ;
107
115
}
108
116
109
- const merge_factor = @max (2 , @min (self .segments_per_merge , self .segments_per_level ));
110
- const log_merge_factor = @log2 (@as (f64 , @floatFromInt (merge_factor )));
111
- const log_min_segment_size = @log2 (@as (f64 , @floatFromInt (self .min_segment_size )));
117
+ // const merge_factor = @max(2, @min(self.segments_per_merge, self.segments_per_level));
118
+ // const log_merge_factor = @log2(@as(f64, @floatFromInt(merge_factor)));
119
+ // const log_min_segment_size = @log2(@as(f64, @floatFromInt(self.min_segment_size)));
112
120
113
- const tier_scaling_factor = @as (f64 , @floatFromInt (num_allowed_segments )) / @as (f64 , @floatFromInt (num_segments )) / @as (f64 , @floatFromInt (self .segments_per_level ));
121
+ const tier_scaling_factor = @as (f64 , @floatFromInt (stats . num_allowed_segments )) / @as (f64 , @floatFromInt (num_segments )) / @as (f64 , @floatFromInt (self .segments_per_level ));
114
122
const top_tier = @as (f64 , @floatFromInt (num_segments )) * tier_scaling_factor ;
115
123
var tier = top_tier ;
116
124
@@ -131,13 +139,23 @@ pub fn TieredMergePolicy(comptime T: type) type {
131
139
continue ;
132
140
}
133
141
134
- // std.debug.print("evaluating segment {d} (size={d}, max_merge_size={}, tier={})\n", .{ current_node.data.id, current_node.data.getSize(), max_merge_size, tier });
142
+ var target_merge_size = max_merge_size ;
143
+
144
+ if (target_merge_size > self .max_segment_size ) {
145
+ target_merge_size = current_node .data .getSize () * self .segments_per_merge ;
146
+ }
147
+
148
+ if (verbose ) {
149
+ std .debug .print ("evaluating segment {d} (no={}, size={d}, target_merge_size={})\n " , .{ current_node .data .id , segment_no , current_node .data .getSize (), target_merge_size });
150
+ }
151
+
152
+ const current_node_size = current_node .data .getSize ();
135
153
136
154
var candidate = Candidate {
137
155
.start = current_node ,
138
156
.end = current_node ,
139
157
.num_segments = 1 ,
140
- .size = current_node . data . getSize () ,
158
+ .size = current_node_size ,
141
159
};
142
160
143
161
while (candidate .num_segments < self .segments_per_merge ) {
@@ -151,17 +169,69 @@ pub fn TieredMergePolicy(comptime T: type) type {
151
169
break ;
152
170
}
153
171
154
- const log_size = @log2 (@as (f64 , @floatFromInt (candidate .size )));
155
- const candidate_tier = (log_size - log_min_segment_size ) / log_merge_factor ;
156
- var score = candidate_tier - tier ;
172
+ // Roughly measure "skew" of the merge, i.e. how
173
+ // "balanced" the merge is (whether the segments are
174
+ // about the same size), which can range from
175
+ // 1.0/numSegsBeingMerged (good) to 1.0 (poor). Heavily
176
+ // lopsided merges (skew near 1.0) is no good; it means
177
+ // O(N^2) merge cost over time:
178
+ const skew = @as (f64 , @floatFromInt (current_node_size )) / @as (f64 , @floatFromInt (candidate .size ));
179
+
180
+ // Strongly favor merges with less skew (smaller
181
+ // score is better):
182
+ var score = skew ;
157
183
158
- const adjustment_factor : f64 = switch (self .strategy ) {
159
- .balanced = > 1.2 ,
160
- .aggressive = > 1.8 ,
161
- };
184
+ // Gently favor smaller merges over bigger ones. We
185
+ // don't want to make this exponent too large else we
186
+ // can end up doing poor merges of small segments in
187
+ // order to avoid the large merges
188
+ score *= std .math .pow (f64 , @floatFromInt (candidate .size ), 0.05 );
162
189
163
- const adjustment = @as (f64 , @floatFromInt (candidate .num_segments )) / @as (f64 , @floatFromInt (self .segments_per_merge ));
164
- score = score - adjustment_factor * adjustment ;
190
+ //std.debug.print(" candidate {}-{} (size={}, len={})\n", .{ candidate.start.data.id, candidate.end.data.id, candidate.size, candidate.num_segments });
191
+
192
+ // const log_size = @log2(@as(f64, @floatFromInt(candidate.size)));
193
+ // const candidate_tier = (log_size - log_min_segment_size) / log_merge_factor;
194
+ // var score = candidate_tier - tier;
195
+
196
+ // const adjustment_factor: f64 = switch (self.strategy) {
197
+ // .balanced => 1.2,
198
+ // .aggressive => 1.8,
199
+ // };
200
+
201
+ // const adjustment = @as(f64, @floatFromInt(candidate.num_segments)) / @as(f64, @floatFromInt(self.segments_per_merge));
202
+ // score = score - adjustment_factor * adjustment;
203
+
204
+ //const max_merge_size_f: f64 = @floatFromInt(max_merge_size);
205
+ //const target_merge_size_f: f64 = @floatFromInt(target_merge_size);
206
+ //const candidate_size_f: f64 = @floatFromInt(candidate.size);
207
+
208
+ //const next_target_merge_size_f = target_merge_size_f / @as(f64, @floatFromInt(self.segments_per_merge));
209
+
210
+ //const score_closer_to_current_target_merge_size = @abs(target_merge_size_f - candidate_size_f) / target_merge_size_f;
211
+ //const score_closer_to_next_target_merge_size = @abs(next_target_merge_size_f - candidate_size_f) / target_merge_size_f;
212
+
213
+ //const num_segments_f = @as(f64, @floatFromInt(candidate.num_segments));
214
+ //const avg_segment_size_f: f64 = candidate_size_f / num_segments_f;
215
+ //const first_segment_size_f: f64 = @as(f64, @floatFromInt(current_node.data.getSize()));
216
+ //std.debug.print(" avg_segment_size={d}\n", .{avg_segment_size_f});
217
+ //const distance_to_avg_segment_size = @abs(first_segment_size_f - avg_segment_size_f) / first_segment_size_f;
218
+ //std.debug.print(" distance_to_avg_segment_size={d}\n", .{distance_to_avg_segment_size});
219
+
220
+ //const distance_to_target_merge_size = score_closer_to_current_target_merge_size;
221
+ //std.debug.print(" score_closer_to_target_merge_size={d}\n", .{distance_to_target_merge_size});
222
+
223
+ //const score_bigger_merge = 1 - @as(f64, @floatFromInt(candidate.num_segments)) / @as(f64, @floatFromInt(self.segments_per_merge));
224
+ // std.debug.print(" score_bigger_merge={d}\n", .{score_bigger_merge});
225
+
226
+ //const score_smaller_segment_no = 1 - @as(f64, @floatFromInt(segment_no)) / @as(f64, @floatFromInt(num_segments));
227
+ //std.debug.print(" score_smaller_segment_no={d}\n", .{score_smaller_segment_no});
228
+
229
+ //const score_oversized = candidate_size_f / @as(f64, @floatFromInt(stats.total_size));
230
+ // const score_oversized = if (candidate.size < self.max_segment_size) 0.0 else (max_merge_size_f - candidate_size_f) / max_merge_size_f;
231
+ // std.debug.print(" score_oversized={d}\n", .{score_oversized});
232
+
233
+ //const score = score_bigger_merge * 0.5 + score_oversized * 0.5;
234
+ //score = score * 0.5 + score_oversized * 0.5;
165
235
166
236
// std.debug.print("candidate {}-{}: len={} size={} candidate_tier={}, score={d}\n", .{ candidate.start.data.id, candidate.end.data.id, candidate.num_segments, candidate.size, candidate_tier, score });
167
237
if (score < best_score or best_candidate == null ) {
@@ -228,7 +298,15 @@ test "TieredMergePolicy" {
228
298
229
299
var last_id : u64 = 1 ;
230
300
231
- var prng = std .rand .DefaultPrng .init (0 );
301
+ var prng = std .rand .DefaultPrng .init (blk : {
302
+ var seed : u64 = undefined ;
303
+ try std .posix .getrandom (std .mem .asBytes (& seed ));
304
+ //seed = 16044660244849477186;
305
+ if (verbose ) {
306
+ std .debug .print ("seed={}\n " , .{seed });
307
+ }
308
+ break :blk seed ;
309
+ });
232
310
const rand = prng .random ();
233
311
234
312
for (0.. 10) | _ | {
@@ -241,14 +319,14 @@ test "TieredMergePolicy" {
241
319
var total_merge_size : u64 = 0 ;
242
320
var total_merge_count : u64 = 0 ;
243
321
244
- for (0.. 1000) | _ | {
322
+ for (0.. 1000) | i | {
245
323
if (verbose ) {
246
- std .debug .print ("---\n " , .{});
324
+ std .debug .print ("--- [{}] --- \n " , .{i });
247
325
}
248
326
249
- if (rand .boolean ()) {
327
+ if (rand .boolean () or true ) {
250
328
var segment = try std .testing .allocator .create (MockSegmentList .Node );
251
- segment .data = .{ .id = last_id , .size = 100 + rand .intRangeAtMost (u16 , 0 , 200 ) };
329
+ segment .data = .{ .id = last_id , .size = rand .intRangeAtMost (u16 , 100 , 200 ) };
252
330
segments .append (segment );
253
331
last_id += 1 ;
254
332
}
@@ -275,9 +353,10 @@ test "TieredMergePolicy" {
275
353
}
276
354
277
355
if (verbose ) {
356
+ std .debug .print ("num merges: {}\n " , .{total_merge_count });
278
357
std .debug .print ("avg merge size: {}\n " , .{total_merge_size / total_merge_count });
279
358
}
280
359
281
- const num_allowed_segmens = policy .calculateBudget (segments );
282
- try std .testing .expect (num_allowed_segmens >= segments .len );
360
+ const s = policy .calculateBudget (segments );
361
+ try std .testing .expect (s . num_allowed_segments >= segments .len );
283
362
}
0 commit comments