diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 6a48c5832..29c8966d0 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import java.util.jar.JarFile; import java.util.jar.Manifest; @@ -1783,7 +1784,7 @@ public CollectionFuture> asyncBopPip insertList.add(new BTreePipedInsert<>(key, elementMap, attributesForCreate, tc)); } } - return asyncCollectionPipedInsert(key, insertList); + return syncCollectionPipedInsert(key, insertList); } @Override @@ -1806,7 +1807,7 @@ public CollectionFuture> asyncBopPip insertList.add(new ByteArraysBTreePipedInsert<>(key, elementList, attributesForCreate, tc)); } } - return asyncCollectionPipedInsert(key, insertList); + return syncCollectionPipedInsert(key, insertList); } @Override @@ -1833,7 +1834,7 @@ public CollectionFuture> asyncMopPip insertList.add(new MapPipedInsert<>(key, elementMap, attributesForCreate, tc)); } } - return asyncCollectionPipedInsert(key, insertList); + return syncCollectionPipedInsert(key, insertList); } @Override @@ -1859,7 +1860,7 @@ public CollectionFuture> asyncLopPip } } } - return asyncCollectionPipedInsert(key, insertList); + return syncCollectionPipedInsert(key, insertList); } @Override @@ -1882,7 +1883,7 @@ public CollectionFuture> asyncSopPip insertList.add(new SetPipedInsert<>(key, elementList, attributesForCreate, tc)); } } - return asyncCollectionPipedInsert(key, insertList); + return syncCollectionPipedInsert(key, insertList); } @Override @@ -3106,6 +3107,91 @@ public void gotStatus(Integer index, OperationStatus status) { return rv; } + /** + * insert items into collection synchronously. + * + * @param key arcus cache key + * @param insertList must not be empty. + * @return future holding the map of element index and the reason why insert operation failed + */ + private CollectionFuture> syncCollectionPipedInsert( + final String key, final List> insertList) { + final CountDownLatch latch = new CountDownLatch(1); + final PipedCollectionFuture rv = + new PipedCollectionFuture<>(latch, operationTimeout); + + BiFunction makeCallback = (opIdx, itemCount) -> new PipedOperationCallback() { + + private int currentCommandIdx = -1; + + public void receivedStatus(OperationStatus status) { + CollectionOperationStatus cstatus; + + if (status instanceof CollectionOperationStatus) { + cstatus = (CollectionOperationStatus) status; + } else { + getLogger().warn("Unhandled state: " + status); + cstatus = new CollectionOperationStatus(status); + } + rv.setOperationStatus(cstatus); + } + + public void complete() { + if (rv.getOperationStatus().isSuccess()) { + int nextOpIdx = rv.getCurrentOpIdx() + 1; + if (nextOpIdx < insertList.size()) { + Operation nextOp = rv.getOperation(nextOpIdx); + if (!nextOp.isCancelled()) { + addOp(key, nextOp); + rv.setCurrentOpIdx(nextOpIdx); + } + } else { + latch.countDown(); + } + } else { + // If this operation has been errored or cancelled, + // add the command not executed in a operation as not executed state. + // The remaining operations will be also not executed state but not added into failed result. + int nextCommandIdx = 0; + if (currentCommandIdx < itemCount - 1) { + // command remained in the same operation object. + nextCommandIdx = currentCommandIdx + 1 + (opIdx * MAX_PIPED_ITEM_COUNT); + } else if (opIdx + 1 < insertList.size()) { + // command remained in the next operation object. + nextCommandIdx = (opIdx + 1) * MAX_PIPED_ITEM_COUNT; + } + if (nextCommandIdx > 0) { + rv.addEachResult(nextCommandIdx, + new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED)); + } + latch.countDown(); + } + } + + public void gotStatus(Integer index, OperationStatus status) { + if (!status.isSuccess()) { + if (status instanceof CollectionOperationStatus) { + rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT), + (CollectionOperationStatus) status); + } else { + rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT), + new CollectionOperationStatus(status)); + } + } + currentCommandIdx = index; + } + }; + + for (int i = 0; i < insertList.size(); i++) { + final CollectionPipedInsert insert = insertList.get(i); + Operation op = opFact.collectionPipedInsert(key, insert, makeCallback.apply(i, insert.getItemCount())); + rv.addOperation(op); + } + addOp(key, rv.getOperation(0)); + rv.setCurrentOpIdx(0); + return rv; + } + @Override public Future> asyncBopInsertBulk( List keyList, long bkey, byte[] eFlag, Object value, diff --git a/src/main/java/net/spy/memcached/collection/CollectionResponse.java b/src/main/java/net/spy/memcached/collection/CollectionResponse.java index cce6bab0e..5ec72f9cd 100644 --- a/src/main/java/net/spy/memcached/collection/CollectionResponse.java +++ b/src/main/java/net/spy/memcached/collection/CollectionResponse.java @@ -44,6 +44,7 @@ public enum CollectionResponse { UNDEFINED, CANCELED, + NOT_EXECUTED, INTERRUPT_EXCEPTION, EXECUTION_EXCEPTION, diff --git a/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java index 4cfc67c9c..a9ca4d06a 100644 --- a/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java +++ b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java @@ -2,6 +2,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -17,9 +18,10 @@ public class PipedCollectionFuture extends CollectionFuture> { - private final Collection ops = new ArrayList<>(); + private final List ops = new ArrayList<>(); private final AtomicReference operationStatus = new AtomicReference<>(null); + private int currentOpIdx = 0; private final Map failedResult = new ConcurrentHashMap<>(); @@ -30,13 +32,17 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) { @Override public boolean cancel(boolean ign) { - boolean rv = false; - for (Operation op : ops) { - rv |= op.cancel("by application."); + for (int i = currentOpIdx; i < ops.size(); i++) { + if (ops.get(i).cancel("by application.")) { + return true; + } } - return rv; + return false; } + /** + * @return true if any operation is cancelled. + */ @Override public boolean isCancelled() { for (Operation op : ops) { @@ -119,4 +125,16 @@ public void addEachResult(K index, V status) { public void addOperation(Operation op) { ops.add(op); } + + public Operation getOperation(int idx) { + return ops.get(idx); + } + + public void setCurrentOpIdx(int idx) { + this.currentOpIdx = idx; + } + + public int getCurrentOpIdx() { + return currentOpIdx; + } } diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java index 65c9b6fb3..bd58c6d37 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java @@ -29,7 +29,7 @@ /** * Operation to store collection data in a memcached server. */ -public final class CollectionPipedInsertOperationImpl extends PipeOperationImpl +public final class CollectionPipedInsertOperationImpl extends SingleKeyPipeOperationImpl implements CollectionPipedInsertOperation { public CollectionPipedInsertOperationImpl(String key, diff --git a/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java index f982f7334..5ecb8f35a 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java @@ -62,13 +62,13 @@ abstract class PipeOperationImpl extends OperationImpl { protected boolean successAll = true; - private final CollectionPipe collectionPipe; - private final PipedOperationCallback cb; - private final List keys; + protected final CollectionPipe collectionPipe; + protected final PipedOperationCallback cb; + protected final List keys; private final boolean isIdempotent; - private int index = 0; - private boolean readUntilLastLine = false; + protected int index = 0; + protected boolean readUntilLastLine = false; protected PipeOperationImpl(List keys, CollectionPipe collectionPipe, OperationCallback cb) { diff --git a/src/main/java/net/spy/memcached/protocol/ascii/SingleKeyPipeOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/SingleKeyPipeOperationImpl.java new file mode 100644 index 000000000..8ea95b8d2 --- /dev/null +++ b/src/main/java/net/spy/memcached/protocol/ascii/SingleKeyPipeOperationImpl.java @@ -0,0 +1,99 @@ +package net.spy.memcached.protocol.ascii; + +import java.util.List; + +import net.spy.memcached.collection.CollectionPipe; +import net.spy.memcached.ops.OperationCallback; +import net.spy.memcached.ops.OperationState; +import net.spy.memcached.ops.OperationStatus; + +public abstract class SingleKeyPipeOperationImpl extends PipeOperationImpl { + + protected SingleKeyPipeOperationImpl(List keys, + CollectionPipe collectionPipe, + OperationCallback cb) { + super(keys, collectionPipe, cb); + } + + @Override + public void handleLine(String line) { + assert getState() == OperationState.READING + : "Read ``" + line + "'' when in " + getState() + " state"; + + /* ENABLE_REPLICATION if */ + if (isWriteOperation() && hasSwitchedOver(line)) { + collectionPipe.setNextOpIndex(index); + prepareSwitchover(line); + return; + } + /* ENABLE_REPLICATION end */ + + /* ENABLE_MIGRATION if */ + if (hasNotMyKey(line)) { + // Only one NOT_MY_KEY is provided in response of + // single key piped operation when redirection. + addRedirectSingleKeyOperation(line, keys.get(0)); + if (collectionPipe.isNotPiped()) { + transitionState(OperationState.REDIRECT); + } else { + collectionPipe.setNextOpIndex(index); + } + return; + } + /* ENABLE_MIGRATION end */ + + if (collectionPipe.isNotPiped()) { + OperationStatus status = checkStatus(line); + if (!status.isSuccess()) { + successAll = false; + } + cb.gotStatus(index, status); + + cb.receivedStatus((successAll) ? END : FAILED_END); + transitionState(OperationState.COMPLETE); + return; + } + + /* + RESPONSE \r\n + \r\n + [ ... ] + \r\n + END|PIPE_ERROR \r\n + */ + if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { + /* ENABLE_MIGRATION if */ + if (needRedirect()) { + transitionState(OperationState.REDIRECT); + return; + } + /* ENABLE_MIGRATION end */ + cb.receivedStatus((index == collectionPipe.getItemCount() && successAll) ? END : FAILED_END); + transitionState(OperationState.COMPLETE); + } else if (line.startsWith("RESPONSE ")) { + getLogger().debug("Got line %s", line); + + // TODO server should be fixed + line = line.replace(" ", " "); + line = line.replace(" ", " "); + + String[] stuff = line.split(" "); + assert "RESPONSE".equals(stuff[0]); + readUntilLastLine = true; + } else { + OperationStatus status = checkStatus(line); + if (!status.isSuccess()) { + successAll = false; + } + cb.gotStatus(index, status); + + index++; + } + } + + @Override + protected OperationStatus checkStatus(String line) { + return null; + } + +} diff --git a/src/test/manual/net/spy/memcached/bulkoperation/BopInsertBulkMultipleTest.java b/src/test/manual/net/spy/memcached/bulkoperation/BopInsertBulkMultipleTest.java index 4634854ae..04a40bd57 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/BopInsertBulkMultipleTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/BopInsertBulkMultipleTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.ArcusClient; import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; import net.spy.memcached.collection.Element; @@ -122,7 +123,7 @@ void testErrorCount() { Map map = future.get(2000L, TimeUnit.MILLISECONDS); - assertEquals(bkeySize, map.size()); + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); } catch (Exception e) { e.printStackTrace(); diff --git a/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java b/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java index 2d8a5f8f0..220efecb8 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java @@ -23,8 +23,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.ArcusClient; import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; +import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.CollectionOperationStatus; import org.junit.jupiter.api.AfterEach; @@ -111,9 +113,7 @@ void testInsertAndGet() { void testErrorCount() { int valueCount = 1200; Object[] valueList = new Object[valueCount]; - for (int i = 0; i < valueList.length; i++) { - valueList[i] = "MyValue"; - } + Arrays.fill(valueList, "MyValue"); try { // SET @@ -123,8 +123,11 @@ void testErrorCount() { Map map = future.get(1000L, TimeUnit.MILLISECONDS); - assertEquals(valueCount, map.size()); - + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); + assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT - 1).getResponse(), + CollectionResponse.NOT_FOUND); + assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT).getResponse(), + CollectionResponse.NOT_EXECUTED); } catch (Exception e) { e.printStackTrace(); fail(); diff --git a/src/test/manual/net/spy/memcached/bulkoperation/MopInsertBulkMultipleTest.java b/src/test/manual/net/spy/memcached/bulkoperation/MopInsertBulkMultipleTest.java index 1902675c1..80675642f 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/MopInsertBulkMultipleTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/MopInsertBulkMultipleTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.ArcusClient; import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; import net.spy.memcached.ops.CollectionOperationStatus; @@ -116,7 +117,7 @@ void testErrorCount() { Map map = future.get(2000L, TimeUnit.MILLISECONDS); - assertEquals(elementSize, map.size()); + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); } catch (Exception e) { e.printStackTrace(); diff --git a/src/test/manual/net/spy/memcached/bulkoperation/PipeInsertTest.java b/src/test/manual/net/spy/memcached/bulkoperation/PipeInsertTest.java index 7faa9bc69..feae56357 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/PipeInsertTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/PipeInsertTest.java @@ -22,6 +22,7 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import net.spy.memcached.ArcusClient; import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; import net.spy.memcached.collection.Element; @@ -253,7 +254,7 @@ void testMopPipeInsert() { Map map = future.get(5000L, TimeUnit.MILLISECONDS); - assertEquals(1000, map.size()); + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); Map rmap = mc.asyncMopGet(KEY, false, false) .get(); diff --git a/src/test/manual/net/spy/memcached/bulkoperation/SopInsertBulkMultipleValueTest.java b/src/test/manual/net/spy/memcached/bulkoperation/SopInsertBulkMultipleValueTest.java index 96a621b55..c1e9b20a1 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/SopInsertBulkMultipleValueTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/SopInsertBulkMultipleValueTest.java @@ -122,7 +122,7 @@ void testErrorCount() { Map map = future.get(2000L, TimeUnit.MILLISECONDS); - assertEquals(valueCount, map.size()); + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); } catch (Exception e) { e.printStackTrace(); fail();