Skip to content

Commit 92d3ce1

Browse files
committed
Force checkpoint of a small segment in some cases
Fixes #46
1 parent c400190 commit 92d3ce1

6 files changed

+103
-33
lines changed

src/FileSegment.zig

+3-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const KeepOrDelete = common.KeepOrDelete;
88

99
const Item = @import("segment.zig").Item;
1010
const SegmentInfo = @import("segment.zig").SegmentInfo;
11+
const SegmentStatus = @import("segment.zig").SegmentStatus;
1112

1213
const filefmt = @import("filefmt.zig");
1314

@@ -20,6 +21,7 @@ pub const Options = struct {
2021
allocator: std.mem.Allocator,
2122
dir: std.fs.Dir,
2223
info: SegmentInfo = .{},
24+
status: SegmentStatus = .{},
2325
attributes: std.StringHashMapUnmanaged(u64) = .{},
2426
docs: std.AutoHashMapUnmanaged(u32, bool) = .{},
2527
min_doc_id: u32 = 0,
@@ -158,7 +160,7 @@ test "build" {
158160
defer source.deinit(.delete);
159161

160162
source.info = .{ .version = 1 };
161-
source.frozen = true;
163+
source.status.frozen = true;
162164
try source.docs.put(source.allocator, 1, true);
163165
try source.items.append(source.allocator, .{ .id = 1, .hash = 1 });
164166
try source.items.append(source.allocator, .{ .id = 1, .hash = 2 });

src/Index.zig

+4-21
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,9 @@ pub fn deinit(self: *Self) void {
142142
self.dir.close();
143143
}
144144

145-
fn doCheckpoint(self: *Self) !bool {
146-
var snapshot = try self.acquireReader();
147-
defer self.releaseReader(&snapshot);
148-
149-
const source = snapshot.memory_segments.value.getFirst() orelse return false;
150-
if (source.value.getSize() < self.options.min_segment_size) {
151-
return false;
152-
}
145+
fn checkpoint(self: *Self) !bool {
146+
var source = self.memory_segments.prepareCheckpoint(self.allocator) orelse return false;
147+
defer MemorySegmentList.destroySegment(self.allocator, &source);
153148

154149
// build new file segment
155150

@@ -207,7 +202,7 @@ fn updateDocsMetrics(self: *Self) void {
207202
}
208203

209204
fn checkpointTask(self: *Self) void {
210-
_ = self.doCheckpoint() catch |err| {
205+
_ = self.checkpoint() catch |err| {
211206
log.err("checkpoint failed: {}", .{err});
212207
};
213208
}
@@ -377,18 +372,6 @@ fn maybeScheduleCheckpoint(self: *Self) void {
377372
}
378373
}
379374

380-
fn readyForCheckpoint(self: *Self) ?MemorySegmentNode {
381-
self.segments_lock.lockShared();
382-
defer self.segments_lock.unlockShared();
383-
384-
if (self.segments.memory_segments.value.getFirstOrNull()) |first_node| {
385-
if (first_node.value.getSize() > self.options.min_segment_size) {
386-
return first_node.acquire();
387-
}
388-
}
389-
return null;
390-
}
391-
392375
pub fn waitForReady(self: *Self, timeout_ms: u32) !void {
393376
try self.is_ready.timedWait(timeout_ms * std.time.us_per_ms);
394377
}

src/MemorySegment.zig

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const common = @import("common.zig");
55
const SearchResults = common.SearchResults;
66
const KeepOrDelete = common.KeepOrDelete;
77
const SegmentInfo = @import("segment.zig").SegmentInfo;
8+
const SegmentStatus = @import("segment.zig").SegmentStatus;
89
const Item = @import("segment.zig").Item;
910

1011
const Change = @import("change.zig").Change;
@@ -19,12 +20,12 @@ pub const Options = struct {};
1920

2021
allocator: std.mem.Allocator,
2122
info: SegmentInfo = .{},
23+
status: SegmentStatus = .{},
2224
attributes: std.StringHashMapUnmanaged(u64) = .{},
2325
docs: std.AutoHashMapUnmanaged(u32, bool) = .{},
2426
min_doc_id: u32 = 0,
2527
max_doc_id: u32 = 0,
2628
items: std.ArrayListUnmanaged(Item) = .{},
27-
frozen: bool = false,
2829

2930
pub fn init(allocator: std.mem.Allocator, opts: Options) Self {
3031
_ = opts;

src/segment.zig

+4
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ pub const Item = packed struct(u64) {
6565
}
6666
};
6767

68+
pub const SegmentStatus = struct {
69+
frozen: bool = false,
70+
};
71+
6872
test "Item binary" {
6973
try std.testing.expectEqual(8, @sizeOf(Item));
7074
try std.testing.expectEqual(64, @bitSizeOf(Item));

src/segment_list.zig

+56-7
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ pub fn SegmentList(Segment: type) type {
193193
};
194194
}
195195

196-
fn getSegmentSize(comptime T: type) fn (SharedPtr(T)) usize {
196+
fn getSizeFn(comptime T: type) fn (SharedPtr(T)) usize {
197197
const tmp = struct {
198198
fn getSize(segment: SharedPtr(T)) usize {
199199
return segment.value.getSize();
@@ -202,17 +202,27 @@ fn getSegmentSize(comptime T: type) fn (SharedPtr(T)) usize {
202202
return tmp.getSize;
203203
}
204204

205+
fn isFrozenFn(comptime T: type) fn (SharedPtr(T)) bool {
206+
const tmp = struct {
207+
fn isFrozen(segment: SharedPtr(T)) bool {
208+
return segment.value.status.frozen;
209+
}
210+
};
211+
return tmp.isFrozen;
212+
}
213+
205214
pub fn SegmentListManager(Segment: type) type {
206215
return struct {
207216
pub const Self = @This();
208217
pub const List = SegmentList(Segment);
209-
pub const MergePolicy = TieredMergePolicy(List.Node, getSegmentSize(Segment));
218+
pub const MergePolicy = TieredMergePolicy(List.Node, getSizeFn(Segment), isFrozenFn(Segment));
210219

211220
options: Segment.Options,
212221
segments: SharedPtr(List),
213222
merge_policy: MergePolicy,
214223
num_allowed_segments: std.atomic.Value(usize),
215224
update_lock: std.Thread.Mutex,
225+
status_update_lock: std.Thread.Mutex,
216226

217227
pub fn init(allocator: Allocator, options: Segment.Options, merge_policy: MergePolicy) !Self {
218228
const segments = try SharedPtr(List).create(allocator, List.initEmpty());
@@ -222,6 +232,7 @@ pub fn SegmentListManager(Segment: type) type {
222232
.merge_policy = merge_policy,
223233
.num_allowed_segments = std.atomic.Value(usize).init(0),
224234
.update_lock = .{},
235+
.status_update_lock = .{},
225236
};
226237
}
227238

@@ -249,16 +260,49 @@ pub fn SegmentListManager(Segment: type) type {
249260
return self.segments.value.nodes.items.len > self.num_allowed_segments.load(.acquire);
250261
}
251262

252-
pub fn prepareMerge(self: *Self, allocator: Allocator) !?Update {
263+
pub fn prepareCheckpoint(self: *Self, allocator: Allocator) ?List.Node {
253264
var segments = self.acquireSegments();
254265
defer destroySegments(allocator, &segments);
255266

256-
self.num_allowed_segments.store(self.merge_policy.calculateBudget(segments.value.nodes.items), .release);
257-
if (!self.needsMerge()) {
258-
return null;
267+
self.status_update_lock.lock();
268+
defer self.status_update_lock.unlock();
269+
270+
if (segments.value.getFirst()) |node| {
271+
if (node.value.status.frozen) {
272+
return node.acquire();
273+
}
259274
}
275+
return null;
276+
}
260277

261-
const candidate = self.merge_policy.findSegmentsToMerge(segments.value.nodes.items) orelse return null;
278+
pub fn prepareMerge(self: *Self, allocator: Allocator) !?Update {
279+
var segments = self.acquireSegments();
280+
defer destroySegments(allocator, &segments);
281+
282+
const candidate = blk: {
283+
self.status_update_lock.lock();
284+
defer self.status_update_lock.unlock();
285+
286+
// Check for a degenerate case:
287+
// - we have a small segment in the front of list and then an oversized one right next to it
288+
// - such a segment could never be merged
289+
// - but it would also never be considered for checkpoint, so it would be stuck there, blocking checkpoints
290+
if (segments.value.nodes.items.len >= 2) {
291+
const node0 = segments.value.nodes.items[0];
292+
const node1 = segments.value.nodes.items[1];
293+
294+
if (node1.value.status.frozen and !node0.value.status.frozen) {
295+
node0.value.status.frozen = true;
296+
}
297+
}
298+
299+
self.num_allowed_segments.store(self.merge_policy.calculateBudget(segments.value.nodes.items), .release);
300+
if (!self.needsMerge()) {
301+
return null;
302+
}
303+
304+
break :blk self.merge_policy.findSegmentsToMerge(segments.value.nodes.items) orelse return null;
305+
};
262306

263307
var target = try List.createSegment(allocator, self.options);
264308
defer List.destroySegment(allocator, &target);
@@ -274,6 +318,11 @@ pub fn SegmentListManager(Segment: type) type {
274318
try target.value.merge(&merger);
275319
errdefer target.value.cleanup();
276320

321+
if (target.value.getSize() > self.merge_policy.max_segment_size) {
322+
// we can do this without a lock, because we are the only ones knowing about this new segment
323+
target.value.status.frozen = true;
324+
}
325+
277326
var update = try self.beginUpdate(allocator);
278327
update.replaceMergedSegment(target);
279328

src/segment_merge_policy.zig

+34-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,15 @@ pub const MergeCandidate = struct {
3636
score: f64 = 0.0,
3737
};
3838

39-
pub fn TieredMergePolicy(comptime Segment: type, comptime getSizeFn: anytype) type {
39+
pub fn GetSizeFn(comptime S: type) type {
40+
return fn (S) usize;
41+
}
42+
43+
pub fn IsFrozenFn(comptime S: type) type {
44+
return fn (S) bool;
45+
}
46+
47+
pub fn TieredMergePolicy(comptime Segment: type, comptime getSizeFn: GetSizeFn(Segment), comptime isFrozenFn: IsFrozenFn(Segment)) type {
4048
return struct {
4149
max_segments: ?usize = null,
4250

@@ -53,6 +61,9 @@ pub fn TieredMergePolicy(comptime Segment: type, comptime getSizeFn: anytype) ty
5361
var num_oversized_segments: usize = 0;
5462

5563
for (segments) |segment| {
64+
if (isFrozenFn(segment)) {
65+
continue;
66+
}
5667
const size = getSizeFn(segment);
5768
if (size > self.max_segment_size) {
5869
num_oversized_segments += 1;
@@ -97,6 +108,10 @@ pub fn TieredMergePolicy(comptime Segment: type, comptime getSizeFn: anytype) ty
97108

98109
var start: usize = 0;
99110
while (start + 1 < segments.len) : (start += 1) {
111+
if (isFrozenFn(segments[start])) {
112+
// Skip frozen segments
113+
continue;
114+
}
100115
const start_size = getSizeFn(segments[start]);
101116
if (start_size > self.max_segment_size) {
102117
// Skip oversized segments that can't be further merged
@@ -110,10 +125,21 @@ pub fn TieredMergePolicy(comptime Segment: type, comptime getSizeFn: anytype) ty
110125
};
111126

112127
while (candidate.end < segments.len) {
113-
candidate.size += getSizeFn(segments[candidate.end]);
128+
if (isFrozenFn(segments[candidate.end])) {
129+
// Can't include frozen segments
130+
break;
131+
}
132+
const size = getSizeFn(segments[candidate.end]);
133+
if (size > self.max_segment_size) {
134+
// Can't include oversized segments
135+
continue;
136+
}
137+
138+
candidate.size += size;
114139
candidate.end += 1;
115140

116141
if (candidate.end - candidate.start > self.segments_per_merge or candidate.size > max_merge_size) {
142+
// Too many seegments to merge in one pass, or the result would be too big
117143
break;
118144
}
119145

@@ -170,6 +196,11 @@ const MockSegment = struct {
170196
pub fn getSize(self: @This()) usize {
171197
return self.size;
172198
}
199+
200+
pub fn isFrozen(self: @This()) bool {
201+
_ = self;
202+
return false;
203+
}
173204
};
174205

175206
fn applyMerge(segments: *std.ArrayList(MockSegment), merge: MergeCandidate) !void {
@@ -183,7 +214,7 @@ test "TieredMergePolicy" {
183214
var segments = std.ArrayList(MockSegment).init(std.testing.allocator);
184215
defer segments.deinit();
185216

186-
const policy = TieredMergePolicy(MockSegment, MockSegment.getSize){
217+
const policy = TieredMergePolicy(MockSegment, MockSegment.getSize, MockSegment.isFrozen){
187218
.min_segment_size = 100,
188219
.max_segment_size = 100000,
189220
.segments_per_merge = 10,

0 commit comments

Comments
 (0)