@@ -151,15 +151,15 @@ fn loadSegment(self: *Self, segment_id: SegmentId) !FileSegmentNode {
151
151
return node ;
152
152
}
153
153
154
- fn loadSegments (self : * Self ) ! void {
154
+ fn loadSegments (self : * Self ) ! u64 {
155
155
self .segments_lock .lock ();
156
156
defer self .segments_lock .unlock ();
157
157
158
158
const segment_ids = filefmt .readIndexFile (self .dir , self .allocator ) catch | err | {
159
159
if (err == error .FileNotFound ) {
160
160
if (self .options .create ) {
161
161
try self .updateIndexFile (self .file_segments .segments .value );
162
- return ;
162
+ return 0 ;
163
163
}
164
164
return error .IndexNotFound ;
165
165
}
@@ -169,10 +169,13 @@ fn loadSegments(self: *Self) !void {
169
169
170
170
try self .file_segments .segments .value .nodes .ensureTotalCapacity (self .allocator , segment_ids .len );
171
171
172
+ var max_commit_id : u64 = 0 ;
172
173
for (segment_ids ) | segment_id | {
173
174
const node = try self .loadSegment (segment_id );
174
175
self .file_segments .segments .value .nodes .appendAssumeCapacity (node );
176
+ max_commit_id = @max (max_commit_id , node .value .max_commit_id );
175
177
}
178
+ return max_commit_id ;
176
179
}
177
180
178
181
fn doCheckpoint (self : * Self ) ! bool {
@@ -356,23 +359,18 @@ fn stopMemorySegmentMergeThread(self: *Self) void {
356
359
}
357
360
358
361
pub fn open (self : * Self ) ! void {
359
- try self .loadSegments ();
362
+ const max_commit_id = try self .loadSegments ();
360
363
361
364
// start these threads after loading file segments, but before replaying oplog to memory segments
362
365
try self .startFileSegmentMergeThread ();
363
366
try self .startMemorySegmentMergeThread ();
364
367
try self .startCheckpointThread ();
365
368
366
- try self .oplog .open (self . getMaxCommitId () , updateInternal , self );
369
+ try self .oplog .open (max_commit_id + 1 , updateInternal , self );
367
370
368
371
log .info ("index loaded" , .{});
369
372
}
370
373
371
- const Checkpoint = struct {
372
- src : * MemorySegmentNode ,
373
- dest : ? * FileSegmentNode = null ,
374
- };
375
-
376
374
fn maybeScheduleCheckpoint (self : * Self ) void {
377
375
if (self .memory_segments .segments .value .getFirst ()) | first_node | {
378
376
if (first_node .value .getSize () >= self .options .min_segment_size ) {
@@ -462,24 +460,17 @@ pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, de
462
460
var results = SearchResults .init (allocator );
463
461
errdefer results .deinit ();
464
462
465
- var segments = self .acquireSegments ();
466
- defer self .releaseSegments (& segments );
463
+ var snapshot = self .acquireSegments ();
464
+ defer self .releaseSegments (& snapshot ); // FIXME this possibly deletes orphaned segments, do it in a separate thread
467
465
468
- try segments .file_segments .value .search (sorted_hashes , & results , deadline );
469
- try segments .memory_segments .value .search (sorted_hashes , & results , deadline );
466
+ try snapshot .file_segments .value .search (sorted_hashes , & results , deadline );
467
+ try snapshot .memory_segments .value .search (sorted_hashes , & results , deadline );
470
468
471
469
results .sort ();
472
470
473
471
return results ;
474
472
}
475
473
476
- pub fn getMaxCommitId (self : * Self ) u64 {
477
- var segments = self .acquireSegments ();
478
- defer self .releaseSegments (& segments );
479
-
480
- return @max (segments .file_segments .value .getMaxCommitId (), segments .memory_segments .value .getMaxCommitId ());
481
- }
482
-
483
474
test {
484
475
_ = @import ("index_tests.zig" );
485
476
}
0 commit comments