diff --git a/source/vibe/core/parallelism.d b/source/vibe/core/parallelism.d index 8a3b2d8..811cce2 100644 --- a/source/vibe/core/parallelism.d +++ b/source/vibe/core/parallelism.d @@ -13,6 +13,7 @@ import vibe.core.channel; import vibe.core.concurrency : isWeaklyIsolated; import vibe.core.log; import std.range : ElementType, isInputRange; +import std.traits : hasMember; /** Processes a range of items in worker tasks and returns them as an unordered @@ -65,6 +66,9 @@ auto parallelUnorderedMap(alias fun, R)(R items, shared(TaskPool) task_pool, Cha chout.close(); } + static if (hasMember!(R, "length")) + auto length = items.length; + runTask(&senderFun, items, chin); auto rc = new shared int; @@ -77,8 +81,21 @@ auto parallelUnorderedMap(alias fun, R)(R items, shared(TaskPool) task_pool, Cha Channel!O m_channel; O m_front; bool m_gotFront = false; + static if (hasMember!(R, "length")) typeof(items.length) m_length; } + static if (hasMember!(R, "length")) + this(Channel!O chout, typeof(items.length) length) + { + m_channel = chout; + m_length = length; + } + else + this(Channel!O chout) + { + m_channel = chout; + } + @property bool empty() { fetchFront(); @@ -96,8 +113,12 @@ auto parallelUnorderedMap(alias fun, R)(R items, shared(TaskPool) task_pool, Cha { fetchFront(); m_gotFront = false; + static if (hasMember!(R, "length")) m_length--; } + static if (hasMember!(R, "length")) + auto length() const { return m_length; } + private void fetchFront() { if (m_gotFront) return; @@ -105,7 +126,10 @@ auto parallelUnorderedMap(alias fun, R)(R items, shared(TaskPool) task_pool, Cha } } - return Result(chout); + static if (hasMember!(R, "length")) + return Result(chout, length); + else + return Result(chout); } /// ditto @@ -128,6 +152,16 @@ unittest { assert(res.isPermutation(iota(100).map!(i => 2 * i).array)); } +unittest { + import std.range : iota; + + auto res = iota(100) + .parallelUnorderedMap!(i => 2 * i); + assert(res.length == 100); + res.popFront(); + assert(res.length == 99); +} + /** Processes a range of items in worker tasks and returns them as an ordered range. @@ -159,6 +193,13 @@ auto parallelMap(alias fun, R)(R items, shared(TaskPool) task_pool, ChannelConfi size_t m_index = 0, m_minIndex = -1; Array!SR m_buffer; int m_refCount = 0; + static if (hasMember!(R, "length")) typeof(items.length) m_length; + + this(typeof(resunord) source) + { + m_source = source.move; + static if (hasMember!(R, "length")) m_length = m_source.length; + } @property bool empty() { @@ -207,6 +248,8 @@ auto parallelMap(alias fun, R)(R items, shared(TaskPool) task_pool, ChannelConfi @property bool empty() { return state.empty; } @property ref O front() { return state.front; } void popFront() { state.popFront; } + + static if (hasMember!(R, "length")) auto length() const { return state.m_length - state.m_index; } } return Result(RefCounted!State(resunord.move)); @@ -251,3 +294,15 @@ unittest { .array; assert(res == iota(100).map!(i => 2 * i).array); } + +unittest { + import std.range : iota; + + auto res = iota(100) + .parallelMap!(i => 2 * i); + assert(res.length == 100); + assert(res.front == 0); + res.popFront(); + assert(res.length == 99); + assert(res.front == 2); +}