diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 26ff126af..398aa9ef5 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -1867,7 +1868,8 @@ public CollectionFuture> asyncLopPip insertList.add(new ListPipedInsert<>(key, index, elementList, attributesForCreate, tc)); } } - return asyncCollectionPipedInsert(key, insertList); + + return syncCollectionPipedInsert(key, Collections.unmodifiableList(insertList)); } @Override @@ -3170,6 +3172,79 @@ public void gotStatus(Integer index, OperationStatus status) { return rv; } + /** + * Pipe insert method for collection items. + * + * @param key arcus cache key + * @param insertList must not be empty. + * @return future holding the map of element index and the reason why insert operation failed + */ + private CollectionFuture> syncCollectionPipedInsert( + final String key, final List> insertList) { + final CountDownLatch latch = new CountDownLatch(1); + final PipedCollectionFuture rv = + new PipedCollectionFuture<>(latch, operationTimeout); + + for (int i = 0; i < insertList.size(); i++) { + final CollectionPipedInsert insert = insertList.get(i); + final int idx = i; + Operation op = opFact.collectionPipedInsert(key, insert, + new CollectionPipedInsertOperation.Callback() { + // each result status + public void receivedStatus(OperationStatus status) { + CollectionOperationStatus cstatus; + + if (status instanceof CollectionOperationStatus) { + cstatus = (CollectionOperationStatus) status; + } else { + getLogger().warn("Unhandled state: " + status); + cstatus = new CollectionOperationStatus(status); + } + rv.setOperationStatus(cstatus); + } + + // complete + public void complete() { + if (idx == insertList.size() - 1) { + // countdown if this is last op + latch.countDown(); + } else { + // if error or cancel occurred by this operation, + // do not add all remaining operations and mark as cancelled + if (rv.hasErrored() || !rv.getOperationStatus().isSuccess()) { + for (int chunkIdx = idx + 1; chunkIdx < insertList.size(); chunkIdx++) { + for (int itemIdx = 0; itemIdx < insertList.get(chunkIdx).getItemCount(); itemIdx++) { + rv.addEachResult(itemIdx + (chunkIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT), + new CollectionOperationStatus(new CollectionOperationStatus( + false, "CANCELED", CollectionResponse.CANCELED))); + } + } + latch.countDown(); + } else { + // add next operation if this is not last op + Operation nextOp = rv.getOp(idx + 1); + addOp(key, nextOp); + } + } + } + + // got status + public void gotStatus(Integer index, OperationStatus status) { + if (status instanceof CollectionOperationStatus) { + rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT), + (CollectionOperationStatus) status); + } else { + rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT), + new CollectionOperationStatus(status)); + } + } + }); + rv.addOperation(op); + } + addOp(key, rv.getOp(0)); + return rv; + } + @Override public Future> asyncBopInsertBulk( List keyList, long bkey, byte[] eFlag, Object value, diff --git a/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java index 271f789d9..cf9a566b6 100644 --- a/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java +++ b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java @@ -1,10 +1,9 @@ package net.spy.memcached.internal; -import java.util.Collection; -import java.util.HashSet; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -18,7 +17,7 @@ public class PipedCollectionFuture extends CollectionFuture> { - private final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue<>(); + private final List ops = new ArrayList<>(); private final AtomicReference operationStatus = new AtomicReference<>(null); @@ -31,11 +30,12 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) { @Override public boolean cancel(boolean ign) { - boolean rv = false; for (Operation op : ops) { - rv |= op.cancel("by application."); + if (!op.getState().equals(OperationState.COMPLETE)) { + return op.cancel("by application."); + } } - return rv; + return false; } @Override @@ -48,50 +48,49 @@ public boolean isCancelled() { return false; } - @Override - public boolean isDone() { + public boolean hasErrored() { for (Operation op : ops) { - if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) { - return false; + if (op.hasErrored()) { + return true; } } - return true; + return false; + } + + @Override + public boolean isDone() { + return latch.getCount() == 0; } @Override public Map get(long duration, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { + System.out.println("ops size:" + ops.size()); long beforeAwait = System.currentTimeMillis(); + Operation lastOp = ops.get(ops.size() - 1); if (!latch.await(duration, unit)) { - Collection timedOutOps = new HashSet<>(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - timedOutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (!timedOutOps.isEmpty()) { - // set timeout only once for piped ops requested to single node. - MemcachedConnection.opTimedOut(timedOutOps.iterator().next()); + if (lastOp.getState() != OperationState.COMPLETE) { + MemcachedConnection.opTimedOut(lastOp); long elapsed = System.currentTimeMillis() - beforeAwait; - throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps); + throw new CheckedOperationTimeoutException(duration, unit, elapsed, lastOp); + } else { + for (Operation op : ops) { + MemcachedConnection.opSucceeded(op); + } } } else { // continuous timeout counter will be reset only once in pipe - MemcachedConnection.opSucceeded(ops.iterator().next()); + MemcachedConnection.opSucceeded(lastOp); } - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); - } + if (lastOp != null && lastOp.hasErrored()) { + throw new ExecutionException(lastOp.getException()); + } - if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); - } + if (lastOp != null && lastOp.isCancelled()) { + throw new ExecutionException(new RuntimeException(lastOp.getCancelCause())); } return failedResult; @@ -120,4 +119,8 @@ public void addEachResult(K index, V status) { public void addOperation(Operation op) { ops.add(op); } + + public Operation getOp(int index) { + return this.ops.get(index); + } } diff --git a/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java b/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java index eb1ffc006..60261fbd2 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java @@ -25,6 +25,7 @@ import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; +import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.CollectionOperationStatus; import org.junit.Assert; @@ -44,9 +45,7 @@ public void testInsertAndGet() { int valueCount = 500; Object[] valueList = new Object[valueCount]; - for (int i = 0; i < valueList.length; i++) { - valueList[i] = "MyValue"; - } + Arrays.fill(valueList, "MyValue"); try { // REMOVE @@ -102,9 +101,7 @@ public void testInsertAndGet() { public void testErrorCount() { int valueCount = 1200; Object[] valueList = new Object[valueCount]; - for (int i = 0; i < valueList.length; i++) { - valueList[i] = "MyValue"; - } + Arrays.fill(valueList, "MyValue"); try { // SET @@ -115,7 +112,10 @@ public void testErrorCount() { Map map = future.get(1000L, TimeUnit.MILLISECONDS); assertEquals(valueCount, map.size()); - + assertEquals(map.get(mc.getMaxPipedItemCount() - 1).getResponse(), + CollectionResponse.NOT_FOUND); + assertEquals(map.get(mc.getMaxPipedItemCount()).getResponse(), + CollectionResponse.CANCELED); } catch (Exception e) { e.printStackTrace(); Assert.fail();