Skip to content

Commit

Permalink
Merge pull request #404 from vibe-d/add_parallel_map_length
Browse files Browse the repository at this point in the history
Forward the length of a range passed to parallel(Unordered)Map
  • Loading branch information
l-kramer authored Jul 27, 2024
2 parents fac01af + 1b23be5 commit 68dc1e4
Showing 1 changed file with 56 additions and 1 deletion.
57 changes: 56 additions & 1 deletion source/vibe/core/parallelism.d
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import vibe.core.channel;
import vibe.core.concurrency : isWeaklyIsolated;
import vibe.core.log;
import std.range : ElementType, isInputRange;
import std.traits : hasMember;


/** Processes a range of items in worker tasks and returns them as an unordered
Expand Down Expand Up @@ -65,6 +66,9 @@ auto parallelUnorderedMap(alias fun, R)(R items, shared(TaskPool) task_pool, Cha
chout.close();
}

static if (hasMember!(R, "length"))
auto length = items.length;

runTask(&senderFun, items, chin);

auto rc = new shared int;
Expand All @@ -77,8 +81,21 @@ auto parallelUnorderedMap(alias fun, R)(R items, shared(TaskPool) task_pool, Cha
Channel!O m_channel;
O m_front;
bool m_gotFront = false;
static if (hasMember!(R, "length")) typeof(items.length) m_length;
}

static if (hasMember!(R, "length"))
this(Channel!O chout, typeof(items.length) length)
{
m_channel = chout;
m_length = length;
}
else
this(Channel!O chout)
{
m_channel = chout;
}

@property bool empty()
{
fetchFront();
Expand All @@ -96,16 +113,23 @@ auto parallelUnorderedMap(alias fun, R)(R items, shared(TaskPool) task_pool, Cha
{
fetchFront();
m_gotFront = false;
static if (hasMember!(R, "length")) m_length--;
}

static if (hasMember!(R, "length"))
auto length() const { return m_length; }

private void fetchFront()
{
if (m_gotFront) return;
m_gotFront = m_channel.tryConsumeOne(m_front);
}
}

return Result(chout);
static if (hasMember!(R, "length"))
return Result(chout, length);
else
return Result(chout);
}

/// ditto
Expand All @@ -128,6 +152,16 @@ unittest {
assert(res.isPermutation(iota(100).map!(i => 2 * i).array));
}

unittest {
import std.range : iota;

auto res = iota(100)
.parallelUnorderedMap!(i => 2 * i);
assert(res.length == 100);
res.popFront();
assert(res.length == 99);
}


/** Processes a range of items in worker tasks and returns them as an ordered
range.
Expand Down Expand Up @@ -159,6 +193,13 @@ auto parallelMap(alias fun, R)(R items, shared(TaskPool) task_pool, ChannelConfi
size_t m_index = 0, m_minIndex = -1;
Array!SR m_buffer;
int m_refCount = 0;
static if (hasMember!(R, "length")) typeof(items.length) m_length;

this(typeof(resunord) source)
{
m_source = source.move;
static if (hasMember!(R, "length")) m_length = m_source.length;
}

@property bool empty()
{
Expand Down Expand Up @@ -207,6 +248,8 @@ auto parallelMap(alias fun, R)(R items, shared(TaskPool) task_pool, ChannelConfi
@property bool empty() { return state.empty; }
@property ref O front() { return state.front; }
void popFront() { state.popFront; }

static if (hasMember!(R, "length")) auto length() const { return state.m_length - state.m_index; }
}

return Result(RefCounted!State(resunord.move));
Expand Down Expand Up @@ -251,3 +294,15 @@ unittest {
.array;
assert(res == iota(100).map!(i => 2 * i).array);
}

unittest {
import std.range : iota;

auto res = iota(100)
.parallelMap!(i => 2 * i);
assert(res.length == 100);
assert(res.front == 0);
res.popFront();
assert(res.length == 99);
assert(res.front == 2);
}

0 comments on commit 68dc1e4

Please sign in to comment.