Skip to content

Commit

Permalink
CLEANUP: Refactored `MemcachedConnection.switchoverMemcachedReplGroup…
Browse files Browse the repository at this point in the history
…()` method.
  • Loading branch information
uhm0311 committed Dec 11, 2024
1 parent 7b301af commit fd4eeeb
Showing 1 changed file with 16 additions and 18 deletions.
34 changes: 16 additions & 18 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep

if (oldGroup.isDelayedSwitchover()) {
delayedSwitchoverGroups.remove(oldGroup);
switchoverMemcachedReplGroup(oldGroup.getMasterNode(), true);
switchoverMemcachedReplGroup(oldGroup, true);
}

MemcachedNode oldMasterNode = oldGroup.getMasterNode();
Expand Down Expand Up @@ -587,26 +587,24 @@ private Set<ArcusReplNodeAddress> getSlaveAddrsFromGroupAddrs(
/* ENABLE_REPLICATION end */

/* ENABLE_REPLICATION if */
private void switchoverMemcachedReplGroup(MemcachedNode node, boolean cancelNonIdempotent) {
MemcachedReplicaGroup group = node.getReplicaGroup();
private void switchoverMemcachedReplGroup(MemcachedReplicaGroup group,
boolean cancelNonIdempotent) {

MemcachedNode oldMaster = group.getMasterNode();

/* must keep the following execution order when switchover
* - first moveOperations
* - second, queueReconnect
*
* because moves all operations
*/
if (group.getMasterNode() != null && group.getMasterCandidate() != null) {
if (((ArcusReplNodeAddress) node.getSocketAddress()).isMaster()) {
((ArcusReplKetamaNodeLocator) locator).switchoverReplGroup(group);
}
node.moveOperations(group.getMasterNode(), cancelNonIdempotent);
addedQueue.offer(group.getMasterNode());
queueReconnect(node, ReconnDelay.IMMEDIATE,
"Discarded all pending reading state operation to move operations.");
} else {
getLogger().warn("Delay switchover because invalid group state : " + group);
}
((ArcusReplKetamaNodeLocator) locator).switchoverReplGroup(group);
MemcachedNode newMaster = group.getMasterNode();

oldMaster.moveOperations(newMaster, cancelNonIdempotent);
addedQueue.offer(group.getMasterNode());
queueReconnect(oldMaster, ReconnDelay.IMMEDIATE,
"Discarded all pending reading state operation to move operations.");
}
/* ENABLE_REPLICATION end */

Expand Down Expand Up @@ -1016,7 +1014,7 @@ private void handleReads(MemcachedNode qa)
if (currentOp != null && currentOp.getState() == OperationState.MOVING) {
((Buffer) rbuf).clear();
delayedSwitchoverGroups.remove(qa.getReplicaGroup());
switchoverMemcachedReplGroup(qa, false);
switchoverMemcachedReplGroup(qa.getReplicaGroup(), false);
break;
}
/* ENABLE_REPLICATION end */
Expand All @@ -1034,7 +1032,7 @@ private void handleReads(MemcachedNode qa)
if (qa.getReplicaGroup().isDelayedSwitchover() &&
qa.getReplicaGroup().masterNode == qa) {
delayedSwitchoverGroups.remove(qa.getReplicaGroup());
switchoverMemcachedReplGroup(qa, false);
switchoverMemcachedReplGroup(qa.getReplicaGroup(), false);
}
}
}
Expand Down Expand Up @@ -1182,7 +1180,7 @@ private void queueReconnect(MemcachedNode qa, ReconnDelay type, String cause) {
if (qa.getReplicaGroup().isDelayedSwitchover() &&
qa.getReplicaGroup().getMasterNode() == qa) {
delayedSwitchoverGroups.remove(qa.getReplicaGroup());
switchoverMemcachedReplGroup(qa, true);
switchoverMemcachedReplGroup(qa.getReplicaGroup(), true);
return;
}
}
Expand Down Expand Up @@ -1805,7 +1803,7 @@ public void switchover() {
} else {
iterator.remove();
entry.getValue().setDelayedSwitchover(false);
switchoverMemcachedReplGroup(entry.getValue().getMasterNode(), true);
switchoverMemcachedReplGroup(entry.getValue(), true);
}
}
}
Expand Down

0 comments on commit fd4eeeb

Please sign in to comment.