Skip to content

Commit f03252b

Browse files
committedDec 7, 2024
Open index in the background
1 parent 9ac89b2 commit f03252b

File tree

6 files changed

+96
-76
lines changed

6 files changed

+96
-76
lines changed
 

‎src/Index.zig

+81-54
Original file line numberDiff line numberDiff line change
@@ -51,23 +51,13 @@ dir: std.fs.Dir,
5151

5252
oplog: Oplog,
5353

54-
memory_segments: SegmentListManager(MemorySegment),
55-
file_segments: SegmentListManager(FileSegment),
54+
open_lock: std.Thread.Mutex = .{},
55+
is_ready: std.atomic.Value(bool),
56+
load_task: ?Scheduler.Task = null,
5657

57-
// These segments are owned by the index and can't be accessed without acquiring segments_lock.
58-
// They can never be modified, only replaced.
5958
segments_lock: std.Thread.RwLock = .{},
60-
61-
// These locks give partial access to the respective segments list.
62-
// 1) For memory_segments, new segment can be appended to the list without this lock.
63-
// 2) For file_segments, no write operation can happen without this lock.
64-
// These lock can be only acquired before segments_lock, never after, to avoid deadlock situatons.
65-
// They are mostly useful to allowing read access to segments during merge/checkpoint, without blocking real-time update.
66-
file_segments_lock: std.Thread.Mutex = .{},
67-
memory_segments_lock: std.Thread.Mutex = .{},
68-
69-
// Mutex used to control linearity of updates.
70-
update_lock: std.Thread.Mutex = .{},
59+
memory_segments: SegmentListManager(MemorySegment),
60+
file_segments: SegmentListManager(FileSegment),
7161

7262
checkpoint_task: ?Scheduler.Task = null,
7363
file_segment_merge_task: ?Scheduler.Task = null,
@@ -123,12 +113,17 @@ pub fn init(allocator: std.mem.Allocator, scheduler: *Scheduler, parent_dir: std
123113
.segments_lock = .{},
124114
.memory_segments = memory_segments,
125115
.file_segments = file_segments,
116+
.is_ready = std.atomic.Value(bool).init(false),
126117
};
127118
}
128119

