Skip to content

Commit

Permalink
Improve thread safety (#2)
Browse files Browse the repository at this point in the history
* fix: thread safety | unit tests

* doc: add documentation
enh: optimized for thread safety under high concurrency
  • Loading branch information
ukashazia authored Feb 16, 2025
1 parent afb1158 commit 3ce9fc7
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 41 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
main.go
build/
.idea/
80 changes: 39 additions & 41 deletions ttl/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,54 @@ import (
"context"
"sync"
"time"

"github.com/google/uuid"
)

// TTL represents a time-based cache system with sharding and dynamic eviction.
type TTL struct {
DefaultTTL time.Duration
ShardSize uint64
CleanupInterval time.Duration
DefaultTTL time.Duration // Default time-to-live for items if not specified
ShardSize uint64 // Maximum number of items per shard
CleanupInterval time.Duration // Frequency of cleanup operations

shardLookupTable shardLookupTable
shardLookupTable shardLookupTable // Internal structure to manage shards
}

// Item represents a cache entry with expiration management.
type Item struct {
Key string
Value any
expirationTime time.Time
TTL time.Duration
mutex sync.RWMutex
Key string // Unique key identifier for the item
Value any // The stored value
expirationTime time.Time // The time when the item expires
TTL time.Duration // Custom TTL for the item (overrides DefaultTTL)
}

// shard represents an individual partition of the cache.
type shard struct {
id uint64
uuid uuid.UUID
data map[string]*Item
termFunc context.CancelFunc
mutex sync.RWMutex
id uint64 // Unique identifier for the shard
data map[string]*Item // Storage for cached items
termFunc context.CancelFunc // Function to terminate cleanup routines
isTerminated bool // Flag to indicate if the shard has been terminated
mutex sync.RWMutex // Mutex for safe concurrent access
}

// shards is a map of shard IDs to shard instances.
type shards map[uint64]*shard

// shardLookupTable maintains the mapping of shard IDs and the current active shard.
type shardLookupTable struct {
shards shards
currentShardId uint64
mutex sync.RWMutex
shards shards // Mapping of shard IDs to shards
currentShardId uint64 // ID of the currently active shard
mutex sync.RWMutex // Mutex for safe concurrent access
}

// Init initializes the TTL cache by creating the first shard.
func (ttl *TTL) Init() error {

newShard := shard{}
ttl.shardLookupTable = shardLookupTable{shards: make(shards)}

ttl.newShard(&newShard)

return nil
}

// Put inserts an item into the cache and returns the shard ID it was stored in.
func (ttl *TTL) Put(item *Item) (uint64, error) {

ttl.shardLookupTable.mutex.RLock()
Expand All @@ -64,27 +66,26 @@ func (ttl *TTL) Put(item *Item) (uint64, error) {
}

currentShard.mutex.Lock()
if uint64(len(currentShard.data)) < ttl.ShardSize {

defer currentShard.mutex.Unlock()
if uint64(len(currentShard.data)) < ttl.ShardSize && !currentShard.isTerminated && currentShard != nil {

currentShard.data[item.Key] = item
currentShard.mutex.Unlock()
} else {
currentShard.mutex.Unlock()

newShard := shard{}
ttl.newShard(&newShard)

newShard.mutex.Lock()
defer newShard.mutex.Unlock()

newShard.data[item.Key] = item
shardId = newShard.id
newShard.mutex.Unlock()
}

return shardId, nil
}

// Get retrieves an item from the cache given a key and shard ID.
func (ttl *TTL) Get(key string, shardId uint64) any {
ttl.shardLookupTable.mutex.RLock()
shard, exists := ttl.shardLookupTable.shards[shardId]
Expand All @@ -102,31 +103,29 @@ func (ttl *TTL) Get(key string, shardId uint64) any {
return nil
}

data.mutex.RLock()
defer data.mutex.RUnlock()

if data.expirationTime.After(time.Now()) {
return data.Value
} else {
return nil
}
return nil
}

// Delete removes an item from the cache.
func (ttl *TTL) Delete(key string, shardId uint64) {
ttl.shardLookupTable.mutex.RLock()
shard, exists := ttl.shardLookupTable.shards[shardId]
ttl.shardLookupTable.mutex.RUnlock()

if !exists {
return
}

shard.mutex.Lock()
defer shard.mutex.Unlock()

delete(shard.data, key)
}

// newShard creates and initializes a new shard.
func (ttl *TTL) newShard(shard *shard) {

ttl.shardLookupTable.mutex.Lock()
defer ttl.shardLookupTable.mutex.Unlock()

Expand All @@ -135,7 +134,6 @@ func (ttl *TTL) newShard(shard *shard) {

shard.id = newShardId
shard.data = make(map[string]*Item)
shard.uuid = uuid.New()
shard.termFunc = cancel

ttl.shardLookupTable.shards[newShardId] = shard
Expand All @@ -144,8 +142,9 @@ func (ttl *TTL) newShard(shard *shard) {
go shard.cleanup(ctx, ttl)
}

// cleanup periodically removes expired items and terminates empty shards.
func (shard *shard) cleanup(ctx context.Context, ttl *TTL) {
ticker := time.NewTicker(*&ttl.CleanupInterval)
ticker := time.NewTicker(ttl.CleanupInterval)
defer ticker.Stop()

for {
Expand All @@ -162,15 +161,12 @@ func (shard *shard) cleanup(ctx context.Context, ttl *TTL) {
}
}

if len(expiredKeys) > 0 {
for _, k := range expiredKeys {
delete(shard.data, k)
}
for _, k := range expiredKeys {
delete(shard.data, k)
}

shardEmpty := len(shard.data) == 0
if len(shard.data) == 0 {

if shardEmpty {
shard.mutex.Unlock()
ttl.terminateShard(shard)
return
Expand All @@ -180,6 +176,7 @@ func (shard *shard) cleanup(ctx context.Context, ttl *TTL) {
}
}

// terminateShard removes an empty shard from the lookup table.
func (ttl *TTL) terminateShard(shard *shard) {

ttl.shardLookupTable.mutex.Lock()
Expand All @@ -190,6 +187,7 @@ func (ttl *TTL) terminateShard(shard *shard) {

if _, exists := ttl.shardLookupTable.shards[shard.id]; exists {
shard.termFunc()
shard.isTerminated = true
delete(ttl.shardLookupTable.shards, shard.id)
}
}
Loading

0 comments on commit 3ce9fc7

Please sign in to comment.