Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: persist commit index in LogStore to accelerate recovery #613

Open
wants to merge 38 commits into
base: main
Choose a base branch
from

Conversation

lalalalatt
Copy link

@lalalalatt lalalalatt commented Sep 1, 2024

@lalalalatt lalalalatt requested review from a team as code owners September 1, 2024 08:25
@lalalalatt lalalalatt requested review from rboyer and removed request for a team September 1, 2024 08:25
Copy link

hashicorp-cla-app bot commented Sep 1, 2024

CLA assistant check
All committers have signed the CLA.

@lalalalatt
Copy link
Author

lalalalatt commented Sep 1, 2024

@banks This is first PR for #549.
Thanks for spending your time on reviewing this~

@lalalalatt
Copy link
Author

lalalalatt commented Sep 1, 2024

Proposal for next PR:
Persist commit index every time processLogs is called. (commit index changes only happened at there)

raft/raft.go

Lines 1292 to 1359 in 42d3446

func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
// Reject logs we've applied already
lastApplied := r.getLastApplied()
if index <= lastApplied {
r.logger.Warn("skipping application of old log", "index", index)
return
}
applyBatch := func(batch []*commitTuple) {
select {
case r.fsmMutateCh <- batch:
case <-r.shutdownCh:
for _, cl := range batch {
if cl.future != nil {
cl.future.respond(ErrRaftShutdown)
}
}
}
}
// Store maxAppendEntries for this call in case it ever becomes reloadable. We
// need to use the same value for all lines here to get the expected result.
maxAppendEntries := r.config().MaxAppendEntries
batch := make([]*commitTuple, 0, maxAppendEntries)
// Apply all the preceding logs
for idx := lastApplied + 1; idx <= index; idx++ {
var preparedLog *commitTuple
// Get the log, either from the future or from our log store
future, futureOk := futures[idx]
if futureOk {
preparedLog = r.prepareLog(&future.log, future)
} else {
l := new(Log)
if err := r.logs.GetLog(idx, l); err != nil {
r.logger.Error("failed to get log", "index", idx, "error", err)
panic(err)
}
preparedLog = r.prepareLog(l, nil)
}
switch {
case preparedLog != nil:
// If we have a log ready to send to the FSM add it to the batch.
// The FSM thread will respond to the future.
batch = append(batch, preparedLog)
// If we have filled up a batch, send it to the FSM
if len(batch) >= maxAppendEntries {
applyBatch(batch)
batch = make([]*commitTuple, 0, maxAppendEntries)
}
case futureOk:
// Invoke the future if given.
future.respond(nil)
}
}
// If there are any remaining logs in the batch apply them
if len(batch) != 0 {
applyBatch(batch)
}
// Update the lastApplied index and term
r.setLastApplied(index)
}

func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) { 
 	// Reject logs we've applied already 
 	lastApplied := r.getLastApplied() 
 	if index <= lastApplied { 
 		r.logger.Warn("skipping application of old log", "index", index) 
 		return 
 	} 

+       if r.fastRecovery && isCommitTrackingLogStore(r.logs) {
+               store := r.logs.(CommitTrackingLogStore)
+               if err = store.SetCommitIndex(index) {
+                       // show some error msg
+               }
+       }
  
 	....
  
 	// Update the lastApplied index and term 
 	r.setLastApplied(index) 
 }

@otoolep
Copy link
Contributor

otoolep commented Sep 1, 2024

As a long-term user of this library, this could be useful. However I would strongly recommend that this functionality be wrapped in a flag, which is settable in the Raft Config object (similar to NoSnapshotRestoreOnStart), and be disabled by default. Systems built on Raft need to be solid, so taking a conservative approach is warranted.

@lalalalatt
Copy link
Author

lalalalatt commented Sep 2, 2024

As a long-term user of this library, this could be useful. However I would strongly recommend that this functionality be wrapped in a flag, which is settable in the Raft Config object (similar to NoSnapshotRestoreOnStart), and be disabled by default. Systems built on Raft need to be solid, so taking a conservative approach is warranted.

@otoolep Thanks for the suggestion, I would add r.fastRecovery as the feature flag~

Copy link
Member

@banks banks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for starting on this @lalalalatt.

The interface looks good. I have a comment inline and given that I'm suggesting removing half the lines in this PR, it might make sense to also add the change you proposed for your "next PR" into this one?

I also think it might be worth rebasing this PR on to a feature branch like f/fast-recovery so we can keep PRs small to review but not merge code into main until it's a working feature that can be released. (I realise this would be a no-op, but still if we don't complete the feature it will be unused code that will need later cleanup or completion.)

What do you think of that approach?

@lalalalatt
Copy link
Author

