Skip to content

Commit

Permalink
Add vibe.core.concurrency.performInWorker.
Browse files Browse the repository at this point in the history
Allows to perform a function call in a worker thread, while keeping a synchronous control flow.
  • Loading branch information
s-ludwig committed Dec 18, 2023
1 parent 1260c03 commit f665bd1
Showing 1 changed file with 118 additions and 0 deletions.
118 changes: 118 additions & 0 deletions source/vibe/core/concurrency.d
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import std.typecons;
import std.typetuple;
import std.string;
import vibe.core.task;
import vibe.core.taskpool;


private extern (C) pure nothrow void _d_monitorenter(Object h);
Expand Down Expand Up @@ -1233,6 +1234,123 @@ Future!(ReturnType!CALLABLE) asyncWork(CALLABLE, ARGS...)(CALLABLE callable, ARG
}


/** Performs work in a worker task and waits for the result to be available.
Other tasks can continue to run in the calling thread while the worker task
performs the function call.
Params:
pool = Optional task pool instance to use, uses the default worker pool by default
callable = Function or function-like object to call - must be weakly isolated and `nothrow`
args = Arguments to pass to callable - must be weakly isolated
See_also: `isWeaklyIsolated`, `asyncWork`
*/
ReturnType!CALLABLE performInWorker(CALLABLE, ARGS...)
(CALLABLE callable, ARGS arguments)
if (is(typeof(callable(arguments)) == ReturnType!CALLABLE) &&
isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS)
{
import vibe.core.core : workerTaskPool;
return performInWorker(workerTaskPool, callable, arguments);
}
/// ditto
ReturnType!CALLABLE performInWorker(CALLABLE, ARGS...)
(TaskPool pool, CALLABLE callable, ARGS arguments)
if (is(typeof(callable(arguments)) == ReturnType!CALLABLE) &&
isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS)
{
import core.sync.mutex : Mutex;
import vibe.core.sync : TaskCondition, scopedMutexLock;

// TODO: this could improve performance by re-using the Mutex/TaskCondition
// instances for later calls. When this optimization gets implemented,
// care should be taken to clean up the cached instances before the
// process/thread exits, so that nothing gets leaked to the GC

static shared(Mutex) s_mutex;
static struct CTX {
CALLABLE callable;
ARGS args;
ReturnType!CALLABLE result;
shared(Mutex) mutex;
shared(TaskCondition) condition;
bool done;
}

// set up context
CTX ctx = CTX(callable, arguments);
ctx.mutex = new shared Mutex;
ctx.condition = new shared TaskCondition(ctx.mutex);

// start worker task
pool.runTask((shared(CTX)* ctx_) nothrow {
auto ctx = () @trusted { return cast(CTX*)ctx_; } ();
auto res = ctx.callable(ctx.args);
{
auto l = scopedMutexLock(ctx.mutex);
ctx.result = res;
ctx.done = true;
ctx.condition.notifyAll();
}
}, () @trusted { return cast(shared)&ctx; } ());

scope (failure) assert(false);

{ // wait for result
auto l = scopedMutexLock(ctx.mutex);
while (!ctx.done) ctx.condition.wait();
}

// clean up resources
() @trusted {
destroy(ctx.condition);
destroy(ctx.mutex);
} ();

return ctx.result;
}

///
@safe unittest {
import vibe.core.core : runTask, sleepUninterruptible;
import vibe.core.log : logInfo;

// runs in parallel to the heavy work
int cnt = 0;
auto t = runTask({
foreach (i; 0 .. 100) {
sleepUninterruptible(1.msecs);
cnt++;
}
});

// perform some heavy CPU work in a worker thread, while the background task
// continues to run unaffected
auto res = performInWorker((long start_value) {
auto tm = MonoTime.currTime;
long res = start_value;
// simulate some CPU workload for 50 ms
while (MonoTime.currTime - tm < 50.msecs) {
res++;
res *= res;
if (!res) res++;
}
return res;
}, 1234);

logInfo("Result: %s, background task has made %s rounds", res, cnt);

// should always receive a non-zero result
assert(res != 0);
// background task should have made roughly 50 rounds up to here
version (OSX) {} else assert(cnt > 25 && cnt < 75);

// make sure our background task also finishes
t.join();
}


/******************************************************************************/
/* std.concurrency compatible interface for message passing */
/******************************************************************************/
Expand Down

0 comments on commit f665bd1

Please sign in to comment.