Skip to content

Commit dd35dd9

Browse files
committed
ENHANCE: Change asyncSetPipedExist method logic.
1 parent 4bc1139 commit dd35dd9

File tree

1 file changed

+79
-74
lines changed

1 file changed

+79
-74
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

+79-74
Original file line numberDiff line numberDiff line change
@@ -3531,97 +3531,102 @@ public CollectionFuture<Integer> asyncBopGetItemCount(String key,
35313531
@Override
35323532
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key,
35333533
List<Object> values) {
3534-
SetPipedExist<Object> exist = new SetPipedExist<Object>(key, values, collectionTranscoder);
3535-
return asyncSetPipedExist(key, exist);
3534+
if (values.size() == 0) {
3535+
throw new IllegalArgumentException(
3536+
"The number of piped operations must be larger than 0.");
3537+
}
3538+
3539+
List<SetPipedExist<Object>> existList = new ArrayList<SetPipedExist<Object>>();
3540+
if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) {
3541+
existList.add(new SetPipedExist<Object>(key, values, collectionTranscoder));
3542+
} else {
3543+
PartitionedList<Object> partitionedList = new PartitionedList<Object>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT);
3544+
for (List<Object> partition : partitionedList) {
3545+
existList.add(new SetPipedExist<Object>(key, partition, collectionTranscoder));
3546+
}
3547+
}
3548+
return asyncSetPipedExist(key, existList);
35363549
}
35373550

35383551
@Override
35393552
public <T> CollectionFuture<Map<T, Boolean>> asyncSopPipedExistBulk(String key,
35403553
List<T> values,
35413554
Transcoder<T> tc) {
3542-
SetPipedExist<T> exist = new SetPipedExist<T>(key, values, tc);
3543-
return asyncSetPipedExist(key, exist);
3555+
if (values.size() == 0) {
3556+
throw new IllegalArgumentException(
3557+
"The number of piped operations must be larger than 0.");
3558+
}
3559+
3560+
List<SetPipedExist<T>> existList = new ArrayList<SetPipedExist<T>>();
3561+
if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) {
3562+
existList.add(new SetPipedExist<T>(key, values, tc));
3563+
} else {
3564+
PartitionedList<T> partitionedList = new PartitionedList<T>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT);
3565+
for (List<T> partition : partitionedList) {
3566+
existList.add(new SetPipedExist<T>(key, partition, tc));
3567+
}
3568+
}
3569+
return asyncSetPipedExist(key, existList);
35443570
}
35453571

