Skip to content

Commit 67793e8

Browse files
committed
Replace direct threads with a task scheduler
1 parent c2dbcd6 commit 67793e8

File tree

5 files changed

+279
-108
lines changed

5 files changed

+279
-108
lines changed

src/Index.zig

+42-98
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const log = std.log.scoped(.index);
55
const zul = @import("zul");
66

77
const Deadline = @import("utils/Deadline.zig");
8+
const Scheduler = @import("utils/Scheduler.zig");
89
const Change = @import("change.zig").Change;
910
const SearchResult = @import("common.zig").SearchResult;
1011
const SearchResults = @import("common.zig").SearchResults;
@@ -43,6 +44,7 @@ const Options = struct {
4344

4445
options: Options,
4546
allocator: std.mem.Allocator,
47+
scheduler: *Scheduler,
4648

4749
dir: std.fs.Dir,
4850

@@ -66,16 +68,9 @@ memory_segments_lock: std.Thread.Mutex = .{},
6668
// Mutex used to control linearity of updates.
6769
update_lock: std.Thread.Mutex = .{},
6870

69-
stopping: std.atomic.Value(bool),
70-
71-
checkpoint_event: std.Thread.ResetEvent = .{},
72-
checkpoint_thread: ?std.Thread = null,
73-
74-
file_segment_merge_event: std.Thread.ResetEvent = .{},
75-
file_segment_merge_thread: ?std.Thread = null,
76-
77-
memory_segment_merge_event: std.Thread.ResetEvent = .{},
78-
memory_segment_merge_thread: ?std.Thread = null,
71+
checkpoint_task: ?Scheduler.Task = null,
72+
file_segment_merge_task: ?Scheduler.Task = null,
73+
memory_segment_merge_task: ?Scheduler.Task = null,
7974

8075
fn getFileSegmentSize(segment: SharedPtr(FileSegment)) usize {
8176
return segment.value.getSize();
@@ -85,7 +80,7 @@ fn getMemorySegmentSize(segment: SharedPtr(MemorySegment)) usize {
8580
return segment.value.getSize();
8681
}
8782

88-
pub fn init(allocator: std.mem.Allocator, parent_dir: std.fs.Dir, path: []const u8, options: Options) !Self {
83+
pub fn init(allocator: std.mem.Allocator, scheduler: *Scheduler, parent_dir: std.fs.Dir, path: []const u8, options: Options) !Self {
8984
var dir = try parent_dir.makeOpenPath(path, .{ .iterate = true });
9085
errdefer dir.close();
9186

@@ -120,22 +115,29 @@ pub fn init(allocator: std.mem.Allocator, parent_dir: std.fs.Dir, path: []const
120115
return .{
121116
.options = options,
122117
.allocator = allocator,
118+
.scheduler = scheduler,
123119
.dir = dir,
124120
.oplog = oplog,
125121
.segments_lock = .{},
126122
.memory_segments = memory_segments,
127123
.file_segments = file_segments,
128-
.stopping = std.atomic.Value(bool).init(false),
129124
};
130125
}
131126

132127
pub fn deinit(self: *Self) void {
133128
log.info("closing index {}", .{@intFromPtr(self)});
134-
self.stopping.store(true, .release);
135129

136-
self.stopCheckpointThread();
137-
self.stopMemorySegmentMergeThread();
138-
self.stopFileSegmentMergeThread();
130+
if (self.checkpoint_task) |task| {
131+
self.scheduler.destroyTask(task);
132+
}
133+
134+
if (self.memory_segment_merge_task) |task| {
135+
self.scheduler.destroyTask(task);
136+
}
137+
138+
if (self.file_segment_merge_task) |task| {
139+
self.scheduler.destroyTask(task);
140+
}
139141

140142
self.memory_segments.deinit(self.allocator, .keep);
141143
self.file_segments.deinit(self.allocator, .keep);
@@ -221,45 +223,30 @@ fn doCheckpoint(self: *Self) !bool {
221223
self.file_segments.commitUpdate(&file_segments_update);
222224

223225
if (self.file_segments.needsMerge()) {
224-
self.file_segment_merge_event.set();
226+
if (self.file_segment_merge_task) |task| {
227+
self.scheduler.scheduleTask(task);
228+
}
225229
}
226230

227231
return true;
228232
}
229233

230-
fn checkpointThreadFn(self: *Self) void {
231-
log.debug("checkpoint thread started", .{});
232-
while (!self.stopping.load(.acquire)) {
233-
if (self.doCheckpoint()) |successful| {
234-
if (successful) {
235-
continue;
236-
}
237-
self.checkpoint_event.reset();
238-
} else |err| {
239-
log.err("checkpoint failed: {}", .{err});
240-
}
241-
log.debug("waiting for checkpoint event", .{});
242-
self.checkpoint_event.timedWait(std.time.ns_per_min) catch continue;
243-
}
244-
log.debug("checkpoint thread stopped", .{});
234+
fn checkpointTask(self: *Self) void {
235+
_ = self.doCheckpoint() catch |err| {
236+
log.err("checkpoint failed: {}", .{err});
237+
};
245238
}
246239

247-
fn startCheckpointThread(self: *Self) !void {
248-
if (self.checkpoint_thread != null) return;
249-
250-
log.info("starting checkpoint thread", .{});
251-
self.checkpoint_thread = try std.Thread.spawn(.{}, checkpointThreadFn, .{self});
240+
fn memorySegmentMergeTask(self: *Self) void {
241+
_ = self.maybeMergeMemorySegments() catch |err| {
242+
log.err("memory segment merge failed: {}", .{err});
243+
};
252244
}
253245

254-
fn stopCheckpointThread(self: *Self) void {
255-
log.info("stopping checkpoint thread", .{});
256-
if (self.checkpoint_thread) |thread| {
257-
self.checkpoint_event.set();
258-
log.debug("waiting for checkpoint thread to exit", .{});
259-
thread.join();
260-
}
261-
log.debug("checkpoint thread stopped", .{});
262-
self.checkpoint_thread = null;
246+
fn fileSegmentMergeTask(self: *Self) void {
247+
_ = self.maybeMergeFileSegments() catch |err| {
248+
log.err("file segment merge failed: {}", .{err});
249+
};
263250
}
264251

265252
fn updateManifestFile(self: *Self, segments: *FileSegmentList) !void {
@@ -303,22 +290,6 @@ fn fileSegmentMergeThreadFn(self: *Self) void {
303290
}
304291
}
305292

306-
fn startFileSegmentMergeThread(self: *Self) !void {
307-
if (self.file_segment_merge_thread != null) return;
308-
309-
log.info("starting file segment merge thread", .{});
310-
self.file_segment_merge_thread = try std.Thread.spawn(.{}, fileSegmentMergeThreadFn, .{self});
311-
}
312-
313-
fn stopFileSegmentMergeThread(self: *Self) void {
314-
log.info("stopping file segment merge thread", .{});
315-
if (self.file_segment_merge_thread) |thread| {
316-
self.file_segment_merge_event.set();
317-
thread.join();
318-
}
319-
self.file_segment_merge_thread = null;
320-
}
321-
322293
fn maybeMergeMemorySegments(self: *Self) !bool {
323294
var upd = try self.memory_segments.prepareMerge(self.allocator) orelse return false;
324295
defer self.memory_segments.cleanupAfterUpdate(self.allocator, &upd);
@@ -335,43 +306,12 @@ fn maybeMergeMemorySegments(self: *Self) !bool {
335306
return true;
336307
}
337308

338-
fn memorySegmentMergeThreadFn(self: *Self) void {
339-
while (!self.stopping.load(.acquire)) {
340-
if (self.maybeMergeMemorySegments()) |successful| {
341-
if (successful) {
342-
continue;
343-
}
344-
self.memory_segment_merge_event.reset();
345-
} else |err| {
346-
log.err("memory segment merge failed: {}", .{err});
347-
}
348-
self.memory_segment_merge_event.timedWait(std.time.ns_per_min) catch continue;
349-
}
350-
}
351-
352-
fn startMemorySegmentMergeThread(self: *Self) !void {
353-
if (self.memory_segment_merge_thread != null) return;
354-
355-
log.info("starting memory segment merge thread", .{});
356-
self.memory_segment_merge_thread = try std.Thread.spawn(.{}, memorySegmentMergeThreadFn, .{self});
357-
}
358-
359-
fn stopMemorySegmentMergeThread(self: *Self) void {
360-
log.info("stopping memory segment merge thread", .{});
361-
if (self.memory_segment_merge_thread) |thread| {
362-
self.memory_segment_merge_event.set();
363-
thread.join();
364-
}
365-
self.memory_segment_merge_thread = null;
366-
}
367-
368309
pub fn open(self: *Self, create: bool) !void {
369310
const last_commit_id = try self.loadSegments(create);
370311

371-
// start these threads after loading file segments, but before replaying oplog to memory segments
372-
try self.startFileSegmentMergeThread();
373-
try self.startMemorySegmentMergeThread();
374-
try self.startCheckpointThread();
312+
self.checkpoint_task = try self.scheduler.createTask(.medium, checkpointTask, self);
313+
self.memory_segment_merge_task = try self.scheduler.createTask(.high, memorySegmentMergeTask, self);
314+
self.file_segment_merge_task = try self.scheduler.createTask(.low, fileSegmentMergeTask, self);
375315

376316
try self.oplog.open(last_commit_id + 1, updateInternal, self);
377317

@@ -381,7 +321,9 @@ pub fn open(self: *Self, create: bool) !void {
381321
fn maybeScheduleCheckpoint(self: *Self) void {
382322
if (self.memory_segments.segments.value.getFirst()) |first_node| {
383323
if (first_node.value.getSize() >= self.options.min_segment_size) {
384-
self.checkpoint_event.set();
324+
if (self.checkpoint_task) |task| {
325+
self.scheduler.scheduleTask(task);
326+
}
385327
}
386328
}
387329
}
@@ -421,7 +363,9 @@ pub fn updateInternal(self: *Self, changes: []const Change, commit_id: ?u64) !vo
421363
self.memory_segments.commitUpdate(&upd);
422364

423365
if (self.memory_segments.needsMerge()) {
424-
self.memory_segment_merge_event.set();
366+
if (self.memory_segment_merge_task) |task| {
367+
self.scheduler.scheduleTask(task);
368+
}
425369
}
426370
}
427371

src/MultiIndex.zig

+5-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ const log = std.log.scoped(.multi_index);
33
const assert = std.debug.assert;
44

55
const Index = @import("Index.zig");
6+
const Scheduler = @import("utils/Scheduler.zig");
67

78
const Self = @This();
89

@@ -44,6 +45,7 @@ pub const IndexRef = struct {
4445

4546
lock: std.Thread.Mutex = .{},
4647
allocator: std.mem.Allocator,
48+
scheduler: *Scheduler,
4749
dir: std.fs.Dir,
4850
indexes: std.StringHashMap(IndexRef),
4951

@@ -75,9 +77,10 @@ test "isValidName" {
7577
try std.testing.expect(!isValidName(".foo"));
7678
}
7779

78-
pub fn init(allocator: std.mem.Allocator, dir: std.fs.Dir) Self {
80+
pub fn init(allocator: std.mem.Allocator, scheduler: *Scheduler, dir: std.fs.Dir) Self {
7981
return .{
8082
.allocator = allocator,
83+
.scheduler = scheduler,
8184
.dir = dir,
8285
.indexes = std.StringHashMap(IndexRef).init(allocator),
8386
};
@@ -144,7 +147,7 @@ fn acquireIndex(self: *Self, name: []const u8) !*IndexRef {
144147
errdefer self.allocator.free(result.key_ptr.*);
145148

146149
result.value_ptr.* = .{
147-
.index = try Index.init(self.allocator, self.dir, name, .{}),
150+
.index = try Index.init(self.allocator, self.scheduler, self.dir, name, .{}),
148151
.name = result.key_ptr.*,
149152
};
150153
errdefer result.value_ptr.index.deinit();

src/index_tests.zig

+23-7
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ const std = @import("std");
33
const common = @import("common.zig");
44
const Change = @import("change.zig").Change;
55
const SearchResults = common.SearchResults;
6+
const Scheduler = @import("utils/Scheduler.zig");
67

78
const Index = @import("Index.zig");
89

@@ -19,7 +20,10 @@ test "index does not exist" {
1920
var tmp_dir = std.testing.tmpDir(.{});
2021
defer tmp_dir.cleanup();
2122

22-
var index = try Index.init(std.testing.allocator, tmp_dir.dir, "idx", .{});
23+
var scheduler = Scheduler.init(std.testing.allocator);
24+
defer scheduler.deinit();
25+
26+
var index = try Index.init(std.testing.allocator, &scheduler, tmp_dir.dir, "idx", .{});
2327
defer index.deinit();
2428

2529
const result = index.open(false);
@@ -30,7 +34,10 @@ test "index create, update and search" {
3034
var tmp_dir = std.testing.tmpDir(.{});
3135
defer tmp_dir.cleanup();
3236

33-
var index = try Index.init(std.testing.allocator, tmp_dir.dir, "idx", .{});
37+
var scheduler = Scheduler.init(std.testing.allocator);
38+
defer scheduler.deinit();
39+
40+
var index = try Index.init(std.testing.allocator, &scheduler, tmp_dir.dir, "idx", .{});
3441
defer index.deinit();
3542

3643
try index.open(true);
@@ -66,10 +73,13 @@ test "index create, update, reopen and search" {
6673
var tmp_dir = std.testing.tmpDir(.{});
6774
defer tmp_dir.cleanup();
6875

76+
var scheduler = Scheduler.init(std.testing.allocator);
77+
defer scheduler.deinit();
78+
6979
var hashes: [100]u32 = undefined;
7080

7181
{
72-
var index = try Index.init(std.testing.allocator, tmp_dir.dir, "idx", .{});
82+
var index = try Index.init(std.testing.allocator, &scheduler, tmp_dir.dir, "idx", .{});
7383
defer index.deinit();
7484

7585
try index.open(true);
@@ -81,7 +91,7 @@ test "index create, update, reopen and search" {
8191
}
8292

8393
{
84-
var index = try Index.init(std.testing.allocator, tmp_dir.dir, "idx", .{});
94+
var index = try Index.init(std.testing.allocator, &scheduler, tmp_dir.dir, "idx", .{});
8595
defer index.deinit();
8696

8797
try index.open(false);
@@ -102,10 +112,13 @@ test "index many updates" {
102112
var tmp_dir = std.testing.tmpDir(.{});
103113
defer tmp_dir.cleanup();
104114

115+
var scheduler = Scheduler.init(std.testing.allocator);
116+
defer scheduler.deinit();
117+
105118
var hashes: [100]u32 = undefined;
106119

107120
{
108-
var index = try Index.init(std.testing.allocator, tmp_dir.dir, "idx", .{});
121+
var index = try Index.init(std.testing.allocator, &scheduler, tmp_dir.dir, "idx", .{});
109122
defer index.deinit();
110123

111124
try index.open(true);
@@ -118,7 +131,7 @@ test "index many updates" {
118131
}
119132
}
120133

121-
var index = try Index.init(std.testing.allocator, tmp_dir.dir, "idx", .{});
134+
var index = try Index.init(std.testing.allocator, &scheduler, tmp_dir.dir, "idx", .{});
122135
defer index.deinit();
123136

124137
try index.open(false);
@@ -146,7 +159,10 @@ test "index, multiple fingerprints with the same hashes" {
146159
var tmp_dir = std.testing.tmpDir(.{});
147160
defer tmp_dir.cleanup();
148161

149-
var index = try Index.init(std.testing.allocator, tmp_dir.dir, "idx", .{});
162+
var scheduler = Scheduler.init(std.testing.allocator);
163+
defer scheduler.deinit();
164+
165+
var index = try Index.init(std.testing.allocator, &scheduler, tmp_dir.dir, "idx", .{});
150166
defer index.deinit();
151167

152168
try index.open(true);

src/main.zig

+7-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ const std = @import("std");
22
const log = std.log.scoped(.main);
33
const zul = @import("zul");
44

5+
const Scheduler = @import("utils/Scheduler.zig");
56
const MultiIndex = @import("MultiIndex.zig");
67
const server = @import("server.zig");
78
const metrics = @import("metrics.zig");
@@ -76,9 +77,14 @@ pub fn main() !void {
7677

7778
try metrics.initializeMetrics(.{ .prefix = "aindex_" });
7879

79-
var indexes = MultiIndex.init(allocator, dir);
80+
var scheduler = Scheduler.init(allocator);
81+
defer scheduler.deinit();
82+
83+
var indexes = MultiIndex.init(allocator, &scheduler, dir);
8084
defer indexes.deinit();
8185

86+
try scheduler.start(threads);
87+
8288
try server.run(allocator, &indexes, address, port, threads);
8389
}
8490

0 commit comments

Comments
 (0)