Skip to content

Commit 576c51e

Browse files
committed
ENHANCE: Change asyncSetPipedExist method logic.
1 parent 5dde87b commit 576c51e

File tree

2 files changed

+109
-80
lines changed

2 files changed

+109
-80
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

+75-75
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,11 @@
123123
import net.spy.memcached.internal.CheckedOperationTimeoutException;
124124
import net.spy.memcached.internal.CollectionFuture;
125125
import net.spy.memcached.internal.CollectionGetBulkFuture;
126-
import net.spy.memcached.internal.OperationFuture;
127-
import net.spy.memcached.internal.SMGetFuture;
128-
import net.spy.memcached.internal.PipedCollectionFuture;
129126
import net.spy.memcached.internal.CollectionGetFuture;
130127
import net.spy.memcached.internal.BroadcastFuture;
128+
import net.spy.memcached.internal.OperationFuture;
129+
import net.spy.memcached.internal.PipedCollectionFuture;
130+
import net.spy.memcached.internal.SMGetFuture;
131131
import net.spy.memcached.ops.BTreeFindPositionOperation;
132132
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
133133
import net.spy.memcached.ops.BTreeGetBulkOperation;
@@ -3498,99 +3498,99 @@ public CollectionFuture<Integer> asyncBopGetItemCount(String key,
34983498
}
34993499

35003500
@Override
3501-
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key,
3502-
List<Object> values) {
3503-
SetPipedExist<Object> exist = new SetPipedExist<Object>(key, values, collectionTranscoder);
3504-
return asyncSetPipedExist(key, exist);
3501+
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key, List<Object> values) {
3502+
return asyncSopPipedExistBulk(key, values, collectionTranscoder);
35053503
}
35063504

35073505
@Override
35083506
public <T> CollectionFuture<Map<T, Boolean>> asyncSopPipedExistBulk(String key,
35093507
List<T> values,
35103508
Transcoder<T> tc) {
3511-
SetPipedExist<T> exist = new SetPipedExist<T>(key, values, tc);
3512-
return asyncSetPipedExist(key, exist);
3509+
if (values.size() == 0) {
3510+
throw new IllegalArgumentException(
3511+
"The number of piped operations must be larger than 0.");
3512+
}
3513+
3514+
List<SetPipedExist<T>> existList = new ArrayList<SetPipedExist<T>>();
3515+
if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) {
3516+
existList.add(new SetPipedExist<T>(key, values, tc));
3517+
} else {
3518+
PartitionedList<T> partitionedList = new PartitionedList<T>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT);
3519+
for (List<T> partition : partitionedList) {
3520+
existList.add(new SetPipedExist<T>(key, partition, tc));
3521+
}
3522+
}
3523+
return asyncSetPipedExist(key, existList);
35133524
}
35143525

