diff --git a/docs/wal_compact.md b/docs/wal_compact.md new file mode 100644 index 0000000..5b63215 --- /dev/null +++ b/docs/wal_compact.md @@ -0,0 +1,43 @@ +### compaction approach + +- wal file exists + +- every operation runs through wal + +- check wal file size on regular basis + +- have a default size configured + +- if current size exceeds configured size + +- start compaction + +- create a new wal file + +- redirect operations to newly created file + +- create a tar file + +- add wal file with data to tar + +- remove the old file + +#### considerations + +- how to handle operations to wal file during compaction? + + create a new wal file, redirect operations to new file + +- how to handle any error or failure during compaction? + + remove the tar file created + + raise a warn level message to indicate failure + +**Todo** + +- [ ] find a way to recover from failure and deal with multiple wal files during recovery cycle + +- [ ] what happens when drive used to store archive is full + +- [ ] dependency on `.local` folder in `home` location diff --git a/wal/compact.go b/wal/compact.go new file mode 100644 index 0000000..005802a --- /dev/null +++ b/wal/compact.go @@ -0,0 +1,139 @@ +// compact check file size +// when size exceeds configured value +// it creates archive and uses a new file +package wal + +import ( + "archive/tar" + "compress/gzip" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "strconv" + "time" +) + +var defaultMaxFileSize int64 + +const DEFAULT_SIZE = "DEFAULT_SIZE" + +// getDefaultFileSize gets the configured +// allowed file size for wal, when none defined +// it uses default `3*4096` as size +func getDefaultFileSize() { + ev := os.Getenv(DEFAULT_SIZE) + if ev == "" { + ev = getDefaultSize() + } + fs, err := strconv.Atoi(ev) + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", "error getting wal file size from env var") + os.Exit(1) + } + defaultMaxFileSize = int64(fs) +} + +func getDefaultSize() string { + dsize := strconv.Itoa(3 * 4096) + return dsize +} + +// compact checks the wal file size +// when size exceeds predefined size `4*4096` +// it triggers the compact routine +// creates a new archive file and save it disk +func (w *Wal) compact() { + // check existing wal file size + walInfo, err := w.getWalFileInfo() + if err != nil { + fmt.Fprintf(os.Stderr, "error:[%v] getting wal file size", err) + } + if walInfo.Size() > defaultMaxFileSize { + fmt.Fprintf(os.Stdout, "[info]: start compacting:%s\n", walInfo.Name()) + archive, err := createArchive(archiveFileName()) + if err != nil { + fmt.Fprintf(os.Stderr, + "error:[%v] creating archive with name-%s\n", err, archiveFileName()) + } + if err := w.writeToArchive(archive, walInfo, archiveFileName()); err != nil { + fmt.Fprintf(os.Stderr, "error:[%v] writing file to archive", err) + } + } +} + +func (w *Wal) writeToArchive(archiveFile io.ReadWriter, + walInfo fs.FileInfo, archiveFileName string) error { + gzr := gzip.NewWriter(archiveFile) + defer gzr.Flush() + defer gzr.Close() + twr := tar.NewWriter(gzr) + defer twr.Close() + tarHeader, err := tar.FileInfoHeader(walInfo, walInfo.Name()) + if err != nil { + fmt.Fprintf(os.Stderr, + "error:[%v] generating tar file header for %q", + err, walInfo.Name()) + } + tarHeader.Name = walInfo.Name() + if err := twr.WriteHeader(tarHeader); err != nil { + fmt.Fprintf(os.Stderr, + "error:[%v] writing header to tar archive-%s", + err, archiveFileName) + } + f, err := os.OpenFile(w.fileName, os.O_RDONLY, fs.FileMode(os.O_RDONLY)) + if err != nil { + fmt.Fprintf(os.Stderr, + "error:[%v] opening wal file-%s to copy into archive", + err, walInfo.Name()) + } + if _, err := io.Copy(twr, f); err != nil { + fmt.Fprintf(os.Stderr, + "error:[%v] writing file-%s to archive-%s", + err, walInfo.Name(), archiveFile) + } + return f.Close() +} + +func (w *Wal) getWalFileInfo() (fs.FileInfo, error) { + f, err := os.Open(w.fileName) + defer func() { + if err := f.Close(); err != nil { + fmt.Fprintf(os.Stderr, + "error:[%v] closing wal file-%s\n", err, w.WalFile()) + } + }() + if err != nil { + fmt.Fprintf(os.Stderr, "error:[%v] opening file\n", err) + return nil, err + } + info, err := f.Stat() + if err != nil { + fmt.Fprintf(os.Stderr, "error:[%v] getting file stats\n", err) + return nil, err + } + return info, nil +} + +// createArchive creates new archive file +func createArchive(fname string) (*os.File, error) { + archiveFile, err := os.Create(fname) + if err != nil { + fmt.Fprintf(os.Stderr, "error:[%v] creating archive file with name-%s", err, fname) + return archiveFile, err + } + return archiveFile, err +} + +// archiveFileName generates file name to be used for archive +// with timestamp numeric in unix micro format +// ex - archive-17188751512222238.tar +func archiveFileName() string { + now := time.Now().UnixMicro() + fname := fmt.Sprintf("archive-%d.tar", now) + local := ".local" + home := os.Getenv("HOME") + fileName := filepath.Join(home, local, fname) + return fileName +} diff --git a/wal/compact_test.go b/wal/compact_test.go new file mode 100644 index 0000000..9254fa5 --- /dev/null +++ b/wal/compact_test.go @@ -0,0 +1,42 @@ +package wal + +import ( + "fmt" + "io/fs" + "os" + "testing" +) + +func Test_compact(t *testing.T) { + // create a sample wal file locally + w := new() + if err := w.createFile(); err != nil { + t.Fatalf("error-[%v] creating wal file-%q\n", err, w.WalFile()) + } + // insert some dummy data + f, err := os.OpenFile(w.WalFile(), os.O_APPEND|os.O_WRONLY, fs.FileMode(os.O_APPEND)) + if err != nil { + t.Fatalf("error-[%v] opening wal file-%q", err, w.WalFile()) + } + for i := 1; i <= 3; i++ { + if _, err := fmt.Fprintf(f, "some test data-%d\n", i); err != nil { + t.Fatalf("error-[%v] writing data to wal file %q", err, w.WalFile()) + } + } + fInfo, err := w.getWalFileInfo() + if err != nil { + t.Fatalf("error-[%v] getting wal file info", err) + } + // set a default file size + defaultMaxFileSize = int64(fInfo.Size() - 1) + t.Logf("file size:%d\n", defaultMaxFileSize) + if _, err := fmt.Fprintf(f, "some more data to increase size of exisiting file\n"); err != nil { + t.Fatalf("error-[%v] adding data to trigger compact action", err) + } + // test if condition to compact is triggered + t.Logf("new file size:%d\n", int64(fInfo.Size())) + if err := f.Close(); err != nil { + t.Fatalf("error-[%v] closing file %q", err, w.WalFile()) + } + w.compact() +} diff --git a/wal/sync_test.go b/wal/sync_test.go index a0f9640..346444d 100644 --- a/wal/sync_test.go +++ b/wal/sync_test.go @@ -14,9 +14,9 @@ func Test_compare_file_size(t *testing.T) { // create test file and write some data // for testing testWal := new() - fileName := fileName() + fileName := testWal.WalFile() // test file operations - if err := createFile(); err != nil { + if err := testWal.createFile(); err != nil { t.Fatal(err) } // open file to add some data @@ -52,9 +52,9 @@ func Test_upd_in_memory_cache(t *testing.T) { // create test file and write some data // for testing testWal := new() - fileName := fileName() + fileName := testWal.WalFile() // test file operations - if err := createFile(); err != nil { + if err := testWal.createFile(); err != nil { t.Fatal(err) } // open file to add some data diff --git a/wal/wal.go b/wal/wal.go index e651892..6acca66 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -8,28 +8,25 @@ import ( "io" "os" "path/filepath" + "time" ) // receive data and persist in file // keep track of pointer up to where the data is being loaded in to cache type Wal struct { - filePointer int - size int - fileName string -} - -func fileName() string { - home := os.Getenv("HOME") - fileName := filepath.Join(home, ".local", "wal.txt") - return fileName + filePointer int + size int + stamp int64 + fileName string + defaultWalDir string } // createFile checks if file exist // and create new file when not found -func createFile() error { - if !exists() { - _, err := os.Create(fileName()) +func (w *Wal) createFile() error { + if !w.exists() { + _, err := os.Create(w.fileName) if err != nil { return err } @@ -39,10 +36,15 @@ func createFile() error { // creates file if one does not exist func new() *Wal { + now := time.Now().UnixMicro() + home := os.Getenv("HOME") + local := ".local" return &Wal{ - filePointer: 0, - size: 4096, - fileName: fileName(), + filePointer: 0, + size: 4096, + stamp: now, + defaultWalDir: filepath.Join(home, local), + fileName: filepath.Join(home, local, fmt.Sprintf("wal-%d.txt", now)), } } @@ -50,12 +52,15 @@ func (w *Wal) WalFile() string { return w.fileName } -func exists() bool { - f, err := os.Open(fileName()) +func (w *Wal) exists() bool { + f, err := os.Open(w.fileName) if err != nil && errors.Is(err, os.ErrNotExist) { return false } - f.Close() + if err := f.Close(); err != nil { + fmt.Fprintf(os.Stderr, "error:[%v] closing-%s file\n", err, w.WalFile()) + os.Exit(1) + } return true } diff --git a/wal/wal_test.go b/wal/wal_test.go index 34ce321..9d4b791 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -11,9 +11,9 @@ import ( func Test_encode_decode(t *testing.T) { testWal := new() - fileName := fileName() + fileName := testWal.WalFile() b := &bytes.Buffer{} - if err := createFile(); err != nil { + if err := testWal.createFile(); err != nil { t.Fatal(err) } d := &datastructure.Data{ @@ -40,9 +40,9 @@ func Test_encode_decode(t *testing.T) { func Test_encode_read_at(t *testing.T) { testWal := new() - fileName:=fileName() + fileName := testWal.WalFile() b := &bytes.Buffer{} - if err := createFile(); err != nil { + if err := testWal.createFile(); err != nil { t.Fatal(err) } d := &datastructure.Data{