Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug fix and small refactoring #378

Merged
merged 3 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions source/vibe/core/sync.d
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,13 @@ struct ManualEvent {

@disable this(this);

private bool canBeFreed()
shared nothrow {
import core.memory : GC;
if (GC.inFinalizer) return true;
return m_waiters.lock.active.empty;
}

private void initialize()
shared nothrow {
m_waiters.initialize(new shared Mutex);
Expand Down Expand Up @@ -1934,7 +1941,7 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) {
shared(bool) m_locked = false;
shared(uint) m_waiters = 0;
shared(ManualEvent) m_signal;
debug Task m_owner;
debug align(16) shared Task m_owner;
}

shared:
Expand All @@ -1947,7 +1954,10 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) {
@trusted bool tryLock()
nothrow {
if (cas(&m_locked, false, true)) {
debug m_owner = Task.getThis();
debug {
auto swapped = cas(&m_owner, Task.init, Task.getThis());
assert(swapped);
}
debug(VibeMutexLog) logTrace("mutex %s lock %s", cast(void*)&this, atomicLoad(m_waiters));
return true;
}
Expand All @@ -1957,7 +1967,11 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) {
@trusted bool lock(Duration timeout = Duration.max)
{
if (tryLock()) return true;
debug assert(m_owner == Task() || m_owner != Task.getThis(), "Recursive mutex lock.");
debug {
auto thist = Task.getThis();
auto owner = atomicLoad(m_owner);
assert(owner == Task.init || owner != thist, "Recursive mutex lock.");
}
atomicOp!"+="(m_waiters, 1);
debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters));
scope(exit) atomicOp!"-="(m_waiters, 1);
Expand All @@ -1979,8 +1993,9 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) {
{
assert(m_locked);
debug {
assert(m_owner == Task.getThis());
m_owner = Task();
auto thist = Task.getThis();
auto swapped = cas(&m_owner, thist, Task());
assert(swapped);
}
atomicStore!(MemoryOrder.rel)(m_locked, false);
debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters));
Expand Down Expand Up @@ -2086,6 +2101,12 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {

@disable this(this);

~this()
nothrow {
assert(m_signal.canBeFreed());
destroy(m_signal);
}

shared:

void setup(shared(LOCKABLE) mtx)
Expand Down
187 changes: 95 additions & 92 deletions source/vibe/core/task.d
Original file line number Diff line number Diff line change
Expand Up @@ -403,126 +403,131 @@ final package class TaskFiber : Fiber {

private void run()
nothrow {
import std.algorithm.mutation : swap;
import std.encoding : sanitize;
import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield, yieldLock;

version (VibeDebugCatchAll) alias UncaughtException = Throwable;
else alias UncaughtException = Exception;
try {
// force creation of a message box/Tid
try thisTid;
catch (Exception e) assert(false, e.msg);

while (true) {
while (!m_taskFunc.func) {
try {
debug (VibeTaskLog) logTrace("putting fiber to sleep waiting for new task...");
static if (__VERSION__ >= 2090) {
import core.memory : GC;
import core.stdc.stdlib : abort;
import std.stdio : stderr;
if (GC.inFinalizer) {
stderr.writeln("Yiedling within finalizer - this would most likely result in a dead-lock!");
abort();
}
}
Fiber.yield();
} catch (Exception e) {
e.logException!(LogLevel.warn)(
"CoreTaskFiber was resumed with exception but without active task");
}
if (m_shutdown) return;
}
handleTasks();
} catch (Throwable th) {
import std.stdio : stderr, writeln;
import core.stdc.stdlib : abort;
try {
stderr.writeln("TaskFiber getting terminated due to an uncaught ", th.classinfo.name);
stderr.writeln(th);
} catch (Exception e) {
try stderr.writeln(th.msg);
catch (Exception e) {}
}
abort();
}
}

debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?");
private void handleTasks()
nothrow {
import std.algorithm.mutation : swap;
import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield, yieldLock;

TaskFuncInfo task;
swap(task, m_taskFunc);
m_dynamicPriority = m_staticPriority = task.settings.priority;
Task handle = this.task;
while (true) {
while (!m_taskFunc.func) {
try {
atomicOp!"|="(m_taskCounterAndFlags, Flags.running); // set running
scope(exit) atomicOp!"&="(m_taskCounterAndFlags, ~Flags.flagsMask); // clear running/initialized

debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle);
if (!isEventLoopRunning) {
debug (VibeTaskLog) logTrace("Event loop not running at task start - yielding.");
taskScheduler.yieldUninterruptible();
debug (VibeTaskLog) logTrace("Initial resume of task.");
debug (VibeTaskLog) logTrace("putting fiber to sleep waiting for new task...");
static if (__VERSION__ >= 2090) {
import core.memory : GC;
import core.stdc.stdlib : abort;
import std.stdio : stderr;
if (GC.inFinalizer) {
stderr.writeln("Yiedling within finalizer - this would most likely result in a dead-lock!");
abort();
}
}
Fiber.yield();
} catch (Exception e) {
e.logException!(LogLevel.warn)(
"CoreTaskFiber was resumed with exception but without active task");
}
if (m_shutdown) return;
}

debug (VibeRunningTasks) s_runningTasks[this] = "initial";
debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?");

TaskFuncInfo task;
swap(task, m_taskFunc);
m_dynamicPriority = m_staticPriority = task.settings.priority;
Task handle = this.task;
try {
atomicOp!"|="(m_taskCounterAndFlags, Flags.running); // set running
scope(exit) atomicOp!"&="(m_taskCounterAndFlags, ~Flags.flagsMask); // clear running/initialized

debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle);
if (!isEventLoopRunning) {
debug (VibeTaskLog) logTrace("Event loop not running at task start - yielding.");
taskScheduler.yieldUninterruptible();
debug (VibeTaskLog) logTrace("Initial resume of task.");
}

task.call();
debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle);
debug (VibeRunningTasks) s_runningTasks[this] = "initial";

debug if (() @trusted { return (cast(shared)this); } ().getTaskStatus().interrupt)
logDebugV("Task exited while an interrupt was in flight.");
} catch (Exception e) {
debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle);
e.logException!(LogLevel.critical)("Task terminated with uncaught exception");
}
task.call();
debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle);

debug (VibeRunningTasks) s_runningTasks.remove(this);
debug if (() @trusted { return (cast(shared)this); } ().getTaskStatus().interrupt)
logDebugV("Task exited while an interrupt was in flight.");
} catch (Exception e) {
debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle);
e.logException!(LogLevel.critical)("Task terminated with uncaught exception");
}

