Skip to content

Commit

Permalink
fix(626): remove file directly when delete segment
Browse files Browse the repository at this point in the history
  • Loading branch information
lsytj0413 committed Mar 1, 2025
1 parent d434094 commit 400f67a
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 258 deletions.
175 changes: 175 additions & 0 deletions server/wal/readonly_segment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright 2023 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package wal

import (
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"time"

"github.com/edsrzf/mmap-go"
"github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/streamnative/oxia/server/wal/codec"
)

func segmentPath(basePath string, firstOffset int64) string {
return filepath.Join(basePath, fmt.Sprintf("%d", firstOffset))
}

func fileOffset(idx []byte, firstOffset, offset int64) uint32 {
return codec.ReadInt(idx, uint32((offset-firstOffset)*4))
}

type ReadOnlySegment interface {
io.Closer

BaseOffset() int64
LastOffset() int64

LastCrc() uint32

Read(offset int64) ([]byte, error)

Delete() error

OpenTimestamp() time.Time
}

type readOnlySegment struct {
c *segmentConfig
// codec codec.Codec
// txnPath string
// idxPath string
// baseOffset int64
lastOffset int64
lastCrc uint32
closed bool

txnFile *os.File
txnMappedFile mmap.MMap

// Index file maps a logical "offset" to a physical file offset within the wal segment
idx []byte
openTimestamp time.Time
}

func newReadOnlySegment(basePath string, baseOffset int64) (ReadOnlySegment, error) {
c, err := newSegmentConfig(basePath, baseOffset)
if err != nil {
return nil, err
}

ms := &readOnlySegment{
c: c,
openTimestamp: time.Now(),
}

if ms.txnFile, err = os.OpenFile(ms.c.txnPath, os.O_RDONLY, 0); err != nil {
return nil, errors.Wrapf(err, "failed to open segment txn file %s", ms.c.txnPath)
}

if ms.txnMappedFile, err = mmap.MapRegion(ms.txnFile, -1, mmap.RDONLY, 0, 0); err != nil {
return nil, errors.Wrapf(err, "failed to map segment txn file %s", ms.c.txnPath)
}

if ms.idx, err = ms.c.codec.ReadIndex(ms.c.idxPath); err != nil {
if !errors.Is(err, codec.ErrDataCorrupted) {
return nil, errors.Wrapf(err, "failed to decode segment index file %s", ms.c.idxPath)
}
slog.Warn("The segment index file is corrupted and the index is being rebuilt.", slog.String("path", ms.c.idxPath))
// recover from txn
if ms.idx, _, _, _, err = ms.c.codec.RecoverIndex(ms.txnMappedFile, 0,
ms.c.baseOffset, nil); err != nil {
slog.Error("The segment index file rebuild failed.", slog.String("path", ms.c.idxPath))
return nil, errors.Wrapf(err, "failed to rebuild segment index file %s", ms.c.idxPath)
}
slog.Info("The segment index file has been rebuilt.", slog.String("path", ms.c.idxPath))
if err := ms.c.codec.WriteIndex(ms.c.idxPath, ms.idx); err != nil {
slog.Warn("write recovered segment index failed. it can continue work but will retry writing after restart.",
slog.String("path", ms.c.idxPath))
}
}

ms.lastOffset = ms.c.baseOffset + int64(len(ms.idx)/4-1)

// recover the last crc
fo := fileOffset(ms.idx, ms.c.baseOffset, ms.lastOffset)
if _, _, ms.lastCrc, err = ms.c.codec.ReadHeaderWithValidation(ms.txnMappedFile, fo); err != nil {
return nil, err
}
return ms, nil
}

func (ms *readOnlySegment) LastCrc() uint32 {
return ms.lastCrc
}

func (ms *readOnlySegment) BaseOffset() int64 {
return ms.c.baseOffset
}

func (ms *readOnlySegment) LastOffset() int64 {
return ms.lastOffset
}

