From ff77167ca28cb6f68dcf0defdabc86809dbf1d3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 8 Jan 2024 13:53:15 +0100 Subject: [PATCH 1/3] Separate TaskFiber's task handling loop into a separate method. Also removes the VibeDebugCatchAll version statement, as tasks may not throw exceptions anymore anyway. --- source/vibe/core/task.d | 187 ++++++++++++++++++++-------------------- 1 file changed, 95 insertions(+), 92 deletions(-) diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index 1c252bb6..6212b1df 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -403,99 +403,115 @@ final package class TaskFiber : Fiber { private void run() nothrow { - import std.algorithm.mutation : swap; - import std.encoding : sanitize; - import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield, yieldLock; - - version (VibeDebugCatchAll) alias UncaughtException = Throwable; - else alias UncaughtException = Exception; try { // force creation of a message box/Tid try thisTid; catch (Exception e) assert(false, e.msg); - while (true) { - while (!m_taskFunc.func) { - try { - debug (VibeTaskLog) logTrace("putting fiber to sleep waiting for new task..."); - static if (__VERSION__ >= 2090) { - import core.memory : GC; - import core.stdc.stdlib : abort; - import std.stdio : stderr; - if (GC.inFinalizer) { - stderr.writeln("Yiedling within finalizer - this would most likely result in a dead-lock!"); - abort(); - } - } - Fiber.yield(); - } catch (Exception e) { - e.logException!(LogLevel.warn)( - "CoreTaskFiber was resumed with exception but without active task"); - } - if (m_shutdown) return; - } + handleTasks(); + } catch (Throwable th) { + import std.stdio : stderr, writeln; + import core.stdc.stdlib : abort; + try { + stderr.writeln("TaskFiber getting terminated due to an uncaught ", th.classinfo.name); + stderr.writeln(th); + } catch (Exception e) { + try stderr.writeln(th.msg); + catch (Exception e) {} + } + abort(); + } + } - debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?"); + private void handleTasks() + nothrow { + import std.algorithm.mutation : swap; + import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield, yieldLock; - TaskFuncInfo task; - swap(task, m_taskFunc); - m_dynamicPriority = m_staticPriority = task.settings.priority; - Task handle = this.task; + while (true) { + while (!m_taskFunc.func) { try { - atomicOp!"|="(m_taskCounterAndFlags, Flags.running); // set running - scope(exit) atomicOp!"&="(m_taskCounterAndFlags, ~Flags.flagsMask); // clear running/initialized - - debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle); - if (!isEventLoopRunning) { - debug (VibeTaskLog) logTrace("Event loop not running at task start - yielding."); - taskScheduler.yieldUninterruptible(); - debug (VibeTaskLog) logTrace("Initial resume of task."); + debug (VibeTaskLog) logTrace("putting fiber to sleep waiting for new task..."); + static if (__VERSION__ >= 2090) { + import core.memory : GC; + import core.stdc.stdlib : abort; + import std.stdio : stderr; + if (GC.inFinalizer) { + stderr.writeln("Yiedling within finalizer - this would most likely result in a dead-lock!"); + abort(); + } } + Fiber.yield(); + } catch (Exception e) { + e.logException!(LogLevel.warn)( + "CoreTaskFiber was resumed with exception but without active task"); + } + if (m_shutdown) return; + } - debug (VibeRunningTasks) s_runningTasks[this] = "initial"; + debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?"); + + TaskFuncInfo task; + swap(task, m_taskFunc); + m_dynamicPriority = m_staticPriority = task.settings.priority; + Task handle = this.task; + try { + atomicOp!"|="(m_taskCounterAndFlags, Flags.running); // set running + scope(exit) atomicOp!"&="(m_taskCounterAndFlags, ~Flags.flagsMask); // clear running/initialized + + debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle); + if (!isEventLoopRunning) { + debug (VibeTaskLog) logTrace("Event loop not running at task start - yielding."); + taskScheduler.yieldUninterruptible(); + debug (VibeTaskLog) logTrace("Initial resume of task."); + } - task.call(); - debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle); + debug (VibeRunningTasks) s_runningTasks[this] = "initial"; - debug if (() @trusted { return (cast(shared)this); } ().getTaskStatus().interrupt) - logDebugV("Task exited while an interrupt was in flight."); - } catch (Exception e) { - debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle); - e.logException!(LogLevel.critical)("Task terminated with uncaught exception"); - } + task.call(); + debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle); - debug (VibeRunningTasks) s_runningTasks.remove(this); + debug if (() @trusted { return (cast(shared)this); } ().getTaskStatus().interrupt) + logDebugV("Task exited while an interrupt was in flight."); + } catch (Exception e) { + debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle); + e.logException!(LogLevel.critical)("Task terminated with uncaught exception"); + } - debug assert(Thread.getThis() is m_thread, "Fiber moved?"); + debug (VibeRunningTasks) s_runningTasks.remove(this); - // re-create message box if it has been used to guarantee a - // unique Tid per task - if (atomicLoad(m_tidUsed)) { - m_tidInfo.cleanup(); - m_tidInfo.ident = Tid.init; - try thisTid; - catch (Exception e) assert(false, e.msg); - atomicStore(m_tidUsed, false); - } + debug assert(Thread.getThis() is m_thread, "Fiber moved?"); - debug (VibeTaskLog) logTrace("Notifying joining tasks."); + // re-create message box if it has been used to guarantee a + // unique Tid per task + if (atomicLoad(m_tidUsed)) { + try m_tidInfo.cleanup(); + catch (Exception e) logException(e, "Failed to clean up TID after task finished"); + m_tidInfo.ident = Tid.init; + try thisTid; + catch (Exception e) assert(false, e.msg); + atomicStore(m_tidUsed, false); + } - // Issue #161: This fiber won't be resumed before the next task - // is assigned, because it is already marked as de-initialized. - // Since ManualEvent.emit() will need to switch tasks, this - // would mean that only the first waiter is notified before - // this fiber gets a new task assigned. - // Using a yield lock forces all corresponding tasks to be - // enqueued into the schedule queue and resumed in sequence - // at the end of the scope. - auto l = yieldLock(); + debug (VibeTaskLog) logTrace("Notifying joining tasks."); - m_onExit.emit(); + // Issue #161: This fiber won't be resumed before the next task + // is assigned, because it is already marked as de-initialized. + // Since ManualEvent.emit() will need to switch tasks, this + // would mean that only the first waiter is notified before + // this fiber gets a new task assigned. + // Using a yield lock forces all corresponding tasks to be + // enqueued into the schedule queue and resumed in sequence + // at the end of the scope. + auto l = yieldLock(); - // make sure that the task does not get left behind in the yielder queue if terminated during yield() - if (m_queue) m_queue.remove(this); + m_onExit.emit(); - // zero the fls initialization ByteArray for memory safety + // make sure that the task does not get left behind in the yielder queue if terminated during yield() + if (m_queue) m_queue.remove(this); + + // zero the fls initialization ByteArray for memory safety + try { foreach (size_t i, ref bool b; m_flsInit) { if (b) { if (ms_flsInfo !is null && ms_flsInfo.length >= i && ms_flsInfo[i] != FLSInfo.init) @@ -503,26 +519,15 @@ final package class TaskFiber : Fiber { b = false; } } + } catch (Exception e) logException(e, "Failed to clean up FLS storage after task finished"); - assert(!m_queue, "Fiber done but still scheduled to be resumed!?"); + assert(!m_queue, "Fiber done but still scheduled to be resumed!?"); - debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?"); + debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?"); - // make the fiber available for the next task - if (!recycleFiber(this)) - return; - } - } catch (UncaughtException th) { - th.logException("CoreTaskFiber was terminated unexpectedly"); - } catch (Throwable th) { - import std.stdio : stderr, writeln; - import core.stdc.stdlib : abort; - try stderr.writeln(th); - catch (Exception e) { - try stderr.writeln(th.msg); - catch (Exception e) {} - } - abort(); + // make the fiber available for the next task + if (!recycleFiber(this)) + return; } } @@ -1087,8 +1092,6 @@ package struct TaskScheduler { /// Resumes execution of a yielded task. private void resumeTask(Task t) nothrow { - import std.encoding : sanitize; - assert(t != Task.init, "Resuming null task"); debug (VibeTaskLog) logTrace("task fiber resume"); From c7311b666e7dfc39521db3ec9ed311ff7854ea44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 8 Jan 2024 13:55:51 +0100 Subject: [PATCH 2/3] Fix race-condition in TaskMutex debug code. m_owner could change between the two conditions checked in the assertion in lock(), causing a false assertion failure. Using atomic operations to avoid that. --- source/vibe/core/sync.d | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index c19b26ee..5bbcfc1a 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -1934,7 +1934,7 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) { shared(bool) m_locked = false; shared(uint) m_waiters = 0; shared(ManualEvent) m_signal; - debug Task m_owner; + debug shared Task m_owner; } shared: @@ -1947,7 +1947,10 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) { @trusted bool tryLock() nothrow { if (cas(&m_locked, false, true)) { - debug m_owner = Task.getThis(); + debug { + auto swapped = cas(&m_owner, Task.init, Task.getThis()); + assert(swapped); + } debug(VibeMutexLog) logTrace("mutex %s lock %s", cast(void*)&this, atomicLoad(m_waiters)); return true; } @@ -1957,7 +1960,11 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) { @trusted bool lock(Duration timeout = Duration.max) { if (tryLock()) return true; - debug assert(m_owner == Task() || m_owner != Task.getThis(), "Recursive mutex lock."); + debug { + auto thist = Task.getThis(); + auto owner = atomicLoad(m_owner); + assert(owner == Task.init || owner != thist, "Recursive mutex lock."); + } atomicOp!"+="(m_waiters, 1); debug(VibeMutexLog) logTrace("mutex %s wait %s", cast(void*)&this, atomicLoad(m_waiters)); scope(exit) atomicOp!"-="(m_waiters, 1); @@ -1979,8 +1986,9 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) { { assert(m_locked); debug { - assert(m_owner == Task.getThis()); - m_owner = Task(); + auto thist = Task.getThis(); + auto swapped = cas(&m_owner, thist, Task()); + assert(swapped); } atomicStore!(MemoryOrder.rel)(m_locked, false); debug(VibeMutexLog) logTrace("mutex %s unlock %s", cast(void*)&this, atomicLoad(m_waiters)); From 1f63517ccc7a273cf5fbe188c3aa2f4b9488d58f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 8 Jan 2024 16:02:47 +0100 Subject: [PATCH 3/3] Fix field alignment for atomic operations. --- source/vibe/core/sync.d | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/source/vibe/core/sync.d b/source/vibe/core/sync.d index 5bbcfc1a..35a62116 100644 --- a/source/vibe/core/sync.d +++ b/source/vibe/core/sync.d @@ -1354,6 +1354,13 @@ struct ManualEvent { @disable this(this); + private bool canBeFreed() + shared nothrow { + import core.memory : GC; + if (GC.inFinalizer) return true; + return m_waiters.lock.active.empty; + } + private void initialize() shared nothrow { m_waiters.initialize(new shared Mutex); @@ -1934,7 +1941,7 @@ private struct TaskMutexImpl(bool INTERRUPTIBLE) { shared(bool) m_locked = false; shared(uint) m_waiters = 0; shared(ManualEvent) m_signal; - debug shared Task m_owner; + debug align(16) shared Task m_owner; } shared: @@ -2094,6 +2101,12 @@ private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { @disable this(this); + ~this() + nothrow { + assert(m_signal.canBeFreed()); + destroy(m_signal); + } + shared: void setup(shared(LOCKABLE) mtx)