Skip to content

Commit bc90d78

Browse files
committed
ENHANCE: Create hashUpdateThreadPool to decrease IO thread overhead.
1 parent 5dde87b commit bc90d78

File tree

4 files changed

+88
-18
lines changed

4 files changed

+88
-18
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package net.spy.memcached;
2+
3+
4+
import java.util.List;
5+
import java.util.concurrent.Callable;
6+
7+
public class CacheListUpdateTask implements Callable<Boolean> {
8+
9+
private final MemcachedConnection memcachedConnection;
10+
private final List<MemcachedNode> attachNodes;
11+
private final List<MemcachedNode> removeNodes;
12+
13+
public CacheListUpdateTask(MemcachedConnection memcachedConnection,
14+
List<MemcachedNode> attachNodes,
15+
List<MemcachedNode> removeNodes) {
16+
17+
this.memcachedConnection = memcachedConnection;
18+
this.attachNodes = attachNodes;
19+
this.removeNodes = removeNodes;
20+
}
21+
22+
@Override
23+
public Boolean call() throws Exception {
24+
memcachedConnection.getLocator().update(attachNodes, removeNodes);
25+
26+
// Remove the unavailable nodes.
27+
memcachedConnection.handleNodesToRemove(removeNodes);
28+
return true;
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package net.spy.memcached;
2+
3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.Executors;
5+
import java.util.concurrent.Future;
6+
7+
public class HashRingUpdateService {
8+
private final ExecutorService pool;
9+
10+
public HashRingUpdateService() {
11+
pool = Executors.newSingleThreadExecutor();
12+
}
13+
14+
public Future<Boolean> updateHashes(CacheListUpdateTask task) {
15+
return pool.submit(task);
16+
}
17+
18+
public void shutdown() {
19+
pool.shutdown();
20+
}
21+
}

src/main/java/net/spy/memcached/MemcachedConnection.java

+26-12
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.SortedMap;
4545
import java.util.TreeMap;
4646
import java.util.concurrent.ConcurrentLinkedQueue;
47+
import java.util.concurrent.Future;
4748
import java.util.concurrent.TimeUnit;
4849
import java.util.concurrent.atomic.AtomicReference;
4950

@@ -114,6 +115,9 @@ public final class MemcachedConnection extends SpyObject {
114115
private final DelayedSwitchoverGroups delayedSwitchoverGroups =
115116
new DelayedSwitchoverGroups(DELAYED_SWITCHOVER_TIMEOUT_MILLISECONDS);
116117
/* ENABLE_REPLICATION end */
118+
private final HashRingUpdateService hashUpdateService = new HashRingUpdateService();
119+
120+
private Future<Boolean> hashUpdateResult;
117121

118122
/**
119123
* Construct a memcached connection.
@@ -312,7 +316,7 @@ public void handleIO() throws IOException {
312316
}
313317
}
314318

315-
private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
319+
void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
316320
for (MemcachedNode node : nodesToRemove) {
317321
getLogger().info("old memcached node removed %s", node);
318322
reconnectQueue.remove(node);
@@ -339,10 +343,9 @@ private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
339343
}
340344
}
341345

342-
private void updateConnections(List<InetSocketAddress> addrs) throws IOException {
343-
List<MemcachedNode> attachNodes = new ArrayList<MemcachedNode>();
344-
List<MemcachedNode> removeNodes = new ArrayList<MemcachedNode>();
345-
346+
private void getUpdateNodes(List<InetSocketAddress> addrs,
347+
List<MemcachedNode> attachNodes,
348+
List<MemcachedNode> removeNodes) throws IOException {
346349
for (MemcachedNode node : locator.getAll()) {
347350
if (addrs.contains(node.getSocketAddress())) {
348351
addrs.remove(node.getSocketAddress());
@@ -355,12 +358,6 @@ private void updateConnections(List<InetSocketAddress> addrs) throws IOException
355358
for (SocketAddress sa : addrs) {
356359
attachNodes.add(attachMemcachedNode(sa));
357360
}
358-
359-
// Update the hash.
360-
locator.update(attachNodes, removeNodes);
361-
362-
// Remove the unavailable nodes.
363-
handleNodesToRemove(removeNodes);
364361
}
365362

366363
/* ENABLE_REPLICATION if */
@@ -703,7 +700,12 @@ void handleCacheNodesChange() throws IOException {
703700
return;
704701
}
705702
/* ENABLE_REPLICATION end */
706-
updateConnections(AddrUtil.getAddresses(cacheList));
703+
List<MemcachedNode> attachNodes = new ArrayList<MemcachedNode>();
704+
List<MemcachedNode> removeNodes = new ArrayList<MemcachedNode>();
705+
getUpdateNodes(AddrUtil.getAddresses(cacheList), attachNodes, removeNodes);
706+
// Update the hash.
707+
CacheListUpdateTask task = new CacheListUpdateTask(this, attachNodes, removeNodes);
708+
hashUpdateResult = hashUpdateService.updateHashes(task);
707709
}
708710
/* ENABLE_MIGRATION if */
709711
if (arcusMigrEnabled && alterList != null) {
@@ -724,6 +726,17 @@ void handleCacheNodesChange() throws IOException {
724726
/* ENABLE_MIGRATION end */
725727
}
726728

729+
// Called By MemcachedConnectionTest.
730+
boolean getHashUpdateResult() {
731+
try {
732+
hashUpdateResult.get();
733+
} catch (Exception e) {
734+
getLogger().warn("Failed to update hash.", e);
735+
return false;
736+
}
737+
return true;
738+
}
739+
727740
// Called by CacheManger to add the memcached server group.
728741
public void setCacheNodesChange(String addrs) {
729742
String old = cacheNodesChange.getAndSet(addrs);
@@ -1509,6 +1522,7 @@ public void shutdown() throws IOException {
15091522
}
15101523
}
15111524
selector.close();
1525+
hashUpdateService.shutdown();
15121526
getLogger().debug("Shut down selector %s", selector);
15131527
}
15141528

src/test/java/net/spy/memcached/MemcachedConnectionTest.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ public void testNodesChangeQueue() throws Exception {
7272
conn.handleCacheNodesChange();
7373

7474
// then
75-
assertTrue(1 == locator.getAll().size());
75+
assertTrue(conn.getHashUpdateResult());
76+
assertEquals(1, locator.getAll().size());
7677

7778
// when
7879
conn.setCacheNodesChange("0.0.0.0:11211,0.0.0.0:11212,0.0.0.0:11213");
@@ -81,7 +82,8 @@ public void testNodesChangeQueue() throws Exception {
8182
conn.handleCacheNodesChange();
8283

8384
// then
84-
assertTrue(3 == locator.getAll().size());
85+
assertTrue(conn.getHashUpdateResult());
86+
assertEquals(3, locator.getAll().size());
8587

8688
// when
8789
conn.setCacheNodesChange("0.0.0.0:11212");
@@ -90,7 +92,8 @@ public void testNodesChangeQueue() throws Exception {
9092
conn.handleCacheNodesChange();
9193

9294
// then
93-
assertTrue(1 == locator.getAll().size());
95+
assertTrue(conn.getHashUpdateResult());
96+
assertEquals(1, locator.getAll().size());
9497
}
9598

9699
public void testNodesChangeQueue_empty() throws Exception {
@@ -101,7 +104,7 @@ public void testNodesChangeQueue_empty() throws Exception {
101104
conn.handleCacheNodesChange();
102105

103106
// then
104-
assertTrue(0 == locator.getAll().size());
107+
assertEquals(0, locator.getAll().size());
105108
}
106109

107110
public void testNodesChangeQueue_invalid_addr() {
@@ -128,7 +131,8 @@ public void testNodesChangeQueue_redundent() throws Exception {
128131
conn.handleCacheNodesChange();
129132

130133
// then
131-
assertTrue(2 == locator.getAll().size());
134+
assertTrue(conn.getHashUpdateResult());
135+
assertEquals(2, locator.getAll().size());
132136
}
133137

134138
public void testNodesChangeQueue_twice() throws Exception {
@@ -140,7 +144,8 @@ public void testNodesChangeQueue_twice() throws Exception {
140144
conn.handleCacheNodesChange();
141145

142146
// then
143-
assertTrue(1 == locator.getAll().size());
147+
assertTrue(conn.getHashUpdateResult());
148+
assertEquals(1, locator.getAll().size());
144149
}
145150

146151
public void testAddOperations() throws Exception {

0 commit comments

Comments
 (0)