Skip to content

Commit 9cbc0b4

Browse files
committed
Chunk: Optimize fetch/collect
1 parent d60fd3f commit 9cbc0b4

File tree

2 files changed

+26
-12
lines changed

2 files changed

+26
-12
lines changed

src/chunks.jl

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,24 +67,38 @@ is_task_or_chunk(c::Chunk) = true
6767
Base.:(==)(c1::Chunk, c2::Chunk) = c1.handle == c2.handle
6868
Base.hash(c::Chunk, x::UInt64) = hash(c.handle, hash(Chunk, x))
6969

70-
collect_remote(chunk::Chunk) =
71-
move(chunk.processor, OSProc(), poolget(chunk.handle))
70+
macro remotecall_fetch_fast(id, ex)
71+
@assert Meta.isexpr(ex, :call)
72+
f = ex.args[1]
73+
args = ex.args[2:end]
74+
quote
75+
if $myid() == id
76+
$(ex)
77+
else
78+
$remotecall_fetch($f, id, $(args...))
79+
end
80+
end
81+
end
7282

73-
function collect(ctx::Context, chunk::Chunk; options=nothing)
83+
@inline function collect(chunk::Chunk)
7484
# delegate fetching to handle by default.
75-
if chunk.handle isa DRef && !(chunk.processor isa OSProc)
85+
#=if chunk.handle isa DRef && !(chunk.processor isa OSProc)
7686
return remotecall_fetch(collect_remote, chunk.handle.owner, chunk)
87+
#return @remotecall_fetch_fast chunk.handle.owner collect_remote(chunk)
7788
elseif chunk.handle isa FileRef
7889
return poolget(chunk.handle)
79-
else
80-
return move(chunk.processor, OSProc(), chunk.handle)
81-
end
90+
else=#
91+
return move(chunk.processor, OSProc(), chunk)
92+
#end
8293
end
94+
collect_remote(chunk::Chunk) =
95+
move(chunk.processor, OSProc(), poolget(chunk.handle))
96+
collect(ctx::Context, chunk::Chunk; options=nothing) = collect(chunk)
8397
collect(ctx::Context, ref::DRef; options=nothing) =
8498
move(OSProc(ref.owner), OSProc(), ref)
8599
collect(ctx::Context, ref::FileRef; options=nothing) =
86100
poolget(ref) # FIXME: Do move call
87-
function Base.fetch(chunk::Chunk; raw=false)
101+
@inline function Base.fetch(chunk::Chunk; raw=false)
88102
if raw
89103
poolget(chunk.handle)
90104
else
@@ -93,9 +107,9 @@ function Base.fetch(chunk::Chunk; raw=false)
93107
end
94108

95109
# Unwrap Chunk, DRef, and FileRef by default
96-
move(from_proc::Processor, to_proc::Processor, x::Chunk) =
110+
@inline move(from_proc::Processor, to_proc::Processor, x::Chunk) =
97111
move(from_proc, to_proc, x.handle)
98-
move(from_proc::Processor, to_proc::Processor, x::Union{DRef,FileRef}) =
112+
@inline move(from_proc::Processor, to_proc::Processor, x::Union{DRef,FileRef}) =
99113
move(from_proc, to_proc, poolget(x))
100114

101115
# Determine from_proc when unspecified

src/compute.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ compute(ctx, c::Chunk; options=nothing) = c
77

88
collect(ctx::Context, t::Thunk; options=nothing) =
99
collect(ctx, compute(ctx, t; options=options); options=options)
10-
collect(d::Union{Chunk,Thunk}; options=nothing) =
11-
collect(Context(global_context()), d; options=options)
10+
collect(t::Thunk; options=nothing) =
11+
collect(Context(global_context()), t; options=options)
1212

1313
abstract type Computation end
1414

0 commit comments

Comments
 (0)