Skip to content

Commit 3b76df3

Browse files
committed
Add support for setting VM ulimit with monitor.
Include mrjob's memory in accounting.
1 parent 927d547 commit 3b76df3

File tree

3 files changed

+72
-20
lines changed

3 files changed

+72
-20
lines changed

cmd/mrjob/mrjob.go

+51-14
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type runner struct {
3636
metadata *core.Metadata
3737
runType string
3838
jobInfo *core.JobInfo
39+
monitoring bool
3940
start time.Time
4041
isDone chan struct{}
4142
perfDone <-chan struct{}
@@ -92,12 +93,8 @@ func (self *runner) Init() {
9293
util.PrintError(jErr, "monitor",
9394
"Could not update log journal file. Continuing, hoping for the best.")
9495
}
95-
// Check that the vmem limit is enough for the parent process plus
96-
// the a half gigabyte of margin over the job's physical memory
97-
// requirement.
98-
mem, _ := core.GetProcessTreeMemory(self.jobInfo.Pid, true, nil)
9996
core.CheckMaxVmem(
100-
uint64(self.jobInfo.MemGB*1024+512)*1024*1024 + uint64(mem.Vmem))
97+
uint64(self.jobInfo.VMemGB) * 1024 * 1024 * 1024)
10198
self.setRlimit()
10299
}
103100