I also think it might be worth rebasing this PR on to a feature branch like f/fast-recovery so we can keep PRs small to review but not merge code into main until it's a working feature that can be released. (I realise this would be a no-op, but still if we don't complete the feature it will be unused code that will need later cleanup or completion.)

@banks Sure, that sounds like a good plan. Could you help me create the f/fast-recovery branch? Thanks!

@otoolep
Copy link
Contributor

otoolep commented Sep 3, 2024

Drive-by comment.

@banks Sure, that sounds like a good plan. Could you help me create the f/fast-recovery branch? Thanks!

That's not the right way to think about the development flow (not unless this repo is managed in some unusual way). You create the branch in your own fork of this repo, and then generate a PR from that branch in your fork to the main branch in this repo.

@banks
Copy link
Member

banks commented Sep 3, 2024

@otoolep in general that is usually how GH works. In this case I wonder if we should have a long running branch here so that we can keep PRs small and review the changes in small parts rather than wait until the entire feature is built and have to review it all in one huge PR from the fork to here.

One way to do that would be for me to create a long-lived feature branch here, another would be for us to review PRs in a fork but that leaves all the interim review in a separate place 🤔 .

On reflection. I'm not sure what is gained by trying to make this many small PRs yet. Let's continue work here and see how large this PR gets as more of the code is built before we worry too much about making feature branches. Sorry for the suggestion that wasn't very well thought through!

@lalalalatt
Copy link
Author

On reflection. I'm not sure what is gained by trying to make this many small PRs yet. Let's continue work here and see how large this PR gets as more of the code is built before we worry too much about making feature branches. Sorry for the suggestion that wasn't very well thought through!

Ok, looks good!

- Introduced a `fastRecovery` flag in the Raft structure and configuration to enable fast recovery mode.
- Updated `NewRaft` to initialize `fastRecovery` from the configuration.
- Added `persistCommitIndex` function to store the commit index when fast recovery is enabled.
- Modified `processLogs` to persist the commit index before updating `lastApplied`.
- Documented the `FastRecovery` option in the config.
- Implemented `recoverFromCommitedLogs` function to recover the Raft node from committed logs.
- If `fastRecovery` is enabled and the log store implements `CommitTrackingLogStore`, the commit index is read from the store, avoiding the need to replay logs.
- Logs between the last applied and commit index are fed into the FSM for faster recovery.
…ency

- Refactor `ReadCommitIndex` to `GetCommitIndex` across `LogStore` and `InmemCommitTrackingStore`.
- Introduce `InmemCommitTrackingStore` to track commit index in memory for testing purposes.
- Add locking mechanism to safely read/write commit index in `InmemCommitTrackingStore`.
@lalalalatt lalalalatt changed the title Add CommitTrackingLogStore and its checker in LogStore Enhancement: persist commit index in LogStore to accelerate recovery Sep 6, 2024
Copy link
Member

@banks banks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @lalalalatt, thanks for working on this.

I spotted a couple of things that we should tweak here. I've added comments inline - let me know if anything is confusing. I didn't do a "full" final review yet but these should get it more inline with my original design. Feel free to explain if I'm misunderstanding anything though!

@peterxcli
Copy link

peterxcli commented Sep 10, 2024

tl;dr, we're fine here 😅 (on the correctness issue, the other comments about this PR stand). We should be careful to preserve the behaviour in this PR - even if we only replay up to commitIndex on the FSM, we should still process any config changes in the rest of the log after that!

@banks Thanks for these lots of comments 😄

But, for the current change, it is possible to apply same configurations twice, although its wouldn't violate the correctness, it still be wasteful.
So I would reverse the order of r.recoverFromCommitedLogs() and "iterating the whole log to see if any logs are config changes", then, if the persisted commit are applied, then we only need to "iterating the whole log r.logs[max(lastapplied, snapshotIndex) + 1 : ] to see if any logs are config changes" and that prevent the duplication~
upd: reverse order

@peterxcli
Copy link

I don't think this is the right place for this call. I think we should call it in storeLogs just before the actual call to StoreLogs on the storage since that is the write we want our log stores to be able to add this state to disk in. Calling it here will not do anything until the next StoreLogs call at least in the design I proposed in the issue.

I don't think storing commit index every time before StoreLogs is a good idea.

For followers, when they are appending the log received from leader, the commit index may not increase, because leader may not receive majority agreement on the corresponding logs.

For leader, as I mention previously, it persist logs by calling log.StoreLogs() when receiving new log from applyCh which client requested, but that doesn't mean the logs are committed, unless majority accept them.

My idea is: the commit index only update at two place:

  1. receive higher leaderCommit when member is follower
  2. leader's commitment detect majority have accepted a higher log index, then it would send a message to c.commitCh, then the leader main thread would receive that new commit index and actually update it in raft in-memory state.

