Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(compact): integrate wal in to api endpoint #11

Merged
merged 1 commit into from
Jun 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/ehrktia/memcache/datastructure"
"github.com/ehrktia/memcache/server"
"github.com/ehrktia/memcache/wal"
"golang.org/x/sync/errgroup"
)

Expand All @@ -26,18 +27,27 @@ func main() {
createDataStruct(once, queueSize)
// start http
mux := http.NewServeMux()
httpServer := server.NewHTTPServer()
registerHandler(mux)
httpServer.Handler = mux
w := wal.NewWal()
walFile := w.WalFileName()
webServer := server.NewWebServer(w, server.NewHTTPServer())
webServer.Server.Handler = mux
registerHandler(mux, webServer)
// wait for interrupt
shutdown(sig)
// start server
eg, _ := errgroup.WithContext(ctx)
eg.Go(func() error { return httpServer.ListenAndServe() })
// stop app in case of error
// create wal file
eg.Go(func() error {
return wal.CreateFile(walFile)
})
eg.Go(func() error {
return wal.Compact(w)
})
eg.Go(func() error {
return webServer.Server.ListenAndServe()
})
if err := eg.Wait(); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
os.Exit(0)
fmt.Fprintf(os.Stderr, "error:%v\n", err)
os.Exit(1)
}
}

Expand All @@ -46,9 +56,10 @@ func createDataStruct(once *sync.Once, queueSize int) {

}

