Skip to content

Commit c400190

Browse files
committed
Fix delayed index load
1 parent f03252b commit c400190

File tree

6 files changed

+65
-28
lines changed

6 files changed

+65
-28
lines changed

build.zig

+3
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,11 @@ pub fn build(b: *std.Build) void {
7272
main_tests.root_module.addImport("msgpack", msgpack.module("msgpack"));
7373

7474
const run_unit_tests = b.addRunArtifact(main_tests);
75+
run_unit_tests.has_side_effects = true;
76+
7577
const run_integration_tests = b.addSystemCommand(&[_][]const u8{ "pytest", "-vv", "tests/" });
7678
run_integration_tests.step.dependOn(b.getInstallStep());
79+
run_integration_tests.has_side_effects = true;
7780

7881
var unit_tests_step = b.step("unit-tests", "Run unit tests");
7982
unit_tests_step.dependOn(&run_unit_tests.step);

src/Index.zig

+15-12
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ dir: std.fs.Dir,
5252
oplog: Oplog,
5353

5454
open_lock: std.Thread.Mutex = .{},
55-
is_ready: std.atomic.Value(bool),
55+
is_ready: std.Thread.ResetEvent = .{},
5656
load_task: ?Scheduler.Task = null,
5757

5858
segments_lock: std.Thread.RwLock = .{},
@@ -113,7 +113,6 @@ pub fn init(allocator: std.mem.Allocator, scheduler: *Scheduler, parent_dir: std
113113
.segments_lock = .{},
114114
.memory_segments = memory_segments,
115115
.file_segments = file_segments,
116-
.is_ready = std.atomic.Value(bool).init(false),
117116
};
118117
}
119118

@@ -287,13 +286,13 @@ fn maybeMergeMemorySegments(self: *Self) !bool {
287286
}
288287

