Skip to content

Commit 07918e6

Browse files
brido4125jhpark816
authored andcommitted
INTERNAL: Using addOperations for atomicity in Broadcast Op.
1 parent 215b2d9 commit 07918e6

File tree

4 files changed

+23
-18
lines changed

4 files changed

+23
-18
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1901,6 +1901,7 @@ public OperationFuture<Boolean> flush(final String prefix, final int delay) {
19011901
Collection<MemcachedNode> nodes = getAllNodes();
19021902
final BroadcastFuture<Boolean> rv
19031903
= new BroadcastFuture<Boolean>(operationTimeout, Boolean.TRUE, nodes.size());
1904+
final Map<MemcachedNode, Operation> opsMap = new HashMap<MemcachedNode, Operation>();
19041905

19051906
checkState();
19061907
for (MemcachedNode node : nodes) {
@@ -1917,9 +1918,10 @@ public void complete() {
19171918
rv.complete();
19181919
}
19191920
});
1920-
rv.addOp(op);
1921-
getMemcachedConnection().addOperation(node, op);
1921+
opsMap.put(node, op);
19221922
}
1923+
rv.addOperations(opsMap.values());
1924+
getMemcachedConnection().addOperations(opsMap);
19231925
return rv;
19241926
}
19251927

src/main/java/net/spy/memcached/MemcachedClient.java

+16-8
Original file line numberDiff line numberDiff line change
@@ -1549,6 +1549,7 @@ public Map<SocketAddress, String> getVersions() {
15491549
final BroadcastFuture<Map<SocketAddress, String>> future
15501550
= new BroadcastFuture<Map<SocketAddress, String>>(
15511551
operationTimeout, result, nodes.size());
1552+
final Map<MemcachedNode, Operation> opsMap = new HashMap<MemcachedNode, Operation>();
15521553

15531554
checkState();
15541555
for (MemcachedNode node : nodes) {
@@ -1564,9 +1565,10 @@ public void complete() {
15641565
future.complete();
15651566
}
15661567
});
1567-
future.addOp(op);
1568-
conn.addOperation(node, op);
1568+
opsMap.put(node, op);
15691569
}
1570+
future.addOperations(opsMap.values());
1571+
conn.addOperations(opsMap);
15701572

15711573
Map<SocketAddress, String> rv = null;
15721574
try {
@@ -1612,6 +1614,7 @@ public Map<SocketAddress, Map<String, String>> getStats(final String arg) {
16121614
final BroadcastFuture<Map<SocketAddress, Map<String, String>>> future
16131615
= new BroadcastFuture<Map<SocketAddress, Map<String, String>>>(
16141616
operationTimeout, resultMap, nodes.size());
1617+
final Map<MemcachedNode, Operation> opsMap = new HashMap<MemcachedNode, Operation>();
16151618

16161619
checkState();
16171620
for (MemcachedNode node : nodes) {
@@ -1637,9 +1640,10 @@ public void complete() {
16371640
future.complete();
16381641
}
16391642
});
1640-
future.addOp(op);
1641-
conn.addOperation(node, op);
1643+
opsMap.put(node, op);
16421644
}
1645+
future.addOperations(opsMap.values());
1646+
conn.addOperations(opsMap);
16431647

16441648
Map<SocketAddress, Map<String, String>> rv = null;
16451649
try {
@@ -1965,6 +1969,7 @@ public Future<Boolean> flush(final int delay) {
19651969
Collection<MemcachedNode> nodes = getAllNodes();
19661970
final BroadcastFuture<Boolean> rv
19671971
= new BroadcastFuture<Boolean>(operationTimeout, Boolean.TRUE, nodes.size());
1972+
final Map<MemcachedNode, Operation> opsMap = new HashMap<MemcachedNode, Operation>();
19681973

19691974
checkState();
19701975
for (MemcachedNode node : nodes) {
@@ -1981,9 +1986,10 @@ public void complete() {
19811986
rv.complete();
19821987
}
19831988
});
1984-
rv.addOp(op);
1985-
conn.addOperation(node, op);
1989+
opsMap.put(node, op);
19861990
}
1991+
rv.addOperations(opsMap.values());
1992+
conn.addOperations(opsMap);
19871993
return rv;
19881994
}
19891995

@@ -2005,6 +2011,7 @@ public Set<String> listSaslMechanisms() {
20052011
final BroadcastFuture<ConcurrentMap<String, String>> future
20062012
= new BroadcastFuture<ConcurrentMap<String, String>>(
20072013
operationTimeout, resultMap, nodes.size());
2014+
final Map<MemcachedNode, Operation> opsMap = new HashMap<MemcachedNode, Operation>();
20082015

20092016
checkState();
20102017
for (MemcachedNode node : nodes) {
@@ -2021,9 +2028,10 @@ public void complete() {
20212028
future.complete();
20222029
}
20232030
});
2024-
future.addOp(op);
2025-
conn.addOperation(node, op);
2031+
opsMap.put(node, op);
20262032
}
2033+
future.addOperations(opsMap.values());
2034+
conn.addOperations(opsMap);
20272035

20282036
Set<String> rv = null;
20292037
try {

src/main/java/net/spy/memcached/MemcachedConnection.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -1482,13 +1482,8 @@ public void addOperation(final MemcachedNode node, final Operation o) {
14821482

14831483
public void addOperations(final Map<MemcachedNode, Operation> ops) {
14841484
for (Map.Entry<MemcachedNode, Operation> me : ops.entrySet()) {
1485-
final MemcachedNode node = me.getKey();
1486-
Operation o = me.getValue();
1487-
node.addOpToInputQ(o);
1488-
addedQueue.offer(node);
1485+
addOperation(me.getKey(), me.getValue());
14891486
}
1490-
Selector s = selector.wakeup();
1491-
assert s == selector : "Wakeup returned the wrong selector.";
14921487
}
14931488

14941489
public void wakeUpSelector() {

src/main/java/net/spy/memcached/internal/BroadcastFuture.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ public T get(long duration, TimeUnit units)
8484
return objRef.get();
8585
}
8686

87-
public void addOp(Operation op) {
88-
ops.add(op);
87+
public void addOperations(Collection<Operation> ops) {
88+
this.ops.addAll(ops);
8989
}
9090

9191
public void complete() {

0 commit comments

Comments
 (0)