They both trigger r.processLogs eventually, so we can just update the commit index there.

Maybe that's my misunderstanding 😅, if there is anything wrong, please correct me~

@banks
Copy link
Member

banks commented Sep 10, 2024

Hi @peterxcli

The idea behind persisting commit index during StoreLogs is to make sure it doesn't cause worse performance. Writing to disk is the slowest part of raft in most implementations.

If we add a new write to disk anywhere then performance will decrease and that's not acceptable for our products at least and probably not to other users of this library.

So the whole design was intended to persist the current known commit index along with the logs every time we write new logs - yes it's a few extra bytes but it's the fsync calls that are slow so if we can write in the same write it's virtually "free" for all the LogStore implemetations we use currently and doesn't impact performance.

For followers, when they are appending the log received from leader, the commit index may not increase, because leader may not receive majority agreement on the corresponding logs.

This is true, but the AppendEntries RPC is the method that a follower learns about previous commits. In a given AppendEntries RPC, there will be one or more new logs (ignoring heartbeats for now), lets say those are logs {1000, 1001} and the leader will also include CommitIndex = 998 which lets this follower know that everything up to 998 is comitted. My proposal is that we call store.SetCommitIndex() with the commit index the leader just sent in each AppendEntries RPC before we call StoreLogs then the log store can just include that meta data in the same write and we always have persisted the absolute most up to date commit info each follower knows, all without any additional disk fsyncs.

If we only call this during processLogs on the follower as you propose then there are two possibilities depending on how the LogStore implements it:

  1. The LogStore buffers that index in memory until the next call to StoreLogs - that would work and have the same performance as my proposal, but I think it's slightly worse than my proposal for two reasons:
    1. If commitIndex sent by the leader is not actually persisted until the next RPC arrives which means that whenever the node restarts it can only restore a smaller number of logs even though it did "know" about and possibly apply more logs than it's persisted commitIndex before it restarted. This difference is likely small, but it's easy to avoid this.
    2. I find it much more surprising and hard to reason about if we actually don't persist the commitIndex we learned from the leader until the next RPC (where we'll learn about a new commit index most likely).
  2. Or, the LogStore could write the commitIndex to disk synchronously when SetCommitIndex is called. This would be a whole extra fsync (or two for BoltDB or WAL implementations) which would likely cause a significant slow down in the speed we can write to raft. I don't think we should consider this option especially when there's an easy way to avoid it (my proposal).

For leader, as I mention previously, it persist logs by calling log.StoreLogs() when receiving new log from applyCh which client requested, but that doesn't mean the logs are committed, unless majority accept them.

You're correct that all the logs we store when we call StoreLogs on the leader are not yet committed, in fact, with our current implementation we guarantee that none of them are committed (except in the trivial case of a single node cluster) because we don't replicate to followers until after this. But again the leader would not be calling SetCommitIndex with the new log index, it would just be calling it with whatever it's current in-memory commitIndex is which is guaranteed to be some way behind.

Again this is so that we can "re-use" the fsync on the LogStore and so avoid more disk IO or waiting on the critical path of writing data.

Most of the same arguments as above apply here: if we wait until commitIndex updates and processLogs is called on the leader we'll either have a more confusing model where the commitIndex persisted is a batch behind the last known state on the leader for any given set of logs, or we'll make it all slower by adding a new sync disk write just to persist this state during processLogs.

As an aside, it we were OK with adding more disk writes, I'd not have proposed this as an extension of LogStore at all - we already have a StableStore that we could just add a new KV pair to. But if we did that and updated it on every commit it would massively impact the throughput of a raft cluster since disk writes (and fsyncs) are the slowest part and we'd effectively double the number of them on both leader and follower for every commit.

Does that make sense?

@peterxcli
Copy link

@banks oh~~ It seems like I completely misunderstood your design.

Again this is so that we can "re-use" the fsync on the LogStore and so avoid more disk IO or waiting on the critical path of writing data.

Got it, thanks~ 😄

As an aside, it we were OK with adding more disk writes, I'd not have proposed this as an extension of LogStore at all - we already have a StableStore that we could just add a new KV pair to. But if we did that and updated it on every commit it would massively impact the throughput of a raft cluster since disk writes (and fsyncs) are the slowest part and we'd effectively double the number of them on both leader and follower for every commit.

That's true 😅

@banks Thanks for your clarification very much~

- Update makeCluster to return (*cluster, error)
- Modify MakeCluster, MakeClusterNoBootstrap, and MakeClusterCustom to return error
- Update all test cases to handle potential errors from cluster creation
- Replace t.Fatalf() calls with t.Logf() and error returns in makeCluster

BREAKING CHANGE: MakeCluster, MakeClusterNoBootstrap, and MakeClusterCustom
now return an additional error value, which needs to be handled in existing tests.
@peterxcli
Copy link

