Skip to content

Commit 39a1630

Browse files
committed
Do not read chunk outs if none are expected.
Otherwise one can spurious encounter deserialization errors.
1 parent cdcab31 commit 39a1630

File tree

1 file changed

+43
-14
lines changed

1 file changed

+43
-14
lines changed

Diff for: martian/core/stage.go

+43-14
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,14 @@ func (self *Fork) Split() bool {
374374
}
375375
}
376376

377+
func (self *Fork) ChunkOutParams() *syntax.OutParams {
378+
if stage, ok := self.node.callable.(*syntax.Stage); ok {
379+
return stage.ChunkOuts
380+
} else {
381+
return nil
382+
}
383+
}
384+
377385
// Get the fork's output parameter list.
378386
func (self *Fork) OutParams() *syntax.OutParams {
379387
return self.node.callable.GetOutParams()
@@ -871,24 +879,45 @@ func (self *Fork) step() {
871879
self.join_metadata.Write(ChunkDefsFile, self.stageDefs.ChunkDefs)
872880
if self.Split() {
873881
ok := true
874-
chunkOuts := make([]interface{}, 0, len(self.chunks))
875882
if len(self.chunks) > 0 {
876-
readSize := self.node.rt.FreeMemBytes() / int64(2*len(self.chunks))
877-
for _, chunk := range self.chunks {
878-
if outs, err := chunk.metadata.read(OutsFile, readSize); err != nil {
879-
chunk.metadata.WriteRaw(Errors, err.Error())
880-
ok = false
881-
} else {
882-
chunkOuts = append(chunkOuts, outs)
883-
ok = chunk.verifyOutput(outs) && ok
883+
if co := self.ChunkOutParams(); len(self.OutParams().List) > 0 ||
884+
(co != nil && len(co.List) > 0) {
885+
chunkOuts := make([]LazyArgumentMap, 0, len(self.chunks))
886+
readSize := self.node.rt.FreeMemBytes() / int64(2*len(self.chunks))
887+
for _, chunk := range self.chunks {
888+
if outs, err := chunk.metadata.read(OutsFile, readSize); err != nil {
889+
chunk.metadata.WriteRaw(Errors, err.Error())
890+
ok = false
891+
} else {
892+
chunkOuts = append(chunkOuts, outs)
893+
ok = chunk.verifyOutput(outs) && ok
894+
}
884895
}
896+
self.join_metadata.Write(ChunkOutsFile, chunkOuts)
897+
} else {
898+
// Write a list of empty outs.
899+
var buf bytes.Buffer
900+
buf.Grow(1 + 3*len(self.chunks))
901+
buf.WriteByte('[')
902+
for i := range self.chunks {
903+
if i != 0 {
904+
buf.WriteByte(',')
905+
}
906+
buf.WriteString("{}")
907+
}
908+
buf.WriteByte(']')
909+
self.join_metadata.WriteRawBytes(ChunkOutsFile, buf.Bytes())
885910
}
911+
if !ok {
912+
return
913+
}
914+
} else {
915+
self.join_metadata.WriteRaw(ChunkOutsFile, "[]")
886916
}
887-
if !ok {
888-
return
889-
}
890-
self.join_metadata.Write(ChunkOutsFile, chunkOuts)
891-
self.join_metadata.Write(OutsFile, makeOutArgs(self.OutParams(), self.join_metadata.curFilesPath, false))
917+
self.join_metadata.Write(
918+
OutsFile,
919+
makeOutArgs(self.OutParams(),
920+
self.join_metadata.curFilesPath, false))
892921
if !self.join_has_run {
893922
self.join_has_run = true
894923
self.lastPrint = time.Now()

0 commit comments

Comments
 (0)