Skip to content

Commit 2a4764e

Browse files
committed
ENHANCE: Create hashUpdateThreadPool to decrease IO thread overhead.
1 parent e85c672 commit 2a4764e

File tree

4 files changed

+73
-18
lines changed

4 files changed

+73
-18
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package net.spy.memcached;
2+
3+
4+
import java.util.List;
5+
6+
public class CacheListUpdateTask implements Runnable {
7+
8+
private final MemcachedConnection memcachedConnection;
9+
private final List<MemcachedNode> attachNodes;
10+
private final List<MemcachedNode> removeNodes;
11+
12+
public CacheListUpdateTask(MemcachedConnection memcachedConnection,
13+
List<MemcachedNode> attachNodes,
14+
List<MemcachedNode> removeNodes) {
15+
16+
this.memcachedConnection = memcachedConnection;
17+
this.attachNodes = attachNodes;
18+
this.removeNodes = removeNodes;
19+
}
20+
21+
@Override
22+
public void run() {
23+
memcachedConnection.getLocator().update(attachNodes, removeNodes);
24+
25+
// Remove the unavailable nodes.
26+
memcachedConnection.handleNodesToRemove(removeNodes);
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package net.spy.memcached;
2+
3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.Executors;
5+
6+
public class HashRingUpdateService {
7+
private final ExecutorService pool;
8+
9+
public HashRingUpdateService() {
10+
pool = Executors.newSingleThreadExecutor();
11+
}
12+
13+
public void updateHashes(CacheListUpdateTask task) {
14+
pool.execute(task);
15+
}
16+
17+
public void shutdown() {
18+
pool.shutdown();
19+
}
20+
}

src/main/java/net/spy/memcached/MemcachedConnection.java

+11-12
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ public final class MemcachedConnection extends SpyObject {
114114
private final DelayedSwitchoverGroups delayedSwitchoverGroups =
115115
new DelayedSwitchoverGroups(DELAYED_SWITCHOVER_TIMEOUT_MILLISECONDS);
116116
/* ENABLE_REPLICATION end */
117+
private final HashRingUpdateService hashUpdateService = new HashRingUpdateService();
117118

118119
/**
119120
* Construct a memcached connection.
@@ -312,7 +313,7 @@ public void handleIO() throws IOException {
312313
}
313314
}
314315

315-
private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
316+
void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
316317
for (MemcachedNode node : nodesToRemove) {
317318
getLogger().info("old memcached node removed %s", node);
318319
reconnectQueue.remove(node);
@@ -339,10 +340,9 @@ private void handleNodesToRemove(final List<MemcachedNode> nodesToRemove) {
339340
}
340341
}
341342

342-
private void updateConnections(List<InetSocketAddress> addrs) throws IOException {
343-
List<MemcachedNode> attachNodes = new ArrayList<MemcachedNode>();
344-
List<MemcachedNode> removeNodes = new ArrayList<MemcachedNode>();
345-
343+
private void getUpdateNodes(List<InetSocketAddress> addrs,
344+
List<MemcachedNode> attachNodes,
345+
List<MemcachedNode> removeNodes) throws IOException {
346346
for (MemcachedNode node : locator.getAll()) {
347347
if (addrs.contains(node.getSocketAddress())) {
348348
addrs.remove(node.getSocketAddress());
@@ -355,12 +355,6 @@ private void updateConnections(List<InetSocketAddress> addrs) throws IOException
355355
for (SocketAddress sa : addrs) {
356356
attachNodes.add(attachMemcachedNode(sa));
357357
}
358-
359-
// Update the hash.
360-
locator.update(attachNodes, removeNodes);
361-
362-
// Remove the unavailable nodes.
363-
handleNodesToRemove(removeNodes);
364358
}
365359

366360
/* ENABLE_REPLICATION if */
@@ -704,7 +698,11 @@ void handleCacheNodesChange() throws IOException {
704698
return;
705699
}
706700
/* ENABLE_REPLICATION end */
707-
updateConnections(AddrUtil.getAddresses(cacheList));
701+
List<MemcachedNode> attachNodes = new ArrayList<MemcachedNode>();
702+
List<MemcachedNode> removeNodes = new ArrayList<MemcachedNode>();
703+
getUpdateNodes(AddrUtil.getAddresses(cacheList), attachNodes, removeNodes);
704+
// Update the hash.
705+
hashUpdateService.updateHashes(new CacheListUpdateTask(this, attachNodes, removeNodes));
708706
}
709707
/* ENABLE_MIGRATION if */
710708
if (arcusMigrEnabled && alterList != null) {
@@ -1511,6 +1509,7 @@ public void shutdown() throws IOException {
15111509
}
15121510
}
15131511
selector.close();
1512+
hashUpdateService.shutdown();
15141513
getLogger().debug("Shut down selector %s", selector);
15151514
}
15161515

src/test/java/net/spy/memcached/MemcachedConnectionTest.java

+14-6
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public void testDebugBuffer() {
6464
assertEquals("this is a test \\x5f", s);
6565
}
6666

67+
// FIXME : Change to using callback instead of sleep when CompletableFuture enabled.
6768
public void testNodesChangeQueue() throws Exception {
6869
// when
6970
conn.setCacheNodesChange("0.0.0.0:11211");
@@ -72,7 +73,8 @@ public void testNodesChangeQueue() throws Exception {
7273
conn.handleCacheNodesChange();
7374

7475
// then
75-
assertTrue(1 == locator.getAll().size());
76+
Thread.sleep(500);
77+
assertEquals(1, locator.getAll().size());
7678

7779
// when
7880
conn.setCacheNodesChange("0.0.0.0:11211,0.0.0.0:11212,0.0.0.0:11213");
@@ -81,7 +83,8 @@ public void testNodesChangeQueue() throws Exception {
8183
conn.handleCacheNodesChange();
8284

8385
// then
84-
assertTrue(3 == locator.getAll().size());
86+
Thread.sleep(500);
87+
assertEquals(3, locator.getAll().size());
8588

8689
// when
8790
conn.setCacheNodesChange("0.0.0.0:11212");
@@ -90,7 +93,8 @@ public void testNodesChangeQueue() throws Exception {
9093
conn.handleCacheNodesChange();
9194

9295
// then
93-
assertTrue(1 == locator.getAll().size());
96+
Thread.sleep(500);
97+
assertEquals(1, locator.getAll().size());
9498
}
9599

96100
public void testNodesChangeQueue_empty() throws Exception {
@@ -101,7 +105,7 @@ public void testNodesChangeQueue_empty() throws Exception {
101105
conn.handleCacheNodesChange();
102106

103107
// then
104-
assertTrue(0 == locator.getAll().size());
108+
assertEquals(0, locator.getAll().size());
105109
}
106110

107111
public void testNodesChangeQueue_invalid_addr() {
@@ -120,6 +124,7 @@ public void testNodesChangeQueue_invalid_addr() {
120124
}
121125
}
122126

127+
// FIXME : Change to using callback instead of sleep when CompletableFuture enabled.
123128
public void testNodesChangeQueue_redundent() throws Exception {
124129
// when
125130
conn.setCacheNodesChange("0.0.0.0:11211,0.0.0.0:11211");
@@ -128,9 +133,11 @@ public void testNodesChangeQueue_redundent() throws Exception {
128133
conn.handleCacheNodesChange();
129134

130135
// then
131-
assertTrue(2 == locator.getAll().size());
136+
Thread.sleep(500);
137+
assertEquals(2, locator.getAll().size());
132138
}
133139

140+
// FIXME : Change to using callback instead of sleep when CompletableFuture enabled.
134141
public void testNodesChangeQueue_twice() throws Exception {
135142
// when
136143
conn.setCacheNodesChange("0.0.0.0:11211");
@@ -140,7 +147,8 @@ public void testNodesChangeQueue_twice() throws Exception {
140147
conn.handleCacheNodesChange();
141148

142149
// then
143-
assertTrue(1 == locator.getAll().size());
150+
Thread.sleep(500);
151+
assertEquals(1, locator.getAll().size());
144152
}
145153

146154
public void testAddOperations() throws Exception {

0 commit comments

Comments
 (0)