Skip to content

Commit 9a955aa

Browse files
committed
ENHANCE: Create hashUpdateThreadPool to decrease IO thread overhead.
1 parent c1b9b96 commit 9a955aa

File tree

4 files changed

+67
-10
lines changed

4 files changed

+67
-10
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package net.spy.memcached;
2+
3+
4+
import java.util.List;
5+
import java.util.concurrent.Callable;
6+
7+
public class CacheListUpdateTask implements Callable<Boolean> {
8+
9+
private final MemcachedConnection memcachedConnection;
10+
private final List<MemcachedNode> attachNodes;
11+
private final List<MemcachedNode> removeNodes;
12+
13+
public CacheListUpdateTask(MemcachedConnection memcachedConnection,
14+
List<MemcachedNode> attachNodes,
15+
List<MemcachedNode> removeNodes) {
16+
17+
this.memcachedConnection = memcachedConnection;
18+
this.attachNodes = attachNodes;
19+
this.removeNodes = removeNodes;
20+
}
21+
22+
@Override
23+
public Boolean call() throws Exception {
24+
memcachedConnection.getLocator().update(attachNodes, removeNodes);
25+
26+
// Remove the unavailable nodes.
27+
memcachedConnection.handleNodesToRemove(removeNodes);
28+
return true;
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package net.spy.memcached;
2+
3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.Executors;
5+
6+
public class HashRingUpdateService {
7+
private final ExecutorService pool;
8+
9+
public HashRingUpdateService() {
10+
pool = Executors.newSingleThreadExecutor();
11+
}
12+
13+
public Boolean updateHashes(CacheListUpdateTask task) {
14+
Boolean res = null;
15+
try {
16+
res = pool.submit(task).get();
17+
} catch (Exception e) {
18+
e.printStackTrace();
19+
}
20+
return res;
21+
}
22+
23+
public void shutdown() {
24+
pool.shutdown();
25+
}
26+
}

src/main/java/net/spy/memcached/MemcachedConnection.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public final class MemcachedConnection extends SpyObject {
115115
private final DelayedSwitchoverGroups delayedSwitchoverGroups =
116116
new DelayedSwitchoverGroups(DELAYED_SWITCHOVER_TIMEOUT_MILLISECONDS);
117117
/* ENABLE_REPLICATION end */
118+
private final HashRingUpdateService hashUpdateService = new HashRingUpdateService();
118119

119120
/**
120121
* Construct a memcached connection.
@@ -313,7 +314,7 @@ public void handleIO() throws IOException {
313314
}
314315
}
315316

316-
private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
317+
void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
317318
for (MemcachedNode node : nodesToRemove) {
318319
getLogger().info("old memcached node removed %s", node);
319320
reconnectQueue.remove(node);
@@ -358,10 +359,9 @@ private void updateConnections(List<InetSocketAddress> addrs) throws IOException
358359
}
359360

360361
// Update the hash.
361-
locator.update(attachNodes, removeNodes);
362-
363-
// Remove the unavailable nodes.
364-
handleNodesToRemove(removeNodes);
362+
hashUpdateService.updateHashes(new CacheListUpdateTask(
363+
this, attachNodes, removeNodes
364+
));
365365
}
366366

367367
/* ENABLE_REPLICATION if */
@@ -1533,6 +1533,7 @@ public void shutdown() throws IOException {
15331533
}
15341534
}
15351535
selector.close();
1536+
hashUpdateService.shutdown();
15361537
getLogger().debug("Shut down selector %s", selector);
15371538
}
15381539

src/test/java/net/spy/memcached/MemcachedConnectionTest.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void testNodesChangeQueue() throws Exception {
7272
conn.handleCacheNodesChange();
7373

7474
// then
75-
assertTrue(1 == locator.getAll().size());
75+
assertEquals(1, locator.getAll().size());
7676

7777
// when
7878
conn.setCacheNodesChange("0.0.0.0:11211,0.0.0.0:11212,0.0.0.0:11213");
@@ -81,7 +81,7 @@ public void testNodesChangeQueue() throws Exception {
8181
conn.handleCacheNodesChange();
8282

8383
// then
84-
assertTrue(3 == locator.getAll().size());
84+
assertEquals(3, locator.getAll().size());
8585

8686
// when
8787
conn.setCacheNodesChange("0.0.0.0:11212");
@@ -90,7 +90,7 @@ public void testNodesChangeQueue() throws Exception {
9090
conn.handleCacheNodesChange();
9191

9292
// then
93-
assertTrue(1 == locator.getAll().size());
93+
assertEquals(1, locator.getAll().size());
9494
}
9595

9696
public void testNodesChangeQueue_empty() throws Exception {
@@ -101,7 +101,7 @@ public void testNodesChangeQueue_empty() throws Exception {
101101
conn.handleCacheNodesChange();
102102

103103
// then
104-
assertTrue(0 == locator.getAll().size());
104+
assertEquals(0, locator.getAll().size());
105105
}
106106

107107
public void testNodesChangeQueue_invalid_addr() {
@@ -128,7 +128,7 @@ public void testNodesChangeQueue_redundent() throws Exception {
128128
conn.handleCacheNodesChange();
129129

130130
// then
131-
assertTrue(2 == locator.getAll().size());
131+
assertEquals(2, locator.getAll().size());
132132
}
133133

134134
public void testNodesChangeQueue_twice() throws Exception {

0 commit comments

Comments
 (0)