Skip to content

Commit

Permalink
dirty JoinChunksAndRestoreSnapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
altergui committed Jan 10, 2024
1 parent 749e700 commit 0f7f26c
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 9 deletions.
78 changes: 69 additions & 9 deletions vochain/cometbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"sort"
"time"

cometabcitypes "github.com/cometbft/cometbft/abci/types"
v1 "github.com/cometbft/cometbft/api/cometbft/abci/v1"
cometapitypes "github.com/cometbft/cometbft/api/cometbft/types/v1"
crypto256k1 "github.com/cometbft/cometbft/crypto/secp256k1"
ethcommon "github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -491,7 +493,19 @@ func (app *BaseApplication) ProcessProposal(_ context.Context,
}, nil
}

// ListSnapshots returns a list of available snapshots.
// Example StateSync (Snapshot) successful flow:
// Alice is a comet node, up-to-date with RPC open on port 26657
// Bob is a fresh node that is bootstrapping, with params:
// * StateSync.Enable = true
// * StateSync.RPCServers = alice:26657
// * Bob comet will ask Alice for snapshots (app doesn't intervene here)
// * Alice comet calls ListSnapshots, App returns a []*v1.Snapshot
// * Bob comet calls OfferSnapshot passing a single *v1.Snapshot, App returns ACCEPT
// * Bob comet asks Alice for chunks (app doesn't intervene here)
// * Alice comet calls (N times) LoadSnapshotChunk passing height, format, chunk index. App returns []byte
// * Bob comet calls (N times) ApplySnapshotChunk passing []byte, chunk index and sender. App returns ACCEPT

