Skip to content

Commit 8d65ae0

Browse files
committed
WIP: Basic Working LogCacheAsync with fuzzy test
1 parent d09d941 commit 8d65ae0

10 files changed

+761
-22
lines changed

fuzzy/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
data

fuzzy/async_logs_test.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package fuzzy
2+
3+
import (
4+
"math/rand"
5+
"testing"
6+
"time"
7+
8+
"github.com/hashicorp/raft"
9+
)
10+
11+
// 5 node cluster where the leader and another node get regularly partitioned off
12+
// eventually all partitions heal, but using async log cache.
13+
func TestRaft_AsyncLogWithPartitions(t *testing.T) {
14+
hooks := NewPartitioner()
15+
16+
cluster := newRaftClusterWithFactory(t, testLogWriter, "lp", 5, hooks, newAsyncRaft)
17+
cluster.Leader(time.Second * 10)
18+
s := newApplySource("LeaderPartitions")
19+
applier := s.apply(t, cluster, 5)
20+
for i := 0; i < 10; i++ {
21+
pg := hooks.PartitionOff(cluster.log, cluster.LeaderPlus(rand.Intn(4)))
22+
time.Sleep(time.Second * 4)
23+
r := rand.Intn(10)
24+
if r < 1 {
25+
cluster.log.Logf("Healing no partitions!")
26+
} else if r < 4 {
27+
hooks.HealAll(cluster.log)
28+
} else {
29+
hooks.Heal(cluster.log, pg)
30+
}
31+
time.Sleep(time.Second * 5)
32+
}
33+
hooks.HealAll(cluster.log)
34+
cluster.Leader(time.Hour)
35+
applier.stop()
36+
cluster.Stop(t, time.Minute*10)
37+
hooks.Report(t)
38+
cluster.VerifyLog(t, applier.applied)
39+
cluster.VerifyFSM(t)
40+
}
41+
42+
func newAsyncRaft(conf *raft.Config, fsm raft.FSM, logs raft.LogStore, stable raft.StableStore, snaps raft.SnapshotStore, trans raft.Transport) (*raft.Raft, error) {
43+
// Wrap the log store in an async cache
44+
asyncLogs, err := raft.NewLogCacheAsync(128, logs)
45+
if err != nil {
46+
return nil, err
47+
}
48+
49+
return raft.NewRaft(conf, fsm, asyncLogs, stable, snaps, trans)
50+
}

fuzzy/cluster.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ func (a *LoggerAdapter) Logf(s string, v ...interface{}) {
5555
}
5656