func registerHandler(mux *http.ServeMux) {
mux.HandleFunc("/save", server.Store)
mux.HandleFunc("/get", server.Get)
func registerHandler(mux *http.ServeMux, w *server.WebServer) {
mux.HandleFunc("/save", w.Store)
mux.HandleFunc("/get", w.Get)
mux.HandleFunc("/getall", w.GetAll)
}

func shutdown(s chan os.Signal) {
Expand All @@ -63,7 +74,7 @@ func shutdown(s chan os.Signal) {
func getQueueSize() int {
size := os.Getenv("QUEUE_SIZE")
if size == "" {
size = "10"
size = "50"
}
s, err := strconv.Atoi(size)
if err != nil {
Expand Down
40 changes: 25 additions & 15 deletions datastructure/datastructure.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datastructure

import (
"fmt"
"strings"
"sync"
)
Expand Down Expand Up @@ -36,23 +37,23 @@ func buildInp(k, v any) {
func Add(k, v any) (any, bool) {
buildInp(k, v)
go func() {
data := <-storeCh
// load data
_, l := inMemoryCache.LoadOrStore(data.Key, data.Value)
// return output
result <- data.Value
load <- l
// update queue
idx := <-inMemoryIdx.getIdx(k)
// add when missing
if idx < 0 {
inMemoryIdx.add(k)
data := <-storeCh
// load data
_, l := inMemoryCache.LoadOrStore(data.Key, data.Value)
// return output
result <- data.Value
load <- l
// update queue
idx := <-inMemoryIdx.getIdx(k)
// add when missing
if idx < 0 {
inMemoryIdx.add(k)
// when present move to top
done <- true
return
}
// when present move to top
inMemoryIdx.swap(idx)
done <- true
}
inMemoryIdx.swap(idx)
done <- true
}()
r := <-result
l := <-load
Expand Down Expand Up @@ -98,6 +99,15 @@ func Get(k any) any {
return out
}

func GetAll() map[string]any {
m := make(map[string]any)
inMemoryCache.Range(func(key, value any) bool {
m[fmt.Sprintf("%s", key)] = value
return true
})
return m
}

// NewQueue creates new inmemory store and queue
// each instance of `once` creates new store and queue
func NewQueue(once *sync.Once, s int) {
Expand Down
30 changes: 30 additions & 0 deletions datastructure/datastructure_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package datastructure

import (
"fmt"
"sync"
"testing"
)

func Test_get_all(t *testing.T) {
s := 5
once := &sync.Once{}
NewQueue(once, s)
// test add
for i := 1; i < s; i++ {
key := fmt.Sprintf("%s-%d", t.Name(), i)
value := fmt.Sprintf("%d", i)
r, l := Add(key, value)
t.Logf("r:%v\n", r)
t.Logf("loaded:%t\n", l)
}
// test get
val := GetAll()
for k, v := range val {
t.Logf("key:%v\tval:%v\n", k, v)
}
if len(val) != s-1 {
t.Fatalf("expected-%d,got-%d\n", s-1, len(val))
}

}
5 changes: 4 additions & 1 deletion datastructure/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ func (q *queue) add(k any) {
func (q *queue) swap(idx int) {
q.lock.Lock()
defer q.lock.Unlock()
q.list[0], q.list[idx] = q.list[idx], q.list[0]
if idx > 0 {
q.list[0], q.list[idx] = q.list[idx], q.list[0]
return
}
}

func (q *queue) getIdx(k any) chan int {
Expand Down
14 changes: 11 additions & 3 deletions docs/wal_compact.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
### wal cache sync approach

- data is persisted in wal file

- at end of persistance, trigger update to cache



### compaction approach

- wal file exists
Expand Down Expand Up @@ -36,8 +44,8 @@

**Todo**

- [ ] find a way to recover from failure and deal with multiple wal files during recovery cycle
- [ ] find a way to recover from failure and deal with multiple wal files during recovery cycle

- [ ] what happens when drive used to store archive is full
- [ ] what happens when drive used to store archive is full

- [ ] dependency on `.local` folder in `home` location
- [ ] dependency on `.local` folder in `home` location
21 changes: 21 additions & 0 deletions scripts/save_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/bash
for i in {1..100}
do
key="key-"$i
value="value-"$i
data="{
\"key\": \"${key}\",
\"value\": \"${value}\"
}"
# echo $data
curl --location 'http://localhost:8080/save' \
--data "${data}"
sleep 1
done

# curl --location 'http://localhost:8080/save' \
# --header 'Content-Type: application/json' \
# --data '{
# "key": "some-key",
# "value": "some-value"
# }'
63 changes: 41 additions & 22 deletions server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"fmt"
"net/http"
"os"
"time"
"strings"
"time"

"github.com/ehrktia/memcache/datastructure"
"github.com/ehrktia/memcache/wal"
)

func NewHTTPServer() *http.Server {
Expand All @@ -28,8 +29,21 @@ func NewHTTPServer() *http.Server {
return s
}

type WebServer struct {
Wal *wal.Wal
Server *http.Server
}

func Store(res http.ResponseWriter, req *http.Request) {
func NewWebServer(w *wal.Wal, h *http.Server) *WebServer {
return &WebServer{
Wal: w,
Server: h,
}
}

// Store receives values which are required to be stored
// writes data to wal file
func (w *WebServer) Store(res http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
if err := reqPostMethod(req); err != nil {
if err := writeResponse(res, []byte(err.Error())); err != nil {
Expand All @@ -43,30 +57,20 @@ func Store(res http.ResponseWriter, req *http.Request) {
if err := readReqBody(buf, req, res); err != nil {
return
}
// extract req data
d, err := extractReqData(buf)
if err != nil {
// add data to wal
if err := wal.UpdCache(w.Wal, buf.Bytes()); err != nil {
if err := writeResponse(res, []byte(err.Error())); err != nil {
return
}
}

result, _ := datastructure.Add(d.Key, d.Value)
if result != nil {
_, err := json.Marshal(result)
if err != nil {
if err := writeResponse(res, []byte(err.Error())); err != nil {
return
}
}
if err := writeResponse(res, []byte("successfully added to cache")); err != nil {
return
}

// write response
if err := writeResponse(res, []byte(fmt.Sprintf("%s\n", "successfully added to cache"))); err != nil {
return
}
}

func Get(res http.ResponseWriter, req *http.Request) {
// Get retrieves the value associated with key from in-memory cache store
func (w *WebServer) Get(res http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
if err := reqGetMethod(req); err != nil {
if err := writeResponse(res, []byte(err.Error())); err != nil {
Expand All @@ -86,10 +90,10 @@ func Get(res http.ResponseWriter, req *http.Request) {
}
}
// get data from cache
v:= datastructure.Get(d.Key)
v := datastructure.Get(d.Key)
// data not found
if strings.EqualFold(v.(string),datastructure.NotFound){
err := fmt.Errorf("%v matching value not found", d.Key)
if strings.EqualFold(v.(string), datastructure.NotFound) {
err := fmt.Errorf("[%v] matching %s", d.Key, datastructure.NotFound)
if err := writeResponse(res, []byte(err.Error())); err != nil {
return
}
Expand All @@ -106,3 +110,18 @@ func Get(res http.ResponseWriter, req *http.Request) {
return
}
}

// GetAll emits all the data stored in-memory cache
// this is expensive
func (w *WebServer) GetAll(res http.ResponseWriter, r *http.Request) {
v := datastructure.GetAll()
resultBytes, err := json.Marshal(v)
if err != nil {
if err := writeResponse(res, []byte(fmt.Sprintf("%s\n", err.Error()))); err != nil {
return
}
}
if err := writeResponse(res, resultBytes); err != nil {
return
}
}
Loading
Loading