diff --git a/source/vibe/core/channel.d b/source/vibe/core/channel.d index 5e74a765..d9093307 100644 --- a/source/vibe/core/channel.d +++ b/source/vibe/core/channel.d @@ -155,6 +155,7 @@ struct Channel(T, size_t buffer_size = 100) { private final class ChannelImpl(T, size_t buffer_size) { import vibe.core.concurrency : isWeaklyIsolated; + import vibe.internal.allocator : Mallocator, makeGCSafe, disposeGCSafe; static assert(isWeaklyIsolated!T, "Channel data type "~T.stringof~" is not safe to pass between threads."); private { @@ -168,8 +169,8 @@ private final class ChannelImpl(T, size_t buffer_size) { this(ChannelConfig config) shared @trusted nothrow { - m_mutex = cast(shared)new Mutex; - m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex); + m_mutex = cast(shared)Mallocator.instance.makeGCSafe!Mutex(); + m_condition = cast(shared)Mallocator.instance.makeGCSafe!TaskCondition(cast()m_mutex); m_config = config; } @@ -183,11 +184,20 @@ private final class ChannelImpl(T, size_t buffer_size) { private void releaseRef() @safe nothrow shared { - m_mutex.lock_nothrow(); - scope (exit) m_mutex.unlock_nothrow(); - auto thisus = () @trusted { return cast(ChannelImpl)this; } (); - if (--thisus.m_refCount == 0) { - try () @trusted { destroy(m_condition); } (); + bool destroy = false; + { + m_mutex.lock_nothrow(); + scope (exit) m_mutex.unlock_nothrow(); + auto thisus = () @trusted { return cast(ChannelImpl)this; } (); + if (--thisus.m_refCount == 0) + destroy = true; + } + + if (destroy) { + try () @trusted { + Mallocator.instance.disposeGCSafe(m_condition); + Mallocator.instance.disposeGCSafe(m_mutex); + } (); catch (Exception e) assert(false); } } diff --git a/source/vibe/core/concurrency.d b/source/vibe/core/concurrency.d index b18cab89..68c512c9 100644 --- a/source/vibe/core/concurrency.d +++ b/source/vibe/core/concurrency.d @@ -18,6 +18,7 @@ import std.typecons; import std.typetuple; import std.string; import vibe.core.task; +import vibe.core.taskpool; private extern (C) pure nothrow void _d_monitorenter(Object h); @@ -1233,6 +1234,129 @@ Future!(ReturnType!CALLABLE) asyncWork(CALLABLE, ARGS...)(CALLABLE callable, ARG } +/** Performs work in a worker task and waits for the result to be available. + + Other tasks can continue to run in the calling thread while the worker task + performs the function call. + + Params: + pool = Optional task pool instance to use, uses the default worker pool by default + callable = Function or function-like object to call - must be weakly isolated and `nothrow` + args = Arguments to pass to callable - must be weakly isolated + + See_also: `isWeaklyIsolated`, `asyncWork` +*/ +ReturnType!CALLABLE performInWorker(CALLABLE, ARGS...) + (CALLABLE callable, ARGS arguments) + if (is(typeof(callable(arguments)) == ReturnType!CALLABLE) && + isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS) +{ + import vibe.core.core : workerTaskPool; + return performInWorker!(CALLABLE, ARGS)(workerTaskPool, callable, arguments); +} +/// ditto +ReturnType!CALLABLE performInWorker(CALLABLE, ARGS...) + (shared(TaskPool) pool, CALLABLE callable, ARGS arguments) + if (is(typeof(callable(arguments)) == ReturnType!CALLABLE) && + isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS) +{ + import core.sync.mutex : Mutex; + import vibe.core.sync : TaskCondition, scopedMutexLock; + + // TODO: this could improve performance by re-using the Mutex/TaskCondition + // instances for later calls. When this optimization gets implemented, + // care should be taken to clean up the cached instances before the + // process/thread exits, so that nothing gets leaked to the GC + + static shared(Mutex) s_mutex; + static struct CTX { + CALLABLE callable; + ARGS args; + ReturnType!CALLABLE result; + shared(Mutex) mutex; + shared(TaskCondition) condition; + bool done; + } + + // set up context + CTX ctx = CTX(callable, arguments); + auto mtx = new Mutex; + ctx.mutex = () @trusted { return cast(shared)mtx; } (); + ctx.condition = () @trusted { return cast(shared)new TaskCondition(mtx); } (); + + // start worker task + pool.runTask((shared(CTX)* ctx_) nothrow { + auto ctx = () @trusted { return cast(CTX*)ctx_; } (); + auto res = ctx.callable(ctx.args); + { + auto l = scopedMutexLock(ctx.mutex); + ctx.result = res; + ctx.done = true; + ctx.condition.notifyAll(); + } + }, () @trusted { return cast(shared)&ctx; } ()); + + scope (failure) assert(false); + + { // wait for result + auto l = scopedMutexLock(ctx.mutex); + while (!ctx.done) ctx.condition.wait(); + } + + // clean up resources + () @trusted { + import core.memory : GC; + destroy(ctx.condition); + destroy(ctx.mutex); + // NOTE: the GC will otherwise call the destructor on the destroy()ed + // Mutex, which causes a failure in DeleteCriticalSection on + // Windows + GC.free(cast(void*)ctx.mutex); + } (); + + return ctx.result; +} + +/// +@safe unittest { + import vibe.core.core : runTask, sleepUninterruptible; + import vibe.core.log : logInfo; + + // runs in parallel to the heavy work + int cnt = 0; + auto t = runTask({ + foreach (i; 0 .. 100) { + sleepUninterruptible(1.msecs); + cnt++; + } + }); + + // perform some heavy CPU work in a worker thread, while the background task + // continues to run unaffected + auto res = performInWorker((long start_value) { + auto tm = MonoTime.currTime; + long res = start_value; + // simulate some CPU workload for 50 ms + while (MonoTime.currTime - tm < 50.msecs) { + res++; + res *= res; + if (!res) res++; + } + return res; + }, 1234); + + logInfo("Result: %s, background task has made %s rounds", res, cnt); + + // should always receive a non-zero result + assert(res != 0); + // background task should have made roughly 50 rounds up to here + version (OSX) {} else assert(cnt > 25 && cnt < 75); + + // make sure our background task also finishes + t.join(); +} + + /******************************************************************************/ /* std.concurrency compatible interface for message passing */ /******************************************************************************/ diff --git a/source/vibe/core/core.d b/source/vibe/core/core.d index f0520d0e..92b350cb 100644 --- a/source/vibe/core/core.d +++ b/source/vibe/core/core.d @@ -458,6 +458,38 @@ unittest { // ensure task.running is true directly after runTask assert(hit); } +unittest { + import core.atomic : atomicOp; + + static struct S { + shared(int)* rc; + this(this) @safe nothrow { if (rc) atomicOp!"+="(*rc, 1); } + ~this() @safe nothrow { if (rc) atomicOp!"-="(*rc, 1); } + } + + S s; + s.rc = new int; + *s.rc = 1; + + runTask((ref S sc) { + auto rc = sc.rc; + assert(*rc == 2); + sc = S.init; + assert(*rc == 1); + }, s).joinUninterruptible(); + + assert(*s.rc == 1); + + runWorkerTaskH((ref S sc) { + auto rc = sc.rc; + assert(*rc == 2); + sc = S.init; + assert(*rc == 1); + }, s).joinUninterruptible(); + + assert(*s.rc == 1); +} + /** Runs a new asynchronous task in a worker thread. @@ -1752,13 +1784,8 @@ package @property ref TaskScheduler taskScheduler() @safe nothrow @nogc { return package void recycleFiber(TaskFiber fiber) @safe nothrow { if (s_availableFibers.length >= s_maxRecycledFibers) { - auto fl = s_availableFibers.front; - s_availableFibers.popFront(); - fl.shutdown(); - () @trusted { - try destroy(fl); - catch (Exception e) logWarn("Failed to destroy fiber: %s", e.msg); - } (); + fiber.shutdown(); + return; } if (s_availableFibers.full) @@ -1889,10 +1916,8 @@ static ~this() shutdownWorkerPool(); } - foreach (f; s_availableFibers) { + foreach (f; s_availableFibers) f.shutdown(); - destroy(f); - } ManualEvent.freeThreadResources(); diff --git a/source/vibe/core/file.d b/source/vibe/core/file.d index d6e3b518..4b6887ad 100644 --- a/source/vibe/core/file.d +++ b/source/vibe/core/file.d @@ -204,7 +204,7 @@ void moveFile(NativePath from, NativePath to, bool copy_fallback = false) /// ditto void moveFile(string from, string to, bool copy_fallback = false) { - auto fail = performInWorker((string from, string to) { + auto fail = performInIOWorker((string from, string to) { try { std.file.rename(from, to); } catch (Exception e) { @@ -287,7 +287,7 @@ void removeFile(NativePath path) /// ditto void removeFile(string path) { - auto fail = performInWorker((string path) { + auto fail = performInIOWorker((string path) { try { std.file.remove(path); } catch (Exception e) { @@ -309,7 +309,7 @@ bool existsFile(NativePath path) nothrow /// ditto bool existsFile(string path) nothrow { - try return performInWorker((string p) => std.file.exists(p), path); + try return performInIOWorker((string p) => std.file.exists(p), path); catch (Exception e) { logDebug("Failed to determine file existence for '%s': %s", path, e.msg); return false; @@ -329,7 +329,7 @@ FileInfo getFileInfo(string path) { import std.typecons : tuple; - auto ret = performInWorker((string p) { + auto ret = performInIOWorker((string p) { try { auto ent = DirEntry(p); return tuple(makeFileInfo(ent), ""); @@ -428,7 +428,7 @@ void createDirectory(NativePath path, Flag!"recursive" recursive) /// ditto void createDirectory(string path, Flag!"recursive" recursive = No.recursive) { - auto fail = performInWorker((string p, bool rec) { + auto fail = performInIOWorker((string p, bool rec) { try { if (rec) mkdirRecurse(p); else mkdir(p); @@ -472,7 +472,7 @@ void listDirectory(NativePath path, DirectoryListMode mode, req.directoryPredicate = directory_predicate; // NOTE: working around bogus "assigning scope variable warning on DMD 2.101.2 here with @trusted - ioWorkerTaskPool.runTask(ioTaskSettings, &performListDirectory, () @trusted { return req; } ()); + ioWorkerTaskPool.runTask(&performListDirectory, () @trusted { return req; } ()); ListDirectoryData itm; while (req.channel.tryConsumeOne(itm)) { @@ -1026,32 +1026,11 @@ unittest { } -private auto performInWorker(C, ARGS...)(C callable, auto ref ARGS args) +private auto performInIOWorker(C, ARGS...)(C callable, auto ref ARGS args) { - version (none) { - import vibe.core.concurrency : asyncWork; - return asyncWork(callable, args).getResult(); - } else { - import vibe.core.core : ioWorkerTaskPool; - import core.atomic : atomicFence; - import std.concurrency : Tid, send, receiveOnly, thisTid; - - struct R {} - - alias RET = typeof(callable(args)); - shared(RET) ret; - ioWorkerTaskPool.runTask(ioTaskSettings, (shared(RET)* r, Tid caller, C c, ref ARGS a) nothrow { - *() @trusted { return cast(RET*)r; } () = c(a); - // Just as a precaution, because ManualEvent is not well defined in - // terms of fence semantics - atomicFence(); - try caller.send(R.init); - catch (Exception e) assert(false, e.msg); - }, () @trusted { return &ret; } (), thisTid, callable, args); - () @trusted { receiveOnly!R(); } (); - atomicFence(); - return ret; - } + import vibe.core.concurrency : performInWorker; + import vibe.core.core : ioWorkerTaskPool; + return performInWorker(ioWorkerTaskPool, callable, args); } private void performListDirectory(ListDirectoryRequest req) @@ -1261,8 +1240,6 @@ version (Posix) { } } -private immutable TaskSettings ioTaskSettings = { priority: 20 * Task.basePriority }; - private struct ListDirectoryData { FileInfo info; string error; diff --git a/source/vibe/core/task.d b/source/vibe/core/task.d index f729ec0e..ab1d0fdd 100644 --- a/source/vibe/core/task.d +++ b/source/vibe/core/task.d @@ -1369,7 +1369,7 @@ package string callWithMove(ARGS...)(string func, string args) return ret ~ ");"; } -private template needsMove(T) +package template needsMove(T) { template isCopyable(T) { diff --git a/source/vibe/core/taskpool.d b/source/vibe/core/taskpool.d index 732fb85a..42dd887d 100644 --- a/source/vibe/core/taskpool.d +++ b/source/vibe/core/taskpool.d @@ -155,12 +155,7 @@ shared final class TaskPool { { foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - // workaround for runWorkerTaskH to work when called outside of a task - if (Task.getThis() == Task.init) { - Task ret; - .runTask((FT func, ARGS args) nothrow { ret = doRunTaskH(TaskSettings.init, func, args); }, func, args).joinUninterruptible(); - return ret; - } else return doRunTaskH(TaskSettings.init, func, args); + return doRunTaskH(TaskSettings.init, func, args); } /// ditto Task runTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) @@ -177,12 +172,7 @@ shared final class TaskPool { { foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); - // workaround for runWorkerTaskH to work when called outside of a task - if (Task.getThis() == Task.init) { - Task ret; - .runTask((TaskSettings settings, FT func, ARGS args) nothrow { ret = doRunTaskH(settings, func, args); }, settings, func, args).joinUninterruptible(); - return ret; - } else return doRunTaskH(settings, func, args); + return doRunTaskH(settings, func, args); } /// ditto Task runTaskH(alias method, T, ARGS...)(TaskSettings settings, shared(T) object, auto ref ARGS args) @@ -201,6 +191,7 @@ shared final class TaskPool { { import std.typecons : Typedef; import vibe.core.channel : Channel, createChannel; + import vibe.core.task : needsMove; foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); @@ -208,12 +199,34 @@ shared final class TaskPool { auto ch = createChannel!Task(); - static void taskFun(Channel!Task ch, FT func, ARGS args) { - try ch.put(Task.getThis()); - catch (Exception e) assert(false, e.msg); - ch = Channel!Task.init; - mixin(callWithMove!ARGS("func", "args")); + static string argdefs() + { + string ret; + foreach (i, A; ARGS) { + if (i > 0) ret ~= ", "; + if (!needsMove!A) ret ~= "ref "; + ret ~= "ARGS["~i.stringof~"] arg_"~i.stringof; + } + return ret; } + + static string argvals() + { + string ret; + foreach (i, A; ARGS) { + if (i > 0) ret ~= ", "; + ret ~= "arg_"~i.stringof; + if (needsMove!A) ret ~= ".move"; + } + return ret; + } + + mixin("static void taskFun(Channel!Task ch, FT func, " ~ argdefs() ~ ") {" + ~ " try ch.put(Task.getThis());" + ~ " catch (Exception e) assert(false, e.msg);" + ~ " ch = Channel!Task.init;" + ~ " func("~argvals()~");" + ~ "}"); runTask_unsafe(settings, &taskFun, ch, func, args); Task ret;