Skip to content

Commit 7eedc76

Browse files
committed
Add WAL pruning logic
1 parent 220d9d5 commit 7eedc76

File tree

1 file changed

+29
-14
lines changed

1 file changed

+29
-14
lines changed

stream/changelog/changelog.go

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"os"
77
"path/filepath"
8+
"time"
89

910
errorutils "github.com/sei-protocol/sei-db/common/errors"
1011
"github.com/sei-protocol/sei-db/common/logger"
@@ -16,12 +17,14 @@ import (
1617
var _ types.Stream[proto.ChangelogEntry] = (*Stream)(nil)
1718

1819
type Stream struct {
20+
dir string
1921
log *wal.Log
2022
config Config
2123
logger logger.Logger
2224
writeChannel chan *Message
2325
errSignal chan error
2426
nextOffset uint64
27+
isClosed bool
2528
}
2629

2730
type Message struct {
@@ -33,7 +36,8 @@ type Config struct {
3336
DisableFsync bool
3437
ZeroCopy bool
3538
WriteBufferSize int
36-
KeepLast int
39+
KeepRecent uint64
40+
PruneInterval time.Duration
3741
}
3842

3943
// NewStream creates a new changelog stream that persist the changesets in the log
@@ -45,20 +49,22 @@ func NewStream(logger logger.Logger, dir string, config Config) (*Stream, error)
4549
if err != nil {
4650
return nil, err
4751
}
48-
firstEntry, err := log.FirstIndex()
52+
stream := &Stream{
53+
dir: dir,
54+
log: log,
55+
config: config,
56+
logger: logger,
57+
isClosed: false,
58+
}
59+
startIndex, err := log.FirstIndex()
4960
if err != nil {
5061
return nil, err
5162
}
52-
if firstEntry <= 0 {
53-
}
54-
if config.KeepLast > 0 {
55-
63+
stream.nextOffset = startIndex + 1
64+
if config.KeepRecent > 0 {
65+
go stream.StartPruning(config.KeepRecent, config.PruneInterval)
5666
}
57-
return &Stream{
58-
log: log,
59-
config: config,
60-
logger: logger,
61-
}, nil
67+
return stream, nil
6268

6369
}
6470

@@ -195,9 +201,17 @@ func (stream *Stream) Replay(start uint64, end uint64, processFn func(index uint
195201
return nil
196202
}
197203

198-
//
199-
func (stream *Stream) Pruning() {
200-
204+
func (stream *Stream) StartPruning(keepRecent uint64, pruneInterval time.Duration) {
205+
for !stream.isClosed {
206+
lastIndex, _ := stream.log.LastIndex()
207+
firstIndex, _ := stream.log.FirstIndex()
208+
if lastIndex > keepRecent && (lastIndex-keepRecent) > firstIndex {
209+
prunePos := lastIndex - keepRecent
210+
err := stream.TruncateBefore(prunePos)
211+
stream.logger.Error(fmt.Sprintf("failed to prune changelog till index %d", prunePos), "err", err)
212+
}
213+
time.Sleep(pruneInterval)
214+
}
201215
}
202216

203217
func (stream *Stream) Close() error {
@@ -209,6 +223,7 @@ func (stream *Stream) Close() error {
209223
stream.writeChannel = nil
210224
stream.errSignal = nil
211225
errClose := stream.log.Close()
226+
stream.isClosed = true
212227
return errorutils.Join(err, errClose)
213228
}
214229

0 commit comments

Comments
 (0)