@@ -123,6 +120,7 @@ func (self *runner) writeJobinfo() {
123120
self.Fail(err, "Error reading jobInfo.")
124121
} else {
125122
self.jobInfo = jobInfo
123+
self.monitoring = jobInfo.Monitor == "monitor"
126124
}
127125
self.jobInfo.Cwd = self.metadata.FilesPath()
128126
self.jobInfo.Host, _ = os.Hostname()
@@ -216,7 +214,7 @@ func totalCpu(ru *core.RusageInfo) float64 {
216214
func (self *runner) Complete() {
217215
self.done()
218216
target := core.CompleteFile
219-
if self.jobInfo.Monitor == "monitor" {
217+
if self.monitoring {
220218
if t := time.Since(self.start); t > time.Minute*15 {
221219
if threads := totalCpu(self.jobInfo.RusageInfo) /
222220
t.Seconds(); threads > 1.5*float64(self.jobInfo.Threads) {
@@ -228,6 +226,14 @@ func (self *runner) Complete() {
228226
util.PrintError(writeError, "monitor", "Could not write errors file.")
229227
}
230228
}
229+
} else if self.jobInfo.RusageInfo.Children.MaxRss > self.jobInfo.MemGB*1024*1024 {
230+
target = core.Errors
231+
if writeError := self.metadata.WriteRaw(target, fmt.Sprintf(
232+
"Stage exceeded its memory quota (using %.1f, allowed %d)",
233+
float64(self.jobInfo.RusageInfo.Children.MaxRss)/(1024*1024),
234+
self.jobInfo.MemGB)); writeError != nil {
235+
util.PrintError(writeError, "monitor", "Could not write errors file.")
236+
}
231237
}
232238
}
233239
if target == core.CompleteFile {
@@ -277,6 +283,18 @@ func (self *runner) StartJob(args []string) error {
277283
self.metadata.MetadataFilePath(core.PerfData),
278284
self.metadata.MetadataFilePath(core.ProfileOut))
279285
}
286+
if self.monitoring && self.jobInfo.VMemGB > 0 {
287+
// Exclude mrjob's vmem usage from the rlimit.
288+
mem, _ := core.GetProcessTreeMemory(self.jobInfo.Pid, true, nil)
289+
amount := int64(self.jobInfo.VMemGB)*1024*1024*1024 - mem.Vmem
290+
if amount < mem.Vmem+1024*1024 {
291+
amount = mem.Vmem + 1024*1024
292+
}
293+
if err := core.SetVMemRLimit(uint64(amount)); err != nil {
294+
util.LogError(err, "monitor",
295+
"Could not set VM rlimit.")
296+
}
297+
}
280298
if err := func() error {
281299
util.EnterCriticalSection()
282300
defer util.ExitCriticalSection()
@@ -481,33 +499,52 @@ func (self *runner) WaitLoop() {
481499
}
482500
}
483501

484-
func (self *runner) getChildMemGB() float64 {
502+
func (self *runner) getChildMemGB() (rss, vmem float64) {
485503
proc := self.job.Process
486504
if proc == nil {
487-
return 0
505+
return 0, 0
488506
}
489507
io := make(map[int]*core.IoAmount)
490508
mem, err := core.GetProcessTreeMemory(proc.Pid, true, io)
509+
if selfMem, err := core.GetRunningMemory(self.jobInfo.Pid); err == nil {
510+
// Do this rather than just calling core.GetProcessTreeMemory,
511+
// above, because we don't want to include the profiling child
512+
// process (if any).
513+
mem.Add(selfMem)
514+
}
491515
mem.IncreaseRusage(core.GetRusage())
492516
self.highMem.IncreaseTo(mem)
493517
if err != nil {
494518
util.LogError(err, "monitor", "Error updating job statistics.")
495519
} else {
496520
self.ioStats.Update(io, time.Now())
497521
}
498-
return float64(mem.Rss) / (1024 * 1024 * 1024)
522+
return float64(mem.Rss) / (1024 * 1024 * 1024),
523+
float64(mem.Vmem) / (1024 * 1024 * 1024)
499524
}
500525

501526
func (self *runner) monitor(lastHeartbeat *time.Time) error {
502-
if mem := self.getChildMemGB(); mem > float64(self.jobInfo.MemGB) {
503-
if self.jobInfo.Monitor == "monitor" {
527+
if rss, vmem := self.getChildMemGB(); rss > float64(self.jobInfo.MemGB) {
528+
if self.monitoring {
504529
self.job.Process.Kill()
505-
return fmt.Errorf("Stage exceeded its memory quota (using %.1f, allowed %dG)",
506-
mem, self.jobInfo.MemGB)
530+
return fmt.Errorf(
531+
"Stage exceeded its memory quota (using %.1f, allowed %dG)",
532+
rss, self.jobInfo.MemGB)
507533
} else {
508534
util.LogInfo("monitor",
509535
"Stage exceeded its memory quota (using %.1f, allowed %dG)",
510-
mem, self.jobInfo.MemGB)
536+
rss, self.jobInfo.MemGB)
537+
}
538+
} else if self.jobInfo.VMemGB > 0 && vmem > float64(self.jobInfo.VMemGB) {
539+
if self.monitoring {
540+
self.job.Process.Kill()
541+
return fmt.Errorf(
542+
"Stage exceeded its address space quota (using %.1f, allowed %dG)",
543+
vmem, self.jobInfo.VMemGB)
544+
} else {
545+
util.LogInfo("monitor",
546+
"Stage exceeded its address space quota (using %.1f, allowed %dG)",
547+
vmem, self.jobInfo.MemGB)
511548
}
512549
}
513550
if time.Since(*lastHeartbeat) > HeartbeatInterval {

martian/core/perf_unix.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -230,13 +230,14 @@ func getChildTreeMemory(procFd, tfd int, mem *ObservedMemory, io map[int]*IoAmou
230230
}
231231
}
232232

233+
var sysPagesize = int64(syscall.Getpagesize())
234+
233235
func (self *ObservedMemory) pagesToBytes() {
234-
pagesize := int64(syscall.Getpagesize())
235-
self.Rss *= pagesize
236-
self.Vmem *= pagesize
237-
self.Shared *= pagesize
238-
self.Text *= pagesize
239-
self.Stack *= pagesize
236+
self.Rss *= sysPagesize
237+
self.Vmem *= sysPagesize
238+
self.Shared *= sysPagesize
239+
self.Text *= sysPagesize
240+
self.Stack *= sysPagesize
240241
}
241242

242243
type unexpectedContentError struct {

martian/core/rlimit.go

+14
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ package core
66

77
import (
88
// syscall package lacks RLIMIT_NPROC
9+
"fmt"
10+
911
"github.com/martian-lang/martian/martian/util"
1012
"golang.org/x/sys/unix"
1113
)
@@ -111,3 +113,15 @@ func CheckMaxVmem(amount uint64) uint64 {
111113
}
112114
return min
113115
}
116+
117+
func SetVMemRLimit(amount uint64) error {
118+
var rlim unix.Rlimit
119+
if err := unix.Getrlimit(unix.RLIMIT_AS, &rlim); err != nil {
120+
return err
121+
} else if rlim.Max != unix.RLIM_INFINITY && rlim.Max < amount {
122+
return fmt.Errorf("could not set RLIMIT_AS %d > %d",
123+
amount, rlim.Max)
124+
}
125+
rlim.Cur = amount
126+
return unix.Setrlimit(unix.RLIMIT_AS, &rlim)
127+
}

0 commit comments

Comments
 (0)