Skip to content

Commit 7c39158

Browse files
committed
ENHANCE: Change asyncSetPipedExist method logic.
1 parent ffde235 commit 7c39158

File tree

2 files changed

+94
-74
lines changed

2 files changed

+94
-74
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

+64-74
Original file line numberDiff line numberDiff line change
@@ -3565,99 +3565,89 @@ public CollectionFuture<Integer> asyncBopGetItemCount(String key,
35653565
}
35663566

35673567
@Override
3568-
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key,
3569-
List<Object> values) {
3570-
SetPipedExist<Object> exist = new SetPipedExist<Object>(key, values, collectionTranscoder);
3571-
return asyncSetPipedExist(key, exist);
3568+
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key, List<Object> values) {
3569+
return asyncSopPipedExistBulk(key, values, collectionTranscoder);
35723570
}
35733571

35743572
@Override
35753573
public <T> CollectionFuture<Map<T, Boolean>> asyncSopPipedExistBulk(String key,
35763574
List<T> values,
35773575
Transcoder<T> tc) {
3578-
SetPipedExist<T> exist = new SetPipedExist<T>(key, values, tc);
3579-
return asyncSetPipedExist(key, exist);
3576+
if (values.size() == 0) {
3577+
throw new IllegalArgumentException(
3578+
"The number of piped operations must be larger than 0.");
3579+
}
3580+
3581+
List<SetPipedExist<T>> existList = new ArrayList<SetPipedExist<T>>();
3582+
if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) {
3583+
existList.add(new SetPipedExist<T>(key, values, tc));
3584+
} else {
3585+
PartitionedList<T> partitionedList = new PartitionedList<T>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT);
3586+
for (List<T> partition : partitionedList) {
3587+
existList.add(new SetPipedExist<T>(key, partition, tc));
3588+
}
3589+
}
3590+
return asyncSetPipedExist(key, existList);
35803591
}
35813592

