diff --git a/source/vibe/core/concurrency.d b/source/vibe/core/concurrency.d index b18cab89..01432cc6 100644 --- a/source/vibe/core/concurrency.d +++ b/source/vibe/core/concurrency.d @@ -18,6 +18,7 @@ import std.typecons; import std.typetuple; import std.string; import vibe.core.task; +import vibe.core.taskpool; private extern (C) pure nothrow void _d_monitorenter(Object h); @@ -1233,6 +1234,123 @@ Future!(ReturnType!CALLABLE) asyncWork(CALLABLE, ARGS...)(CALLABLE callable, ARG } +/** Performs work in a worker task and waits for the result to be available. + + Other tasks can continue to run in the calling thread while the worker task + performs the function call. + + Params: + pool = Optional task pool instance to use, uses the default worker pool by default + callable = Function or function-like object to call - must be weakly isolated and `nothrow` + args = Arguments to pass to callable - must be weakly isolated + + See_also: `isWeaklyIsolated`, `asyncWork` +*/ +ReturnType!CALLABLE performInWorker(CALLABLE, ARGS...) + (CALLABLE callable, ARGS arguments) + if (is(typeof(callable(arguments)) == ReturnType!CALLABLE) && + isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS) +{ + import vibe.core.core : workerTaskPool; + return performInWorker(workerTaskPool, callable, arguments); +} +/// ditto +ReturnType!CALLABLE performInWorker(CALLABLE, ARGS...) + (TaskPool pool, CALLABLE callable, ARGS arguments) + if (is(typeof(callable(arguments)) == ReturnType!CALLABLE) && + isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS) +{ + import core.sync.mutex : Mutex; + import vibe.core.sync : TaskCondition, scopedMutexLock; + + // TODO: this could improve performance by re-using the Mutex/TaskCondition + // instances for later calls. When this optimization gets implemented, + // care should be taken to clean up the cached instances before the + // process/thread exits, so that nothing gets leaked to the GC + + static shared(Mutex) s_mutex; + static struct CTX { + CALLABLE callable; + ARGS args; + ReturnType!CALLABLE result; + shared(Mutex) mutex; + shared(TaskCondition) condition; + bool done; + } + + // set up context + CTX ctx = CTX(callable, arguments); + ctx.mutex = new shared Mutex; + ctx.condition = new shared TaskCondition(ctx.mutex); + + // start worker task + pool.runTask((shared(CTX)* ctx_) nothrow { + auto ctx = () @trusted { return cast(CTX*)ctx_; } (); + auto res = ctx.callable(ctx.args); + { + auto l = scopedMutexLock(ctx.mutex); + ctx.result = res; + ctx.done = true; + ctx.condition.notifyAll(); + } + }, () @trusted { return cast(shared)&ctx; } ()); + + scope (failure) assert(false); + + { // wait for result + auto l = scopedMutexLock(ctx.mutex); + while (!ctx.done) ctx.condition.wait(); + } + + // clean up resources + () @trusted { + destroy(ctx.condition); + destroy(ctx.mutex); + } (); + + return ctx.result; +} + +/// +@safe unittest { + import vibe.core.core : runTask, sleepUninterruptible; + import vibe.core.log : logInfo; + + // runs in parallel to the heavy work + int cnt = 0; + auto t = runTask({ + foreach (i; 0 .. 100) { + sleepUninterruptible(1.msecs); + cnt++; + } + }); + + // perform some heavy CPU work in a worker thread, while the background task + // continues to run unaffected + auto res = performInWorker((long start_value) { + auto tm = MonoTime.currTime; + long res = start_value; + // simulate some CPU workload for 50 ms + while (MonoTime.currTime - tm < 50.msecs) { + res++; + res *= res; + if (!res) res++; + } + return res; + }, 1234); + + logInfo("Result: %s, background task has made %s rounds", res, cnt); + + // should always receive a non-zero result + assert(res != 0); + // background task should have made roughly 50 rounds up to here + version (OSX) {} else assert(cnt > 25 && cnt < 75); + + // make sure our background task also finishes + t.join(); +} + + /******************************************************************************/ /* std.concurrency compatible interface for message passing */ /******************************************************************************/