func (ms *readOnlySegment) Read(offset int64) ([]byte, error) {
if offset < ms.c.baseOffset || offset > ms.lastOffset {
return nil, codec.ErrOffsetOutOfBounds
}
fileReadOffset := fileOffset(ms.idx, ms.c.baseOffset, offset)
var payload []byte
var err error
if payload, err = ms.c.codec.ReadRecordWithValidation(ms.txnMappedFile, fileReadOffset); err != nil {
if errors.Is(err, codec.ErrDataCorrupted) {
return nil, errors.Wrapf(err, "read record failed. entryOffset: %d", offset)
}
return nil, err
}
return payload, nil
}

func (ms *readOnlySegment) Close() error {
if ms.closed {
return nil
}

ms.closed = true
return multierr.Combine(
ms.txnMappedFile.Unmap(),
ms.txnFile.Close(),
)
}

func (ms *readOnlySegment) Delete() error {
return multierr.Combine(
ms.Close(),
os.Remove(ms.c.txnPath),
os.Remove(ms.c.idxPath),
)
}

func (ms *readOnlySegment) OpenTimestamp() time.Time {
return ms.openTimestamp
}

const (
maxReadOnlySegmentsInCacheCount = 5
maxReadOnlySegmentsInCacheTime = 5 * time.Minute
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@ package wal
import (
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"github.com/streamnative/oxia/proto"
"github.com/streamnative/oxia/server/wal/codec"
)

Expand Down Expand Up @@ -71,7 +68,7 @@ func TestRO_auto_recover_broken_index(t *testing.T) {
rwSegment := rw.(*readWriteSegment)
assert.NoError(t, rw.Close())

idxPath := rwSegment.idxPath
idxPath := rwSegment.c.idxPath
// inject wrong data
file, err := os.OpenFile(idxPath, os.O_RDWR, 0644)
assert.NoError(t, err)
Expand Down Expand Up @@ -102,64 +99,3 @@ func TestRO_auto_recover_broken_index(t *testing.T) {

assert.NoError(t, ro.Close())
}

func TestReadOnlySegmentsGroupTrimSegments(t *testing.T) {
basePath := t.TempDir()
t.Run("when newReadOnlySegment failed", func(t *testing.T) {
walFactory := NewWalFactory(&FactoryOptions{
BaseWalDir: basePath,
Retention: 1 * time.Hour,
SegmentSize: 128,
SyncData: true,
})
w, err := walFactory.NewWal("test", 0, nil)
assert.NoError(t, err)
for i := int64(0); i < 1000; i++ {
assert.NoError(t, w.Append(&proto.LogEntry{
Term: 1,
Offset: i,
Value: fmt.Appendf(nil, "test-%d", i),
}))
}
walBasePath := w.(*wal).walPath
readOnlySegments, err := newReadOnlySegmentsGroup(walBasePath)
assert.NoError(t, err)

// Ensure newReadOnlySegment will return an error
err = os.Truncate(filepath.Join(walBasePath, "0.txnx"), 0)
assert.NoError(t, err)

assert.NoError(t, err)
err = readOnlySegments.TrimSegments(6)
assert.Error(t, err)
})

t.Run("when txnFile is not exists", func(t *testing.T) {
walFactory := NewWalFactory(&FactoryOptions{
BaseWalDir: basePath,
Retention: 1 * time.Hour,
SegmentSize: 128,
SyncData: true,
})
w, err := walFactory.NewWal("test", 1, nil)
assert.NoError(t, err)
for i := int64(0); i < 1000; i++ {
assert.NoError(t, w.Append(&proto.LogEntry{
Term: 1,
Offset: i,
Value: fmt.Appendf(nil, "test-%d", i),
}))
}
walBasePath := w.(*wal).walPath
readOnlySegments, err := newReadOnlySegmentsGroup(walBasePath)
assert.NoError(t, err)

// Ensure newReadOnlySegment will return an NotExists error
err = os.Remove(filepath.Join(walBasePath, "0.txnx"))
assert.NoError(t, err)

assert.NoError(t, err)
err = readOnlySegments.TrimSegments(6)
assert.NoError(t, err)
})
}
Loading

0 comments on commit 400f67a

Please sign in to comment.