Skip to content

Commit

Permalink
fix(619): append into ReadWriteSegment when it exceeded size should n…
Browse files Browse the repository at this point in the history
…ot panic (#627)

Fix: #619

---------

Co-authored-by: Matteo Merli <mmerli@apache.org>
  • Loading branch information
lsytj0413 and merlimat authored Feb 27, 2025
1 parent 36819c9 commit 8c1d357
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 6 deletions.
1 change: 1 addition & 0 deletions server/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
ErrEntryNotFound = errors.New("oxia: entry not found")
ErrReaderClosed = errors.New("oxia: reader already closed")
ErrInvalidNextOffset = errors.New("oxia: invalid next offset in wal")
ErrSegmentFull = errors.New("oxia: current segment is full")

InvalidTerm int64 = -1
InvalidOffset int64 = -1
Expand Down
16 changes: 11 additions & 5 deletions server/wal/wal_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,23 @@ func (t *wal) AppendAsync(entry *proto.LogEntry) error {
}
}

if !t.currentSegment.HasSpace(len(val)) {
if err = t.currentSegment.Append(entry.Offset, val); err != nil {
if !errors.Is(err, ErrSegmentFull) {
t.writeErrors.Inc()
return err
}
if err = t.rolloverSegment(); err != nil {
t.writeErrors.Inc()
return errors.Wrap(err, "failed to rollover segment")
}
}

if err = t.currentSegment.Append(entry.Offset, val); err != nil {
t.writeErrors.Inc()
return err
// After the rollover, try to append again
if err = t.currentSegment.Append(entry.Offset, val); err != nil {
t.writeErrors.Inc()
return err
}
}

t.lastAppendedOffset.Store(entry.Offset)
t.firstOffset.CompareAndSwap(InvalidOffset, entry.Offset)

Expand Down
3 changes: 3 additions & 0 deletions server/wal/wal_rw_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ func (ms *readWriteSegment) Append(offset int64, data []byte) error {
if len(data) == 0 {
return codec.ErrEmptyPayload
}
if !ms.HasSpace(len(data)) {
return ErrSegmentFull
}
if offset != ms.lastOffset+1 {
return ErrInvalidNextOffset
}
Expand Down
14 changes: 14 additions & 0 deletions server/wal/wal_rw_segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package wal

import (
"encoding/binary"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -296,3 +297,16 @@ func TestReadWriteSegment_BrokenCommittedData_ErrDataCorrupted(t *testing.T) {
_, err = newReadWriteSegment(dir, 0, 1024, 0, commitOffsetProvider)
assert.ErrorIs(t, err, codec.ErrDataCorrupted)
}

func TestSegmentAppendShouldNotPanic(t *testing.T) {
basePath := t.TempDir()
rw, err := newReadWriteSegment(basePath, 0, 1024, 0, nil)
assert.NoError(t, err)
for i := int64(0); i < 51; i++ {
err := rw.Append(i, fmt.Appendf(nil, "entry-%d", i))
assert.NoError(t, err)
}

err = rw.Append(51, fmt.Appendf(nil, "entry-%d", 51))
assert.ErrorIs(t, err, ErrSegmentFull)
}
7 changes: 6 additions & 1 deletion server/wal/wal_trimmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ func TestWalTrimUpToCommitOffset(t *testing.T) {
Retention: 2 * time.Millisecond,
SegmentSize: 128 * 1024,
}

slog.Info("Starting",
slog.String("TestName", t.Name()),
slog.String("BaseWalDir", t.TempDir()),
)
clock := &common.MockedClock{}
commitOffsetProvider := &mockedCommitOffsetProvider{}
commitOffsetProvider.commitOffset.Store(math.MaxInt64)
Expand Down Expand Up @@ -133,6 +136,7 @@ func TestWalTrimUpToCommitOffset(t *testing.T) {
slog.Info(
"checking...",
slog.Int64("first-offset", w.FirstOffset()),
slog.String("TestName", t.Name()),
)
return w.FirstOffset() == 2
}, 10*time.Second, 10*time.Millisecond)
Expand All @@ -151,6 +155,7 @@ func TestWalTrimUpToCommitOffset(t *testing.T) {
slog.Info(
"checking...",
slog.Int64("first-offset", w.FirstOffset()),
slog.String("TestName", t.Name()),
)
return w.FirstOffset() == 87
}, 10*time.Second, 10*time.Millisecond)
Expand Down

0 comments on commit 8c1d357

Please sign in to comment.