peterxcli commented Oct 11, 2024

To support error assertion for NewRaft error when making cluster, I add a error return field in makeCluster, but it requires a bunch of change for existing test to handle the returned error.
I think it is necessary cause I guess there would be more and more log store variant appearing, and their error mode test are needed.


Document fix commits:


Error mode testing:

schmichael
schmichael previously approved these changes Oct 11, 2024
Copy link
Member

@schmichael schmichael left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

}

// makeCluster will return a cluster with the given config and number of peers.
// If bootstrap is true, the servers will know about each other before starting,
// otherwise their transports will be wired up but they won't yet have configured
// each other.
func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster {
func makeCluster(t *testing.T, opts *MakeClusterOpts) (*cluster, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT this was added to support 1 test that asserts a specific error is returned. Since this causes tons of other test churn I have a slight preference for reverting the error return here and special casing the 1 test that needs to check the error return of NewRaft. Not a blocker though.

- Add PropagateError option to MakeClusterOpts
- Update makeCluster to return (*cluster, error)
- Modify MakeCluster, MakeClusterNo
Copy link
Contributor

@dhiaayachi dhiaayachi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you again for the prompt update @lalalalatt!
I left a minor suggestion for a comment, other then that I think from testing perspective we can add more tests that cover the following cases:

  • the logstore return 0, nil all the time (no commit index is in the store).
  • GetCommitIndex return a value that is bigger then the last index

log.go Outdated

// GetCommitIndex returns the latest persisted commit index from the latest log entry
// in the store at startup.
// It is ok to return a value higher than the last index in the log (But it should never happen).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// It is ok to return a value higher than the last index in the log (But it should never happen).
// GetCommitIndex should not return a value higher than the last index in the log. If that happens, the last index in the log will be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also document here that GetCommitIndex need to return 0,nil when no commit index is found in the log.

…face

- Specify that GetCommitIndex should not return a value higher than the last
  index in the log
- Clarify that if a higher value is returned, the last index in the log will
  be used instead
- Add instruction to return (0, nil) when no commit index is found in the
  log store
@lalalalatt
Copy link
Author

Hi, @banks, any new progress on this?

@banks banks dismissed their stale review February 12, 2025 11:22

Concerns no longer valid

@banks
Copy link
Member

banks commented Feb 12, 2025

Thanks @lalalalatt and sorry it's been a while on this one, time passes! I think @dhiaayachi and @schmichael and @tgross did a great job following up after my initial review so I've dismissed my concerns which seems to be outdated now.

I've not had a chance to do another thorough review yet so I'll not approve right now but I've unblocked the merge if one of the others was already at a point ready to approve.

@peterxcli
Copy link

peterxcli commented Mar 21, 2025

Ratis has similar mechanism. But instead storing local lastComittedIndex in each node persisted store, it consider the lastCommittedIndex as raft log(called MeataDataLog), then replicate it just like normal raft log:

Just for reference, cc @banks

@banks
Copy link
Member

banks commented Mar 24, 2025

@peterxcli thanks for the links.

I think their approach is essentially the same as this one - note that this is why I pushed this design towards CommitTrackingLogStore as an extension of log store rather than just storing it in the existing StableStore. Either way it lets you persist the extra data in the logs without additional fsyncs on every AppendEntries.

Their specific approach of have an actual log record type for it instead of having LogStore guarantee atomic commitment of that metadata is interesting and is an alternative we could consider - it would require no changes from logstores for example. I think it would need slightly more change to raft to introduce a new log type, and there would be compatibility issues to consider where new leader can't replicate those messages to old followers because they would panic... That was one of the reasons I proposed an out-of-band metadata solution rather than a new log type I think because it avoid the need for a complex compatibility dance that could break upgrades.

tgross
tgross previously approved these changes Mar 27, 2025
Copy link
Member

@tgross tgross left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've had a chance to re-review and I'm 👍 on merging, once the merge conflicts have been resolved.

@lalalalatt
Copy link
Author

@tgross Resolved, PTAL. Thanks!

Copy link
Member

@tgross tgross left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

schmichael added a commit to hashicorp/nomad that referenced this pull request Mar 31, 2025
This is just a test of hashicorp/raft#613 and
not intended for merging as-is.

Since there is no corresponding raft-boltdb implementation, it's not
possible to functionally test this change as-is.
Copy link
Member

@schmichael schmichael left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sticking with this @lalalalatt and everyone else. I'm +1 on merging.

A note to others: there's currently no https://github.com/hashicorp/raft-boltdb (AFAICT), so it's not possible to functionally test this in products like HashiCorp Nomad.

That being said, I'm fine with merging first so we can treat the API contract as finalized.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants