Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compact): add archive logic #10

Merged
merged 1 commit into from
Jun 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions docs/wal_compact.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
### compaction approach

- wal file exists

- every operation runs through wal

- check wal file size on regular basis

- have a default size configured

- if current size exceeds configured size

- start compaction

- create a new wal file

- redirect operations to newly created file

- create a tar file

- add wal file with data to tar

- remove the old file

#### considerations

- how to handle operations to wal file during compaction?

create a new wal file, redirect operations to new file

- how to handle any error or failure during compaction?

remove the tar file created

raise a warn level message to indicate failure

**Todo**

- [ ] find a way to recover from failure and deal with multiple wal files during recovery cycle

- [ ] what happens when drive used to store archive is full

- [ ] dependency on `.local` folder in `home` location
139 changes: 139 additions & 0 deletions wal/compact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// compact check file size
// when size exceeds configured value
// it creates archive and uses a new file
package wal

import (
"archive/tar"
"compress/gzip"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strconv"
"time"
)

var defaultMaxFileSize int64

const DEFAULT_SIZE = "DEFAULT_SIZE"

// getDefaultFileSize gets the configured
// allowed file size for wal, when none defined
// it uses default `3*4096` as size
func getDefaultFileSize() {
ev := os.Getenv(DEFAULT_SIZE)
if ev == "" {
ev = getDefaultSize()
}
fs, err := strconv.Atoi(ev)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", "error getting wal file size from env var")
os.Exit(1)
}
defaultMaxFileSize = int64(fs)
}

func getDefaultSize() string {
dsize := strconv.Itoa(3 * 4096)
return dsize
}

// compact checks the wal file size
// when size exceeds predefined size `4*4096`
// it triggers the compact routine
// creates a new archive file and save it disk
func (w *Wal) compact() {
// check existing wal file size
walInfo, err := w.getWalFileInfo()
if err != nil {
fmt.Fprintf(os.Stderr, "error:[%v] getting wal file size", err)
}
if walInfo.Size() > defaultMaxFileSize {
fmt.Fprintf(os.Stdout, "[info]: start compacting:%s\n", walInfo.Name())
archive, err := createArchive(archiveFileName())
if err != nil {
fmt.Fprintf(os.Stderr,
"error:[%v] creating archive with name-%s\n", err, archiveFileName())
}
if err := w.writeToArchive(archive, walInfo, archiveFileName()); err != nil {
fmt.Fprintf(os.Stderr, "error:[%v] writing file to archive", err)
}
}
}

func (w *Wal) writeToArchive(archiveFile io.ReadWriter,
walInfo fs.FileInfo, archiveFileName string) error {
gzr := gzip.NewWriter(archiveFile)
defer gzr.Flush()
defer gzr.Close()
twr := tar.NewWriter(gzr)
defer twr.Close()
tarHeader, err := tar.FileInfoHeader(walInfo, walInfo.Name())
if err != nil {
fmt.Fprintf(os.Stderr,
"error:[%v] generating tar file header for %q",
err, walInfo.Name())
}
tarHeader.Name = walInfo.Name()
if err := twr.WriteHeader(tarHeader); err != nil {
fmt.Fprintf(os.Stderr,
"error:[%v] writing header to tar archive-%s",
err, archiveFileName)
}
f, err := os.OpenFile(w.fileName, os.O_RDONLY, fs.FileMode(os.O_RDONLY))
if err != nil {
fmt.Fprintf(os.Stderr,
"error:[%v] opening wal file-%s to copy into archive",
err, walInfo.Name())
}
if _, err := io.Copy(twr, f); err != nil {
fmt.Fprintf(os.Stderr,
"error:[%v] writing file-%s to archive-%s",
err, walInfo.Name(), archiveFile)
}
return f.Close()
}

func (w *Wal) getWalFileInfo() (fs.FileInfo, error) {
f, err := os.Open(w.fileName)
defer func() {
if err := f.Close(); err != nil {
fmt.Fprintf(os.Stderr,
"error:[%v] closing wal file-%s\n", err, w.WalFile())
}
}()
if err != nil {
fmt.Fprintf(os.Stderr, "error:[%v] opening file\n", err)
return nil, err
}
info, err := f.Stat()
if err != nil {
fmt.Fprintf(os.Stderr, "error:[%v] getting file stats\n", err)
return nil, err
}
return info, nil
}

// createArchive creates new archive file
func createArchive(fname string) (*os.File, error) {
archiveFile, err := os.Create(fname)
if err != nil {
fmt.Fprintf(os.Stderr, "error:[%v] creating archive file with name-%s", err, fname)
return archiveFile, err
}
return archiveFile, err
}

// archiveFileName generates file name to be used for archive
// with timestamp numeric in unix micro format
// ex - archive-17188751512222238.tar
func archiveFileName() string {
now := time.Now().UnixMicro()
fname := fmt.Sprintf("archive-%d.tar", now)
local := ".local"
home := os.Getenv("HOME")
fileName := filepath.Join(home, local, fname)
return fileName
}
42 changes: 42 additions & 0 deletions wal/compact_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package wal

