Skip to content

Commit 9ac89b2

Browse files
committed
Tasks with multiple arguments
1 parent 6549d44 commit 9ac89b2

File tree

2 files changed

+26
-11
lines changed

2 files changed

+26
-11
lines changed

src/Index.zig

+3-3
Original file line numberDiff line numberDiff line change
@@ -322,9 +322,9 @@ fn maybeMergeMemorySegments(self: *Self) !bool {
322322
pub fn open(self: *Self, create: bool) !void {
323323
const last_commit_id = try self.loadSegments(create);
324324

325-
self.checkpoint_task = try self.scheduler.createTask(.medium, checkpointTask, self);
326-
self.memory_segment_merge_task = try self.scheduler.createTask(.high, memorySegmentMergeTask, self);
327-
self.file_segment_merge_task = try self.scheduler.createTask(.low, fileSegmentMergeTask, self);
325+
self.checkpoint_task = try self.scheduler.createTask(.medium, checkpointTask, .{self});
326+
self.memory_segment_merge_task = try self.scheduler.createTask(.high, memorySegmentMergeTask, .{self});
327+
self.file_segment_merge_task = try self.scheduler.createTask(.low, fileSegmentMergeTask, .{self});
328328

329329
try self.oplog.open(last_commit_id + 1, updateInternal, self);
330330

src/utils/Scheduler.zig

+23-8
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ const TaskStatus = struct {
1414
done: std.Thread.ResetEvent = .{},
1515
priority: Priority,
1616
ctx: *anyopaque,
17-
func: *const fn (ctx: *anyopaque) void,
17+
runFn: *const fn (ctx: *anyopaque) void,
18+
deinitFn: *const fn (ctx: *anyopaque, allocator: std.mem.Allocator) void,
1819
};
1920

2021
const Queue = std.DoublyLinkedList(TaskStatus);
@@ -45,24 +46,37 @@ pub fn deinit(self: *Self) void {
4546
std.debug.assert(self.num_tasks == 0);
4647
}
4748

48-
pub fn createTask(self: *Self, priority: Priority, func: anytype, ctx: anytype) !Task {
49+
pub fn createTask(self: *Self, priority: Priority, comptime func: anytype, args: anytype) !Task {
4950
self.queue_mutex.lock();
5051
defer self.queue_mutex.unlock();
5152

5253
const task = try self.allocator.create(Queue.Node);
5354
errdefer self.allocator.destroy(task);
5455

55-
const Wrapper = struct {
56-
pub fn run(c: *anyopaque) void {
57-
@call(.auto, func, .{@as(@TypeOf(ctx), @ptrCast(@alignCast(c)))});
56+
const Args = @TypeOf(args);
57+
const Closure = struct {
58+
arguments: Args,
59+
60+
fn deinit(ctx: *anyopaque, allocator: std.mem.Allocator) void {
61+
const closure: *@This() = @ptrCast(@alignCast(ctx));
62+
allocator.destroy(closure);
63+
}
64+
65+
fn run(ctx: *anyopaque) void {
66+
const closure: *@This() = @ptrCast(@alignCast(ctx));
67+
@call(.auto, func, closure.arguments);
5868
}
5969
};
6070

71+
const closure = try self.allocator.create(Closure);
72+
errdefer self.allocator.destroy(closure);
73+
6174
task.* = .{
6275
.data = .{
6376
.priority = priority,
64-
.ctx = ctx,
65-
.func = Wrapper.run,
77+
.ctx = closure,
78+
.runFn = Closure.run,
79+
.deinitFn = Closure.deinit,
6680
},
6781
};
6882
task.data.done.set();
@@ -81,6 +95,7 @@ pub fn destroyTask(self: *Self, task: Task) void {
8195

8296
task.data.done.wait();
8397

98+
task.data.deinitFn(task.data.ctx, self.allocator);
8499
self.allocator.destroy(task);
85100

86101
std.debug.assert(self.num_tasks > 0);
@@ -141,7 +156,7 @@ fn workerThreadFunc(self: *Self) void {
141156
const task = self.getTaskToRun() orelse break;
142157
defer self.markAsDone(task);
143158

144-
task.data.func(task.data.ctx);
159+
task.data.runFn(task.data.ctx);
145160
}
146161
}
147162

0 commit comments

Comments
 (0)