-
Notifications
You must be signed in to change notification settings - Fork 430
Add ability for aggro cache cleanup to only consider Kraken's disk usage #386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Anton-Kalpakchiev
wants to merge
1
commit into
master
Choose a base branch
from
aggro-cleanup-two
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+225
−93
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,7 @@ | |
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
|
@@ -16,6 +16,7 @@ package store | |
import ( | ||
"fmt" | ||
"os" | ||
"strconv" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -30,19 +31,15 @@ import ( | |
|
||
// CleanupConfig defines configuration for periodically cleaning up idle files. | ||
type CleanupConfig struct { | ||
Disabled bool `yaml:"disabled"` | ||
Interval time.Duration `yaml:"interval"` // How often cleanup runs. | ||
TTI time.Duration `yaml:"tti"` // Time to idle based on last access time. | ||
TTL time.Duration `yaml:"ttl"` // Time to live regardless of access. If 0, disables TTL. | ||
AggressiveThreshold int `yaml:"aggressive_threshold"` // The disk util threshold to trigger aggressive cleanup. If 0, disables aggressive cleanup. | ||
AggressiveTTL time.Duration `yaml:"aggressive_ttL"` // Time to live regardless of access if aggressive cleanup is triggered. | ||
Disabled bool `yaml:"disabled"` | ||
Interval time.Duration `yaml:"interval"` // How often cleanup runs. | ||
TTI time.Duration `yaml:"tti"` // Time to idle based on last access time. | ||
TTL time.Duration `yaml:"ttl"` // Time to live regardless of access. If 0, disables TTL. | ||
ExcludeOtherServices bool `yaml:"exclude_other_services"` // Whether to exclude other services from the disk util calculation. | ||
AggressiveThreshold int `yaml:"aggressive_threshold"` // The disk util threshold to trigger aggressive cleanup. If 0, disables aggressive cleanup. | ||
AggressiveTTL time.Duration `yaml:"aggressive_ttL"` // Time to live regardless of access if aggressive cleanup is triggered. | ||
} | ||
|
||
type ( | ||
// Define a func type for mocking diskSpaceUtil function. | ||
diskSpaceUtilFunc func() (int, error) | ||
) | ||
|
||
func (c CleanupConfig) applyDefaults() CleanupConfig { | ||
if c.Interval == 0 { | ||
c.Interval = 30 * time.Minute | ||
|
@@ -87,6 +84,7 @@ func newCleanupManager(clk clock.Clock, stats tally.Scope) (*cleanupManager, err | |
// on the settings in config. op must set the desired states to clean before addJob | ||
// is called. | ||
func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp) { | ||
jobStats := m.stats.Tagged(map[string]string{"job": tag}) | ||
config = config.applyDefaults() | ||
if config.Disabled { | ||
log.Warnf("Cleanup disabled for %s", op) | ||
|
@@ -95,26 +93,17 @@ func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp | |
if config.TTL == 0 { | ||
log.Warnf("TTL disabled for %s", op) | ||
} | ||
|
||
if config.AggressiveThreshold == 0 { | ||
log.Warnf("Aggressive cleanup disabled for %s", op) | ||
} | ||
|
||
ticker := m.clk.Ticker(config.Interval) | ||
|
||
usageGauge := m.stats.Tagged(map[string]string{"job": tag}).Gauge("disk_usage") | ||
|
||
go func() { | ||
for { | ||
select { | ||
case <-ticker.C: | ||
log.Debugf("Performing cleanup of %s", op) | ||
ttl := m.checkAggressiveCleanup(op, config, diskspaceutil.DiskSpaceUtil) | ||
usage, err := m.scan(op, config.TTI, ttl) | ||
if err != nil { | ||
log.Errorf("Error scanning %s: %s", op, err) | ||
} | ||
usageGauge.Update(float64(usage)) | ||
m.clean(jobStats, config, op) | ||
case <-m.stopc: | ||
ticker.Stop() | ||
return | ||
|
@@ -123,32 +112,94 @@ func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp | |
}() | ||
} | ||
|
||
func (m *cleanupManager) stop() { | ||
m.stopOnce.Do(func() { close(m.stopc) }) | ||
} | ||
|
||
// scan scans the op for idle or expired files. Also returns the total disk usage | ||
// of op. | ||
func (m *cleanupManager) scan( | ||
op base.FileOp, tti time.Duration, ttl time.Duration) (usage int64, err error) { | ||
// clean deletes idle files from op based on the config. | ||
func (m *cleanupManager) clean(jobStats tally.Scope, config CleanupConfig, op base.FileOp) { | ||
log.Debugf("Performing cleanup of %s", op) | ||
ttl := m.calculateTTL(jobStats, op, config, calculateDiskUtil) | ||
|
||
names, err := op.ListNames() | ||
if err != nil { | ||
return 0, fmt.Errorf("list names: %s", err) | ||
log.Errorf("Error cleaning cache: list names: %v", err) | ||
} | ||
|
||
var absUsage int64 | ||
|
||
for _, name := range names { | ||
info, err := op.GetFileStat(name) | ||
if err != nil { | ||
log.With("name", name).Errorf("Error getting file stat: %s", err) | ||
continue | ||
} | ||
if ready, err := m.readyForDeletion(op, name, info, tti, ttl); err != nil { | ||
if ready, err := m.readyForDeletion(op, name, info, config.TTI, ttl); err != nil { | ||
log.With("name", name).Errorf("Error checking if file expired: %s", err) | ||
} else if ready { | ||
if err := op.DeleteFile(name); err != nil && err != base.ErrFilePersisted { | ||
log.With("name", name).Errorf("Error deleting expired file: %s", err) | ||
} | ||
} | ||
absUsage += info.Size() | ||
} | ||
|
||
jobStats.Gauge("disk_usage").Update(float64(absUsage)) | ||
} | ||
|
||
type diskUtilFn func(op base.FileOp, c CleanupConfig) (int, error) | ||
|
||
// calculateTTL returns the TTL used for cleanup based on the config and current disk utilization. | ||
func (m *cleanupManager) calculateTTL(jobStats tally.Scope, op base.FileOp, config CleanupConfig, calculateDiskUtil diskUtilFn) time.Duration { | ||
if config.AggressiveThreshold == 0 { | ||
return config.TTL | ||
} | ||
|
||
utilPercent, err := calculateDiskUtil(op, config) | ||
if err != nil { | ||
log.Errorf("Defaulting to normal TTL due to error calculating disk space util of %s: %v", op, err) | ||
return config.TTL | ||
} | ||
jobStats.Tagged(map[string]string{"exclude_other_services": strconv.FormatBool(config.ExcludeOtherServices)}). | ||
Gauge("disk_util").Update(float64(utilPercent)) | ||
|
||
if utilPercent >= config.AggressiveThreshold { | ||
log.Debugf("Aggressive cleanup of %s triggers with disk space util %d", op, utilPercent) | ||
return config.AggressiveTTL | ||
} | ||
return config.TTL | ||
} | ||
|
||
// calculateDiskUtil calculates the disk space utilization based on the config. | ||
// If 'ExcludeOtherServices' is turned on, only this op's utilization of the filesystem is considered. | ||
func calculateDiskUtil(op base.FileOp, config CleanupConfig) (int, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How could I test this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no way around dependency injection |
||
if config.ExcludeOtherServices { | ||
fsSize, err := diskspaceutil.FileSystemSize() | ||
if err != nil { | ||
return 0, err | ||
} | ||
opSize, err := size(op) | ||
if err != nil { | ||
return 0, err | ||
} | ||
utilPercent := int(100 * opSize / fsSize) | ||
return utilPercent, nil | ||
} | ||
|
||
utilPercent, err := diskspaceutil.FileSystemUtil() | ||
if err != nil { | ||
return 0, err | ||
} | ||
return utilPercent, nil | ||
} | ||
|
||
func size(op base.FileOp) (usage int64, err error) { | ||
names, err := op.ListNames() | ||
if err != nil { | ||
return 0, fmt.Errorf("list names: %s", err) | ||
} | ||
for _, name := range names { | ||
info, err := op.GetFileStat(name) | ||
if err != nil { | ||
log.With("name", name).Errorf("Error getting file stat: %s", err) | ||
continue | ||
} | ||
usage += info.Size() | ||
} | ||
return usage, nil | ||
|
@@ -174,17 +225,6 @@ func (m *cleanupManager) readyForDeletion( | |
return m.clk.Now().Sub(lat.Time) > tti, nil | ||
} | ||
|
||
func (m *cleanupManager) checkAggressiveCleanup(op base.FileOp, config CleanupConfig, util diskSpaceUtilFunc) time.Duration { | ||
if config.AggressiveThreshold != 0 { | ||
diskspaceutil, err := util() | ||
if err != nil { | ||
log.Errorf("Error checking disk space util %s: %s", op, err) | ||
return config.TTL | ||
} | ||
if diskspaceutil >= config.AggressiveThreshold { | ||
log.Debugf("Aggressive cleanup of %s triggers with disk space util %d", op, diskspaceutil) | ||
return config.AggressiveTTL | ||
} | ||
} | ||
return config.TTL | ||
func (m *cleanupManager) stop() { | ||
m.stopOnce.Do(func() { close(m.stopc) }) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Anton-Kalpakchiev why are we making this config-driven?
In my opinion, Kraken's cache cleanup should always exclude other services from the disk utilization calculation. I don’t see a scenario where including other services’ disk utilization would improve the cache cleanup behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The biggest reason is backward compatibility. Other than that, I agree that we don't need the option.