import (
"fmt"
"io/fs"
"os"
"testing"
)

func Test_compact(t *testing.T) {
// create a sample wal file locally
w := new()
if err := w.createFile(); err != nil {
t.Fatalf("error-[%v] creating wal file-%q\n", err, w.WalFile())
}
// insert some dummy data
f, err := os.OpenFile(w.WalFile(), os.O_APPEND|os.O_WRONLY, fs.FileMode(os.O_APPEND))
if err != nil {
t.Fatalf("error-[%v] opening wal file-%q", err, w.WalFile())
}
for i := 1; i <= 3; i++ {
if _, err := fmt.Fprintf(f, "some test data-%d\n", i); err != nil {
t.Fatalf("error-[%v] writing data to wal file %q", err, w.WalFile())
}
}
fInfo, err := w.getWalFileInfo()
if err != nil {
t.Fatalf("error-[%v] getting wal file info", err)
}
// set a default file size
defaultMaxFileSize = int64(fInfo.Size() - 1)
t.Logf("file size:%d\n", defaultMaxFileSize)
if _, err := fmt.Fprintf(f, "some more data to increase size of exisiting file\n"); err != nil {
t.Fatalf("error-[%v] adding data to trigger compact action", err)
}
// test if condition to compact is triggered
t.Logf("new file size:%d\n", int64(fInfo.Size()))
if err := f.Close(); err != nil {
t.Fatalf("error-[%v] closing file %q", err, w.WalFile())
}
w.compact()
}
8 changes: 4 additions & 4 deletions wal/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ func Test_compare_file_size(t *testing.T) {
// create test file and write some data
// for testing
testWal := new()
fileName := fileName()
fileName := testWal.WalFile()
// test file operations
if err := createFile(); err != nil {
if err := testWal.createFile(); err != nil {
t.Fatal(err)
}
// open file to add some data
Expand Down Expand Up @@ -52,9 +52,9 @@ func Test_upd_in_memory_cache(t *testing.T) {
// create test file and write some data
// for testing
testWal := new()
fileName := fileName()
fileName := testWal.WalFile()
// test file operations
if err := createFile(); err != nil {
if err := testWal.createFile(); err != nil {
t.Fatal(err)
}
// open file to add some data
Expand Down
41 changes: 23 additions & 18 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,25 @@ import (
"io"
"os"
"path/filepath"
"time"
)

// receive data and persist in file
// keep track of pointer up to where the data is being loaded in to cache

type Wal struct {
filePointer int
size int
fileName string
}

func fileName() string {
home := os.Getenv("HOME")
fileName := filepath.Join(home, ".local", "wal.txt")
return fileName
filePointer int
size int
stamp int64
fileName string
defaultWalDir string
}

// createFile checks if file exist
// and create new file when not found
func createFile() error {
if !exists() {
_, err := os.Create(fileName())
func (w *Wal) createFile() error {
if !w.exists() {
_, err := os.Create(w.fileName)
if err != nil {
return err
}
Expand All @@ -39,23 +36,31 @@ func createFile() error {

// creates file if one does not exist
func new() *Wal {
now := time.Now().UnixMicro()
home := os.Getenv("HOME")
local := ".local"
return &Wal{
filePointer: 0,
size: 4096,
fileName: fileName(),
filePointer: 0,
size: 4096,
stamp: now,
defaultWalDir: filepath.Join(home, local),
fileName: filepath.Join(home, local, fmt.Sprintf("wal-%d.txt", now)),
}
}

func (w *Wal) WalFile() string {
return w.fileName
}

func exists() bool {
f, err := os.Open(fileName())
func (w *Wal) exists() bool {
f, err := os.Open(w.fileName)
if err != nil && errors.Is(err, os.ErrNotExist) {
return false
}
f.Close()
if err := f.Close(); err != nil {
fmt.Fprintf(os.Stderr, "error:[%v] closing-%s file\n", err, w.WalFile())
os.Exit(1)
}
return true
}

Expand Down
8 changes: 4 additions & 4 deletions wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

func Test_encode_decode(t *testing.T) {
testWal := new()
fileName := fileName()
fileName := testWal.WalFile()
b := &bytes.Buffer{}
if err := createFile(); err != nil {
if err := testWal.createFile(); err != nil {
t.Fatal(err)
}
d := &datastructure.Data{
Expand All @@ -40,9 +40,9 @@ func Test_encode_decode(t *testing.T) {

func Test_encode_read_at(t *testing.T) {
testWal := new()
fileName:=fileName()
fileName := testWal.WalFile()
b := &bytes.Buffer{}
if err := createFile(); err != nil {
if err := testWal.createFile(); err != nil {
t.Fatal(err)
}
d := &datastructure.Data{
Expand Down
Loading