35823593
/**
35833594
* Generic pipelined existence operation for set items. Public methods call this method.
35843595
*
35853596
* @param key collection item's key
3586-
* @param exist operation parameters (element values)
3597+
* @param existList list of operation parameters (element values)
35873598
* @return future holding the map of elements and their existence results
35883599
*/
35893600
<T> CollectionFuture<Map<T, Boolean>> asyncSetPipedExist(
3590-
final String key, final SetPipedExist<T> exist) {
3591-
3592-
if (exist.getItemCount() == 0) {
3593-
throw new IllegalArgumentException(
3594-
"The number of piped operations must be larger than 0.");
3595-
}
3596-
if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
3597-
throw new IllegalArgumentException(
3598-
"The number of piped operations must not exceed a maximum of "
3599-
+ CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + ".");
3600-
}
3601-
3602-
final CountDownLatch latch = new CountDownLatch(1);
3603-
final CollectionFuture<Map<T, Boolean>> rv = new CollectionFuture<Map<T, Boolean>>(
3604-
latch, operationTimeout);
3605-
3606-
Operation op = opFact.collectionPipedExist(key, exist,
3607-
new CollectionPipedExistOperation.Callback() {
3608-
3609-
private final Map<T, Boolean> result = new HashMap<T, Boolean>();
3610-
private boolean hasAnError = false;
3611-
3612-
public void receivedStatus(OperationStatus status) {
3613-
if (hasAnError) {
3614-
return;
3615-
}
3616-
3617-
CollectionOperationStatus cstatus;
3618-
if (status instanceof CollectionOperationStatus) {
3619-
cstatus = (CollectionOperationStatus) status;
3620-
} else {
3621-
getLogger().warn("Unhandled state: " + status);
3622-
cstatus = new CollectionOperationStatus(status);
3623-
}
3624-
rv.set(result, cstatus);
3625-
}
3601+
final String key, final List<SetPipedExist<T>> existList) {
3602+
final CountDownLatch latch = new CountDownLatch(existList.size());
36263603

3627-
public void complete() {
3628-
latch.countDown();
3629-
}
3604+
final PipedCollectionFuture<T, Boolean> rv
3605+
= new PipedCollectionFuture<T, Boolean>(latch, operationTimeout);
36303606

3631-
public void gotStatus(Integer index, OperationStatus status) {
3632-
CollectionOperationStatus cstatus;
3633-
if (status instanceof CollectionOperationStatus) {
3634-
cstatus = (CollectionOperationStatus) status;
3635-
} else {
3636-
cstatus = new CollectionOperationStatus(status);
3637-
}
3607+
for (final SetPipedExist<T> exist : existList) {
3608+
Operation op = opFact.collectionPipedExist(key, exist,
3609+
new CollectionPipedExistOperation.Callback() {
3610+
public void gotStatus(Integer index, OperationStatus status) {
3611+
CollectionOperationStatus cstatus;
3612+
if (status instanceof CollectionOperationStatus) {
3613+
cstatus = (CollectionOperationStatus) status;
3614+
} else {
3615+
getLogger().warn("Unhandled state: " + status);
3616+
cstatus = new CollectionOperationStatus(status);
3617+
}
3618+
switch (cstatus.getResponse()) {
3619+
case EXIST:
3620+
case NOT_EXIST:
3621+
rv.addEachResult(exist.getValues().get(index), (CollectionResponse.EXIST.equals(cstatus.getResponse())));
3622+
break;
3623+
case UNREADABLE:
3624+
case TYPE_MISMATCH:
3625+
case NOT_FOUND:
3626+
rv.addOperationStatus(cstatus);
3627+
break;
3628+
default:
3629+
getLogger().warn("Unhandled state: " + status);
3630+
}
3631+
}
36383632

3639-
switch (cstatus.getResponse()) {
3640-
case EXIST:
3641-
case NOT_EXIST:
3642-
result.put(exist.getValues().get(index),
3643-
(CollectionResponse.EXIST.equals(cstatus
3644-
.getResponse())));
3645-
break;
3646-
case UNREADABLE:
3647-
case TYPE_MISMATCH:
3648-
case NOT_FOUND:
3649-
hasAnError = true;
3650-
rv.set(new HashMap<T, Boolean>(0),
3651-
(CollectionOperationStatus) status);
3652-
break;
3653-
default:
3654-
getLogger().warn("Unhandled state: " + status);
3655-
}
3656-
}
3657-
});
3633+
public void receivedStatus(OperationStatus status) {
3634+
CollectionOperationStatus cstatus;
3635+
if (status instanceof CollectionOperationStatus) {
3636+
cstatus = (CollectionOperationStatus) status;
3637+
} else {
3638+
getLogger().warn("Unhandled state: " + status);
3639+
cstatus = new CollectionOperationStatus(status);
3640+
}
3641+
rv.addOperationStatus(cstatus);
3642+
}
36583643

3659-
rv.setOperation(op);
3660-
addOp(key, op);
3644+
public void complete() {
3645+
latch.countDown();
3646+
}
3647+
});
3648+
rv.addOperation(op);
3649+
addOp(key, op);
3650+
}
36613651
return rv;
36623652
}
36633653

src/test/manual/net/spy/memcached/collection/set/SopPipedExistTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,36 @@ public void testMaxPipedExist() {
166166
Assert.fail(e.getMessage());
167167
}
168168
}
169+
public void testMaxOverPipedExist() {
170+
int OVER_COUNT = 1000;
171+
172+
try {
173+
List<Object> findValues = new ArrayList<Object>();
174+
175+
// insert items
176+
for (int i = 0; i < OVER_COUNT; i++) {
177+
findValues.add("VALUE" + i);
178+
179+
Assert.assertTrue(mc.asyncSopInsert(KEY, "VALUE" + i, new CollectionAttributes()).get());
180+
}
181+
182+
// exist bulk
183+
CollectionFuture<Map<Object, Boolean>> future = mc
184+
.asyncSopPipedExistBulk(KEY, findValues);
185+
186+
Map<Object, Boolean> map = future.get();
187+
188+
Assert.assertTrue(future.getOperationStatus().isSuccess());
189+
190+
for (int i = 0; i < OVER_COUNT; i++) {
191+
Assert.assertTrue(map.get("VALUE" + i));
192+
}
193+
194+
} catch (Exception e) {
195+
e.printStackTrace();
196+
Assert.fail(e.getMessage());
197+
}
198+
}
169199

170200
public void testPipedExistNotExistsKey() {
171201
try {

0 commit comments

Comments
 (0)