289288
pub fn open(self: *Self, create: bool) !void {
290-
self.open_lock.lock();
291-
defer self.open_lock.unlock();
292-
293-
if (self.is_ready.load(.monotonic)) {
289+
if (self.is_ready.isSet()) {
294290
return;
295291
}
296292

293+
self.open_lock.lock();
294+
defer self.open_lock.unlock();
295+
297296
if (self.load_task != null) {
298297
return error.AlreadyOpening;
299298
}
@@ -333,11 +332,11 @@ fn load(self: *Self, manifest: []SegmentInfo) !void {
333332
self.checkpoint_task = try self.scheduler.createTask(.medium, checkpointTask, .{self});
334333
self.file_segment_merge_task = try self.scheduler.createTask(.low, fileSegmentMergeTask, .{self});
335334

336-
try self.oplog.open(1, updateInternal, self);
335+
try self.oplog.open(last_commit_id + 1, updateInternal, self);
337336

338337
log.info("index loaded", .{});
339338

340-
self.is_ready.store(true, .monotonic);
339+
self.is_ready.set();
341340
}
342341

343342
fn loadTask(self: *Self, manifest: []SegmentInfo) void {
@@ -390,14 +389,18 @@ fn readyForCheckpoint(self: *Self) ?MemorySegmentNode {
390389
return null;
391390
}
392391

393-
fn checkIfReady(self: Self) !void {
394-
if (!self.is_ready.load(.monotonic)) {
392+
pub fn waitForReady(self: *Self, timeout_ms: u32) !void {
393+
try self.is_ready.timedWait(timeout_ms * std.time.us_per_ms);
394+
}
395+
396+
pub fn checkReady(self: *Self) !void {
397+
if (!self.is_ready.isSet()) {
395398
return error.IndexNotReady;
396399
}
397400
}
398401

399402
pub fn update(self: *Self, changes: []const Change) !void {
400-
try self.checkIfReady();
403+
try self.checkReady();
401404
try self.updateInternal(changes, null);
402405
}
403406

@@ -426,7 +429,7 @@ fn updateInternal(self: *Self, changes: []const Change, commit_id: ?u64) !void {
426429
}
427430

428431
pub fn acquireReader(self: *Self) !IndexReader {
429-
try self.checkIfReady();
432+
try self.checkReady();
430433

431434
self.segments_lock.lockShared();
432435
defer self.segments_lock.unlockShared();

src/index_tests.zig

+7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const common = @import("common.zig");
44
const Change = @import("change.zig").Change;
55
const SearchResults = common.SearchResults;
66
const Scheduler = @import("utils/Scheduler.zig");
7+
const Deadline = @import("utils/Deadline.zig");
78

89
const Index = @import("Index.zig");
910

@@ -76,6 +77,8 @@ test "index create, update, reopen and search" {
7677
var scheduler = Scheduler.init(std.testing.allocator);
7778
defer scheduler.deinit();
7879

80+
try scheduler.start(2);
81+
7982
var hashes: [100]u32 = undefined;
8083

8184
{
@@ -95,6 +98,7 @@ test "index create, update, reopen and search" {
9598
defer index.deinit();
9699

97100
try index.open(false);
101+
try index.waitForReady(10000);
98102

99103
var results = try index.search(generateRandomHashes(&hashes, 1), std.testing.allocator, .{});
100104
defer results.deinit();
@@ -115,6 +119,8 @@ test "index many updates" {
115119
var scheduler = Scheduler.init(std.testing.allocator);
116120
defer scheduler.deinit();
117121

122+
try scheduler.start(2);
123+
118124
var hashes: [100]u32 = undefined;
119125

120126
{
@@ -135,6 +141,7 @@ test "index many updates" {
135141
defer index.deinit();
136142

137143
try index.open(false);
144+
try index.waitForReady(100000);
138145

139146
{
140147
var results = try index.search(generateRandomHashes(&hashes, 0), std.testing.allocator, .{});

src/segment_list.zig

+1
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ pub fn SegmentListManager(Segment: type) type {
230230
}
231231

232232
pub fn count(self: Self) usize {
233+
// std.debug.print("count {}\n", .{self.segments.value.nodes});
233234
return self.segments.value.nodes.items.len;
234235
}
235236

src/server.zig

+17-5
Original file line numberDiff line numberDiff line change
@@ -233,11 +233,21 @@ fn handleNotFound(_: *Context, req: *httpz.Request, res: *httpz.Response) !void
233233
}
234234

235235
fn handleError(_: *Context, req: *httpz.Request, res: *httpz.Response, err: anyerror) void {
236-
log.err("unhandled error in {s}: {any}", .{ req.url.raw, err });
237-
writeErrorResponse(500, err, req, res) catch {
238-
res.status = 500;
239-
res.body = "internal error";
240-
};
236+
switch (err) {
237+
error.IndexNotReady => {
238+
writeErrorResponse(503, err, req, res) catch {
239+
res.status = 503;
240+
res.body = "not ready yet";
241+
};
242+
},
243+
else => {
244+
log.err("unhandled error in {s}: {any}", .{ req.url.raw, err });
245+
writeErrorResponse(500, err, req, res) catch {
246+
res.status = 500;
247+
res.body = "internal error";
248+
};
249+
},
250+
}
241251
}
242252

243253
fn writeErrorResponse(status: u16, err: anyerror, req: *httpz.Request, res: *httpz.Response) !void {
@@ -484,6 +494,8 @@ fn handleIndexHealth(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !
484494
const index = try getIndex(ctx, req, res, false) orelse return;
485495
defer releaseIndex(ctx, index);
486496

497+
try index.checkReady();
498+
487499
try res.writer().writeAll("OK\n");
488500
}
489501

src/utils/Scheduler.zig

+22-11
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ const Priority = enum(u8) {
1111
const TaskStatus = struct {
1212
reschedule: usize = 0,
1313
scheduled: bool = false,
14+
running: bool = false,
1415
done: std.Thread.ResetEvent = .{},
1516
priority: Priority,
1617
ctx: *anyopaque,
@@ -88,12 +89,22 @@ pub fn createTask(self: *Self, priority: Priority, comptime func: anytype, args:
8889
return task;
8990
}
9091

91-
pub fn destroyTask(self: *Self, task: Task) void {
92+
fn dequeue(self: *Self, task: Task) void {
9293
self.queue_mutex.lock();
94+
defer self.queue_mutex.unlock();
95+
9396
if (task.data.scheduled) {
9497
self.queue.remove(task);
98+
task.next = null;
99+
task.prev = null;
100+
task.data.scheduled = false;
95101
}
96-
self.queue_mutex.unlock();
102+
103+
task.data.reschedule = 0;
104+
}
105+
106+
pub fn destroyTask(self: *Self, task: Task) void {
107+
self.dequeue(task);
97108

98109
task.data.done.wait();
99110

@@ -108,15 +119,14 @@ pub fn scheduleTask(self: *Self, task: Task) void {
108119
self.queue_mutex.lock();
109120
defer self.queue_mutex.unlock();
110121

111-
if (task.data.scheduled) {
122+
if (task.data.scheduled or task.data.running) {
112123
task.data.reschedule += 1;
113-
return;
124+
} else {
125+
self.enqueue(task);
114126
}
115-
116-
self.addToQueue(task);
117127
}
118128

119-
fn addToQueue(self: *Self, task: *Queue.Node) void {
129+
fn enqueue(self: *Self, task: *Queue.Node) void {
120130
task.data.scheduled = true;
121131
self.queue.prepend(task);
122132
self.queue_not_empty.signal();
@@ -133,6 +143,8 @@ fn getTaskToRun(self: *Self) ?*Queue.Node {
133143
};
134144
task.prev = null;
135145
task.next = null;
146+
task.data.scheduled = false;
147+
task.data.running = true;
136148
task.data.done.reset();
137149
return task;
138150
}
@@ -145,11 +157,10 @@ fn markAsDone(self: *Self, task: *Queue.Node) void {
145157

146158
if (task.data.reschedule > 0) {
147159
task.data.reschedule -= 1;
148-
self.addToQueue(task);
149-
} else {
150-
task.data.scheduled = false;
160+
self.enqueue(task);
151161
}
152162

163+
task.data.running = false;
153164
task.data.done.set();
154165
}
155166

@@ -207,7 +218,7 @@ test "Scheduler: smoke test" {
207218
};
208219
var counter: Counter = .{};
209220

210-
const task = try scheduler.createTask(.high, Counter.incr, &counter);
221+
const task = try scheduler.createTask(.high, Counter.incr, .{&counter});
211222
defer scheduler.destroyTask(task);
212223

213224
for (0..3) |_| {

0 commit comments

Comments
 (0)