Skip to content

Commit 2467c42

Browse files
committed
Use commit_id as segment id
1 parent e1a0069 commit 2467c42

7 files changed

+70
-170
lines changed

src/FileSegment.zig

+12-12
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const SearchResults = common.SearchResults;
88
const SegmentId = common.SegmentId;
99
const KeepOrDelete = common.KeepOrDelete;
1010

11-
const MemorySegment = @import("MemorySegment.zig");
11+
const SegmentInfo = @import("segment.zig").SegmentInfo;
1212

1313
const filefmt = @import("filefmt.zig");
1414

@@ -20,8 +20,7 @@ pub const Options = struct {
2020

2121
allocator: std.mem.Allocator,
2222
dir: std.fs.Dir,
23-
id: SegmentId = .{ .version = 0, .included_merges = 0 },
24-
max_commit_id: u64 = 0,
23+
info: SegmentInfo = .{},
2524
docs: std.AutoHashMap(u32, bool),
2625
index: std.ArrayList(u32),
2726
block_size: usize = 0,
@@ -82,19 +81,19 @@ pub fn search(self: Self, sorted_hashes: []const u32, results: *SearchResults) !
8281
}
8382
const matches = std.sort.equalRange(Item, Item{ .hash = hash, .id = 0 }, block_items.items, {}, Item.cmpByHash);
8483
for (matches[0]..matches[1]) |i| {
85-
try results.incr(block_items.items[i].id, self.id.version);
84+
try results.incr(block_items.items[i].id, self.info.version);
8685
}
8786
}
8887
}
8988
}
9089