// ListSnapshots provides cometbft with a list of available snapshots.
func (*BaseApplication) ListSnapshots(_ context.Context,
req *cometabcitypes.ListSnapshotsRequest,
) (*cometabcitypes.ListSnapshotsResponse, error) {
Expand All @@ -500,18 +514,27 @@ func (*BaseApplication) ListSnapshots(_ context.Context,
list.Snapshots = append(list.Snapshots, &cometabcitypes.Snapshot{
Height: 1,
Format: 2,
Chunks: 3,
Chunks: 10,
Hash: []byte{0xff},
Metadata: []byte{'h', 'e', 'l', 'l', 'o'},
})
return list, nil
}

// OfferSnapshot returns the response to a snapshot offer.
// horrible hack, turn this into a field of app or state package, and use something concurrent-safe
var snaps = struct {
Snap *v1.Snapshot
ChunkFilenames map[uint32]string
}{}

// OfferSnapshot is called by cometbft during StateSync, when another node offers a Snapshot.
func (*BaseApplication) OfferSnapshot(_ context.Context,
req *cometabcitypes.OfferSnapshotRequest,
) (*cometabcitypes.OfferSnapshotResponse, error) {
log.Warn("Snapshot offered:", req) // debug
snaps.ChunkFilenames = make(map[uint32]string)
snaps.Snap = req.Snapshot

if true {
return &cometabcitypes.OfferSnapshotResponse{
Result: cometabcitypes.OFFER_SNAPSHOT_RESULT_ACCEPT,
Expand All @@ -522,21 +545,58 @@ func (*BaseApplication) OfferSnapshot(_ context.Context,
}, nil
}

// LoadSnapshotChunk returns the response to a snapshot chunk loading request.
// LoadSnapshotChunk provides cometbft with a snapshot chunk, during StateSync.
//
// cometbft will reject a len(Chunk) > 16M
func (*BaseApplication) LoadSnapshotChunk(_ context.Context,
req *cometabcitypes.LoadSnapshotChunkRequest,
) (*cometabcitypes.LoadSnapshotChunkResponse, error) {
log.Warn("LoadSnapshotChunk:", req) // debug
log.Warn("LoadSnapshotChunk: req is ", req)

chunks := [][]byte{
{'0', 'p'},
{'1', 'p', 'p'},
{'2', 'p', 'p', 'p'},
{'3', 'p', 'p', 'p', 'p'},
{'4', 'p', 'p', 'p', 'p', 'p'},
{'5', 'p', 'p', 'p', 'p', 'p', 'p'},
{'6', 'p', 'p', 'p', 'p', 'p', 'p', 'p'},
{'7', 'p', 'p', 'p', 'p', 'p', 'p', 'p', 'p'},
{'8', 'p', 'p', 'p', 'p', 'p', 'p', 'p', 'p', 'p'},
{'9', 'p', 'p', 'p', 'p', 'p', 'p', 'p', 'p', 'p', 'p'},
}

buf := chunks[req.Chunk]
log.Warn("LoadSnapshotChunk: len(buf) is ", len(buf)) // debug
return &cometabcitypes.LoadSnapshotChunkResponse{
Chunk: []byte{'c', 'h', 'u', 'n', 'k'},
Chunk: buf,
}, nil
}

// ApplySnapshotChunk returns the response to a snapshot chunk applying request.
func (*BaseApplication) ApplySnapshotChunk(_ context.Context,
// ApplySnapshotChunk applies a snapshot chunk provided by cometbft StateSync.
//
// cometbft will never pass a Chunk bigger than 16M
func (app *BaseApplication) ApplySnapshotChunk(_ context.Context,
req *cometabcitypes.ApplySnapshotChunkRequest,
) (*cometabcitypes.ApplySnapshotChunkResponse, error) {
log.Warn("ApplySnapshotChunk:", req) // debug
log.Warn("ApplySnapshotChunk: index ", req.Index, " size: ", len(req.Chunk)) // debug
f, err := os.CreateTemp("", "SnapshotChunk")
if err != nil {
return nil, err
}
defer f.Close()

if _, err := f.Write(req.Chunk); err != nil {
defer os.Remove(f.Name())
return nil, err
}

snaps.ChunkFilenames[req.Index] = f.Name()

if len(snaps.ChunkFilenames) == int(snaps.Snap.GetChunks()) {
app.State.JoinChunksAndRestoreSnapshot(snaps.ChunkFilenames, snaps.Snap.Height)
}

return &cometabcitypes.ApplySnapshotChunkResponse{
Result: cometabcitypes.APPLY_SNAPSHOT_CHUNK_RESULT_ACCEPT,
}, nil
Expand Down
96 changes: 96 additions & 0 deletions vochain/state/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -399,6 +400,52 @@ func (v *State) installSnapshot(height uint32) error {
}
*/

// SplitSnapshotIntoChunks TBD
func SplitSnapshotIntoChunks() {
fileToBeChunked := "./somebigfile"

file, err := os.Open(fileToBeChunked)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

defer file.Close()

fileInfo, _ := file.Stat()

var fileSize int64 = fileInfo.Size()

const fileChunk = 1 * (1 << 20) // 1 MB, change this to your requirement

// calculate total number of parts the file will be chunked into

totalPartsNum := uint64(math.Ceil(float64(fileSize) / float64(fileChunk)))

fmt.Printf("Splitting to %d pieces.\n", totalPartsNum)

for i := uint64(0); i < totalPartsNum; i++ {

partSize := int(math.Min(fileChunk, float64(fileSize-int64(i*fileChunk))))
partBuffer := make([]byte, partSize)

file.Read(partBuffer)

// write to disk
fileName := "somebigfile_" + strconv.FormatUint(i, 10)
_, err := os.Create(fileName)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

// write/save buffer to disk
os.WriteFile(fileName, partBuffer, os.ModeAppend)

fmt.Println("Split to : ", fileName)
}
}

type DiskSnapshotInfo struct {
ModTime time.Time
Height uint32
Expand Down Expand Up @@ -437,6 +484,55 @@ func (v *State) ListSnapshots() []DiskSnapshotInfo {
return list
}

func (v *State) JoinChunksAndRestoreSnapshot(chunkFilenames map[uint32]string, height uint64) error {
snapshotFilename := filepath.Join(
v.dataDir,
storageDirectory,
snapshotsDirectory,
fmt.Sprintf("%d", height),
)

log.Warnf("Joining %d chunks into snapshot %s (height %d)", len(chunkFilenames), snapshotFilename, height) // debug
// Create or truncate the destination file
_, err := os.Create(snapshotFilename)
if err != nil {
return err
}

// Open for APPEND
snapFile, err := os.OpenFile(snapshotFilename, os.O_APPEND|os.O_WRONLY, os.ModeAppend)
if err != nil {
return err
}

// according to https://socketloop.com/tutorials/golang-recombine-chunked-files-example
// we shouldn't defer a file.Close when opening a file for APPEND mode, but no idea why
defer snapFile.Close()

// we can't use range chunkFilenames since that would restore in random order
for i := uint32(0); i < uint32(len(chunkFilenames)); i++ {
// open a chunk
chunk, err := os.ReadFile(chunkFilenames[i])
if err != nil {
return err
}

if _, err := snapFile.Write(chunk); err != nil {
return err
}

snapFile.Sync() // flush to disk

chunk = nil // reset or empty our buffer
}

// TBD: restore snapshot. until then just dump the mock contents of the file to console
c, _ := os.ReadFile(snapFile.Name())
log.Warnf("file %s contents %s", snapFile.Name(), c)

return nil
}

// DBPair is a key value pair for the no state db.
type DBPair struct {
Key []byte
Expand Down

0 comments on commit 0f7f26c

Please sign in to comment.