Skip to content

Commit

Permalink
Separate TaskFiber's task handling loop into a separate method.
Browse files Browse the repository at this point in the history
Also removes the VibeDebugCatchAll version statement, as tasks may not throw exceptions anymore anyway.
  • Loading branch information
s-ludwig committed Jan 8, 2024
1 parent f4c6b75 commit def5e7b
Showing 1 changed file with 95 additions and 92 deletions.
187 changes: 95 additions & 92 deletions source/vibe/core/task.d
Original file line number Diff line number Diff line change
Expand Up @@ -403,126 +403,131 @@ final package class TaskFiber : Fiber {

private void run()
nothrow {
import std.algorithm.mutation : swap;
import std.encoding : sanitize;
import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield, yieldLock;

version (VibeDebugCatchAll) alias UncaughtException = Throwable;
else alias UncaughtException = Exception;
try {
// force creation of a message box/Tid
try thisTid;
catch (Exception e) assert(false, e.msg);

while (true) {
while (!m_taskFunc.func) {
try {
debug (VibeTaskLog) logTrace("putting fiber to sleep waiting for new task...");
static if (__VERSION__ >= 2090) {
import core.memory : GC;
import core.stdc.stdlib : abort;
import std.stdio : stderr;
if (GC.inFinalizer) {
stderr.writeln("Yiedling within finalizer - this would most likely result in a dead-lock!");
abort();
}
}
Fiber.yield();
} catch (Exception e) {
e.logException!(LogLevel.warn)(
"CoreTaskFiber was resumed with exception but without active task");
}
if (m_shutdown) return;
}
handleTasks();
} catch (Throwable th) {
import std.stdio : stderr, writeln;
import core.stdc.stdlib : abort;
try {
stderr.writeln("TaskFiber getting terminated due to an uncaught ", th.classinfo.name);
stderr.writeln(th);
} catch (Exception e) {
try stderr.writeln(th.msg);
catch (Exception e) {}
}
abort();
}
}

debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?");
private void handleTasks()
nothrow {
import std.algorithm.mutation : swap;
import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield, yieldLock;

TaskFuncInfo task;
swap(task, m_taskFunc);
m_dynamicPriority = m_staticPriority = task.settings.priority;
Task handle = this.task;
while (true) {
while (!m_taskFunc.func) {
try {
atomicOp!"|="(m_taskCounterAndFlags, Flags.running); // set running
scope(exit) atomicOp!"&="(m_taskCounterAndFlags, ~Flags.flagsMask); // clear running/initialized

debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle);
if (!isEventLoopRunning) {
debug (VibeTaskLog) logTrace("Event loop not running at task start - yielding.");
taskScheduler.yieldUninterruptible();
debug (VibeTaskLog) logTrace("Initial resume of task.");
debug (VibeTaskLog) logTrace("putting fiber to sleep waiting for new task...");
static if (__VERSION__ >= 2090) {
import core.memory : GC;
import core.stdc.stdlib : abort;
import std.stdio : stderr;
if (GC.inFinalizer) {
stderr.writeln("Yiedling within finalizer - this would most likely result in a dead-lock!");
abort();
}
}
Fiber.yield();
} catch (Exception e) {
e.logException!(LogLevel.warn)(
"CoreTaskFiber was resumed with exception but without active task");
}
if (m_shutdown) return;
}

debug (VibeRunningTasks) s_runningTasks[this] = "initial";
debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?");

TaskFuncInfo task;
swap(task, m_taskFunc);
m_dynamicPriority = m_staticPriority = task.settings.priority;
Task handle = this.task;
try {
atomicOp!"|="(m_taskCounterAndFlags, Flags.running); // set running
scope(exit) atomicOp!"&="(m_taskCounterAndFlags, ~Flags.flagsMask); // clear running/initialized

debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle);
if (!isEventLoopRunning) {
debug (VibeTaskLog) logTrace("Event loop not running at task start - yielding.");
taskScheduler.yieldUninterruptible();
debug (VibeTaskLog) logTrace("Initial resume of task.");
}

task.call();
debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle);
debug (VibeRunningTasks) s_runningTasks[this] = "initial";

debug if (() @trusted { return (cast(shared)this); } ().getTaskStatus().interrupt)
logDebugV("Task exited while an interrupt was in flight.");
} catch (Exception e) {
debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle);
e.logException!(LogLevel.critical)("Task terminated with uncaught exception");
}
task.call();
debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle);

