Skip to content

Commit cdf129f

Browse files
committed
Fix file segments not being deleted on merge
1 parent 17fbb7c commit cdf129f

7 files changed

+40
-32
lines changed

src/FileSegment.zig

+9-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const common = @import("common.zig");
66
const Item = common.Item;
77
const SearchResults = common.SearchResults;
88
const SegmentId = common.SegmentId;
9+
const KeepOrDelete = common.KeepOrDelete;
910

1011
const MemorySegment = @import("MemorySegment.zig");
1112

@@ -27,6 +28,7 @@ block_size: usize = 0,
2728
blocks: []const u8,
2829
merged: u32 = 0,
2930
num_items: usize = 0,
31+
delete_in_deinit: bool = false,
3032

3133
raw_data: ?[]align(std.mem.page_size) u8 = null,
3234

@@ -40,14 +42,18 @@ pub fn init(allocator: std.mem.Allocator, options: Options) Self {
4042
};
4143
}
4244

43-
pub fn deinit(self: *Self) void {
45+
pub fn deinit(self: *Self, delete_file: KeepOrDelete) void {
4446
self.docs.deinit();
4547
self.index.deinit();
4648

4749
if (self.raw_data) |data| {
4850
std.posix.munmap(data);
4951
self.raw_data = null;
5052
}
53+
54+
if (delete_file == .delete) {
55+
self.delete();
56+
}
5157
}
5258

5359
pub fn getBlockData(self: Self, block: usize) []const u8 {
@@ -126,7 +132,7 @@ test "build" {
126132
defer data_dir.close();
127133

128134
var source = MemorySegment.init(std.testing.allocator, .{});
129-
defer source.deinit();
135+
defer source.deinit(.delete);
130136

131137
source.id.version = 1;
132138
source.frozen = true;
@@ -138,7 +144,7 @@ test "build" {
138144
defer source_reader.close();
139145

140146
var segment = Self.init(std.testing.allocator, .{ .dir = tmp_dir.dir });
141-
defer segment.deinit();
147+
defer segment.deinit(.delete);
142148

143149
try segment.build(&source_reader);
144150

src/Index.zig

+5-3
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ pub fn deinit(self: *Self) void {
134134
self.stopMemorySegmentMergeThread();
135135
self.stopFileSegmentMergeThread();
136136

137-
self.memory_segments.deinit();
138-
self.file_segments.deinit();
137+
self.memory_segments.deinit(.keep);
138+
self.file_segments.deinit(.keep);
139139

140140
self.oplog.deinit();
141141
self.dir.close();
@@ -255,7 +255,7 @@ fn doCheckpoint(self: *Self) !bool {
255255
// build new file segment
256256

257257
var target = try FileSegmentList.createSegment(self.allocator, .{ .dir = self.dir });
258-
errdefer FileSegmentList.destroySegment(self.allocator, &target);
258+
defer FileSegmentList.destroySegment(self.allocator, &target);
259259

260260
var reader = source.value.reader();
261261
defer reader.close();
@@ -341,6 +341,7 @@ fn maybeMergeFileSegments(self: *Self) !bool {
341341
defer self.segments_lock.unlock();
342342

343343
self.file_segments.commitUpdate(&upd);
344+
log.debug("committed file segments merge", .{});
344345

345346
return true;
346347
}
@@ -383,6 +384,7 @@ fn maybeMergeMemorySegments(self: *Self) !bool {
383384
defer self.segments_lock.unlock();
384385

385386
self.memory_segments.commitUpdate(&upd);
387+
log.debug("committed memory segments merge", .{});
386388

387389
self.maybeScheduleCheckpoint();
388390

src/MemorySegment.zig

+4-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const common = @import("common.zig");
55
const Item = common.Item;
66
const SearchResults = common.SearchResults;
77
const SegmentId = common.SegmentId;
8+
const KeepOrDelete = common.KeepOrDelete;
89

910
const Change = @import("change.zig").Change;
1011

@@ -32,9 +33,11 @@ pub fn init(allocator: std.mem.Allocator, opts: Options) Self {
3233
};
3334
}
3435

35-
pub fn deinit(self: *Self) void {
36+
pub fn deinit(self: *Self, delete_file: KeepOrDelete) void {
37+
_ = delete_file;
3638
self.docs.deinit();
3739
self.items.deinit();
40+
log.debug("deinit memory segment {}:{}", .{ self.id.version, self.id.included_merges });
3841
}
3942

4043
pub fn search(self: Self, sorted_hashes: []const u32, results: *SearchResults) !void {

src/common.zig

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@ const testing = std.testing;
33

44
const msgpack = @import("msgpack");
55

6+
pub const KeepOrDelete = enum {
7+
keep,
8+
delete,
9+
};
10+
611
pub const Item = packed struct(u64) {
712
id: u32,
813
hash: u32,

src/filefmt.zig

+3-3
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ pub fn encodeBlock(data: []u8, reader: anytype) !u16 {
199199

200200
test "writeBlock/readBlock/readFirstItemFromBlock" {
201201
var segment = MemorySegment.init(std.testing.allocator, .{});
202-
defer segment.deinit();
202+
defer segment.deinit(.delete);
203203

204204
try segment.items.append(.{ .hash = 1, .id = 1 });
205205
try segment.items.append(.{ .hash = 2, .id = 1 });
@@ -457,7 +457,7 @@ test "writeFile/readFile" {
457457

458458
{
459459
var in_memory_segment = MemorySegment.init(testing.allocator, .{});
460-
defer in_memory_segment.deinit();
460+
defer in_memory_segment.deinit(.delete);
461461

462462
in_memory_segment.id = version;
463463

@@ -473,7 +473,7 @@ test "writeFile/readFile" {
473473

474474
{
475475
var segment = FileSegment.init(testing.allocator, .{ .dir = tmp.dir });
476-
defer segment.deinit();
476+
defer segment.deinit(.delete);
477477

478478
try readSegmentFile(tmp.dir, version, &segment);
479479

src/segment_list.zig

+13-21
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const SearchResults = @import("common.zig").SearchResults;
55

66
const Change = @import("change.zig").Change;
77
const SegmentId = @import("common.zig").SegmentId;
8+
const KeepOrDelete = @import("common.zig").KeepOrDelete;
89

910
const Deadline = @import("utils/Deadline.zig");
1011

@@ -48,9 +49,9 @@ pub fn SegmentList(Segment: type) type {
4849
return try SharedPtr(Self).create(allocator, self);
4950
}
5051

51-
pub fn deinit(self: *Self, allocator: Allocator) void {
52+
pub fn deinit(self: *Self, allocator: Allocator, delete_files: KeepOrDelete) void {
5253
for (self.nodes.items) |*node| {
53-
node.release(allocator, .{});
54+
node.release(allocator, .{delete_files});
5455
}
5556
self.nodes.deinit(allocator);
5657
}
@@ -60,25 +61,11 @@ pub fn SegmentList(Segment: type) type {
6061
}
6162

6263
pub fn destroySegment(allocator: Allocator, segment: *Node) void {
63-
segment.releaseWithCleanup(allocator, destroySegmentCallback, .{});
64+
segment.release(allocator, .{.delete});
6465
}
6566

6667
pub fn destroySegments(allocator: Allocator, segments: *SharedPtr(Self)) void {
67-
// we also call cleanup on these segments, to ensure that unused segments will get deleted from disk
68-
segments.releaseWithCleanup(allocator, destroySegmentListCallback, .{allocator});
69-
}
70-
71-
fn destroySegmentListCallback(segments: *Self, allocator: Allocator) void {
72-
while (segments.nodes.items.len > 0) {
73-
var node = segments.nodes.pop();
74-
destroySegment(allocator, &node);
75-
}
76-
segments.deinit(allocator);
77-
}
78-
79-
fn destroySegmentCallback(segment: *Segment) void {
80-
segment.cleanup();
81-
segment.deinit();
68+
segments.release(allocator, .{ allocator, .delete });
8269
}
8370

8471
pub fn appendSegmentInto(self: Self, copy: *Self, node: Node) void {
@@ -87,13 +74,16 @@ pub fn SegmentList(Segment: type) type {
8774
copy.nodes.appendAssumeCapacity(n.acquire());
8875
}
8976
copy.nodes.appendAssumeCapacity(node.acquire());
77+
std.log.debug("adding {s} {}:{}", .{ @typeName(Segment), node.value.id.version, node.value.id.included_merges });
9078
}
9179

9280
pub fn removeSegmentInto(self: Self, copy: *Self, node: Node) void {
9381
copy.nodes.clearRetainingCapacity();
9482
for (self.nodes.items) |n| {
9583
if (n.value != node.value) {
9684
copy.nodes.appendAssumeCapacity(n.acquire());
85+
} else {
86+
std.log.debug("removing {s} {}:{}", .{ @typeName(Segment), n.value.id.version, n.value.id.included_merges });
9787
}
9888
}
9989
}
@@ -103,9 +93,11 @@ pub fn SegmentList(Segment: type) type {
10393
var inserted_merged = false;
10494
for (self.nodes.items) |n| {
10595
if (node.value.id.contains(n.value.id)) {
96+
std.log.debug("removing {s} {}:{}", .{ @typeName(Segment), n.value.id.version, n.value.id.included_merges });
10697
if (!inserted_merged) {
10798
copy.nodes.appendAssumeCapacity(node.acquire());
10899
inserted_merged = true;
100+
std.log.debug("adding {s} {}:{}", .{ @typeName(Segment), node.value.id.version, node.value.id.included_merges });
109101
}
110102
} else {
111103
copy.nodes.appendAssumeCapacity(n.acquire());
@@ -207,8 +199,8 @@ pub fn SegmentListManager(Segment: type) type {
207199
};
208200
}
209201

210-
pub fn deinit(self: *Self) void {
211-
self.segments.release(self.allocator, .{self.allocator});
202+
pub fn deinit(self: *Self, delete_files: KeepOrDelete) void {
203+
self.segments.release(self.allocator, .{ self.allocator, delete_files });
212204
}
213205

214206
pub fn count(self: Self) usize {
@@ -284,7 +276,7 @@ pub fn SegmentListManager(Segment: type) type {
284276
errdefer self.update_lock.unlock();
285277

286278
var segments = try SharedPtr(List).create(self.allocator, List.initEmpty());
287-
errdefer self.releaseSegments(&segments);
279+
errdefer self.destroySegments(&segments);
288280

289281
// allocate memory for one extra segment, if it's going to be unused, it's going to be unused, but we need to have it ready
290282
try segments.value.nodes.ensureTotalCapacity(self.allocator, self.count() + 1);

src/segment_merger.zig

+1-1
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ test "merge segments" {
149149
const MemorySegment = @import("MemorySegment.zig");
150150

151151
var collection = try SegmentList(MemorySegment).init(std.testing.allocator, 3);
152-
defer collection.deinit(std.testing.allocator);
152+
defer collection.deinit(std.testing.allocator, .delete);
153153

154154
var merger = SegmentMerger(MemorySegment).init(std.testing.allocator, &collection);
155155
defer merger.deinit();

0 commit comments

Comments
 (0)