diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index a8c2318b0..3f9bdcdc1 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -453,7 +453,7 @@ private void updateReplConnections(List addrs) throws IOExcep if (oldGroup.isDelayedSwitchover()) { delayedSwitchoverGroups.remove(oldGroup); - switchoverMemcachedReplGroup(oldGroup.getMasterNode(), true); + switchoverMemcachedReplGroup(oldGroup, true); } MemcachedNode oldMasterNode = oldGroup.getMasterNode(); @@ -587,8 +587,10 @@ private Set getSlaveAddrsFromGroupAddrs( /* ENABLE_REPLICATION end */ /* ENABLE_REPLICATION if */ - private void switchoverMemcachedReplGroup(MemcachedNode node, boolean cancelNonIdempotent) { - MemcachedReplicaGroup group = node.getReplicaGroup(); + private void switchoverMemcachedReplGroup(MemcachedReplicaGroup group, + boolean cancelNonIdempotent) { + + MemcachedNode oldMaster = group.getMasterNode(); /* must keep the following execution order when switchover * - first moveOperations @@ -596,17 +598,13 @@ private void switchoverMemcachedReplGroup(MemcachedNode node, boolean cancelNonI * * because moves all operations */ - if (group.getMasterNode() != null && group.getMasterCandidate() != null) { - if (((ArcusReplNodeAddress) node.getSocketAddress()).isMaster()) { - ((ArcusReplKetamaNodeLocator) locator).switchoverReplGroup(group); - } - node.moveOperations(group.getMasterNode(), cancelNonIdempotent); - addedQueue.offer(group.getMasterNode()); - queueReconnect(node, ReconnDelay.IMMEDIATE, - "Discarded all pending reading state operation to move operations."); - } else { - getLogger().warn("Delay switchover because invalid group state : " + group); - } + ((ArcusReplKetamaNodeLocator) locator).switchoverReplGroup(group); + MemcachedNode newMaster = group.getMasterNode(); + + oldMaster.moveOperations(newMaster, cancelNonIdempotent); + addedQueue.offer(group.getMasterNode()); + queueReconnect(oldMaster, ReconnDelay.IMMEDIATE, + "Discarded all pending reading state operation to move operations."); } /* ENABLE_REPLICATION end */ @@ -1016,7 +1014,7 @@ private void handleReads(MemcachedNode qa) if (currentOp != null && currentOp.getState() == OperationState.MOVING) { ((Buffer) rbuf).clear(); delayedSwitchoverGroups.remove(qa.getReplicaGroup()); - switchoverMemcachedReplGroup(qa, false); + switchoverMemcachedReplGroup(qa.getReplicaGroup(), false); break; } /* ENABLE_REPLICATION end */ @@ -1034,7 +1032,7 @@ private void handleReads(MemcachedNode qa) if (qa.getReplicaGroup().isDelayedSwitchover() && qa.getReplicaGroup().masterNode == qa) { delayedSwitchoverGroups.remove(qa.getReplicaGroup()); - switchoverMemcachedReplGroup(qa, false); + switchoverMemcachedReplGroup(qa.getReplicaGroup(), false); } } } @@ -1182,7 +1180,7 @@ private void queueReconnect(MemcachedNode qa, ReconnDelay type, String cause) { if (qa.getReplicaGroup().isDelayedSwitchover() && qa.getReplicaGroup().getMasterNode() == qa) { delayedSwitchoverGroups.remove(qa.getReplicaGroup()); - switchoverMemcachedReplGroup(qa, true); + switchoverMemcachedReplGroup(qa.getReplicaGroup(), true); return; } } @@ -1805,7 +1803,7 @@ public void switchover() { } else { iterator.remove(); entry.getValue().setDelayedSwitchover(false); - switchoverMemcachedReplGroup(entry.getValue().getMasterNode(), true); + switchoverMemcachedReplGroup(entry.getValue(), true); } } }