5757
func newRaftCluster(t *testing.T, logWriter io.Writer, namePrefix string, n uint, transportHooks TransportHooks) *cluster {
58+
return newRaftClusterWithFactory(t, logWriter, namePrefix, n, transportHooks, raft.NewRaft)
59+
}
60+
61+
func newRaftClusterWithFactory(t *testing.T, logWriter io.Writer, namePrefix string, n uint, transportHooks TransportHooks, factory factoryFn) *cluster {
5862
res := make([]*raftNode, 0, n)
5963
names := make([]string, 0, n)
6064
for i := uint(0); i < n; i++ {
@@ -67,11 +71,11 @@ func newRaftCluster(t *testing.T, logWriter io.Writer, namePrefix string, n uint
6771
transports := newTransports(l)
6872
for _, i := range names {
6973

70-
r, err := newRaftNode(hclog.New(&hclog.LoggerOptions{
74+
r, err := newRaftNodeFromFactory(hclog.New(&hclog.LoggerOptions{
7175
Name: i + ":",
7276
Output: logWriter,
7377
Level: hclog.DefaultLevel,
74-
}), transports, transportHooks, names, i)
78+
}), transports, transportHooks, names, i, factory)
7579
if err != nil {
7680
t.Fatalf("Unable to create raftNode:%v : %v", i, err)
7781
}

fuzzy/go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1F
9595
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
9696
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
9797
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
98+
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
99+
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
98100
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
99101
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
100102
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=

fuzzy/node.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ type raftNode struct {
2424
}
2525

2626
func newRaftNode(logger hclog.Logger, tc *transports, h TransportHooks, nodes []string, name string) (*raftNode, error) {
27+
return newRaftNodeFromFactory(logger, tc, h, nodes, name, raft.NewRaft)
28+
}
29+
30+
// Same type as raft.NewRaft
31+
type factoryFn func(conf *raft.Config, fsm raft.FSM, logs raft.LogStore, stable raft.StableStore, snaps raft.SnapshotStore, trans raft.Transport) (*raft.Raft, error)
32+
33+
func newRaftNodeFromFactory(logger hclog.Logger, tc *transports, h TransportHooks, nodes []string, name string, factory factoryFn) (*raftNode, error) {
2734
var err error
2835
var datadir string
2936
datadir, err = resolveDirectory(fmt.Sprintf("data/%v", name), true)
@@ -46,8 +53,7 @@ func newRaftNode(logger hclog.Logger, tc *transports, h TransportHooks, nodes []
4653
config.ShutdownOnRemove = false
4754
config.LocalID = raft.ServerID(name)
4855

49-
var store *rdb.BoltStore
50-
store, err = rdb.NewBoltStore(filepath.Join(datadir, "store.bolt"))
56+
store, err := rdb.NewBoltStore(filepath.Join(datadir, "store.bolt"))
5157
if err != nil {
5258
return nil, fmt.Errorf("unable to initialize log %v", err.Error())
5359
}
@@ -65,7 +71,7 @@ func newRaftNode(logger hclog.Logger, tc *transports, h TransportHooks, nodes []
6571
}
6672
fsm := &fuzzyFSM{}
6773
var r *raft.Raft
68-
r, err = raft.NewRaft(config, fsm, store, store, ss, transport)
74+
r, err = factory(config, fsm, store, store, ss, transport)
6975
if err != nil {
7076
return nil, err
7177
}

inmem_store.go

+47-3
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,19 @@ package raft
66
import (
77
"errors"
88
"sync"
9+
"sync/atomic"
910
)
1011

1112
// InmemStore implements the LogStore and StableStore interface.
1213
// It should NOT EVER be used for production. It is used only for
1314
// unit tests. Use the MDBStore implementation instead.
1415
type InmemStore struct {
16+
storeFail uint32 // accessed atomically as a bool 0/1
17+
18+
// storeSem lets the test control exactly when s StoreLog(s) call takes
19+
// effect.
20+
storeSem chan struct{}
21+
1522
l sync.RWMutex
1623
lowIndex uint64
1724
highIndex uint64
@@ -24,13 +31,35 @@ type InmemStore struct {
2431
// use for production. Only for testing.
2532
func NewInmemStore() *InmemStore {
2633
i := &InmemStore{
27-
logs: make(map[uint64]*Log),
28-
kv: make(map[string][]byte),
29-
kvInt: make(map[string]uint64),
34+
storeSem: make(chan struct{}, 1),
35+
logs: make(map[uint64]*Log),
36+
kv: make(map[string][]byte),
37+
kvInt: make(map[string]uint64),
3038
}
3139
return i
3240
}
3341

42+
// BlockStore will cause further calls to StoreLog(s) to block indefinitely
43+
// until the returned cancel func is called. Note that if the code or test is
44+
// buggy this could cause a deadlock
45+
func (i *InmemStore) BlockStore() func() {
46+
i.storeSem <- struct{}{}
47+
cancelled := false
48+
return func() {
49+
// Allow multiple calls, subsequent ones are a no op
50+
if !cancelled {
51+
<-i.storeSem
52+
cancelled = true
53+
}
54+
}
55+
}
56+
57+
// FailNext signals that the next call to StoreLog(s) should return an error
58+
// without modifying the log contents. Subsequent calls will succeed again.
59+
func (i *InmemStore) FailNext() {
60+
atomic.StoreUint32(&i.storeFail, 1)
61+
}
62+
3463
// FirstIndex implements the LogStore interface.
3564
func (i *InmemStore) FirstIndex() (uint64, error) {
3665
i.l.RLock()
@@ -64,8 +93,23 @@ func (i *InmemStore) StoreLog(log *Log) error {
6493

6594
// StoreLogs implements the LogStore interface.
6695
func (i *InmemStore) StoreLogs(logs []*Log) error {
96+
// Block waiting for the semaphore slot if BlockStore has been called. We must
97+
// do this before we take the lock because otherwise we'll block GetLog and
98+
// others too by holding the lock while blocked.
99+
i.storeSem <- struct{}{}
100+
defer func() {
101+
<-i.storeSem
102+
}()
103+
104+
// Switch out fail if it is set so we only fail once
105+
shouldFail := atomic.SwapUint32(&i.storeFail, 0)
106+
if shouldFail == 1 {
107+
return errors.New("IO error")
108+
}
109+
67110
i.l.Lock()
68111
defer i.l.Unlock()
112+
69113
for _, l := range logs {
70114
i.logs[l.Index] = l
71115
if i.lowIndex == 0 {

log.go

+48-8
Original file line numberDiff line numberDiff line change
@@ -130,18 +130,58 @@ type LogStore interface {
130130
}
131131

132132
// MonotonicLogStore is an optional interface for LogStore implementations that
133-
// cannot tolerate gaps in between the Index values of consecutive log entries. For example,
134-
// this may allow more efficient indexing because the Index values are densely populated. If true is
135-
// returned, Raft will avoid relying on gaps to trigger re-synching logs on followers after a
136-
// snapshot is restored. The LogStore must have an efficient implementation of
137-
// DeleteLogs for the case where all logs are removed, as this must be called after snapshot restore when gaps are not allowed.
138-
// We avoid deleting all records for LogStores that do not implement MonotonicLogStore
139-
// because although it's always correct to do so, it has a major negative performance impact on the BoltDB store that is currently
140-
// the most widely used.
133+
// cannot tolerate gaps in between the Index values of consecutive log entries.
134+
// For example, this may allow more efficient indexing because the Index values
135+
// are densely populated. If true is returned, Raft will avoid relying on gaps
136+
// to trigger re-synching logs on followers after a snapshot is restored. The
137+
// LogStore must have an efficient implementation of DeleteLogs for the case
138+
// where all logs are removed, as this must be called after snapshot restore
139+
// when gaps are not allowed. We avoid deleting all records for LogStores that
140+
// do not implement MonotonicLogStore because although it's always correct to do
141+
// so, it has a major negative performance impact on the BoltDB store that is
142+
// currently the most widely used.
141143
type MonotonicLogStore interface {
142144
IsMonotonic() bool
143145
}
144146

147+
type LogWriteCompletion struct {
148+
PersistentIndex uint64
149+
Error error
150+
Duration time.Duration
151+
}
152+
153+
type AsyncLogStore interface {
154+
LogStore
155+
156+
// EnableAsync is called on the log store when a node starts the leader loop.
157+
// A Channel is passed to deliver write completion events. The implementation
158+
// chooses how many events to buffer but the chan may block and this should be
159+
// used as a back-pressure mechanism to slow down syncs to disk. Must be
160+
// called serially with StoreLog* and DeleteRange (i.e from the main
161+
// leader/follower thread). After this returns calls to StoreLog(s) will
162+
// return an error and only StoreLogsAsync should be used until DisableAsync
163+
// is called.
164+
EnableAsync(chan<- LogWriteCompletion)
165+
166+
// DisableAsync is called when the leader steps down to return the LogStore to
167+
// Sync mode since followers currently use Sync writes. They may in the future
168+
// use async writes too however explicit switching modes makes it easier to
169+
// reason about the behaviour of Async vs Sync storage calls as well as
170+
// providing the channel to deliver updates explicitly. DisableAsync will
171+
// block until all in-flight writes are persisted (or fail).
172+
DisableAsync()
173+
174+
// StoreLogsAsync may only be called after EnableAsync but before the
175+
// corresponding DisableAsync call. It will return as soon as the logs are
176+
// available to read from GetLog and reflected in LastIndex, though they may
177+
// still be in-memory only. It will trigger background writing of the logs to
178+
// disk. The background process must eventually deliver a LogWriteCompletion
179+
// to the channel provided to the last EnableAsync call. Each
180+
// LowWriteCompletion indicates that all logs up to the PersistentIndex are
181+
// safely stored on durable storage, or an error has occurred.
182+
StoreLogsAsync(logs []*Log) error
183+
}
184+
145185
func oldestLog(s LogStore) (Log, error) {
146186
var l Log
147187

0 commit comments

Comments
 (0)