35153526
/**
35163527
* Generic pipelined existence operation for set items. Public methods call this method.
35173528
*
35183529
* @param key collection item's key
3519-
* @param exist operation parameters (element values)
3530+
* @param existList list of operation parameters (element values)
35203531
* @return future holding the map of elements and their existence results
35213532
*/
35223533
<T> CollectionFuture<Map<T, Boolean>> asyncSetPipedExist(
3523-
final String key, final SetPipedExist<T> exist) {
3524-
3525-
if (exist.getItemCount() == 0) {
3526-
throw new IllegalArgumentException(
3527-
"The number of piped operations must be larger than 0.");
3528-
}
3529-
if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
3530-
throw new IllegalArgumentException(
3531-
"The number of piped operations must not exceed a maximum of "
3532-
+ CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + ".");
3533-
}
3534-
3535-
final CountDownLatch latch = new CountDownLatch(1);
3536-
final CollectionFuture<Map<T, Boolean>> rv = new CollectionFuture<Map<T, Boolean>>(
3537-
latch, operationTimeout);
3538-
3539-
Operation op = opFact.collectionPipedExist(key, exist,
3540-
new CollectionPipedExistOperation.Callback() {
3541-
3542-
private final Map<T, Boolean> result = new HashMap<T, Boolean>();
3543-
private boolean hasAnError = false;
3544-
3545-
public void receivedStatus(OperationStatus status) {
3546-
if (hasAnError) {
3547-
return;
3548-
}
3549-
3550-
CollectionOperationStatus cstatus;
3551-
if (status instanceof CollectionOperationStatus) {
3552-
cstatus = (CollectionOperationStatus) status;
3553-
} else {
3534+
final String key, final List<SetPipedExist<T>> existList) {
3535+
final CountDownLatch latch = new CountDownLatch(existList.size());
3536+
3537+
final PipedCollectionFuture<T, Boolean> rv
3538+
= new PipedCollectionFuture<T, Boolean>(latch, operationTimeout);
3539+
3540+
for (final SetPipedExist<T> exist : existList) {
3541+
Operation op = opFact.collectionPipedExist(key, exist, new CollectionPipedExistOperation.Callback() {
3542+
private CollectionOperationStatus failedStatus = null;
3543+
private int failedStatusCount = 0;
3544+
public void gotStatus(Integer index, OperationStatus status) {
3545+
CollectionOperationStatus cstatus;
3546+
if (status instanceof CollectionOperationStatus) {
3547+
cstatus = (CollectionOperationStatus) status;
3548+
} else {
3549+
getLogger().warn("Unhandled state: " + status);
3550+
cstatus = new CollectionOperationStatus(status);
3551+
}
3552+
switch (cstatus.getResponse()) {
3553+
case EXIST:
3554+
case NOT_EXIST:
3555+
rv.addEachResult(exist.getValues().get(index), (CollectionResponse.EXIST.equals(cstatus.getResponse())));
3556+
break;
3557+
case UNREADABLE:
3558+
case TYPE_MISMATCH:
3559+
case NOT_FOUND:
3560+
if (failedStatus == null) {
3561+
failedStatus = cstatus;
3562+
failedStatusCount++;
3563+
} else if (failedStatus.equals(cstatus)) {
3564+
failedStatusCount++;
3565+
}
3566+
break;
3567+
default:
35543568
getLogger().warn("Unhandled state: " + status);
3555-
cstatus = new CollectionOperationStatus(status);
3556-
}
3557-
rv.set(result, cstatus);
35583569
}
3570+
}
35593571

3560-
public void complete() {
3561-
latch.countDown();
3572+
public void receivedStatus(OperationStatus status) {
3573+
CollectionOperationStatus cstatus;
3574+
if (status instanceof CollectionOperationStatus) {
3575+
cstatus = (CollectionOperationStatus) status;
3576+
} else {
3577+
getLogger().warn("Unhandled state: " + status);
3578+
cstatus = new CollectionOperationStatus(status);
35623579
}
3563-
3564-
public void gotStatus(Integer index, OperationStatus status) {
3565-
CollectionOperationStatus cstatus;
3566-
if (status instanceof CollectionOperationStatus) {
3567-
cstatus = (CollectionOperationStatus) status;
3568-
} else {
3569-
cstatus = new CollectionOperationStatus(status);
3570-
}
3571-
3572-
switch (cstatus.getResponse()) {
3573-
case EXIST:
3574-
case NOT_EXIST:
3575-
result.put(exist.getValues().get(index),
3576-
(CollectionResponse.EXIST.equals(cstatus
3577-
.getResponse())));
3578-
break;
3579-
case UNREADABLE:
3580-
case TYPE_MISMATCH:
3581-
case NOT_FOUND:
3582-
hasAnError = true;
3583-
rv.set(new HashMap<T, Boolean>(0),
3584-
(CollectionOperationStatus) status);
3585-
break;
3586-
default:
3587-
getLogger().warn("Unhandled state: " + status);
3588-
}
3580+
if (failedStatusCount == exist.getItemCount()) {
3581+
rv.addOperationStatus(failedStatus);
3582+
} else {
3583+
rv.addOperationStatus(cstatus);
35893584
}
3590-
});
3585+
}
35913586

3592-
rv.setOperation(op);
3593-
addOp(key, op);
3587+
public void complete() {
3588+
latch.countDown();
3589+
}
3590+
});
3591+
rv.addOperation(op);
3592+
addOp(key, op);
3593+
}
35943594
return rv;
35953595
}
35963596

src/test/manual/net/spy/memcached/collection/set/SopPipedExistTest.java

+34-5
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,17 @@
1616
*/
1717
package net.spy.memcached.collection.set;
1818

19-
import java.util.ArrayList;
20-
import java.util.List;
21-
import java.util.Map;
22-
2319
import net.spy.memcached.collection.BaseIntegrationTest;
2420
import net.spy.memcached.collection.CollectionAttributes;
2521
import net.spy.memcached.collection.CollectionResponse;
2622
import net.spy.memcached.collection.ElementValueType;
2723
import net.spy.memcached.internal.CollectionFuture;
28-
2924
import org.junit.Assert;
3025

26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import java.util.Map;
29+
3130
public class SopPipedExistTest extends BaseIntegrationTest {
3231

3332
private final String KEY = this.getClass().getSimpleName();
@@ -166,6 +165,36 @@ public void testMaxPipedExist() {
166165
Assert.fail(e.getMessage());
167166
}
168167
}
168+
public void testMaxOverPipedExist() {
169+
int OVER_COUNT = 1000;
170+
171+
try {
172+
List<Object> findValues = new ArrayList<Object>();
173+
174+
// insert items
175+
for (int i = 0; i < OVER_COUNT; i++) {
176+
findValues.add("VALUE" + i);
177+
178+
Assert.assertTrue(mc.asyncSopInsert(KEY, "VALUE" + i, new CollectionAttributes()).get());
179+
}
180+
181+
// exist bulk
182+
CollectionFuture<Map<Object, Boolean>> future = mc
183+
.asyncSopPipedExistBulk(KEY, findValues);
184+
185+
Map<Object, Boolean> map = future.get();
186+
187+
Assert.assertTrue(future.getOperationStatus().isSuccess());
188+
189+
for (int i = 0; i < OVER_COUNT; i++) {
190+
Assert.assertTrue(map.get("VALUE" + i));
191+
}
192+
193+
} catch (Exception e) {
194+
e.printStackTrace();
195+
Assert.fail(e.getMessage());
196+
}
197+
}
169198

170199
public void testPipedExistNotExistsKey() {
171200
try {

0 commit comments

Comments
 (0)