35463572
/**
35473573
* Generic pipelined existence operation for set items. Public methods call this method.
35483574
*
35493575
* @param key collection item's key
3550-
* @param exist operation parameters (element values)
3576+
* @param existList list of operation parameters (element values)
35513577
* @return future holding the map of elements and their existence results
35523578
*/
35533579
<T> CollectionFuture<Map<T, Boolean>> asyncSetPipedExist(
3554-
final String key, final SetPipedExist<T> exist) {
3555-
3556-
if (exist.getItemCount() == 0) {
3557-
throw new IllegalArgumentException(
3558-
"The number of piped operations must be larger than 0.");
3559-
}
3560-
if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
3561-
throw new IllegalArgumentException(
3562-
"The number of piped operations must not exceed a maximum of "
3563-
+ CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + ".");
3564-
}
3565-
3566-
final CountDownLatch latch = new CountDownLatch(1);
3567-
final CollectionFuture<Map<T, Boolean>> rv = new CollectionFuture<Map<T, Boolean>>(
3568-
latch, operationTimeout);
3580+
final String key, final List<SetPipedExist<T>> existList) {
3581+
final CountDownLatch latch = new CountDownLatch(existList.size());
35693582

3570-
Operation op = opFact.collectionPipedExist(key, exist,
3571-
new CollectionPipedExistOperation.Callback() {
3583+
final PipedCollectionFuture<T, Boolean> rv
3584+
= new PipedCollectionFuture<T, Boolean>(latch, operationTimeout);
35723585

3573-
private final Map<T, Boolean> result = new HashMap<T, Boolean>();
3574-
private boolean hasAnError = false;
3575-
3576-
public void receivedStatus(OperationStatus status) {
3577-
if (hasAnError) {
3578-
return;
3579-
}
3580-
3581-
CollectionOperationStatus cstatus;
3582-
if (status instanceof CollectionOperationStatus) {
3583-
cstatus = (CollectionOperationStatus) status;
3584-
} else {
3585-
getLogger().warn("Unhandled state: " + status);
3586-
cstatus = new CollectionOperationStatus(status);
3587-
}
3588-
rv.set(result, cstatus);
3589-
}
3590-
3591-
public void complete() {
3592-
latch.countDown();
3593-
}
3594-
3595-
public void gotStatus(Integer index, OperationStatus status) {
3596-
CollectionOperationStatus cstatus;
3597-
if (status instanceof CollectionOperationStatus) {
3598-
cstatus = (CollectionOperationStatus) status;
3599-
} else {
3600-
cstatus = new CollectionOperationStatus(status);
3601-
}
3602-
3603-
switch (cstatus.getResponse()) {
3604-
case EXIST:
3605-
case NOT_EXIST:
3606-
result.put(exist.getValues().get(index),
3607-
(CollectionResponse.EXIST.equals(cstatus
3608-
.getResponse())));
3609-
break;
3610-
case UNREADABLE:
3611-
case TYPE_MISMATCH:
3612-
case NOT_FOUND:
3613-
hasAnError = true;
3614-
rv.set(new HashMap<T, Boolean>(0),
3615-
(CollectionOperationStatus) status);
3616-
break;
3617-
default:
3618-
getLogger().warn("Unhandled state: " + status);
3619-
}
3620-
}
3621-
});
3622-
3623-
rv.setOperation(op);
3624-
addOp(key, op);
3586+
for (final SetPipedExist<T> exist : existList) {
3587+
Operation op = opFact.collectionPipedExist(key, exist,
3588+
new CollectionPipedExistOperation.Callback() {
3589+
public void receivedStatus(OperationStatus status) {
3590+
CollectionOperationStatus cstatus;
3591+
if (status instanceof CollectionOperationStatus) {
3592+
cstatus = (CollectionOperationStatus) status;
3593+
} else {
3594+
getLogger().warn("Unhandled state: " + status);
3595+
cstatus = new CollectionOperationStatus(status);
3596+
}
3597+
rv.addOperationStatus(cstatus);
3598+
}
3599+
public void complete() {
3600+
latch.countDown();
3601+
}
3602+
public void gotStatus(Integer index, OperationStatus status) {
3603+
CollectionOperationStatus cstatus;
3604+
if (status instanceof CollectionOperationStatus) {
3605+
cstatus = (CollectionOperationStatus) status;
3606+
} else {
3607+
getLogger().warn("Unhandled state: " + status);
3608+
cstatus = new CollectionOperationStatus(status);
3609+
}
3610+
switch (cstatus.getResponse()) {
3611+
case EXIST:
3612+
case NOT_EXIST:
3613+
rv.addEachResult(exist.getValues().get(index),
3614+
(CollectionResponse.EXIST.equals(cstatus
3615+
.getResponse())));
3616+
break;
3617+
case UNREADABLE:
3618+
case TYPE_MISMATCH:
3619+
case NOT_FOUND:
3620+
rv.addOperationStatus(cstatus);
3621+
break;
3622+
default:
3623+
getLogger().warn("Unhandled state: " + status);
3624+
}
3625+
}
3626+
});
3627+
rv.addOperation(op);
3628+
addOp(key, op);
3629+
}
36253630
return rv;
36263631
}
36273632

0 commit comments

Comments
 (0)