debug (VibeRunningTasks) s_runningTasks.remove(this);
debug if (() @trusted { return (cast(shared)this); } ().getTaskStatus().interrupt)
logDebugV("Task exited while an interrupt was in flight.");
} catch (Exception e) {
debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle);
e.logException!(LogLevel.critical)("Task terminated with uncaught exception");
}

debug assert(Thread.getThis() is m_thread, "Fiber moved?");
debug (VibeRunningTasks) s_runningTasks.remove(this);

// re-create message box if it has been used to guarantee a
// unique Tid per task
if (atomicLoad(m_tidUsed)) {
m_tidInfo.cleanup();
m_tidInfo.ident = Tid.init;
try thisTid;
catch (Exception e) assert(false, e.msg);
atomicStore(m_tidUsed, false);
}
debug assert(Thread.getThis() is m_thread, "Fiber moved?");

debug (VibeTaskLog) logTrace("Notifying joining tasks.");
// re-create message box if it has been used to guarantee a
// unique Tid per task
if (atomicLoad(m_tidUsed)) {
try m_tidInfo.cleanup();
catch (Exception e) logException(e, "Failed to clean up TID after task finished");
m_tidInfo.ident = Tid.init;
try thisTid;
catch (Exception e) assert(false, e.msg);
atomicStore(m_tidUsed, false);
}

// Issue #161: This fiber won't be resumed before the next task
// is assigned, because it is already marked as de-initialized.
// Since ManualEvent.emit() will need to switch tasks, this
// would mean that only the first waiter is notified before
// this fiber gets a new task assigned.
// Using a yield lock forces all corresponding tasks to be
// enqueued into the schedule queue and resumed in sequence
// at the end of the scope.
auto l = yieldLock();
debug (VibeTaskLog) logTrace("Notifying joining tasks.");

m_onExit.emit();
// Issue #161: This fiber won't be resumed before the next task
// is assigned, because it is already marked as de-initialized.
// Since ManualEvent.emit() will need to switch tasks, this
// would mean that only the first waiter is notified before
// this fiber gets a new task assigned.
// Using a yield lock forces all corresponding tasks to be
// enqueued into the schedule queue and resumed in sequence
// at the end of the scope.
auto l = yieldLock();

// make sure that the task does not get left behind in the yielder queue if terminated during yield()
if (m_queue) m_queue.remove(this);
m_onExit.emit();

// zero the fls initialization ByteArray for memory safety
// make sure that the task does not get left behind in the yielder queue if terminated during yield()
if (m_queue) m_queue.remove(this);

// zero the fls initialization ByteArray for memory safety
try {
foreach (size_t i, ref bool b; m_flsInit) {
if (b) {
if (ms_flsInfo !is null && ms_flsInfo.length >= i && ms_flsInfo[i] != FLSInfo.init)
ms_flsInfo[i].destroy(m_fls);
b = false;
}
}
} catch (Exception e) logException(e, "Failed to clean up FLS storage after task finished");

assert(!m_queue, "Fiber done but still scheduled to be resumed!?");
assert(!m_queue, "Fiber done but still scheduled to be resumed!?");

debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?");
debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?");

// make the fiber available for the next task
if (!recycleFiber(this))
return;
}
} catch (UncaughtException th) {
th.logException("CoreTaskFiber was terminated unexpectedly");
} catch (Throwable th) {
import std.stdio : stderr, writeln;
import core.stdc.stdlib : abort;
try stderr.writeln(th);
catch (Exception e) {
try stderr.writeln(th.msg);
catch (Exception e) {}
}
abort();
// make the fiber available for the next task
if (!recycleFiber(this))
return;
}
}

Expand Down Expand Up @@ -1087,8 +1092,6 @@ package struct TaskScheduler {
/// Resumes execution of a yielded task.
private void resumeTask(Task t)
nothrow {
import std.encoding : sanitize;

assert(t != Task.init, "Resuming null task");

debug (VibeTaskLog) logTrace("task fiber resume");
Expand Down

0 comments on commit def5e7b

Please sign in to comment.