Skip to content

Commit

Permalink
snapshot: implement versioning system, bump snapshot.Version=2
Browse files Browse the repository at this point in the history
  • Loading branch information
altergui committed Mar 20, 2024
1 parent a0aca9f commit a75b688
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 52 deletions.
91 changes: 45 additions & 46 deletions snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import (
"path/filepath"
"sort"
"sync"
"time"

"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/vochain/state"
)

const (
snapshotHeaderVersion = 1
// Version is the version that is accepted
Version = 2
snapshotHeaderLenSize = 32
)

Expand Down Expand Up @@ -55,6 +55,7 @@ const chunksDir = "chunks"
// - blobN is the raw bytes dump of all trees and databases.
type Snapshot struct {
path string
size int64
file *os.File
lock sync.Mutex
header SnapshotHeader
Expand Down Expand Up @@ -86,14 +87,6 @@ type SnapshotBlobHeader struct {
Root []byte
}

// DiskSnapshotInfo describes a file on disk
type DiskSnapshotInfo struct {
Path string
ModTime time.Time
Size int64
Hash []byte
}

func (h *SnapshotHeader) String() string {
return fmt.Sprintf("version=%d root=%s chainID=%s height=%d blobs=%+v",
h.Version, hex.EncodeToString(h.Root), h.ChainID, h.Height, h.Blobs)
Expand Down Expand Up @@ -172,6 +165,11 @@ func (s *Snapshot) Path() string {
return s.path
}

// Path returns the size of the snapshot file.
func (s *Snapshot) Size() int64 {
return s.size
}

// Finish builds the snapshot started with `New` and stores in disk its contents.
// After calling this method the snapshot is finished.
func (s *Snapshot) Finish() error {
Expand Down Expand Up @@ -222,7 +220,7 @@ func (s *Snapshot) Finish() error {
"snapHash", hex.EncodeToString(s.header.Hash), "appRoot", hex.EncodeToString(s.header.Root))

// close and remove the temporary file
if err := s.file.Close(); err != nil {
if err := s.Close(); err != nil {
return err
}
if err := finalFile.Close(); err != nil {
Expand All @@ -231,6 +229,11 @@ func (s *Snapshot) Finish() error {
return os.Remove(s.file.Name())
}

// Close closes the file descriptor used by the snapshot
func (s *Snapshot) Close() error {
return s.file.Close()
}

// Restore restores the State snapshot into a temp directory
// inside the passed dataDir, and returns the path. Caller is expected to move that tmpDir
// into the location normally used by statedb,
Expand Down Expand Up @@ -410,30 +413,35 @@ func (sm *SnapshotManager) Do(v *state.State) (string, error) {
// To open an existing snapshot file, use `Open` instead.
func (sm *SnapshotManager) New(height uint32) (*Snapshot, error) {
filePath := filepath.Join(sm.dataDir, fmt.Sprintf("%d", height))
file, err := os.Create(filePath + ".tmp")
tmpFile, err := os.Create(filePath + ".tmp")
if err != nil {
return nil, err
}
return &Snapshot{
path: filePath,
file: file,
file: tmpFile,
header: SnapshotHeader{
Version: snapshotHeaderVersion,
Version: Version,
hasher: md5.New(),
},
}, nil
}

// Open reads an existing snapshot file, decodes the header and returns a Snapshot
// Open reads an existing snapshot file, decodes the header and returns a Snapshot.
// On the returned Snapshot, you are expected to call SeekToNextBlob(), read from CurrentBlobReader()
// and iterate until SeekToNextBlob() returns io.EOF.
//
// When done, caller should call Close()
// This method performs the opposite operation of `New`.
func (*SnapshotManager) Open(filePath string) (*Snapshot, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
fileInfo, err := file.Stat()
if err != nil {
return nil, fmt.Errorf("could not fetch snapshot file info: %w", err)
}
headerSizeBytes := make([]byte, snapshotHeaderLenSize)
if _, err = io.ReadFull(file, headerSizeBytes); err != nil {
return nil, err
Expand All @@ -442,19 +450,25 @@ func (*SnapshotManager) Open(filePath string) (*Snapshot, error) {
s := &Snapshot{
path: filePath,
file: file,
size: fileInfo.Size(),
headerSize: binary.LittleEndian.Uint32(headerSizeBytes),
currentBlob: -1, // in order for the first SeekToNextBlob seek to blob 0
}
decoder := gob.NewDecoder(io.LimitReader(s.file, int64(s.headerSize)))
if err := decoder.Decode(&s.header); err != nil {
return nil, fmt.Errorf("cannot decode header: %w", err)
}
if s.header.Version != snapshotHeaderVersion {
return nil, fmt.Errorf("snapshot version not compatible")
if s.header.Version > Version {
return nil, fmt.Errorf("snapshot version %d unsupported (must be <=%d)", s.header.Version, Version)
}
return s, nil
}

// OpenByHeight reads an existing snapshot file, decodes the header and returns a Snapshot.
func (sm *SnapshotManager) OpenByHeight(height int64) (*Snapshot, error) {
return sm.Open(filepath.Join(sm.dataDir, fmt.Sprintf("%d", height)))
}

// Prune removes old snapshots stored on disk, keeping the N most recent ones
func (sm *SnapshotManager) Prune(keepRecent int) error {
files, err := os.ReadDir(sm.dataDir)
Expand Down Expand Up @@ -495,67 +509,52 @@ func (sm *SnapshotManager) Prune(keepRecent int) error {
}

// List returns the list of the current snapshots stored on disk, indexed by height
func (sm *SnapshotManager) List() map[uint32]DiskSnapshotInfo {
func (sm *SnapshotManager) List() map[uint32]*Snapshot {
files, err := os.ReadDir(sm.dataDir)
if err != nil {
log.Fatal(err)
}
dsi := make(map[uint32]DiskSnapshotInfo)
snaps := make(map[uint32]*Snapshot)
for _, file := range files {
if !file.IsDir() {
if path.Ext(file.Name()) == ".tmp" {
// ignore incomplete snapshots
continue
}
fileInfo, err := file.Info()
if err != nil {
log.Errorw(err, "could not fetch snapshot file info")
continue
}
s, err := sm.Open(filepath.Join(sm.dataDir, file.Name()))
if err != nil {
log.Errorw(err, fmt.Sprintf("could not open snapshot file %q", filepath.Join(sm.dataDir, file.Name())))
continue
}

dsi[uint32(s.header.Height)] = DiskSnapshotInfo{
Path: filepath.Join(sm.dataDir, file.Name()),
Size: fileInfo.Size(),
ModTime: fileInfo.ModTime(),
Hash: s.header.Hash,
// for the list we don't need the file descriptors open
if err := s.Close(); err != nil {
log.Error(err)
}
snaps[uint32(s.header.Height)] = s
}
}
return dsi
return snaps
}

// SliceChunk returns a chunk of a snapshot
func (sm *SnapshotManager) SliceChunk(height uint64, format uint32, chunk uint32) ([]byte, error) {
_ = format // TBD: we don't support different formats

dsi := sm.List()

snapshot, found := dsi[uint32(height)]
if !found {
return nil, fmt.Errorf("snapshot not found for height %d", height)
}

file, err := os.Open(snapshot.Path)
s, err := sm.OpenByHeight(int64(height))
if err != nil {
return nil, err
return nil, fmt.Errorf("snapshot not found for height %d", height)
}
defer s.Close()

defer file.Close()

chunks := int(math.Ceil(float64(snapshot.Size) / float64(sm.ChunkSize)))
partSize := int(math.Min(float64(sm.ChunkSize), float64(snapshot.Size-int64(chunk)*sm.ChunkSize)))
chunks := int(math.Ceil(float64(s.Size()) / float64(sm.ChunkSize)))
partSize := int(math.Min(float64(sm.ChunkSize), float64(s.Size()-int64(chunk)*sm.ChunkSize)))
partBuffer := make([]byte, partSize)
if _, err := file.ReadAt(partBuffer, int64(chunk)*sm.ChunkSize); err != nil && !errors.Is(err, io.EOF) {
if _, err := s.file.ReadAt(partBuffer, int64(chunk)*sm.ChunkSize); err != nil && !errors.Is(err, io.EOF) {
return nil, err
}

log.Debugf("splitting snapshot for height %d (size=%d, hash=%x), serving chunk %d of %d",
height, snapshot.Size, snapshot.Hash, chunk, chunks)
height, s.Size(), s.header.Hash, chunk, chunks)

return partBuffer, nil
}
Expand Down
32 changes: 26 additions & 6 deletions vochain/cometbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,20 +517,23 @@ func (app *BaseApplication) ProcessProposal(_ context.Context,

// ListSnapshots provides cometbft with a list of available snapshots.
func (app *BaseApplication) ListSnapshots(_ context.Context,
req *cometabcitypes.ListSnapshotsRequest,
_ *cometabcitypes.ListSnapshotsRequest,
) (*cometabcitypes.ListSnapshotsResponse, error) {
list := app.Snapshots.List()

response := &cometabcitypes.ListSnapshotsResponse{}
for height, dsi := range list {
chunks := uint32(math.Ceil(float64(dsi.Size) / float64(app.Snapshots.ChunkSize)))

for height, snap := range list {
chunks := uint32(math.Ceil(float64(snap.Size()) / float64(app.Snapshots.ChunkSize)))
metadataBytes, err := json.Marshal(snap.Header())
if err != nil {
return nil, fmt.Errorf("couldn't marshal metadata: %w", err)
}
response.Snapshots = append(response.Snapshots, &cometabcitypes.Snapshot{
Height: uint64(height),
Format: 0,
Chunks: chunks,
Hash: dsi.Hash,
Metadata: []byte{},
Hash: snap.Header().Hash,
Metadata: metadataBytes,
})
}
log.Debugf("cometbft requests our list of snapshots, we offer %d options", len(response.Snapshots))
Expand All @@ -553,6 +556,18 @@ func (app *BaseApplication) OfferSnapshot(_ context.Context,
"snapHash", hex.EncodeToString(req.Snapshot.Hash),
"height", req.Snapshot.Height, "format", req.Snapshot.Format, "chunks", req.Snapshot.Chunks)

var metadata snapshot.SnapshotHeader
if err := json.Unmarshal(req.Snapshot.Metadata, &metadata); err != nil {
return nil, fmt.Errorf("couldn't unmarshal metadata: %w", err)
}
if metadata.Version > snapshot.Version {
log.Debugw("reject snapshot due to unsupported version",
"height", req.Snapshot.Height, "version", metadata.Version, "chunks", req.Snapshot.Chunks)
return &cometabcitypes.OfferSnapshotResponse{
Result: cometabcitypes.OFFER_SNAPSHOT_RESULT_REJECT,
}, nil
}

snapshotFromComet.height.Store(int64(req.Snapshot.Height))
snapshotFromComet.chunks.Store(int32(req.Snapshot.Chunks))

Expand Down Expand Up @@ -604,6 +619,11 @@ func (app *BaseApplication) ApplySnapshotChunk(_ context.Context,
Result: cometabcitypes.APPLY_SNAPSHOT_CHUNK_RESULT_REJECT_SNAPSHOT,
}, nil
}
defer func() {
if err := s.Close(); err != nil {
log.Error(err)
}
}()

if err := app.RestoreStateFromSnapshot(s); err != nil {
log.Error(err)
Expand Down

0 comments on commit a75b688

Please sign in to comment.