129120
pub fn deinit(self: *Self) void {
130121
log.info("closing index {}", .{@intFromPtr(self)});
131122

123+
if (self.load_task) |task| {
124+
self.scheduler.destroyTask(task);
125+
}
126+
132127
if (self.checkpoint_task) |task| {
133128
self.scheduler.destroyTask(task);
134129
}
@@ -148,36 +143,8 @@ pub fn deinit(self: *Self) void {
148143
self.dir.close();
149144
}
150145

151-
fn loadSegments(self: *Self, create: bool) !u64 {
152-
self.segments_lock.lock();
153-
defer self.segments_lock.unlock();
154-
155-
const segment_ids = filefmt.readManifestFile(self.dir, self.allocator) catch |err| {
156-
if (err == error.FileNotFound) {
157-
if (create) {
158-
try self.updateManifestFile(self.file_segments.segments.value);
159-
return 0;
160-
}
161-
return error.IndexNotFound;
162-
}
163-
return err;
164-
};
165-
defer self.allocator.free(segment_ids);
166-
log.info("found {} segments in manifest", .{segment_ids.len});
167-
168-
try self.file_segments.segments.value.nodes.ensureTotalCapacity(self.allocator, segment_ids.len);
169-
var last_commit_id: u64 = 0;
170-
for (segment_ids, 1..) |segment_id, i| {
171-
const node = try FileSegmentList.loadSegment(self.allocator, segment_id, .{ .dir = self.dir });
172-
self.file_segments.segments.value.nodes.appendAssumeCapacity(node);
173-
last_commit_id = node.value.info.getLastCommitId();
174-
log.info("loaded segment {} ({}/{})", .{ last_commit_id, i, segment_ids.len });
175-
}
176-
return last_commit_id;
177-
}
178-
179146
fn doCheckpoint(self: *Self) !bool {
180-
var snapshot = self.acquireReader();
147+
var snapshot = try self.acquireReader();
181148
defer self.releaseReader(&snapshot);
182149

183150
const source = snapshot.memory_segments.value.getFirst() orelse return false;
@@ -234,7 +201,7 @@ fn doCheckpoint(self: *Self) !bool {
234201
}
235202

236203
fn updateDocsMetrics(self: *Self) void {
237-
var snapshot = self.acquireReader();
204+
var snapshot = self.acquireReader() catch return;
238205
defer self.releaseReader(&snapshot);
239206

240207
metrics.docs(self.name, snapshot.getNumDocs());
@@ -320,30 +287,81 @@ fn maybeMergeMemorySegments(self: *Self) !bool {
320287
}
321288

322289
pub fn open(self: *Self, create: bool) !void {
323-
const last_commit_id = try self.loadSegments(create);
290+
self.open_lock.lock();
291+
defer self.open_lock.unlock();
292+
293+
if (self.is_ready.load(.monotonic)) {
294+
return;
295+
}
296+
297+
if (self.load_task != null) {
298+
return error.AlreadyOpening;
299+
}
300+
301+
const manifest = filefmt.readManifestFile(self.dir, self.allocator) catch |err| {
302+
if (err == error.FileNotFound) {
303+
if (create) {
304+
try self.updateManifestFile(self.file_segments.segments.value);
305+
try self.load(&.{});
306+
return;
307+
}
308+
return error.IndexNotFound;
309+
}
310+
return err;
311+
};
312+
errdefer self.allocator.free(manifest);
313+
314+
self.load_task = try self.scheduler.createTask(.medium, loadTask, .{ self, manifest });
315+
self.scheduler.scheduleTask(self.load_task.?);
316+
}
317+
318+
fn load(self: *Self, manifest: []SegmentInfo) !void {
319+
defer self.allocator.free(manifest);
320+
321+
log.info("found {} segments in manifest", .{manifest.len});
322+
323+
try self.file_segments.segments.value.nodes.ensureTotalCapacity(self.allocator, manifest.len);
324+
var last_commit_id: u64 = 0;
325+
for (manifest, 1..) |segment_id, i| {
326+
const node = try FileSegmentList.loadSegment(self.allocator, segment_id, .{ .dir = self.dir });
327+
self.file_segments.segments.value.nodes.appendAssumeCapacity(node);
328+
last_commit_id = node.value.info.getLastCommitId();
329+
log.info("loaded segment {} ({}/{})", .{ last_commit_id, i, manifest.len });
330+
}
324331

325-
self.checkpoint_task = try self.scheduler.createTask(.medium, checkpointTask, .{self});
326332
self.memory_segment_merge_task = try self.scheduler.createTask(.high, memorySegmentMergeTask, .{self});
333+
self.checkpoint_task = try self.scheduler.createTask(.medium, checkpointTask, .{self});
327334
self.file_segment_merge_task = try self.scheduler.createTask(.low, fileSegmentMergeTask, .{self});
328335

329-
try self.oplog.open(last_commit_id + 1, updateInternal, self);
336+
try self.oplog.open(1, updateInternal, self);
330337

331338
log.info("index loaded", .{});
339+
340+
self.is_ready.store(true, .monotonic);
341+
}
342+
343+
fn loadTask(self: *Self, manifest: []SegmentInfo) void {
344+
self.open_lock.lock();
345+
defer self.open_lock.unlock();
346+
347+
self.load(manifest) catch |err| {
348+
log.err("load failed: {}", .{err});
349+
};
332350
}
333351

334352
fn maybeScheduleMemorySegmentMerge(self: *Self) void {
335353
if (self.memory_segments.needsMerge()) {
336-
log.debug("too many memory segments, scheduling merging", .{});
337354
if (self.memory_segment_merge_task) |task| {
355+
log.debug("too many memory segments, scheduling merging", .{});
338356
self.scheduler.scheduleTask(task);
339357
}
340358
}
341359
}
342360

343361
fn maybeScheduleFileSegmentMerge(self: *Self) void {
344362
if (self.file_segments.needsMerge()) {
345-
log.debug("too many file segments, scheduling merging", .{});
346363
if (self.file_segment_merge_task) |task| {
364+
log.debug("too many file segments, scheduling merging", .{});
347365
self.scheduler.scheduleTask(task);
348366
}
349367
}
@@ -352,8 +370,8 @@ fn maybeScheduleFileSegmentMerge(self: *Self) void {
352370
fn maybeScheduleCheckpoint(self: *Self) void {
353371
if (self.memory_segments.segments.value.getFirst()) |first_node| {
354372
if (first_node.value.getSize() >= self.options.min_segment_size) {
355-
log.debug("the first memory segment is too big, scheduling checkpoint", .{});
356373
if (self.checkpoint_task) |task| {
374+
log.debug("the first memory segment is too big, scheduling checkpoint", .{});
357375
self.scheduler.scheduleTask(task);
358376
}
359377
}
@@ -372,11 +390,18 @@ fn readyForCheckpoint(self: *Self) ?MemorySegmentNode {
372390
return null;
373391
}
374392

393+
fn checkIfReady(self: Self) !void {
394+
if (!self.is_ready.load(.monotonic)) {
395+
return error.IndexNotReady;
396+
}
397+
}
398+
375399
pub fn update(self: *Self, changes: []const Change) !void {
400+
try self.checkIfReady();
376401
try self.updateInternal(changes, null);
377402
}
378403

379-
pub fn updateInternal(self: *Self, changes: []const Change, commit_id: ?u64) !void {
404+
fn updateInternal(self: *Self, changes: []const Change, commit_id: ?u64) !void {
380405
var target = try MemorySegmentList.createSegment(self.allocator, .{});
381406
defer MemorySegmentList.destroySegment(self.allocator, &target);
382407

@@ -400,7 +425,9 @@ pub fn updateInternal(self: *Self, changes: []const Change, commit_id: ?u64) !vo
400425
self.maybeScheduleCheckpoint();
401426
}
402427

403-
pub fn acquireReader(self: *Self) IndexReader {
428+
pub fn acquireReader(self: *Self) !IndexReader {
429+
try self.checkIfReady();
430+
404431
self.segments_lock.lockShared();
405432
defer self.segments_lock.unlockShared();
406433

@@ -416,7 +443,7 @@ pub fn releaseReader(self: *Self, reader: *IndexReader) void {
416443
}
417444

418445
pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, deadline: Deadline) !SearchResults {
419-
var reader = self.acquireReader();
446+
var reader = try self.acquireReader();
420447
defer self.releaseReader(&reader);
421448

422449
return reader.search(hashes, allocator, deadline);

‎src/MultiIndex.zig

+5-19
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ pub const IndexRef = struct {
1212
name: []const u8,
1313
references: usize = 0,
1414
last_used_at: i64 = std.math.minInt(i64),
15-
is_open: bool = false,
16-
lock: std.Thread.Mutex = .{},
1715

1816
pub fn deinit(self: *IndexRef, allocator: std.mem.Allocator) void {
1917
allocator.free(self.name);
@@ -31,16 +29,6 @@ pub const IndexRef = struct {
3129
self.last_used_at = std.time.milliTimestamp();
3230
return self.references == 0;
3331
}
34-
35-
pub fn ensureOpen(self: *IndexRef, create: bool) !void {
36-
self.lock.lock();
37-
defer self.lock.unlock();
38-
39-
if (self.is_open) return;
40-
41-
try self.index.open(create);
42-
self.is_open = true;
43-
}
4432
};
4533

4634
lock: std.Thread.Mutex = .{},
@@ -128,7 +116,7 @@ pub fn releaseIndex(self: *Self, index: *Index) void {
128116
self.releaseIndexRef(@fieldParentPtr("index", index));
129117
}
130118

131-
fn acquireIndex(self: *Self, name: []const u8) !*IndexRef {
119+
fn acquireIndex(self: *Self, name: []const u8, create: bool) !*IndexRef {
132120
if (!isValidName(name)) {
133121
return error.InvalidIndexName;
134122
}
@@ -152,26 +140,24 @@ fn acquireIndex(self: *Self, name: []const u8) !*IndexRef {
152140
};
153141
errdefer result.value_ptr.index.deinit();
154142

143+
try result.value_ptr.index.open(create);
144+
155145
result.value_ptr.incRef();
156146
return result.value_ptr;
157147
}
158148

159149
pub fn getIndex(self: *Self, name: []const u8) !*Index {
160-
const index_ref = try self.acquireIndex(name);
150+
const index_ref = try self.acquireIndex(name, false);
161151
errdefer self.releaseIndexRef(index_ref);
162152

163-
try index_ref.ensureOpen(false);
164-
165153
return &index_ref.index;
166154
}
167155

168156
pub fn createIndex(self: *Self, name: []const u8) !void {
169157
log.info("creating index {s}", .{name});
170158

171-
const index_ref = try self.acquireIndex(name);
159+
const index_ref = try self.acquireIndex(name, true);
172160
defer self.releaseIndexRef(index_ref);
173-
174-
try index_ref.ensureOpen(true);
175161
}
176162

177163
pub fn deleteIndex(self: *Self, name: []const u8) !void {

‎src/main.zig

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ pub fn main() !void {
7676
log.info("using {} threads", .{threads});
7777

7878
try metrics.initializeMetrics(allocator, .{ .prefix = "aindex_" });
79+
defer metrics.deinitMetrics();
7980

8081
var scheduler = Scheduler.init(allocator);
8182
defer scheduler.deinit();

‎src/metrics.zig

+4
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ pub fn initializeMetrics(allocator: std.mem.Allocator, comptime opts: m.Registry
4949
};
5050
}
5151

52+
pub fn deinitMetrics() void {
53+
metrics.docs.deinit();
54+
}
55+
5256
pub fn writeMetrics(writer: anytype) !void {
5357
return m.write(&metrics, writer);
5458
}

‎src/server.zig

+3-3
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ fn handleHeadFingerprint(ctx: *Context, req: *httpz.Request, res: *httpz.Respons
330330
const index = try getIndex(ctx, req, res, false) orelse return;
331331
defer releaseIndex(ctx, index);
332332

333-
var index_reader = index.acquireReader();
333+
var index_reader = try index.acquireReader();
334334
defer index.releaseReader(&index_reader);
335335

336336
const id = try getId(req, res, false) orelse return;
@@ -351,7 +351,7 @@ fn handleGetFingerprint(ctx: *Context, req: *httpz.Request, res: *httpz.Response
351351
const index = try getIndex(ctx, req, res, true) orelse return;
352352
defer releaseIndex(ctx, index);
353353

354-
var index_reader = index.acquireReader();
354+
var index_reader = try index.acquireReader();
355355
defer index.releaseReader(&index_reader);
356356

357357
const id = try getId(req, res, true) orelse return;
@@ -443,7 +443,7 @@ fn handleGetIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !voi
443443
const index = try getIndex(ctx, req, res, true) orelse return;
444444
defer releaseIndex(ctx, index);
445445

446-
var index_reader = index.acquireReader();
446+
var index_reader = try index.acquireReader();
447447
defer index.releaseReader(&index_reader);
448448

449449
const response = GetIndexResponse{

‎src/utils/Scheduler.zig

+2
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ pub fn createTask(self: *Self, priority: Priority, comptime func: anytype, args:
7171
const closure = try self.allocator.create(Closure);
7272
errdefer self.allocator.destroy(closure);
7373

74+
closure.arguments = args;
75+
7476
task.* = .{
7577
.data = .{
7678
.priority = priority,

0 commit comments

Comments
 (0)