diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 6a48c5832..e9c6a0f24 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import java.util.jar.JarFile; import java.util.jar.Manifest; @@ -1783,7 +1784,7 @@ public CollectionFuture> asyncBopPip insertList.add(new BTreePipedInsert<>(key, elementMap, attributesForCreate, tc)); } } - return asyncCollectionPipedInsert(key, insertList); + return syncCollectionPipedInsert(key, insertList); } @Override @@ -1806,7 +1807,7 @@ public CollectionFuture> asyncBopPip insertList.add(new ByteArraysBTreePipedInsert<>(key, elementList, attributesForCreate, tc)); } } - return asyncCollectionPipedInsert(key, insertList); + return syncCollectionPipedInsert(key, insertList); } @Override @@ -1833,7 +1834,7 @@ public CollectionFuture> asyncMopPip insertList.add(new MapPipedInsert<>(key, elementMap, attributesForCreate, tc)); } } - return asyncCollectionPipedInsert(key, insertList); + return syncCollectionPipedInsert(key, insertList); } @Override @@ -1859,7 +1860,7 @@ public CollectionFuture> asyncLopPip } } } - return asyncCollectionPipedInsert(key, insertList); + return syncCollectionPipedInsert(key, insertList); } @Override @@ -1882,7 +1883,7 @@ public CollectionFuture> asyncSopPip insertList.add(new SetPipedInsert<>(key, elementList, attributesForCreate, tc)); } } - return asyncCollectionPipedInsert(key, insertList); + return syncCollectionPipedInsert(key, insertList); } @Override @@ -3106,6 +3107,87 @@ public void gotStatus(Integer index, OperationStatus status) { return rv; } + /** + * insert items into collection synchronously. + * + * @param key arcus cache key + * @param insertList must not be empty. + * @return future holding the map of element index and the reason why insert operation failed + */ + private CollectionFuture> syncCollectionPipedInsert( + final String key, final List> insertList) { + final CountDownLatch latch = new CountDownLatch(1); + final PipedCollectionFuture rv = + new PipedCollectionFuture<>(latch, operationTimeout); + + final List ops = new ArrayList<>(insertList.size()); + BiFunction makeCallback = (opIdx, itemCount) -> new PipedOperationCallback() { + + private int lastExecutedIndex; + + public void receivedStatus(OperationStatus status) { + CollectionOperationStatus cstatus; + + if (status instanceof CollectionOperationStatus) { + cstatus = (CollectionOperationStatus) status; + } else { + getLogger().warn("Unhandled state: " + status); + cstatus = new CollectionOperationStatus(status); + } + rv.setOperationStatus(cstatus); + } + + public void complete() { + if (!rv.getOperationStatus().isSuccess()) { + // If this operation has been errored or cancelled, + // add the command not executed in a operation as not executed state. + // The remaining operations will be also not executed state but not added into failed result. + if (lastExecutedIndex < itemCount - 2) { + // command remained in the same operation object. + rv.addEachResult(lastExecutedIndex + 1 + (opIdx * MAX_PIPED_ITEM_COUNT), + new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED)); + } else if (opIdx + 1 < ops.size()) { + // command remained in the next operation object. + rv.addEachResult((opIdx + 1) * MAX_PIPED_ITEM_COUNT, + new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED)); + } + latch.countDown(); + } else { + if (opIdx + 1 < ops.size()) { + // If operations are succeed and next operation exists, then add it. + Operation nextOp = ops.get(opIdx + 1); + rv.addOperation(nextOp); + addOp(key, nextOp); + } else { + latch.countDown(); + } + } + } + + public void gotStatus(Integer index, OperationStatus status) { + if (!status.isSuccess()) { + if (status instanceof CollectionOperationStatus) { + rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT), + (CollectionOperationStatus) status); + } else { + rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT), + new CollectionOperationStatus(status)); + } + } + this.lastExecutedIndex = index; + } + }; + + for (int i = 0; i < insertList.size(); i++) { + final CollectionPipedInsert insert = insertList.get(i); + Operation op = opFact.collectionPipedInsert(key, insert, makeCallback.apply(i, insert.getItemCount())); + ops.add(op); + } + rv.addOperation(ops.get(0)); + addOp(key, ops.get(0)); + return rv; + } + @Override public Future> asyncBopInsertBulk( List keyList, long bkey, byte[] eFlag, Object value, diff --git a/src/main/java/net/spy/memcached/collection/CollectionResponse.java b/src/main/java/net/spy/memcached/collection/CollectionResponse.java index cce6bab0e..5ec72f9cd 100644 --- a/src/main/java/net/spy/memcached/collection/CollectionResponse.java +++ b/src/main/java/net/spy/memcached/collection/CollectionResponse.java @@ -44,6 +44,7 @@ public enum CollectionResponse { UNDEFINED, CANCELED, + NOT_EXECUTED, INTERRUPT_EXCEPTION, EXECUTION_EXCEPTION, diff --git a/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java index 4cfc67c9c..243626100 100644 --- a/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java +++ b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java @@ -1,7 +1,7 @@ package net.spy.memcached.internal; import java.util.ArrayList; -import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -11,13 +11,15 @@ import java.util.concurrent.atomic.AtomicReference; import net.spy.memcached.MemcachedConnection; +import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; public class PipedCollectionFuture extends CollectionFuture> { - private final Collection ops = new ArrayList<>(); + // operations that are completed or in progress + private final List ops = new ArrayList<>(); private final AtomicReference operationStatus = new AtomicReference<>(null); @@ -30,31 +32,33 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) { @Override public boolean cancel(boolean ign) { - boolean rv = false; - for (Operation op : ops) { - rv |= op.cancel("by application."); - } - return rv; + return ops.get(ops.size() - 1).cancel("by application."); } + /** + * if previous op is cancelled, then next ops are not added to the opQueue. + * So we only need to check current op. + * + * @return true if operation is cancelled. + */ @Override public boolean isCancelled() { - for (Operation op : ops) { - if (op.isCancelled()) { - return true; - } - } - return false; + return operationStatus.get().getResponse() == CollectionResponse.CANCELED; + } + + /** + * if previous op threw exception, then next ops are not added to the opQueue. + * So we only need to check current op. + * + * @return true if operation has errored by exception. + */ + public boolean hasErrored() { + return ops.get(ops.size() - 1).hasErrored(); } @Override public boolean isDone() { - for (Operation op : ops) { - if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) { - return false; - } - } - return true; + return latch.getCount() == 0; } @Override @@ -62,35 +66,29 @@ public Map get(long duration, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { long beforeAwait = System.currentTimeMillis(); + Operation lastExecutedOp; if (!latch.await(duration, unit)) { - Collection timedOutOps = new ArrayList<>(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - timedOutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (!timedOutOps.isEmpty()) { - // set timeout only once for piped ops requested to single node. - MemcachedConnection.opTimedOut(timedOutOps.iterator().next()); + lastExecutedOp = ops.get(ops.size() - 1); + if (lastExecutedOp.getState() != OperationState.COMPLETE) { + MemcachedConnection.opTimedOut(lastExecutedOp); long elapsed = System.currentTimeMillis() - beforeAwait; - throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps); + throw new CheckedOperationTimeoutException(duration, unit, elapsed, lastExecutedOp); } } else { // continuous timeout counter will be reset only once in pipe - MemcachedConnection.opSucceeded(ops.iterator().next()); + lastExecutedOp = ops.get(ops.size() - 1); + MemcachedConnection.opSucceeded(lastExecutedOp); } - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); - } + // If previous op has errored or cancelled, operations are not executed anymore. + // Therefore, we only need to check last executed op. + if (lastExecutedOp != null && lastExecutedOp.hasErrored()) { + throw new ExecutionException(lastExecutedOp.getException()); + } - if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); - } + if (lastExecutedOp != null && lastExecutedOp.isCancelled()) { + throw new ExecutionException(new RuntimeException(lastExecutedOp.getCancelCause())); } return failedResult; diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java index 65c9b6fb3..bd58c6d37 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java @@ -29,7 +29,7 @@ /** * Operation to store collection data in a memcached server. */ -public final class CollectionPipedInsertOperationImpl extends PipeOperationImpl +public final class CollectionPipedInsertOperationImpl extends SingleKeyPipeOperationImpl implements CollectionPipedInsertOperation { public CollectionPipedInsertOperationImpl(String key, diff --git a/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java index f982f7334..5ecb8f35a 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java @@ -62,13 +62,13 @@ abstract class PipeOperationImpl extends OperationImpl { protected boolean successAll = true; - private final CollectionPipe collectionPipe; - private final PipedOperationCallback cb; - private final List keys; + protected final CollectionPipe collectionPipe; + protected final PipedOperationCallback cb; + protected final List keys; private final boolean isIdempotent; - private int index = 0; - private boolean readUntilLastLine = false; + protected int index = 0; + protected boolean readUntilLastLine = false; protected PipeOperationImpl(List keys, CollectionPipe collectionPipe, OperationCallback cb) { diff --git a/src/main/java/net/spy/memcached/protocol/ascii/SingleKeyPipeOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/SingleKeyPipeOperationImpl.java new file mode 100644 index 000000000..36bc20bdc --- /dev/null +++ b/src/main/java/net/spy/memcached/protocol/ascii/SingleKeyPipeOperationImpl.java @@ -0,0 +1,98 @@ +package net.spy.memcached.protocol.ascii; + +import java.util.List; + +import net.spy.memcached.collection.CollectionPipe; +import net.spy.memcached.ops.OperationCallback; +import net.spy.memcached.ops.OperationState; +import net.spy.memcached.ops.OperationStatus; + +public abstract class SingleKeyPipeOperationImpl extends PipeOperationImpl { + + protected SingleKeyPipeOperationImpl(List keys, + CollectionPipe collectionPipe, + OperationCallback cb) { + super(keys, collectionPipe, cb); + } + + @Override + public void handleLine(String line) { + assert getState() == OperationState.READING + : "Read ``" + line + "'' when in " + getState() + " state"; + + /* ENABLE_REPLICATION if */ + if (isWriteOperation() && hasSwitchedOver(line)) { + collectionPipe.setNextOpIndex(index); + prepareSwitchover(line); + return; + } + /* ENABLE_REPLICATION end */ + + /* ENABLE_MIGRATION if */ + if (hasNotMyKey(line)) { + // Only one NOT_MY_KEY is provided in response of + // single key piped operation when redirection. + addRedirectSingleKeyOperation(line, keys.get(0)); + if (collectionPipe.isNotPiped()) { + transitionState(OperationState.REDIRECT); + } else { + collectionPipe.setNextOpIndex(index); + } + } + /* ENABLE_MIGRATION end */ + + if (collectionPipe.isNotPiped()) { + OperationStatus status = checkStatus(line); + if (!status.isSuccess()) { + successAll = false; + } + cb.gotStatus(index, status); + + cb.receivedStatus((successAll) ? END : FAILED_END); + transitionState(OperationState.COMPLETE); + return; + } + + /* + RESPONSE \r\n + \r\n + [ ... ] + \r\n + END|PIPE_ERROR \r\n + */ + if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { + /* ENABLE_MIGRATION if */ + if (needRedirect()) { + transitionState(OperationState.REDIRECT); + return; + } + /* ENABLE_MIGRATION end */ + cb.receivedStatus((index == collectionPipe.getItemCount() && successAll) ? END : FAILED_END); + transitionState(OperationState.COMPLETE); + } else if (line.startsWith("RESPONSE ")) { + getLogger().debug("Got line %s", line); + + // TODO server should be fixed + line = line.replace(" ", " "); + line = line.replace(" ", " "); + + String[] stuff = line.split(" "); + assert "RESPONSE".equals(stuff[0]); + readUntilLastLine = true; + } else { + OperationStatus status = checkStatus(line); + if (!status.isSuccess()) { + successAll = false; + } + cb.gotStatus(index, status); + + index++; + } + } + + @Override + protected OperationStatus checkStatus(String line) { + return null; + } + +} diff --git a/src/test/manual/net/spy/memcached/bulkoperation/BopInsertBulkMultipleTest.java b/src/test/manual/net/spy/memcached/bulkoperation/BopInsertBulkMultipleTest.java index 4634854ae..04a40bd57 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/BopInsertBulkMultipleTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/BopInsertBulkMultipleTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.ArcusClient; import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; import net.spy.memcached.collection.Element; @@ -122,7 +123,7 @@ void testErrorCount() { Map map = future.get(2000L, TimeUnit.MILLISECONDS); - assertEquals(bkeySize, map.size()); + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); } catch (Exception e) { e.printStackTrace(); diff --git a/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java b/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java index 2d8a5f8f0..220efecb8 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java @@ -23,8 +23,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.ArcusClient; import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; +import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.CollectionOperationStatus; import org.junit.jupiter.api.AfterEach; @@ -111,9 +113,7 @@ void testInsertAndGet() { void testErrorCount() { int valueCount = 1200; Object[] valueList = new Object[valueCount]; - for (int i = 0; i < valueList.length; i++) { - valueList[i] = "MyValue"; - } + Arrays.fill(valueList, "MyValue"); try { // SET @@ -123,8 +123,11 @@ void testErrorCount() { Map map = future.get(1000L, TimeUnit.MILLISECONDS); - assertEquals(valueCount, map.size()); - + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); + assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT - 1).getResponse(), + CollectionResponse.NOT_FOUND); + assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT).getResponse(), + CollectionResponse.NOT_EXECUTED); } catch (Exception e) { e.printStackTrace(); fail(); diff --git a/src/test/manual/net/spy/memcached/bulkoperation/MopInsertBulkMultipleTest.java b/src/test/manual/net/spy/memcached/bulkoperation/MopInsertBulkMultipleTest.java index 1902675c1..80675642f 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/MopInsertBulkMultipleTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/MopInsertBulkMultipleTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.ArcusClient; import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; import net.spy.memcached.ops.CollectionOperationStatus; @@ -116,7 +117,7 @@ void testErrorCount() { Map map = future.get(2000L, TimeUnit.MILLISECONDS); - assertEquals(elementSize, map.size()); + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); } catch (Exception e) { e.printStackTrace(); diff --git a/src/test/manual/net/spy/memcached/bulkoperation/PipeInsertTest.java b/src/test/manual/net/spy/memcached/bulkoperation/PipeInsertTest.java index 7faa9bc69..feae56357 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/PipeInsertTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/PipeInsertTest.java @@ -22,6 +22,7 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import net.spy.memcached.ArcusClient; import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; import net.spy.memcached.collection.Element; @@ -253,7 +254,7 @@ void testMopPipeInsert() { Map map = future.get(5000L, TimeUnit.MILLISECONDS); - assertEquals(1000, map.size()); + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); Map rmap = mc.asyncMopGet(KEY, false, false) .get(); diff --git a/src/test/manual/net/spy/memcached/bulkoperation/SopInsertBulkMultipleValueTest.java b/src/test/manual/net/spy/memcached/bulkoperation/SopInsertBulkMultipleValueTest.java index 96a621b55..c1e9b20a1 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/SopInsertBulkMultipleValueTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/SopInsertBulkMultipleValueTest.java @@ -122,7 +122,7 @@ void testErrorCount() { Map map = future.get(2000L, TimeUnit.MILLISECONDS); - assertEquals(valueCount, map.size()); + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); } catch (Exception e) { e.printStackTrace(); fail();