From def5e7bf5a04206c047cb85404f6abbdfa316a78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Ludwig?= Date: Mon, 8 Jan 2024 13:53:15 +0100 Subject: [PATCH] Separate TaskFiber's task handling loop into a separate method. Also removes the VibeDebugCatchAll version statement, as tasks may not throw exceptions anymore anyway. --- source/vibe/core/task.d | 187 ++++++++++++++++++++-------------------- 1 file changed, 95 insertions(+), 92 deletions(-) diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index 1c252bb6..6212b1df 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -403,99 +403,115 @@ final package class TaskFiber : Fiber { private void run() nothrow { - import std.algorithm.mutation : swap; - import std.encoding : sanitize; - import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield, yieldLock; - - version (VibeDebugCatchAll) alias UncaughtException = Throwable; - else alias UncaughtException = Exception; try { // force creation of a message box/Tid try thisTid; catch (Exception e) assert(false, e.msg); - while (true) { - while (!m_taskFunc.func) { - try { - debug (VibeTaskLog) logTrace("putting fiber to sleep waiting for new task..."); - static if (__VERSION__ >= 2090) { - import core.memory : GC; - import core.stdc.stdlib : abort; - import std.stdio : stderr; - if (GC.inFinalizer) { - stderr.writeln("Yiedling within finalizer - this would most likely result in a dead-lock!"); - abort(); - } - } - Fiber.yield(); - } catch (Exception e) { - e.logException!(LogLevel.warn)( - "CoreTaskFiber was resumed with exception but without active task"); - } - if (m_shutdown) return; - } + handleTasks(); + } catch (Throwable th) { + import std.stdio : stderr, writeln; + import core.stdc.stdlib : abort; + try { + stderr.writeln("TaskFiber getting terminated due to an uncaught ", th.classinfo.name); + stderr.writeln(th); + } catch (Exception e) { + try stderr.writeln(th.msg); + catch (Exception e) {} + } + abort(); + } + } - debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?"); + private void handleTasks() + nothrow { + import std.algorithm.mutation : swap; + import vibe.core.core : isEventLoopRunning, recycleFiber, taskScheduler, yield, yieldLock; - TaskFuncInfo task; - swap(task, m_taskFunc); - m_dynamicPriority = m_staticPriority = task.settings.priority; - Task handle = this.task; + while (true) { + while (!m_taskFunc.func) { try { - atomicOp!"|="(m_taskCounterAndFlags, Flags.running); // set running - scope(exit) atomicOp!"&="(m_taskCounterAndFlags, ~Flags.flagsMask); // clear running/initialized - - debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle); - if (!isEventLoopRunning) { - debug (VibeTaskLog) logTrace("Event loop not running at task start - yielding."); - taskScheduler.yieldUninterruptible(); - debug (VibeTaskLog) logTrace("Initial resume of task."); + debug (VibeTaskLog) logTrace("putting fiber to sleep waiting for new task..."); + static if (__VERSION__ >= 2090) { + import core.memory : GC; + import core.stdc.stdlib : abort; + import std.stdio : stderr; + if (GC.inFinalizer) { + stderr.writeln("Yiedling within finalizer - this would most likely result in a dead-lock!"); + abort(); + } } + Fiber.yield(); + } catch (Exception e) { + e.logException!(LogLevel.warn)( + "CoreTaskFiber was resumed with exception but without active task"); + } + if (m_shutdown) return; + } - debug (VibeRunningTasks) s_runningTasks[this] = "initial"; + debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?"); + + TaskFuncInfo task; + swap(task, m_taskFunc); + m_dynamicPriority = m_staticPriority = task.settings.priority; + Task handle = this.task; + try { + atomicOp!"|="(m_taskCounterAndFlags, Flags.running); // set running + scope(exit) atomicOp!"&="(m_taskCounterAndFlags, ~Flags.flagsMask); // clear running/initialized + + debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.start, handle); + if (!isEventLoopRunning) { + debug (VibeTaskLog) logTrace("Event loop not running at task start - yielding."); + taskScheduler.yieldUninterruptible(); + debug (VibeTaskLog) logTrace("Initial resume of task."); + } - task.call(); - debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle); + debug (VibeRunningTasks) s_runningTasks[this] = "initial"; - debug if (() @trusted { return (cast(shared)this); } ().getTaskStatus().interrupt) - logDebugV("Task exited while an interrupt was in flight."); - } catch (Exception e) { - debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle); - e.logException!(LogLevel.critical)("Task terminated with uncaught exception"); - } + task.call(); + debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.end, handle); - debug (VibeRunningTasks) s_runningTasks.remove(this); + debug if (() @trusted { return (cast(shared)this); } ().getTaskStatus().interrupt) + logDebugV("Task exited while an interrupt was in flight."); + } catch (Exception e) { + debug if (ms_taskEventCallback) ms_taskEventCallback(TaskEvent.fail, handle); + e.logException!(LogLevel.critical)("Task terminated with uncaught exception"); + } - debug assert(Thread.getThis() is m_thread, "Fiber moved?"); + debug (VibeRunningTasks) s_runningTasks.remove(this); - // re-create message box if it has been used to guarantee a - // unique Tid per task - if (atomicLoad(m_tidUsed)) { - m_tidInfo.cleanup(); - m_tidInfo.ident = Tid.init; - try thisTid; - catch (Exception e) assert(false, e.msg); - atomicStore(m_tidUsed, false); - } + debug assert(Thread.getThis() is m_thread, "Fiber moved?"); - debug (VibeTaskLog) logTrace("Notifying joining tasks."); + // re-create message box if it has been used to guarantee a + // unique Tid per task + if (atomicLoad(m_tidUsed)) { + try m_tidInfo.cleanup(); + catch (Exception e) logException(e, "Failed to clean up TID after task finished"); + m_tidInfo.ident = Tid.init; + try thisTid; + catch (Exception e) assert(false, e.msg); + atomicStore(m_tidUsed, false); + } - // Issue #161: This fiber won't be resumed before the next task - // is assigned, because it is already marked as de-initialized. - // Since ManualEvent.emit() will need to switch tasks, this - // would mean that only the first waiter is notified before - // this fiber gets a new task assigned. - // Using a yield lock forces all corresponding tasks to be - // enqueued into the schedule queue and resumed in sequence - // at the end of the scope. - auto l = yieldLock(); + debug (VibeTaskLog) logTrace("Notifying joining tasks."); - m_onExit.emit(); + // Issue #161: This fiber won't be resumed before the next task + // is assigned, because it is already marked as de-initialized. + // Since ManualEvent.emit() will need to switch tasks, this + // would mean that only the first waiter is notified before + // this fiber gets a new task assigned. + // Using a yield lock forces all corresponding tasks to be + // enqueued into the schedule queue and resumed in sequence + // at the end of the scope. + auto l = yieldLock(); - // make sure that the task does not get left behind in the yielder queue if terminated during yield() - if (m_queue) m_queue.remove(this); + m_onExit.emit(); - // zero the fls initialization ByteArray for memory safety + // make sure that the task does not get left behind in the yielder queue if terminated during yield() + if (m_queue) m_queue.remove(this); + + // zero the fls initialization ByteArray for memory safety + try { foreach (size_t i, ref bool b; m_flsInit) { if (b) { if (ms_flsInfo !is null && ms_flsInfo.length >= i && ms_flsInfo[i] != FLSInfo.init) @@ -503,26 +519,15 @@ final package class TaskFiber : Fiber { b = false; } } + } catch (Exception e) logException(e, "Failed to clean up FLS storage after task finished"); - assert(!m_queue, "Fiber done but still scheduled to be resumed!?"); + assert(!m_queue, "Fiber done but still scheduled to be resumed!?"); - debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?"); + debug assert(Thread.getThis() is m_thread, "Fiber moved between threads!?"); - // make the fiber available for the next task - if (!recycleFiber(this)) - return; - } - } catch (UncaughtException th) { - th.logException("CoreTaskFiber was terminated unexpectedly"); - } catch (Throwable th) { - import std.stdio : stderr, writeln; - import core.stdc.stdlib : abort; - try stderr.writeln(th); - catch (Exception e) { - try stderr.writeln(th.msg); - catch (Exception e) {} - } - abort(); + // make the fiber available for the next task + if (!recycleFiber(this)) + return; } } @@ -1087,8 +1092,6 @@ package struct TaskScheduler { /// Resumes execution of a yielded task. private void resumeTask(Task t) nothrow { - import std.encoding : sanitize; - assert(t != Task.init, "Resuming null task"); debug (VibeTaskLog) logTrace("task fiber resume");