Skip to content

Add ability for aggro cache cleanup to only consider Kraken's disk usage #386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 85 additions & 45 deletions lib/store/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -16,6 +16,7 @@ package store
import (
"fmt"
"os"
"strconv"
"sync"
"time"

Expand All @@ -30,19 +31,15 @@ import (

// CleanupConfig defines configuration for periodically cleaning up idle files.
type CleanupConfig struct {
Disabled bool `yaml:"disabled"`
Interval time.Duration `yaml:"interval"` // How often cleanup runs.
TTI time.Duration `yaml:"tti"` // Time to idle based on last access time.
TTL time.Duration `yaml:"ttl"` // Time to live regardless of access. If 0, disables TTL.
AggressiveThreshold int `yaml:"aggressive_threshold"` // The disk util threshold to trigger aggressive cleanup. If 0, disables aggressive cleanup.
AggressiveTTL time.Duration `yaml:"aggressive_ttL"` // Time to live regardless of access if aggressive cleanup is triggered.
Disabled bool `yaml:"disabled"`
Interval time.Duration `yaml:"interval"` // How often cleanup runs.
TTI time.Duration `yaml:"tti"` // Time to idle based on last access time.
TTL time.Duration `yaml:"ttl"` // Time to live regardless of access. If 0, disables TTL.
ExcludeOtherServices bool `yaml:"exclude_other_services"` // Whether to exclude other services from the disk util calculation.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Anton-Kalpakchiev why are we making this config-driven?
In my opinion, Kraken's cache cleanup should always exclude other services from the disk utilization calculation. I don’t see a scenario where including other services’ disk utilization would improve the cache cleanup behavior.

Copy link
Collaborator Author

@Anton-Kalpakchiev Anton-Kalpakchiev Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The biggest reason is backward compatibility. Other than that, I agree that we don't need the option.

AggressiveThreshold int `yaml:"aggressive_threshold"` // The disk util threshold to trigger aggressive cleanup. If 0, disables aggressive cleanup.
AggressiveTTL time.Duration `yaml:"aggressive_ttL"` // Time to live regardless of access if aggressive cleanup is triggered.
}

type (
// Define a func type for mocking diskSpaceUtil function.
diskSpaceUtilFunc func() (int, error)
)

func (c CleanupConfig) applyDefaults() CleanupConfig {
if c.Interval == 0 {
c.Interval = 30 * time.Minute
Expand Down Expand Up @@ -87,6 +84,7 @@ func newCleanupManager(clk clock.Clock, stats tally.Scope) (*cleanupManager, err
// on the settings in config. op must set the desired states to clean before addJob
// is called.
func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp) {
jobStats := m.stats.Tagged(map[string]string{"job": tag})
config = config.applyDefaults()
if config.Disabled {
log.Warnf("Cleanup disabled for %s", op)
Expand All @@ -95,26 +93,17 @@ func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp
if config.TTL == 0 {
log.Warnf("TTL disabled for %s", op)
}

if config.AggressiveThreshold == 0 {
log.Warnf("Aggressive cleanup disabled for %s", op)
}

ticker := m.clk.Ticker(config.Interval)

usageGauge := m.stats.Tagged(map[string]string{"job": tag}).Gauge("disk_usage")

go func() {
for {
select {
case <-ticker.C:
log.Debugf("Performing cleanup of %s", op)
ttl := m.checkAggressiveCleanup(op, config, diskspaceutil.DiskSpaceUtil)
usage, err := m.scan(op, config.TTI, ttl)
if err != nil {
log.Errorf("Error scanning %s: %s", op, err)
}
usageGauge.Update(float64(usage))
m.clean(jobStats, config, op)
case <-m.stopc:
ticker.Stop()
return
Expand All @@ -123,32 +112,94 @@ func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp
}()
}

func (m *cleanupManager) stop() {
m.stopOnce.Do(func() { close(m.stopc) })
}

// scan scans the op for idle or expired files. Also returns the total disk usage
// of op.
func (m *cleanupManager) scan(
op base.FileOp, tti time.Duration, ttl time.Duration) (usage int64, err error) {
// clean deletes idle files from op based on the config.
func (m *cleanupManager) clean(jobStats tally.Scope, config CleanupConfig, op base.FileOp) {
log.Debugf("Performing cleanup of %s", op)
ttl := m.calculateTTL(jobStats, op, config, calculateDiskUtil)

names, err := op.ListNames()
if err != nil {
return 0, fmt.Errorf("list names: %s", err)
log.Errorf("Error cleaning cache: list names: %v", err)
}

var absUsage int64

for _, name := range names {
info, err := op.GetFileStat(name)
if err != nil {
log.With("name", name).Errorf("Error getting file stat: %s", err)
continue
}
if ready, err := m.readyForDeletion(op, name, info, tti, ttl); err != nil {
if ready, err := m.readyForDeletion(op, name, info, config.TTI, ttl); err != nil {
log.With("name", name).Errorf("Error checking if file expired: %s", err)
} else if ready {
if err := op.DeleteFile(name); err != nil && err != base.ErrFilePersisted {
log.With("name", name).Errorf("Error deleting expired file: %s", err)
}
}
absUsage += info.Size()
}

jobStats.Gauge("disk_usage").Update(float64(absUsage))
}

type diskUtilFn func(op base.FileOp, c CleanupConfig) (int, error)

// calculateTTL returns the TTL used for cleanup based on the config and current disk utilization.
func (m *cleanupManager) calculateTTL(jobStats tally.Scope, op base.FileOp, config CleanupConfig, calculateDiskUtil diskUtilFn) time.Duration {
if config.AggressiveThreshold == 0 {
return config.TTL
}

utilPercent, err := calculateDiskUtil(op, config)
if err != nil {
log.Errorf("Defaulting to normal TTL due to error calculating disk space util of %s: %v", op, err)
return config.TTL
}
jobStats.Tagged(map[string]string{"exclude_other_services": strconv.FormatBool(config.ExcludeOtherServices)}).
Gauge("disk_util").Update(float64(utilPercent))

if utilPercent >= config.AggressiveThreshold {
log.Debugf("Aggressive cleanup of %s triggers with disk space util %d", op, utilPercent)
return config.AggressiveTTL
}
return config.TTL
}

// calculateDiskUtil calculates the disk space utilization based on the config.
// If 'ExcludeOtherServices' is turned on, only this op's utilization of the filesystem is considered.
func calculateDiskUtil(op base.FileOp, config CleanupConfig) (int, error) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How could I test this?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no way around dependency injection

if config.ExcludeOtherServices {
fsSize, err := diskspaceutil.FileSystemSize()
if err != nil {
return 0, err
}
opSize, err := size(op)
if err != nil {
return 0, err
}
utilPercent := int(100 * opSize / fsSize)
return utilPercent, nil
}

utilPercent, err := diskspaceutil.FileSystemUtil()
if err != nil {
return 0, err
}
return utilPercent, nil
}

func size(op base.FileOp) (usage int64, err error) {
names, err := op.ListNames()
if err != nil {
return 0, fmt.Errorf("list names: %s", err)
}
for _, name := range names {
info, err := op.GetFileStat(name)
if err != nil {
log.With("name", name).Errorf("Error getting file stat: %s", err)
continue
}
usage += info.Size()
}
return usage, nil
Expand All @@ -174,17 +225,6 @@ func (m *cleanupManager) readyForDeletion(
return m.clk.Now().Sub(lat.Time) > tti, nil
}

func (m *cleanupManager) checkAggressiveCleanup(op base.FileOp, config CleanupConfig, util diskSpaceUtilFunc) time.Duration {
if config.AggressiveThreshold != 0 {
diskspaceutil, err := util()
if err != nil {
log.Errorf("Error checking disk space util %s: %s", op, err)
return config.TTL
}
if diskspaceutil >= config.AggressiveThreshold {
log.Debugf("Aggressive cleanup of %s triggers with disk space util %d", op, diskspaceutil)
return config.AggressiveTTL
}
}
return config.TTL
func (m *cleanupManager) stop() {
m.stopOnce.Do(func() { close(m.stopc) })
}
Loading