91-
pub fn open(self: *Self, id: SegmentId) !void {
92-
try filefmt.readSegmentFile(self.dir, id, self);
90+
pub fn open(self: *Self, info: SegmentInfo) !void {
91+
try filefmt.readSegmentFile(self.dir, info, self);
9392
}
9493

9594
pub fn delete(self: *Self) void {
9695
var file_name_buf: [filefmt.max_file_name_size]u8 = undefined;
97-
const file_name = filefmt.buildSegmentFileName(&file_name_buf, self.id);
96+
const file_name = filefmt.buildSegmentFileName(&file_name_buf, self.info);
9897

9998
log.info("deleting segment file {s}", .{file_name});
10099

@@ -113,18 +112,20 @@ pub fn merge(self: *Self, source: anytype) !void {
113112

114113
pub fn build(self: *Self, source: anytype) !void {
115114
var file_name_buf: [filefmt.max_file_name_size]u8 = undefined;
116-
const file_name = filefmt.buildSegmentFileName(&file_name_buf, source.segment.id);
115+
const file_name = filefmt.buildSegmentFileName(&file_name_buf, source.segment.info);
117116

118117
try filefmt.writeSegmentFile(self.dir, source);
119118

120119
errdefer self.dir.deleteFile(file_name) catch |err| {
121120
log.err("failed to clean up segment file {s}: {}", .{ file_name, err });
122121
};
123122

124-
try filefmt.readSegmentFile(self.dir, source.segment.id, self);
123+
try filefmt.readSegmentFile(self.dir, source.segment.info, self);
125124
}
126125

127126
test "build" {
127+
const MemorySegment = @import("MemorySegment.zig");
128+
128129
var tmp_dir = std.testing.tmpDir(.{});
129130
defer tmp_dir.cleanup();
130131

@@ -134,7 +135,7 @@ test "build" {
134135
var source = MemorySegment.init(std.testing.allocator, .{});
135136
defer source.deinit(.delete);
136137

137-
source.id.version = 1;
138+
source.info = .{ .version = 1 };
138139
source.frozen = true;
139140
try source.docs.put(1, true);
140141
try source.items.append(.{ .id = 1, .hash = 1 });
@@ -148,8 +149,7 @@ test "build" {
148149

149150
try segment.build(&source_reader);
150151

151-
try std.testing.expectEqual(1, segment.id.version);
152-
try std.testing.expectEqual(0, segment.id.included_merges);
152+
try std.testing.expectEqualDeep(SegmentInfo{ .version = 1, .merges = 0 }, segment.info);
153153
try std.testing.expectEqual(1, segment.docs.count());
154154
try std.testing.expectEqual(1, segment.index.items.len);
155155
}

src/Index.zig

+10-20
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const Deadline = @import("utils/Deadline.zig");
88
const Change = @import("change.zig").Change;
99
const SearchResult = @import("common.zig").SearchResult;
1010
const SearchResults = @import("common.zig").SearchResults;
11-
const SegmentId = @import("common.zig").SegmentId;
11+
const SegmentInfo = @import("segment.zig").SegmentInfo;
1212

1313
const Oplog = @import("Oplog.zig");
1414

@@ -142,11 +142,11 @@ pub fn deinit(self: *Self) void {
142142
self.dir.close();
143143
}
144144

145-
fn loadSegment(self: *Self, segment_id: SegmentId) !FileSegmentNode {
145+
fn loadSegment(self: *Self, info: SegmentInfo) !FileSegmentNode {
146146
var node = try FileSegmentList.createSegment(self.allocator, .{ .dir = self.dir });
147147
errdefer FileSegmentList.destroySegment(self.allocator, &node);
148148

149-
try node.value.open(segment_id);
149+
try node.value.open(info);
150150

151151
return node;
152152
}
@@ -169,13 +169,13 @@ fn loadSegments(self: *Self) !u64 {
169169

170170
try self.file_segments.segments.value.nodes.ensureTotalCapacity(self.allocator, segment_ids.len);
171171

172-
var max_commit_id: u64 = 0;
172+
var last_commit_id: u64 = 0;
173173
for (segment_ids) |segment_id| {
174174
const node = try self.loadSegment(segment_id);
175175
self.file_segments.segments.value.nodes.appendAssumeCapacity(node);
176-
max_commit_id = @max(max_commit_id, node.value.max_commit_id);
176+
last_commit_id = node.value.info.getLastCommitId();
177177
}
178-
return max_commit_id;
178+
return last_commit_id;
179179
}
180180

181181
fn doCheckpoint(self: *Self) !bool {
@@ -260,7 +260,7 @@ fn stopCheckpointThread(self: *Self) void {
260260
}
261261

262262
fn updateIndexFile(self: *Self, segments: *FileSegmentList) !void {
263-
var ids = try segments.getIds(self.allocator);
263+
var ids = try segments.getInfos(self.allocator);
264264
defer ids.deinit(self.allocator);
265265

266266
try filefmt.writeIndexFile(self.dir, ids.items);
@@ -359,14 +359,14 @@ fn stopMemorySegmentMergeThread(self: *Self) void {
359359
}
360360

361361
pub fn open(self: *Self) !void {
362-
const max_commit_id = try self.loadSegments();
362+
const last_commit_id = try self.loadSegments();
363363

364364
// start these threads after loading file segments, but before replaying oplog to memory segments
365365
try self.startFileSegmentMergeThread();
366366
try self.startMemorySegmentMergeThread();
367367
try self.startCheckpointThread();
368368

369-
try self.oplog.open(max_commit_id + 1, updateInternal, self);
369+
try self.oplog.open(last_commit_id + 1, updateInternal, self);
370370

371371
log.info("index loaded", .{});
372372
}
@@ -406,21 +406,11 @@ pub fn updateInternal(self: *Self, changes: []const Change, commit_id: ?u64) !vo
406406
var upd = try self.memory_segments.beginUpdate(self.allocator);
407407
defer self.memory_segments.cleanupAfterUpdate(self.allocator, &upd);
408408

409-
target.value.max_commit_id = commit_id orelse try self.oplog.write(changes);
409+
target.value.info.version = commit_id orelse try self.oplog.write(changes);
410410

411411
self.segments_lock.lock();
412412
defer self.segments_lock.unlock();
413413

414-
target.value.id = blk: {
415-
if (self.memory_segments.segments.value.getLast()) |n| {
416-
break :blk n.value.id.next();
417-
} else if (self.file_segments.segments.value.getLast()) |n| {
418-
break :blk n.value.id.next();
419-
} else {
420-
break :blk SegmentId.first();
421-
}
422-
};
423-
424414
upd.appendSegment(target);
425415

426416
self.memory_segments.commitUpdate(&upd);

src/MemorySegment.zig

+4-6
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ const log = std.log;
44
const common = @import("common.zig");
55
const Item = common.Item;
66
const SearchResults = common.SearchResults;
7-
const SegmentId = common.SegmentId;
87
const KeepOrDelete = common.KeepOrDelete;
8+
const SegmentInfo = @import("segment.zig").SegmentInfo;
99

1010
const Change = @import("change.zig").Change;
1111

@@ -18,8 +18,7 @@ const Self = @This();
1818
pub const Options = struct {};
1919

2020
allocator: std.mem.Allocator,
21-
id: SegmentId = .{ .version = 0, .included_merges = 0 },
22-
max_commit_id: u64 = 0,
21+
info: SegmentInfo = .{},
2322
docs: std.AutoHashMap(u32, bool),
2423
items: std.ArrayList(Item),
2524
frozen: bool = false,
@@ -44,7 +43,7 @@ pub fn search(self: Self, sorted_hashes: []const u32, results: *SearchResults) !
4443
for (sorted_hashes) |hash| {
4544
const matches = std.sort.equalRange(Item, Item{ .hash = hash, .id = 0 }, items, {}, Item.cmpByHash);
4645
for (matches[0]..matches[1]) |i| {
47-
try results.incr(items[i].id, self.id.version);
46+
try results.incr(items[i].id, self.info.version);
4847
}
4948
items = items[matches[1]..];
5049
}
@@ -112,8 +111,7 @@ pub fn cleanup(self: *Self) void {
112111
}
113112

114113
pub fn merge(self: *Self, merger: *SegmentMerger(Self)) !void {
115-
self.id = merger.segment.id;
116-
self.max_commit_id = merger.segment.max_commit_id;
114+
self.info = merger.segment.info;
117115

118116
self.docs.deinit();
119117
self.docs = merger.segment.docs.move();

src/common.zig

+3-72
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ const std = @import("std");
22
const testing = std.testing;
33

44
const msgpack = @import("msgpack");
5+
const SegmentInfo = @import("segment.zig").SegmentInfo;
56

67
pub const KeepOrDelete = enum {
78
keep,
@@ -59,7 +60,7 @@ test "Item array sort" {
5960
pub const SearchResult = struct {
6061
id: u32,
6162
score: u32,
62-
version: u32,
63+
version: u64,
6364

6465
pub fn cmp(_: void, a: SearchResult, b: SearchResult) bool {
6566
return a.score > b.score or (a.score == b.score and a.id > b.id);
@@ -81,7 +82,7 @@ pub const SearchResults = struct {
8182
self.results.deinit();
8283
}
8384

84-
pub fn incr(self: *SearchResults, id: u32, version: u32) !void {
85+
pub fn incr(self: *SearchResults, id: u32, version: u64) !void {
8586
const r = try self.results.getOrPut(id);
8687
if (!r.found_existing or r.value_ptr.version < version) {
8788
r.value_ptr.id = id;
@@ -140,73 +141,3 @@ test "sort search results" {
140141
SearchResult{ .id = 1, .score = 1, .version = 1 },
141142
}, results.values());
142143
}
143-
144-
pub const SegmentId = packed struct(u64) {
145-
version: u32,
146-
included_merges: u32 = 0,
147-
148-
pub fn cmp(_: void, a: SegmentId, b: SegmentId) bool {
149-
const xa: u64 = @bitCast(a);
150-
const xb: u64 = @bitCast(b);
151-
return xa < xb;
152-
}
153-
154-
pub fn eq(a: SegmentId, b: SegmentId) bool {
155-
const xa: u64 = @bitCast(a);
156-
const xb: u64 = @bitCast(b);
157-
return xa == xb;
158-
}
159-
160-
pub fn contains(self: SegmentId, other: SegmentId) bool {
161-
const start = self.version;
162-
const end = self.version + self.included_merges;
163-
164-
const other_start = other.version;
165-
const other_end = other.version + other.included_merges;
166-
167-
return other_start >= start and other_end <= end;
168-
}
169-
170-
pub fn first() SegmentId {
171-
return .{
172-
.version = 1,
173-
.included_merges = 0,
174-
};
175-
}
176-
177-
pub fn next(a: SegmentId) SegmentId {
178-
return .{
179-
.version = a.version + a.included_merges + 1,
180-
.included_merges = 0,
181-
};
182-
}
183-
184-
pub fn merge(a: SegmentId, b: SegmentId) SegmentId {
185-
return .{
186-
.version = @min(a.version, b.version),
187-
.included_merges = 1 + a.included_merges + b.included_merges,
188-
};
189-
}
190-
191-
pub fn msgpackFormat() msgpack.StructFormat {
192-
return .{ .as_array = .{} };
193-
}
194-
};
195-
196-
test "SegmentId.contains" {
197-
const a = SegmentId{ .version = 1, .included_merges = 0 };
198-
const b = SegmentId{ .version = 2, .included_merges = 0 };
199-
const c = SegmentId{ .version = 1, .included_merges = 1 };
200-
201-
try std.testing.expect(a.contains(a));
202-
try std.testing.expect(!a.contains(b));
203-
try std.testing.expect(!a.contains(c));
204-
205-
try std.testing.expect(!b.contains(a));
206-
try std.testing.expect(b.contains(b));
207-
try std.testing.expect(!b.contains(c));
208-
209-
try std.testing.expect(c.contains(a));
210-
try std.testing.expect(c.contains(b));
211-
try std.testing.expect(c.contains(c));
212-
}

0 commit comments

Comments
 (0)