Skip to content

Commit

Permalink
Merge pull request #374 from vibe-d/perform_in_worker
Browse files Browse the repository at this point in the history
Add performInWorker and use it for blocking I/O
  • Loading branch information
l-kramer authored Dec 19, 2023
2 parents f24acdc + 8e0aa82 commit ee9870c
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 68 deletions.
24 changes: 17 additions & 7 deletions source/vibe/core/channel.d
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ struct Channel(T, size_t buffer_size = 100) {

private final class ChannelImpl(T, size_t buffer_size) {
import vibe.core.concurrency : isWeaklyIsolated;
import vibe.internal.allocator : Mallocator, makeGCSafe, disposeGCSafe;
static assert(isWeaklyIsolated!T, "Channel data type "~T.stringof~" is not safe to pass between threads.");

private {
Expand All @@ -168,8 +169,8 @@ private final class ChannelImpl(T, size_t buffer_size) {

this(ChannelConfig config)
shared @trusted nothrow {
m_mutex = cast(shared)new Mutex;
m_condition = cast(shared)new TaskCondition(cast(Mutex)m_mutex);
m_mutex = cast(shared)Mallocator.instance.makeGCSafe!Mutex();
m_condition = cast(shared)Mallocator.instance.makeGCSafe!TaskCondition(cast()m_mutex);
m_config = config;
}

Expand All @@ -183,11 +184,20 @@ private final class ChannelImpl(T, size_t buffer_size) {

private void releaseRef()
@safe nothrow shared {
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
if (--thisus.m_refCount == 0) {
try () @trusted { destroy(m_condition); } ();
bool destroy = false;
{
m_mutex.lock_nothrow();
scope (exit) m_mutex.unlock_nothrow();
auto thisus = () @trusted { return cast(ChannelImpl)this; } ();
if (--thisus.m_refCount == 0)
destroy = true;
}

if (destroy) {
try () @trusted {
Mallocator.instance.disposeGCSafe(m_condition);
Mallocator.instance.disposeGCSafe(m_mutex);
} ();
catch (Exception e) assert(false);
}
}
Expand Down
124 changes: 124 additions & 0 deletions source/vibe/core/concurrency.d
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import std.typecons;
import std.typetuple;
import std.string;
import vibe.core.task;
import vibe.core.taskpool;


private extern (C) pure nothrow void _d_monitorenter(Object h);
Expand Down Expand Up @@ -1233,6 +1234,129 @@ Future!(ReturnType!CALLABLE) asyncWork(CALLABLE, ARGS...)(CALLABLE callable, ARG
}


/** Performs work in a worker task and waits for the result to be available.
Other tasks can continue to run in the calling thread while the worker task
performs the function call.
Params:
pool = Optional task pool instance to use, uses the default worker pool by default
callable = Function or function-like object to call - must be weakly isolated and `nothrow`
args = Arguments to pass to callable - must be weakly isolated
See_also: `isWeaklyIsolated`, `asyncWork`
*/
ReturnType!CALLABLE performInWorker(CALLABLE, ARGS...)
(CALLABLE callable, ARGS arguments)
if (is(typeof(callable(arguments)) == ReturnType!CALLABLE) &&
isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS)
{
import vibe.core.core : workerTaskPool;
return performInWorker!(CALLABLE, ARGS)(workerTaskPool, callable, arguments);
}
/// ditto
ReturnType!CALLABLE performInWorker(CALLABLE, ARGS...)
(shared(TaskPool) pool, CALLABLE callable, ARGS arguments)
if (is(typeof(callable(arguments)) == ReturnType!CALLABLE) &&
isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS)
{
import core.sync.mutex : Mutex;
import vibe.core.sync : TaskCondition, scopedMutexLock;

// TODO: this could improve performance by re-using the Mutex/TaskCondition
// instances for later calls. When this optimization gets implemented,
// care should be taken to clean up the cached instances before the
// process/thread exits, so that nothing gets leaked to the GC

static shared(Mutex) s_mutex;
static struct CTX {
CALLABLE callable;
ARGS args;
ReturnType!CALLABLE result;
shared(Mutex) mutex;
shared(TaskCondition) condition;
bool done;
}

// set up context
CTX ctx = CTX(callable, arguments);
auto mtx = new Mutex;
ctx.mutex = () @trusted { return cast(shared)mtx; } ();
ctx.condition = () @trusted { return cast(shared)new TaskCondition(mtx); } ();

// start worker task
pool.runTask((shared(CTX)* ctx_) nothrow {
auto ctx = () @trusted { return cast(CTX*)ctx_; } ();
auto res = ctx.callable(ctx.args);
{
auto l = scopedMutexLock(ctx.mutex);
ctx.result = res;
ctx.done = true;
ctx.condition.notifyAll();
}
}, () @trusted { return cast(shared)&ctx; } ());

scope (failure) assert(false);

{ // wait for result
auto l = scopedMutexLock(ctx.mutex);
while (!ctx.done) ctx.condition.wait();
}

// clean up resources
() @trusted {
import core.memory : GC;
destroy(ctx.condition);
destroy(ctx.mutex);
// NOTE: the GC will otherwise call the destructor on the destroy()ed
// Mutex, which causes a failure in DeleteCriticalSection on
// Windows
GC.free(cast(void*)ctx.mutex);
} ();

return ctx.result;
}

///
@safe unittest {
import vibe.core.core : runTask, sleepUninterruptible;
import vibe.core.log : logInfo;

// runs in parallel to the heavy work
int cnt = 0;
auto t = runTask({
foreach (i; 0 .. 100) {
sleepUninterruptible(1.msecs);
cnt++;
}
});

// perform some heavy CPU work in a worker thread, while the background task
// continues to run unaffected
auto res = performInWorker((long start_value) {
auto tm = MonoTime.currTime;
long res = start_value;
// simulate some CPU workload for 50 ms
while (MonoTime.currTime - tm < 50.msecs) {
res++;
res *= res;
if (!res) res++;
}
return res;
}, 1234);

logInfo("Result: %s, background task has made %s rounds", res, cnt);

// should always receive a non-zero result
assert(res != 0);
// background task should have made roughly 50 rounds up to here
version (OSX) {} else assert(cnt > 25 && cnt < 75);

// make sure our background task also finishes
t.join();
}


/******************************************************************************/
/* std.concurrency compatible interface for message passing */
/******************************************************************************/
Expand Down
45 changes: 35 additions & 10 deletions source/vibe/core/core.d
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,38 @@ unittest { // ensure task.running is true directly after runTask
assert(hit);
}

unittest {
import core.atomic : atomicOp;

static struct S {
shared(int)* rc;
this(this) @safe nothrow { if (rc) atomicOp!"+="(*rc, 1); }
~this() @safe nothrow { if (rc) atomicOp!"-="(*rc, 1); }
}

S s;
s.rc = new int;
*s.rc = 1;

runTask((ref S sc) {
auto rc = sc.rc;
assert(*rc == 2);
sc = S.init;
assert(*rc == 1);
}, s).joinUninterruptible();

assert(*s.rc == 1);

runWorkerTaskH((ref S sc) {
auto rc = sc.rc;
assert(*rc == 2);
sc = S.init;
assert(*rc == 1);
}, s).joinUninterruptible();

assert(*s.rc == 1);
}


/**
Runs a new asynchronous task in a worker thread.
Expand Down Expand Up @@ -1752,13 +1784,8 @@ package @property ref TaskScheduler taskScheduler() @safe nothrow @nogc { return
package void recycleFiber(TaskFiber fiber)
@safe nothrow {
if (s_availableFibers.length >= s_maxRecycledFibers) {
auto fl = s_availableFibers.front;
s_availableFibers.popFront();
fl.shutdown();
() @trusted {
try destroy(fl);
catch (Exception e) logWarn("Failed to destroy fiber: %s", e.msg);
} ();
fiber.shutdown();
return;
}

if (s_availableFibers.full)
Expand Down Expand Up @@ -1889,10 +1916,8 @@ static ~this()
shutdownWorkerPool();
}

foreach (f; s_availableFibers) {
foreach (f; s_availableFibers)
f.shutdown();
destroy(f);
}

ManualEvent.freeThreadResources();

Expand Down
43 changes: 10 additions & 33 deletions source/vibe/core/file.d
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ void moveFile(NativePath from, NativePath to, bool copy_fallback = false)
/// ditto
void moveFile(string from, string to, bool copy_fallback = false)
{
auto fail = performInWorker((string from, string to) {
auto fail = performInIOWorker((string from, string to) {
try {
std.file.rename(from, to);
} catch (Exception e) {
Expand Down Expand Up @@ -287,7 +287,7 @@ void removeFile(NativePath path)
/// ditto
void removeFile(string path)
{
auto fail = performInWorker((string path) {
auto fail = performInIOWorker((string path) {
try {
std.file.remove(path);
} catch (Exception e) {
Expand All @@ -309,7 +309,7 @@ bool existsFile(NativePath path) nothrow
/// ditto
bool existsFile(string path) nothrow
{
try return performInWorker((string p) => std.file.exists(p), path);
try return performInIOWorker((string p) => std.file.exists(p), path);
catch (Exception e) {
logDebug("Failed to determine file existence for '%s': %s", path, e.msg);
return false;
Expand All @@ -329,7 +329,7 @@ FileInfo getFileInfo(string path)
{
import std.typecons : tuple;

auto ret = performInWorker((string p) {
auto ret = performInIOWorker((string p) {
try {
auto ent = DirEntry(p);
return tuple(makeFileInfo(ent), "");
Expand Down Expand Up @@ -428,7 +428,7 @@ void createDirectory(NativePath path, Flag!"recursive" recursive)
/// ditto
void createDirectory(string path, Flag!"recursive" recursive = No.recursive)
{
auto fail = performInWorker((string p, bool rec) {
auto fail = performInIOWorker((string p, bool rec) {
try {
if (rec) mkdirRecurse(p);
else mkdir(p);
Expand Down Expand Up @@ -472,7 +472,7 @@ void listDirectory(NativePath path, DirectoryListMode mode,
req.directoryPredicate = directory_predicate;

// NOTE: working around bogus "assigning scope variable warning on DMD 2.101.2 here with @trusted
ioWorkerTaskPool.runTask(ioTaskSettings, &performListDirectory, () @trusted { return req; } ());
ioWorkerTaskPool.runTask(&performListDirectory, () @trusted { return req; } ());

ListDirectoryData itm;
while (req.channel.tryConsumeOne(itm)) {
Expand Down Expand Up @@ -1026,32 +1026,11 @@ unittest {
}


private auto performInWorker(C, ARGS...)(C callable, auto ref ARGS args)
private auto performInIOWorker(C, ARGS...)(C callable, auto ref ARGS args)
{
version (none) {
import vibe.core.concurrency : asyncWork;
return asyncWork(callable, args).getResult();
} else {
import vibe.core.core : ioWorkerTaskPool;
import core.atomic : atomicFence;
import std.concurrency : Tid, send, receiveOnly, thisTid;

struct R {}

alias RET = typeof(callable(args));
shared(RET) ret;
ioWorkerTaskPool.runTask(ioTaskSettings, (shared(RET)* r, Tid caller, C c, ref ARGS a) nothrow {
*() @trusted { return cast(RET*)r; } () = c(a);
// Just as a precaution, because ManualEvent is not well defined in
// terms of fence semantics
atomicFence();
try caller.send(R.init);
catch (Exception e) assert(false, e.msg);
}, () @trusted { return &ret; } (), thisTid, callable, args);
() @trusted { receiveOnly!R(); } ();
atomicFence();
return ret;
}
import vibe.core.concurrency : performInWorker;
import vibe.core.core : ioWorkerTaskPool;
return performInWorker(ioWorkerTaskPool, callable, args);
}

private void performListDirectory(ListDirectoryRequest req)
Expand Down Expand Up @@ -1261,8 +1240,6 @@ version (Posix) {
}
}

private immutable TaskSettings ioTaskSettings = { priority: 20 * Task.basePriority };

private struct ListDirectoryData {
FileInfo info;
string error;
Expand Down
2 changes: 1 addition & 1 deletion source/vibe/core/task.d
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ package string callWithMove(ARGS...)(string func, string args)
return ret ~ ");";
}

private template needsMove(T)
package template needsMove(T)
{
template isCopyable(T)
{
Expand Down
Loading

0 comments on commit ee9870c

Please sign in to comment.