@@ -3119,99 +3119,100 @@ public CollectionFuture<Integer> asyncBopGetItemCount(String key,
3119
3119
}
3120
3120
3121
3121
@ Override
3122
- public CollectionFuture <Map <Object , Boolean >> asyncSopPipedExistBulk (String key ,
3123
- List <Object > values ) {
3124
- SetPipedExist <Object > exist = new SetPipedExist <Object >(key , values , collectionTranscoder );
3125
- return asyncSetPipedExist (key , exist );
3122
+ public CollectionFuture <Map <Object , Boolean >> asyncSopPipedExistBulk (String key , List <Object > values ) {
3123
+ return asyncSopPipedExistBulk (key , values , collectionTranscoder );
3126
3124
}
3127
3125
3128
3126
@ Override
3129
3127
public <T > CollectionFuture <Map <T , Boolean >> asyncSopPipedExistBulk (String key ,
3130
3128
List <T > values ,
3131
3129
Transcoder <T > tc ) {
3132
- SetPipedExist <T > exist = new SetPipedExist <T >(key , values , tc );
3133
- return asyncSetPipedExist (key , exist );
3130
+ if (values .size () == 0 ) {
3131
+ throw new IllegalArgumentException (
3132
+ "The number of piped operations must be larger than 0." );
3133
+ }
3134
+
3135
+ List <SetPipedExist <T >> existList = new ArrayList <SetPipedExist <T >>();
3136
+ if (values .size () <= SetPipedExist .MAX_PIPED_ITEM_COUNT ) {
3137
+ existList .add (new SetPipedExist <T >(key , values , tc ));
3138
+ } else {
3139
+ PartitionedList <T > partitionedList = new PartitionedList <T >(values , SetPipedExist .MAX_PIPED_ITEM_COUNT );
3140
+ for (List <T > partition : partitionedList ) {
3141
+ existList .add (new SetPipedExist <T >(key , partition , tc ));
3142
+ }
3143
+ }
3144
+ return asyncSetPipedExist (key , existList );
3134
3145
}
3135
3146
3136
3147
/**
3137
3148
* Generic pipelined existence operation for set items. Public methods call this method.
3138
3149
*
3139
3150
* @param key collection item's key
3140
- * @param exist operation parameters (element values)
3151
+ * @param existList list of operation parameters (element values)
3141
3152
* @return future holding the map of elements and their existence results
3142
3153
*/
3143
3154
<T > CollectionFuture <Map <T , Boolean >> asyncSetPipedExist (
3144
- final String key , final SetPipedExist <T > exist ) {
3145
-
3146
- if (exist .getItemCount () == 0 ) {
3147
- throw new IllegalArgumentException (
3148
- "The number of piped operations must be larger than 0." );
3149
- }
3150
- if (exist .getItemCount () > CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ) {
3151
- throw new IllegalArgumentException (
3152
- "The number of piped operations must not exceed a maximum of "
3153
- + CollectionPipedInsert .MAX_PIPED_ITEM_COUNT + "." );
3154
- }
3155
-
3156
- final CountDownLatch latch = new CountDownLatch (1 );
3157
- final CollectionFuture <Map <T , Boolean >> rv = new CollectionFuture <Map <T , Boolean >>(
3158
- latch , operationTimeout );
3159
-
3160
- Operation op = opFact .collectionPipedExist (key , exist ,
3161
- new CollectionPipedExistOperation .Callback () {
3162
-
3163
- private final Map <T , Boolean > result = new HashMap <T , Boolean >();
3164
- private boolean hasAnError = false ;
3165
-
3166
- public void receivedStatus (OperationStatus status ) {
3167
- if (hasAnError ) {
3168
- return ;
3169
- }
3170
-
3171
- CollectionOperationStatus cstatus ;
3172
- if (status instanceof CollectionOperationStatus ) {
3173
- cstatus = (CollectionOperationStatus ) status ;
3174
- } else {
3155
+ final String key , final List <SetPipedExist <T >> existList ) {
3156
+ final CountDownLatch latch = new CountDownLatch (existList .size ());
3157
+ final PipedCollectionFuture <T , Boolean > rv
3158
+ = new PipedCollectionFuture <T , Boolean >(latch , operationTimeout );
3159
+
3160
+ for (final SetPipedExist <T > exist : existList ) {
3161
+ Operation op = opFact .collectionPipedExist (key , exist , new CollectionPipedExistOperation .Callback () {
3162
+ private CollectionOperationStatus failedStatus = null ;
3163
+ private int failStatusCount = 0 ;
3164
+ public void gotStatus (Integer index , OperationStatus status ) {
3165
+ CollectionOperationStatus cstatus ;
3166
+ if (status instanceof CollectionOperationStatus ) {
3167
+ cstatus = (CollectionOperationStatus ) status ;
3168
+ } else {
3169
+ getLogger ().warn ("Unhandled state: " + status );
3170
+ cstatus = new CollectionOperationStatus (status );
3171
+ }
3172
+ switch (cstatus .getResponse ()) {
3173
+ case EXIST :
3174
+ rv .addEachResult (exist .getValues ().get (index ), true );
3175
+ break ;
3176
+ case NOT_EXIST :
3177
+ rv .addEachResult (exist .getValues ().get (index ), false );
3178
+ break ;
3179
+ case UNREADABLE :
3180
+ case TYPE_MISMATCH :
3181
+ case NOT_FOUND :
3182
+ if (failedStatus == null ) {
3183
+ failedStatus = cstatus ;
3184
+ failStatusCount ++;
3185
+ } else if (failedStatus .equals (cstatus )) {
3186
+ failStatusCount ++;
3187
+ }
3188
+ break ;
3189
+ default :
3175
3190
getLogger ().warn ("Unhandled state: " + status );
3176
- cstatus = new CollectionOperationStatus (status );
3177
- }
3178
- rv .set (result , cstatus );
3179
3191
}
3192
+ }
3180
3193
3181
- public void complete () {
3182
- latch .countDown ();
3194
+ public void receivedStatus (OperationStatus status ) {
3195
+ CollectionOperationStatus cstatus ;
3196
+ if (status instanceof CollectionOperationStatus ) {
3197
+ cstatus = (CollectionOperationStatus ) status ;
3198
+ } else {
3199
+ getLogger ().warn ("Unhandled state: " + status );
3200
+ cstatus = new CollectionOperationStatus (status );
3183
3201
}
3184
-
3185
- public void gotStatus (Integer index , OperationStatus status ) {
3186
- CollectionOperationStatus cstatus ;
3187
- if (status instanceof CollectionOperationStatus ) {
3188
- cstatus = (CollectionOperationStatus ) status ;
3189
- } else {
3190
- cstatus = new CollectionOperationStatus (status );
3191
- }
3192
-
3193
- switch (cstatus .getResponse ()) {
3194
- case EXIST :
3195
- case NOT_EXIST :
3196
- result .put (exist .getValues ().get (index ),
3197
- (CollectionResponse .EXIST .equals (cstatus
3198
- .getResponse ())));
3199
- break ;
3200
- case UNREADABLE :
3201
- case TYPE_MISMATCH :
3202
- case NOT_FOUND :
3203
- hasAnError = true ;
3204
- rv .set (new HashMap <T , Boolean >(0 ),
3205
- (CollectionOperationStatus ) status );
3206
- break ;
3207
- default :
3208
- getLogger ().warn ("Unhandled state: " + status );
3209
- }
3202
+ if (failedStatus != null && exist .getItemCount () == failStatusCount ) {
3203
+ rv .setOperationStatus (failedStatus );
3204
+ } else {
3205
+ rv .setOperationStatus (cstatus );
3210
3206
}
3211
- });
3207
+ }
3212
3208
3213
- rv .setOperation (op );
3214
- addOp (key , op );
3209
+ public void complete () {
3210
+ latch .countDown ();
3211
+ }
3212
+ });
3213
+ rv .addOperation (op );
3214
+ addOp (key , op );
3215
+ }
3215
3216
return rv ;
3216
3217
}
3217
3218
0 commit comments