Skip to content

Commit 139665b

Browse files
committed
ENHANCE: Create hashUpdateThreadPool to decrease IO thread overhead.
1 parent c1b9b96 commit 139665b

File tree

4 files changed

+87
-18
lines changed

4 files changed

+87
-18
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package net.spy.memcached;
2+
3+
4+
import java.util.List;
5+
import java.util.concurrent.Callable;
6+
7+
public class CacheListUpdateTask implements Callable<Boolean> {
8+
9+
private final MemcachedConnection memcachedConnection;
10+
private final List<MemcachedNode> attachNodes;
11+
private final List<MemcachedNode> removeNodes;
12+
13+
public CacheListUpdateTask(MemcachedConnection memcachedConnection,
14+
List<MemcachedNode> attachNodes,
15+
List<MemcachedNode> removeNodes) {
16+
17+
this.memcachedConnection = memcachedConnection;
18+
this.attachNodes = attachNodes;
19+
this.removeNodes = removeNodes;
20+
}
21+
22+
@Override
23+
public Boolean call() throws Exception {
24+
memcachedConnection.getLocator().update(attachNodes, removeNodes);
25+
26+
// Remove the unavailable nodes.
27+
memcachedConnection.handleNodesToRemove(removeNodes);
28+
return true;
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package net.spy.memcached;
2+
3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.Executors;
5+
import java.util.concurrent.Future;
6+
7+
public class HashRingUpdateService {
8+
private final ExecutorService pool;
9+
10+
public HashRingUpdateService() {
11+
pool = Executors.newSingleThreadExecutor();
12+
}
13+
14+
public Future<Boolean> updateHashes(CacheListUpdateTask task) {
15+
return pool.submit(task);
16+
}
17+
18+
public void shutdown() {
19+
pool.shutdown();
20+
}
21+
}

src/main/java/net/spy/memcached/MemcachedConnection.java

+25-12
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.TreeMap;
4646
import java.util.concurrent.ConcurrentLinkedQueue;
4747
import java.util.concurrent.CountDownLatch;
48+
import java.util.concurrent.Future;
4849
import java.util.concurrent.TimeUnit;
4950
import java.util.concurrent.atomic.AtomicReference;
5051

@@ -115,6 +116,9 @@ public final class MemcachedConnection extends SpyObject {
115116
private final DelayedSwitchoverGroups delayedSwitchoverGroups =
116117
new DelayedSwitchoverGroups(DELAYED_SWITCHOVER_TIMEOUT_MILLISECONDS);
117118
/* ENABLE_REPLICATION end */
119+
private final HashRingUpdateService hashUpdateService = new HashRingUpdateService();
120+
121+
private Future<Boolean> hashUpdateResult;
118122

119123
/**
120124
* Construct a memcached connection.
@@ -313,7 +317,7 @@ public void handleIO() throws IOException {
313317
}
314318
}
315319

316-
private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
320+
void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
317321
for (MemcachedNode node : nodesToRemove) {
318322
getLogger().info("old memcached node removed %s", node);
319323
reconnectQueue.remove(node);
@@ -340,10 +344,9 @@ private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
340344
}
341345
}
342346

343-
private void updateConnections(List<InetSocketAddress> addrs) throws IOException {
344-
List<MemcachedNode> attachNodes = new ArrayList<MemcachedNode>();
345-
List<MemcachedNode> removeNodes = new ArrayList<MemcachedNode>();
346-
347+
private void getUpdateNodes(List<InetSocketAddress> addrs,
348+
List<MemcachedNode> attachNodes,
349+
List<MemcachedNode> removeNodes) throws IOException {
347350
for (MemcachedNode node : locator.getAll()) {
348351
if (addrs.contains(node.getSocketAddress())) {
349352
addrs.remove(node.getSocketAddress());
@@ -356,12 +359,6 @@ private void updateConnections(List<InetSocketAddress> addrs) throws IOException
356359
for (SocketAddress sa : addrs) {
357360
attachNodes.add(attachMemcachedNode(sa));
358361
}
359-
360-
// Update the hash.
361-
locator.update(attachNodes, removeNodes);
362-
363-
// Remove the unavailable nodes.
364-
handleNodesToRemove(removeNodes);
365362
}
366363

367364
/* ENABLE_REPLICATION if */
@@ -704,7 +701,12 @@ void handleCacheNodesChange() throws IOException {
704701
return;
705702
}
706703
/* ENABLE_REPLICATION end */
707-
updateConnections(AddrUtil.getAddresses(cacheList));
704+
List<MemcachedNode> attachNodes = new ArrayList<MemcachedNode>();
705+
List<MemcachedNode> removeNodes = new ArrayList<MemcachedNode>();
706+
getUpdateNodes(AddrUtil.getAddresses(cacheList), attachNodes, removeNodes);
707+
// Update the hash.
708+
CacheListUpdateTask task = new CacheListUpdateTask(this, attachNodes, removeNodes);
709+
hashUpdateResult = hashUpdateService.updateHashes(task);
708710
}
709711
/* ENABLE_MIGRATION if */
710712
if (arcusMigrEnabled && alterList != null) {
@@ -725,6 +727,16 @@ void handleCacheNodesChange() throws IOException {
725727
/* ENABLE_MIGRATION end */
726728
}
727729

730+
// Called By MemcachedConnectionTest.
731+
boolean getHashUpdateResult() {
732+
try {
733+
hashUpdateResult.get();
734+
} catch (Exception e) {
735+
return false;
736+
}
737+
return true;
738+
}
739+
728740
// Called by CacheManger to add the memcached server group.
729741
public void setCacheNodesChange(String addrs) {
730742
String old = cacheNodesChange.getAndSet(addrs);
@@ -1533,6 +1545,7 @@ public void shutdown() throws IOException {
15331545
}
15341546
}
15351547
selector.close();
1548+
hashUpdateService.shutdown();
15361549
getLogger().debug("Shut down selector %s", selector);
15371550
}
15381551

src/test/java/net/spy/memcached/MemcachedConnectionTest.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ public void testNodesChangeQueue() throws Exception {
7272
conn.handleCacheNodesChange();
7373

7474
// then
75-
assertTrue(1 == locator.getAll().size());
75+
assertTrue(conn.getHashUpdateResult());
76+
assertEquals(1, locator.getAll().size());
7677

7778
// when
7879
conn.setCacheNodesChange("0.0.0.0:11211,0.0.0.0:11212,0.0.0.0:11213");
@@ -81,7 +82,8 @@ public void testNodesChangeQueue() throws Exception {
8182
conn.handleCacheNodesChange();
8283

8384
// then
84-
assertTrue(3 == locator.getAll().size());
85+
assertTrue(conn.getHashUpdateResult());
86+
assertEquals(3, locator.getAll().size());
8587

8688
// when
8789
conn.setCacheNodesChange("0.0.0.0:11212");
@@ -90,7 +92,8 @@ public void testNodesChangeQueue() throws Exception {
9092
conn.handleCacheNodesChange();
9193

9294
// then
93-
assertTrue(1 == locator.getAll().size());
95+
assertTrue(conn.getHashUpdateResult());
96+
assertEquals(1, locator.getAll().size());
9497
}
9598

9699
public void testNodesChangeQueue_empty() throws Exception {
@@ -101,7 +104,7 @@ public void testNodesChangeQueue_empty() throws Exception {
101104
conn.handleCacheNodesChange();
102105

103106
// then
104-
assertTrue(0 == locator.getAll().size());
107+
assertEquals(0, locator.getAll().size());
105108
}
106109

107110
public void testNodesChangeQueue_invalid_addr() {
@@ -128,7 +131,8 @@ public void testNodesChangeQueue_redundent() throws Exception {
128131
conn.handleCacheNodesChange();
129132

130133
// then
131-
assertTrue(2 == locator.getAll().size());
134+
assertTrue(conn.getHashUpdateResult());
135+
assertEquals(2, locator.getAll().size());
132136
}
133137

134138
public void testNodesChangeQueue_twice() throws Exception {
@@ -140,7 +144,8 @@ public void testNodesChangeQueue_twice() throws Exception {
140144
conn.handleCacheNodesChange();
141145

142146
// then
143-
assertTrue(1 == locator.getAll().size());
147+
assertTrue(conn.getHashUpdateResult());
148+
assertEquals(1, locator.getAll().size());
144149
}
145150

146151
public void testAddOperations() throws Exception {

0 commit comments

Comments
 (0)