From 400f67a9ca8e46131dfd8f1e1cd58e24d3e971a6 Mon Sep 17 00:00:00 2001 From: Soren Yang Date: Sat, 1 Mar 2025 14:11:14 +0800 Subject: [PATCH] fix(626): remove file directly when delete segment --- server/wal/readonly_segment.go | 175 +++++++++++++++++ ...gment_test.go => readonly_segment_test.go} | 66 +------ ..._segment.go => readonly_segments_group.go} | 178 ++---------------- server/wal/readonly_segments_group_test.go | 58 ++++++ ...wal_rw_segment.go => readwrite_segment.go} | 50 +++-- ...ment_test.go => readwrite_segment_test.go} | 2 +- server/wal/segment_config.go | 42 +++++ server/wal/{wal_trimmer.go => trimmer.go} | 0 .../{wal_trimmer_test.go => trimmer_test.go} | 0 9 files changed, 313 insertions(+), 258 deletions(-) create mode 100644 server/wal/readonly_segment.go rename server/wal/{wal_ro_segment_test.go => readonly_segment_test.go} (59%) rename server/wal/{wal_ro_segment.go => readonly_segments_group.go} (51%) create mode 100644 server/wal/readonly_segments_group_test.go rename server/wal/{wal_rw_segment.go => readwrite_segment.go} (76%) rename server/wal/{wal_rw_segment_test.go => readwrite_segment_test.go} (99%) create mode 100644 server/wal/segment_config.go rename server/wal/{wal_trimmer.go => trimmer.go} (100%) rename server/wal/{wal_trimmer_test.go => trimmer_test.go} (100%) diff --git a/server/wal/readonly_segment.go b/server/wal/readonly_segment.go new file mode 100644 index 00000000..d9b15e3b --- /dev/null +++ b/server/wal/readonly_segment.go @@ -0,0 +1,175 @@ +// Copyright 2023 StreamNative, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "fmt" + "io" + "log/slog" + "os" + "path/filepath" + "time" + + "github.com/edsrzf/mmap-go" + "github.com/pkg/errors" + "go.uber.org/multierr" + + "github.com/streamnative/oxia/server/wal/codec" +) + +func segmentPath(basePath string, firstOffset int64) string { + return filepath.Join(basePath, fmt.Sprintf("%d", firstOffset)) +} + +func fileOffset(idx []byte, firstOffset, offset int64) uint32 { + return codec.ReadInt(idx, uint32((offset-firstOffset)*4)) +} + +type ReadOnlySegment interface { + io.Closer + + BaseOffset() int64 + LastOffset() int64 + + LastCrc() uint32 + + Read(offset int64) ([]byte, error) + + Delete() error + + OpenTimestamp() time.Time +} + +type readOnlySegment struct { + c *segmentConfig + // codec codec.Codec + // txnPath string + // idxPath string + // baseOffset int64 + lastOffset int64 + lastCrc uint32 + closed bool + + txnFile *os.File + txnMappedFile mmap.MMap + + // Index file maps a logical "offset" to a physical file offset within the wal segment + idx []byte + openTimestamp time.Time +} + +func newReadOnlySegment(basePath string, baseOffset int64) (ReadOnlySegment, error) { + c, err := newSegmentConfig(basePath, baseOffset) + if err != nil { + return nil, err + } + + ms := &readOnlySegment{ + c: c, + openTimestamp: time.Now(), + } + + if ms.txnFile, err = os.OpenFile(ms.c.txnPath, os.O_RDONLY, 0); err != nil { + return nil, errors.Wrapf(err, "failed to open segment txn file %s", ms.c.txnPath) + } + + if ms.txnMappedFile, err = mmap.MapRegion(ms.txnFile, -1, mmap.RDONLY, 0, 0); err != nil { + return nil, errors.Wrapf(err, "failed to map segment txn file %s", ms.c.txnPath) + } + + if ms.idx, err = ms.c.codec.ReadIndex(ms.c.idxPath); err != nil { + if !errors.Is(err, codec.ErrDataCorrupted) { + return nil, errors.Wrapf(err, "failed to decode segment index file %s", ms.c.idxPath) + } + slog.Warn("The segment index file is corrupted and the index is being rebuilt.", slog.String("path", ms.c.idxPath)) + // recover from txn + if ms.idx, _, _, _, err = ms.c.codec.RecoverIndex(ms.txnMappedFile, 0, + ms.c.baseOffset, nil); err != nil { + slog.Error("The segment index file rebuild failed.", slog.String("path", ms.c.idxPath)) + return nil, errors.Wrapf(err, "failed to rebuild segment index file %s", ms.c.idxPath) + } + slog.Info("The segment index file has been rebuilt.", slog.String("path", ms.c.idxPath)) + if err := ms.c.codec.WriteIndex(ms.c.idxPath, ms.idx); err != nil { + slog.Warn("write recovered segment index failed. it can continue work but will retry writing after restart.", + slog.String("path", ms.c.idxPath)) + } + } + + ms.lastOffset = ms.c.baseOffset + int64(len(ms.idx)/4-1) + + // recover the last crc + fo := fileOffset(ms.idx, ms.c.baseOffset, ms.lastOffset) + if _, _, ms.lastCrc, err = ms.c.codec.ReadHeaderWithValidation(ms.txnMappedFile, fo); err != nil { + return nil, err + } + return ms, nil +} + +func (ms *readOnlySegment) LastCrc() uint32 { + return ms.lastCrc +} + +func (ms *readOnlySegment) BaseOffset() int64 { + return ms.c.baseOffset +} + +func (ms *readOnlySegment) LastOffset() int64 { + return ms.lastOffset +} + +func (ms *readOnlySegment) Read(offset int64) ([]byte, error) { + if offset < ms.c.baseOffset || offset > ms.lastOffset { + return nil, codec.ErrOffsetOutOfBounds + } + fileReadOffset := fileOffset(ms.idx, ms.c.baseOffset, offset) + var payload []byte + var err error + if payload, err = ms.c.codec.ReadRecordWithValidation(ms.txnMappedFile, fileReadOffset); err != nil { + if errors.Is(err, codec.ErrDataCorrupted) { + return nil, errors.Wrapf(err, "read record failed. entryOffset: %d", offset) + } + return nil, err + } + return payload, nil +} + +func (ms *readOnlySegment) Close() error { + if ms.closed { + return nil + } + + ms.closed = true + return multierr.Combine( + ms.txnMappedFile.Unmap(), + ms.txnFile.Close(), + ) +} + +func (ms *readOnlySegment) Delete() error { + return multierr.Combine( + ms.Close(), + os.Remove(ms.c.txnPath), + os.Remove(ms.c.idxPath), + ) +} + +func (ms *readOnlySegment) OpenTimestamp() time.Time { + return ms.openTimestamp +} + +const ( + maxReadOnlySegmentsInCacheCount = 5 + maxReadOnlySegmentsInCacheTime = 5 * time.Minute +) diff --git a/server/wal/wal_ro_segment_test.go b/server/wal/readonly_segment_test.go similarity index 59% rename from server/wal/wal_ro_segment_test.go rename to server/wal/readonly_segment_test.go index 02c5012f..19cd9f45 100644 --- a/server/wal/wal_ro_segment_test.go +++ b/server/wal/readonly_segment_test.go @@ -17,14 +17,11 @@ package wal import ( "fmt" "os" - "path/filepath" "testing" - "time" "github.com/google/uuid" "github.com/stretchr/testify/assert" - "github.com/streamnative/oxia/proto" "github.com/streamnative/oxia/server/wal/codec" ) @@ -71,7 +68,7 @@ func TestRO_auto_recover_broken_index(t *testing.T) { rwSegment := rw.(*readWriteSegment) assert.NoError(t, rw.Close()) - idxPath := rwSegment.idxPath + idxPath := rwSegment.c.idxPath // inject wrong data file, err := os.OpenFile(idxPath, os.O_RDWR, 0644) assert.NoError(t, err) @@ -102,64 +99,3 @@ func TestRO_auto_recover_broken_index(t *testing.T) { assert.NoError(t, ro.Close()) } - -func TestReadOnlySegmentsGroupTrimSegments(t *testing.T) { - basePath := t.TempDir() - t.Run("when newReadOnlySegment failed", func(t *testing.T) { - walFactory := NewWalFactory(&FactoryOptions{ - BaseWalDir: basePath, - Retention: 1 * time.Hour, - SegmentSize: 128, - SyncData: true, - }) - w, err := walFactory.NewWal("test", 0, nil) - assert.NoError(t, err) - for i := int64(0); i < 1000; i++ { - assert.NoError(t, w.Append(&proto.LogEntry{ - Term: 1, - Offset: i, - Value: fmt.Appendf(nil, "test-%d", i), - })) - } - walBasePath := w.(*wal).walPath - readOnlySegments, err := newReadOnlySegmentsGroup(walBasePath) - assert.NoError(t, err) - - // Ensure newReadOnlySegment will return an error - err = os.Truncate(filepath.Join(walBasePath, "0.txnx"), 0) - assert.NoError(t, err) - - assert.NoError(t, err) - err = readOnlySegments.TrimSegments(6) - assert.Error(t, err) - }) - - t.Run("when txnFile is not exists", func(t *testing.T) { - walFactory := NewWalFactory(&FactoryOptions{ - BaseWalDir: basePath, - Retention: 1 * time.Hour, - SegmentSize: 128, - SyncData: true, - }) - w, err := walFactory.NewWal("test", 1, nil) - assert.NoError(t, err) - for i := int64(0); i < 1000; i++ { - assert.NoError(t, w.Append(&proto.LogEntry{ - Term: 1, - Offset: i, - Value: fmt.Appendf(nil, "test-%d", i), - })) - } - walBasePath := w.(*wal).walPath - readOnlySegments, err := newReadOnlySegmentsGroup(walBasePath) - assert.NoError(t, err) - - // Ensure newReadOnlySegment will return an NotExists error - err = os.Remove(filepath.Join(walBasePath, "0.txnx")) - assert.NoError(t, err) - - assert.NoError(t, err) - err = readOnlySegments.TrimSegments(6) - assert.NoError(t, err) - }) -} diff --git a/server/wal/wal_ro_segment.go b/server/wal/readonly_segments_group.go similarity index 51% rename from server/wal/wal_ro_segment.go rename to server/wal/readonly_segments_group.go index 9a64d779..4eaf35bf 100644 --- a/server/wal/wal_ro_segment.go +++ b/server/wal/readonly_segments_group.go @@ -1,4 +1,4 @@ -// Copyright 2023 StreamNative, Inc. +// Copyright 2025 StreamNative, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,169 +15,17 @@ package wal import ( - "fmt" "io" - "log/slog" "os" - "path/filepath" "sync" "time" - "github.com/edsrzf/mmap-go" - "github.com/pkg/errors" "go.uber.org/multierr" "github.com/streamnative/oxia/common" "github.com/streamnative/oxia/server/wal/codec" ) -func segmentPath(basePath string, firstOffset int64) string { - return filepath.Join(basePath, fmt.Sprintf("%d", firstOffset)) -} - -func fileOffset(idx []byte, firstOffset, offset int64) uint32 { - return codec.ReadInt(idx, uint32((offset-firstOffset)*4)) -} - -type ReadOnlySegment interface { - io.Closer - - BaseOffset() int64 - LastOffset() int64 - - LastCrc() uint32 - - Read(offset int64) ([]byte, error) - - Delete() error - - OpenTimestamp() time.Time -} - -type readonlySegment struct { - codec codec.Codec - txnPath string - idxPath string - baseOffset int64 - lastOffset int64 - lastCrc uint32 - closed bool - - txnFile *os.File - txnMappedFile mmap.MMap - - // Index file maps a logical "offset" to a physical file offset within the wal segment - idx []byte - openTimestamp time.Time -} - -func newReadOnlySegment(basePath string, baseOffset int64) (ReadOnlySegment, error) { - _codec, _, err := codec.GetOrCreate(segmentPath(basePath, baseOffset)) - if err != nil { - return nil, err - } - - ms := &readonlySegment{ - codec: _codec, - txnPath: segmentPath(basePath, baseOffset) + _codec.GetTxnExtension(), - idxPath: segmentPath(basePath, baseOffset) + _codec.GetIdxExtension(), - baseOffset: baseOffset, - openTimestamp: time.Now(), - } - - if ms.txnFile, err = os.OpenFile(ms.txnPath, os.O_RDONLY, 0); err != nil { - return nil, errors.Wrapf(err, "failed to open segment txn file %s", ms.txnPath) - } - - if ms.txnMappedFile, err = mmap.MapRegion(ms.txnFile, -1, mmap.RDONLY, 0, 0); err != nil { - return nil, errors.Wrapf(err, "failed to map segment txn file %s", ms.txnPath) - } - - if ms.idx, err = ms.codec.ReadIndex(ms.idxPath); err != nil { - if !errors.Is(err, codec.ErrDataCorrupted) { - return nil, errors.Wrapf(err, "failed to decode segment index file %s", ms.idxPath) - } - slog.Warn("The segment index file is corrupted and the index is being rebuilt.", slog.String("path", ms.idxPath)) - // recover from txn - if ms.idx, _, _, _, err = ms.codec.RecoverIndex(ms.txnMappedFile, 0, - ms.baseOffset, nil); err != nil { - slog.Error("The segment index file rebuild failed.", slog.String("path", ms.idxPath)) - return nil, errors.Wrapf(err, "failed to rebuild segment index file %s", ms.idxPath) - } - slog.Info("The segment index file has been rebuilt.", slog.String("path", ms.idxPath)) - if err := ms.codec.WriteIndex(ms.idxPath, ms.idx); err != nil { - slog.Warn("write recovered segment index failed. it can continue work but will retry writing after restart.", - slog.String("path", ms.idxPath)) - } - } - - ms.lastOffset = ms.baseOffset + int64(len(ms.idx)/4-1) - - // recover the last crc - fo := fileOffset(ms.idx, ms.baseOffset, ms.lastOffset) - if _, _, ms.lastCrc, err = ms.codec.ReadHeaderWithValidation(ms.txnMappedFile, fo); err != nil { - return nil, err - } - return ms, nil -} - -func (ms *readonlySegment) LastCrc() uint32 { - return ms.lastCrc -} - -func (ms *readonlySegment) BaseOffset() int64 { - return ms.baseOffset -} - -func (ms *readonlySegment) LastOffset() int64 { - return ms.lastOffset -} - -func (ms *readonlySegment) Read(offset int64) ([]byte, error) { - if offset < ms.baseOffset || offset > ms.lastOffset { - return nil, codec.ErrOffsetOutOfBounds - } - fileReadOffset := fileOffset(ms.idx, ms.baseOffset, offset) - var payload []byte - var err error - if payload, err = ms.codec.ReadRecordWithValidation(ms.txnMappedFile, fileReadOffset); err != nil { - if errors.Is(err, codec.ErrDataCorrupted) { - return nil, errors.Wrapf(err, "read record failed. entryOffset: %d", offset) - } - return nil, err - } - return payload, nil -} - -func (ms *readonlySegment) Close() error { - if ms.closed { - return nil - } - - ms.closed = true - return multierr.Combine( - ms.txnMappedFile.Unmap(), - ms.txnFile.Close(), - ) -} - -func (ms *readonlySegment) Delete() error { - return multierr.Combine( - ms.Close(), - os.Remove(ms.txnPath), - os.Remove(ms.idxPath), - ) -} - -func (ms *readonlySegment) OpenTimestamp() time.Time { - return ms.openTimestamp -} - -const ( - maxReadOnlySegmentsInCacheCount = 5 - maxReadOnlySegmentsInCacheTime = 5 * time.Minute -) - type ReadOnlySegmentsGroup interface { io.Closer @@ -311,17 +159,19 @@ func (r *readOnlySegmentsGroup) TrimSegments(offset int64) error { if segment, ok := r.openSegments.Get(s); ok { err = multierr.Append(err, segment.Get().Delete()) r.openSegments.Remove(s) - } else { - if segment, err2 := newReadOnlySegment(r.basePath, s); err2 != nil { - // When the error is NotExists, it means the segment was deleted, - // so we can ignore it. - // TODO: There is a lot of errors when newReadOnlySegment, we should avoid all of it. - if !errors.Is(err2, os.ErrNotExist) { - err = multierr.Append(err, err2) - } - } else { - err = multierr.Append(err, segment.Delete()) - } + continue + } + + c, err2 := newSegmentConfig(r.basePath, s) + if err != nil { + err = multierr.Append(err, err2) + } + err2 = multierr.Combine( + os.Remove(c.idxPath), + os.Remove(c.txnPath), + ) + if err != nil { + err = multierr.Append(err, err2) } } diff --git a/server/wal/readonly_segments_group_test.go b/server/wal/readonly_segments_group_test.go new file mode 100644 index 00000000..03a23153 --- /dev/null +++ b/server/wal/readonly_segments_group_test.go @@ -0,0 +1,58 @@ +// Copyright 2025 StreamNative, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/streamnative/oxia/proto" +) + +func TestReadOnlySegmentsGroupTrimSegments(t *testing.T) { + t.Run("when txnFile is not exists", func(t *testing.T) { + walFactory := NewWalFactory(&FactoryOptions{ + BaseWalDir: t.TempDir(), + Retention: 1 * time.Hour, + SegmentSize: 128, + SyncData: true, + }) + w, err := walFactory.NewWal("test", 1, nil) + assert.NoError(t, err) + for i := int64(0); i < 1000; i++ { + assert.NoError(t, w.Append(&proto.LogEntry{ + Term: 1, + Offset: i, + Value: fmt.Appendf(nil, "test-%d", i), + })) + } + walBasePath := w.(*wal).walPath + readOnlySegments, err := newReadOnlySegmentsGroup(walBasePath) + assert.NoError(t, err) + + // Ensure newReadOnlySegment will return an NotExists error + err = os.Remove(filepath.Join(walBasePath, "0.txnx")) + assert.NoError(t, err) + + assert.NoError(t, err) + err = readOnlySegments.TrimSegments(6) + assert.NoError(t, err) + }) +} diff --git a/server/wal/wal_rw_segment.go b/server/wal/readwrite_segment.go similarity index 76% rename from server/wal/wal_rw_segment.go rename to server/wal/readwrite_segment.go index 60d0b2da..5611affa 100644 --- a/server/wal/wal_rw_segment.go +++ b/server/wal/readwrite_segment.go @@ -42,11 +42,8 @@ type ReadWriteSegment interface { type readWriteSegment struct { sync.RWMutex - codec codec.Codec - txnPath string - idxPath string + c *segmentConfig - baseOffset int64 lastOffset int64 lastCrc uint32 txnFile *os.File @@ -67,32 +64,29 @@ func newReadWriteSegment(basePath string, baseOffset int64, segmentSize uint32, } } - _codec, segmentExists, err := codec.GetOrCreate(segmentPath(basePath, baseOffset)) + c, err := newSegmentConfig(basePath, baseOffset) if err != nil { return nil, err } ms := &readWriteSegment{ - codec: _codec, - txnPath: segmentPath(basePath, baseOffset) + _codec.GetTxnExtension(), - idxPath: segmentPath(basePath, baseOffset) + _codec.GetIdxExtension(), - baseOffset: baseOffset, + c: c, segmentSize: segmentSize, lastCrc: lastCrc, } - if ms.txnFile, err = os.OpenFile(ms.txnPath, os.O_CREATE|os.O_RDWR, 0644); err != nil { - return nil, errors.Wrapf(err, "failed to open segment file %s", ms.txnPath) + if ms.txnFile, err = os.OpenFile(ms.c.txnPath, os.O_CREATE|os.O_RDWR, 0644); err != nil { + return nil, errors.Wrapf(err, "failed to open segment file %s", ms.c.txnPath) } - if !segmentExists { + if !c.segmentExists { if err = initFileWithZeroes(ms.txnFile, segmentSize); err != nil { return nil, err } } if ms.txnMappedFile, err = mmap.MapRegion(ms.txnFile, int(segmentSize), mmap.RDWR, 0, 0); err != nil { - return nil, errors.Wrapf(err, "failed to map segment file %s", ms.txnPath) + return nil, errors.Wrapf(err, "failed to map segment file %s", ms.c.txnPath) } var commitOffset *int64 @@ -102,9 +96,9 @@ func newReadWriteSegment(basePath string, baseOffset int64, segmentSize uint32, } else { commitOffset = nil } - if ms.writingIdx, ms.lastCrc, ms.currentFileOffset, ms.lastOffset, err = ms.codec.RecoverIndex(ms.txnMappedFile, - ms.currentFileOffset, ms.baseOffset, commitOffset); err != nil { - return nil, errors.Wrapf(err, "failed to rebuild index for segment file %s", ms.txnPath) + if ms.writingIdx, ms.lastCrc, ms.currentFileOffset, ms.lastOffset, err = ms.c.codec.RecoverIndex(ms.txnMappedFile, + ms.currentFileOffset, ms.c.baseOffset, commitOffset); err != nil { + return nil, errors.Wrapf(err, "failed to rebuild index for segment file %s", ms.c.txnPath) } return ms, nil } @@ -116,7 +110,7 @@ func (ms *readWriteSegment) LastCrc() uint32 { } func (ms *readWriteSegment) BaseOffset() int64 { - return ms.baseOffset + return ms.c.baseOffset } func (ms *readWriteSegment) LastOffset() int64 { @@ -131,10 +125,10 @@ func (ms *readWriteSegment) Read(offset int64) ([]byte, error) { defer ms.Unlock() // todo: we might need validate if the offset less than base offset - fileReadOffset := fileOffset(ms.writingIdx, ms.baseOffset, offset) + fileReadOffset := fileOffset(ms.writingIdx, ms.c.baseOffset, offset) var payload []byte var err error - if payload, err = ms.codec.ReadRecordWithValidation(ms.txnMappedFile, fileReadOffset); err != nil { + if payload, err = ms.c.codec.ReadRecordWithValidation(ms.txnMappedFile, fileReadOffset); err != nil { if errors.Is(err, codec.ErrDataCorrupted) { return nil, errors.Wrapf(err, "read record failed. entryOffset: %d", offset) } @@ -144,7 +138,7 @@ func (ms *readWriteSegment) Read(offset int64) ([]byte, error) { } func (ms *readWriteSegment) HasSpace(l int) bool { - return ms.currentFileOffset+ms.codec.GetHeaderSize()+uint32(l) <= ms.segmentSize + return ms.currentFileOffset+ms.c.codec.GetHeaderSize()+uint32(l) <= ms.segmentSize } func (ms *readWriteSegment) Append(offset int64, data []byte) error { @@ -163,7 +157,7 @@ func (ms *readWriteSegment) Append(offset int64, data []byte) error { fOffset := ms.currentFileOffset var recordSize uint32 - recordSize, ms.lastCrc = ms.codec.WriteRecord(ms.txnMappedFile, fOffset, ms.lastCrc, data) + recordSize, ms.lastCrc = ms.c.codec.WriteRecord(ms.txnMappedFile, fOffset, ms.lastCrc, data) ms.currentFileOffset += recordSize ms.lastOffset = offset ms.writingIdx = binary.BigEndian.AppendUint32(ms.writingIdx, fOffset) @@ -186,7 +180,7 @@ func (ms *readWriteSegment) Close() error { ms.txnMappedFile.Unmap(), ms.txnFile.Close(), // Write index file - ms.codec.WriteIndex(ms.idxPath, ms.writingIdx), + ms.c.codec.WriteIndex(ms.c.idxPath, ms.writingIdx), ) codec.ReturnIndexBuf(&ms.writingIdx) return err @@ -195,21 +189,21 @@ func (ms *readWriteSegment) Close() error { func (ms *readWriteSegment) Delete() error { return multierr.Combine( ms.Close(), - os.Remove(ms.txnPath), - os.Remove(ms.idxPath), + os.Remove(ms.c.txnPath), + os.Remove(ms.c.idxPath), ) } func (ms *readWriteSegment) Truncate(lastSafeOffset int64) error { - if lastSafeOffset < ms.baseOffset || lastSafeOffset > ms.lastOffset { + if lastSafeOffset < ms.c.baseOffset || lastSafeOffset > ms.lastOffset { return codec.ErrOffsetOutOfBounds } // Write zeroes in the section to clear - fileLastSafeOffset := fileOffset(ms.writingIdx, ms.baseOffset, lastSafeOffset) + fileLastSafeOffset := fileOffset(ms.writingIdx, ms.c.baseOffset, lastSafeOffset) var recordSize uint32 var err error - if recordSize, err = ms.codec.GetRecordSize(ms.txnMappedFile, fileLastSafeOffset); err != nil { + if recordSize, err = ms.c.codec.GetRecordSize(ms.txnMappedFile, fileLastSafeOffset); err != nil { return err } fileEndOffset := fileLastSafeOffset + recordSize @@ -218,7 +212,7 @@ func (ms *readWriteSegment) Truncate(lastSafeOffset int64) error { } // Truncate the index - ms.writingIdx = ms.writingIdx[:4*(lastSafeOffset-ms.baseOffset+1)] + ms.writingIdx = ms.writingIdx[:4*(lastSafeOffset-ms.c.baseOffset+1)] ms.currentFileOffset = fileEndOffset ms.lastOffset = lastSafeOffset return ms.Flush() diff --git a/server/wal/wal_rw_segment_test.go b/server/wal/readwrite_segment_test.go similarity index 99% rename from server/wal/wal_rw_segment_test.go rename to server/wal/readwrite_segment_test.go index de5ba337..b1c2d6eb 100644 --- a/server/wal/wal_rw_segment_test.go +++ b/server/wal/readwrite_segment_test.go @@ -102,7 +102,7 @@ func TestReadWriteSegment_HasSpace(t *testing.T) { rw, err := newReadWriteSegment(t.TempDir(), 0, 1024, 0, nil) assert.NoError(t, err) segment := rw.(*readWriteSegment) - headerSize := int(segment.codec.GetHeaderSize()) + headerSize := int(segment.c.codec.GetHeaderSize()) assert.True(t, rw.HasSpace(10)) assert.False(t, rw.HasSpace(1024)) diff --git a/server/wal/segment_config.go b/server/wal/segment_config.go new file mode 100644 index 00000000..470a4532 --- /dev/null +++ b/server/wal/segment_config.go @@ -0,0 +1,42 @@ +// Copyright 2025 StreamNative, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "github.com/streamnative/oxia/server/wal/codec" +) + +type segmentConfig struct { + codec codec.Codec + segmentExists bool + txnPath string + idxPath string + baseOffset int64 +} + +func newSegmentConfig(basePath string, baseOffset int64) (*segmentConfig, error) { + _codec, segmentExists, err := codec.GetOrCreate(segmentPath(basePath, baseOffset)) + if err != nil { + return nil, err + } + + return &segmentConfig{ + codec: _codec, + segmentExists: segmentExists, + txnPath: segmentPath(basePath, baseOffset) + _codec.GetTxnExtension(), + idxPath: segmentPath(basePath, baseOffset) + _codec.GetIdxExtension(), + baseOffset: baseOffset, + }, nil +} diff --git a/server/wal/wal_trimmer.go b/server/wal/trimmer.go similarity index 100% rename from server/wal/wal_trimmer.go rename to server/wal/trimmer.go diff --git a/server/wal/wal_trimmer_test.go b/server/wal/trimmer_test.go similarity index 100% rename from server/wal/wal_trimmer_test.go rename to server/wal/trimmer_test.go