-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinternal.go
148 lines (121 loc) · 3.01 KB
/
internal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package hermes
import (
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
)
type internalStatus struct {
value string
ttl time.Duration
createdAt time.Time
expiresAt time.Time
}
var (
lock sync.RWMutex
gcInterval time.Duration
statusStore map[string]internalStatus
logger *logrus.Logger
changeIndex int64
)
func init() {
lock = sync.RWMutex{}
gcInterval = time.Duration(30) * time.Second
statusStore = make(map[string]internalStatus)
logger = logrus.StandardLogger()
go runPeriodicGc()
}
func SetGcInterval(interval time.Duration) {
gcInterval = interval
}
func SetLogger(l *logrus.Logger) {
logger = l
}
// ChangeIndex is a monotonically increasing integer
// that is incremented by one, for each change that happens.
// This index is useful in caching operations, to check whether
// a change has happened, before acting on it.
func GetChangeIndex() int64 {
lock.RLock()
defer lock.RUnlock()
return changeIndex
}
func RunGC() {
logger.Debugf("Hermes: Running GC cycle.")
lock.Lock()
defer lock.Unlock()
changed := false
for key, s := range statusStore {
if s.expiresAt.Before(time.Now()) {
if logger.Level >= logrus.DebugLevel {
logger.Debugf("Hermes: Key %s was created at %s with TTL %s (making expiry %s). It is now expired since the time is %s. Purging it.", key, s.createdAt.String(), s.ttl.String(), s.expiresAt.String(), time.Now().String())
}
delete(statusStore, key)
changed = true
}
}
if changed {
changeIndex++
}
}
func runPeriodicGc() {
logger.Debugf("Hermes: Started background GC.")
for {
logger.Debugf("Hermes: Sleeping for %s before next GC.", gcInterval.String())
time.Sleep(gcInterval)
RunGC()
}
}
func putStatus(key string, value string, ttl time.Duration) int64 {
if ttl <= 0 {
logger.Debugf("Hermes: Attempted to set key %s with a zero or negative TTL. Not setting it.", key)
return changeIndex
}
lock.Lock()
defer lock.Unlock()
changeIndex++
logger.Debugf("Hermes: storing key %s", key)
creationTime := time.Now()
s := internalStatus{
createdAt: creationTime,
ttl: ttl,
expiresAt: creationTime.Add(ttl),
value: value,
}
statusStore[key] = s
return changeIndex
}
func getStatus(key string) (string, bool, int64) {
lock.RLock()
defer lock.RUnlock()
value, ok := statusStore[key]
return value.value, ok, changeIndex
}
func deleteStatus(key string) int64 {
lock.Lock()
defer lock.Unlock()
changeIndex++
logger.Debugf("Hermes: deleting key %s", key)
delete(statusStore, key)
return changeIndex
}
func listKeys(prefix string) ([]string, int64) {
lock.RLock()
defer lock.RUnlock()
keys := make([]string, 0, len(statusStore))
for key, _ := range statusStore {
if strings.HasPrefix(key, prefix) {
keys = append(keys, key)
}
}
return keys, changeIndex
}
func shallowCopyStore() (map[string]internalStatus, int64) {
lock.RLock()
defer lock.RUnlock()
sc := make(map[string]internalStatus, len(statusStore))
for key, value := range statusStore {
sc[key] = value
}
return sc, changeIndex
}