debug assert(Thread.getThis() is m_thread, "Fiber moved?");
debug (VibeRunningTasks) s_runningTasks.remove(this);

// re-create message box if it has been used to guarantee a
// unique Tid per task
if (atomicLoad(m_tidUsed)) {
m_tidInfo.cleanup();
m_tidInfo.ident = Tid.init;
try thisTid;
catch (Exception e) assert(false, e.msg);
atomicStore(m_tidUsed, false);
}
debug assert(Thread.getThis() is m_thread, "Fiber moved?");

debug (VibeTaskLog) logTrace("Notifying joining tasks.");
// re-create message box if it has been used to guarantee a
// unique Tid per task
if (atomicLoad(m_tidUsed)) {
try m_tidInfo.cleanup();
catch (Exception e) logException(e, "Failed to clean up TID after task finished");
m_tidInfo.ident = Tid.init;
try thisTid;
catch (Exception e) assert(false, e.msg);
atomicStore(m_tidUsed, false);
}

// Issue #161: This fiber won't be resumed before the next task
// is assigned, because it is already marked as de-initialized.
// Since ManualEvent.emit() will need to switch tasks, this
// would mean that only the first waiter is notified before
// this fiber gets a new task assigned.
// Using a yield lock forces all corresponding tasks to be
// enqueued into the schedule queue and resumed in sequence
// at the end of the scope.
auto l = yieldLock();
debug (VibeTaskLog) logTrace("Notifying joining tasks.");

m_onExit.emit();
// Issue #161: This fiber won't be resumed before the next task
// is assigned, because it is already marked as de-initialized.
// Since ManualEvent.emit() will need to switch tasks, this
// would mean that only the first waiter is notified before
// this fiber gets a new task assigned.
// Using a yield lock forces all corresponding tasks to be
// enqueued into the schedule queue and resumed in sequence
// at the end of the scope.
auto l = yieldLock();

// make sure that the task does not get left behind in the yielder queue if terminated during yield()
if (m_queue) m_queue.remove(this);
m_onExit.emit();

// zero the fls initialization ByteArray for memory safety
// make sure that the task does not get left behind in the yielder queue if terminated during yield()
if (m_queue) m_queue.remove(this);

// zero the fls initialization ByteArray for memory safety
try {
foreach (size_t i, ref bool b; m_flsInit) {
if (b) {
if (ms_flsInfo !is null && ms_flsInfo.length >= i && ms_flsInfo[i] != FLSInfo.init)
ms_flsInfo[i].destroy(m_fls);
b = false;
}
}
} catch (Exception e) logException(e, "Failed to clean up FLS storage after task finished");

assert(!m_queue, "Fiber done but still scheduled to be resumed!?");
assert(!m_queue, "Fiber done but still scheduled to be resumed!?");

debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?");
debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?");

// make the fiber available for the next task
if (!recycleFiber(this))
return;
}
} catch (UncaughtException th) {
th.logException("CoreTaskFiber was terminated unexpectedly");
} catch (Throwable th) {
import std.stdio : stderr, writeln;
import core.stdc.stdlib : abort;
try stderr.writeln(th);
catch (Exception e) {
try stderr.writeln(th.msg);
catch (Exception e) {}
}
abort();
// make the fiber available for the next task
if (!recycleFiber(this))
return;
}
}

Expand Down Expand Up @@ -1087,8 +1092,6 @@ package struct TaskScheduler {
/// Resumes execution of a yielded task.
private void resumeTask(Task t)
nothrow {
import std.encoding : sanitize;

assert(t != Task.init, "Resuming null task");

debug (VibeTaskLog) logTrace("task fiber resume");
Expand Down