From 134622918f816412c35d74f26b63c772788229ac Mon Sep 17 00:00:00 2001 From: oliviarla Date: Fri, 14 Feb 2025 16:15:19 +0900 Subject: [PATCH] INTERNAL: make piped insert operations process synchronously --- .../java/net/spy/memcached/ArcusClient.java | 98 ++++++++++++++- .../collection/CollectionResponse.java | 1 + .../internal/PipedCollectionFuture.java | 76 ++++++------ .../ops/SyncPipedOperationCallback.java | 7 ++ .../CollectionPipedInsertOperationImpl.java | 2 +- .../protocol/ascii/PipeOperationImpl.java | 10 +- .../ascii/SingleKeyPipeOperationImpl.java | 114 ++++++++++++++++++ .../memcached/protocol/ascii/BaseOpTest.java | 8 +- .../net/spy/memcached/MultibyteKeyTest.java | 9 +- .../BopInsertBulkMultipleTest.java | 3 +- .../LopInsertBulkMultipleValueTest.java | 13 +- .../MopInsertBulkMultipleTest.java | 3 +- .../bulkoperation/PipeInsertTest.java | 13 +- .../SopInsertBulkMultipleValueTest.java | 2 +- 14 files changed, 287 insertions(+), 72 deletions(-) create mode 100644 src/main/java/net/spy/memcached/ops/SyncPipedOperationCallback.java create mode 100644 src/main/java/net/spy/memcached/protocol/ascii/SingleKeyPipeOperationImpl.java diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index dc974953e..1629254ef 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import java.util.jar.JarFile; import java.util.jar.Manifest; @@ -146,6 +147,7 @@ import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.PipedOperationCallback; import net.spy.memcached.ops.StoreType; +import net.spy.memcached.ops.SyncPipedOperationCallback; import net.spy.memcached.plugin.FrontCacheMemcachedClient; import net.spy.memcached.transcoders.CollectionTranscoder; import net.spy.memcached.transcoders.Transcoder; @@ -1782,7 +1784,7 @@ public CollectionFuture> asyncBopPip insertList.add(new BTreePipedInsert<>(key, elementMap, attributesForCreate, tc)); } } - return asyncCollectionPipedInsert(key, insertList); + return syncCollectionPipedInsert(key, insertList); } @Override @@ -1805,7 +1807,7 @@ public CollectionFuture> asyncBopPip insertList.add(new ByteArraysBTreePipedInsert<>(key, elementList, attributesForCreate, tc)); } } - return asyncCollectionPipedInsert(key, insertList); + return syncCollectionPipedInsert(key, insertList); } @Override @@ -1832,7 +1834,7 @@ public CollectionFuture> asyncMopPip insertList.add(new MapPipedInsert<>(key, elementMap, attributesForCreate, tc)); } } - return asyncCollectionPipedInsert(key, insertList); + return syncCollectionPipedInsert(key, insertList); } @Override @@ -1853,9 +1855,12 @@ public CollectionFuture> asyncLopPip PartitionedList list = new PartitionedList<>(valueList, MAX_PIPED_ITEM_COUNT); for (List elementList : list) { insertList.add(new ListPipedInsert<>(key, index, elementList, attributesForCreate, tc)); + if (index >= 0) { + index += elementList.size(); + } } } - return asyncCollectionPipedInsert(key, insertList); + return syncCollectionPipedInsert(key, insertList); } @Override @@ -1878,7 +1883,7 @@ public CollectionFuture> asyncSopPip insertList.add(new SetPipedInsert<>(key, elementList, attributesForCreate, tc)); } } - return asyncCollectionPipedInsert(key, insertList); + return syncCollectionPipedInsert(key, insertList); } @Override @@ -3115,6 +3120,89 @@ public void gotStatus(Integer index, OperationStatus status) { return rv; } + /** + * insert items into collection synchronously. + * + * @param key arcus cache key + * @param insertList must not be empty. + * @return future holding the map of element index and the reason why insert operation failed + */ + private CollectionFuture> syncCollectionPipedInsert( + final String key, final List> insertList) { + final CountDownLatch latch = new CountDownLatch(1); + final PipedCollectionFuture rv = + new PipedCollectionFuture<>(latch, operationTimeout); + + final List ops = new ArrayList<>(insertList.size()); + BiFunction makeCallback = (opIdx, itemCount) -> new SyncPipedOperationCallback() { + + private int notExecutedIndex; + + public void receivedStatus(OperationStatus status) { + CollectionOperationStatus cstatus; + + if (status instanceof CollectionOperationStatus) { + cstatus = (CollectionOperationStatus) status; + } else { + getLogger().warn("Unhandled state: " + status); + cstatus = new CollectionOperationStatus(status); + } + rv.setOperationStatus(cstatus); + } + + public void complete() { + if (!rv.getOperationStatus().isSuccess()) { + // If this operation has been errored or cancelled, + // add the command not executed in a operation as cancelled state. + // The remaining operations will be also cancelled state but not added into failed result. + if (notExecutedIndex < itemCount) { + rv.addEachResult(notExecutedIndex + (opIdx * MAX_PIPED_ITEM_COUNT), + new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED)); + } else if (opIdx + 1 < ops.size()) { + rv.addEachResult((opIdx + 1) * MAX_PIPED_ITEM_COUNT, + new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED)); + } + latch.countDown(); + } else { + if (opIdx + 1 < ops.size()) { + // If operations are succeed and next operation exists, then add it. + Operation nextOp = ops.get(opIdx + 1); + rv.addOperation(nextOp); + addOp(key, nextOp); + } else { + latch.countDown(); + } + } + } + + public void gotStatus(Integer index, OperationStatus status) { + if (!status.isSuccess()) { + if (status instanceof CollectionOperationStatus) { + rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT), + (CollectionOperationStatus) status); + } else { + rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT), + new CollectionOperationStatus(status)); + } + } + } + + @Override + public void setNotExecutedIndex(Integer index) { + this.notExecutedIndex = index; + } + }; + + for (int i = 0; i < insertList.size(); i++) { + final CollectionPipedInsert insert = insertList.get(i); + Operation op = opFact.collectionPipedInsert(key, insert, makeCallback.apply(i, insert.getItemCount())); + ops.add(op); + } + rv.addOperation(ops.get(0)); + addOp(key, ops.get(0)); + return rv; + } + @Override public Future> asyncBopInsertBulk( List keyList, long bkey, byte[] eFlag, Object value, diff --git a/src/main/java/net/spy/memcached/collection/CollectionResponse.java b/src/main/java/net/spy/memcached/collection/CollectionResponse.java index cce6bab0e..5ec72f9cd 100644 --- a/src/main/java/net/spy/memcached/collection/CollectionResponse.java +++ b/src/main/java/net/spy/memcached/collection/CollectionResponse.java @@ -44,6 +44,7 @@ public enum CollectionResponse { UNDEFINED, CANCELED, + NOT_EXECUTED, INTERRUPT_EXCEPTION, EXECUTION_EXCEPTION, diff --git a/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java index 4cfc67c9c..243626100 100644 --- a/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java +++ b/src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java @@ -1,7 +1,7 @@ package net.spy.memcached.internal; import java.util.ArrayList; -import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -11,13 +11,15 @@ import java.util.concurrent.atomic.AtomicReference; import net.spy.memcached.MemcachedConnection; +import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; public class PipedCollectionFuture extends CollectionFuture> { - private final Collection ops = new ArrayList<>(); + // operations that are completed or in progress + private final List ops = new ArrayList<>(); private final AtomicReference operationStatus = new AtomicReference<>(null); @@ -30,31 +32,33 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) { @Override public boolean cancel(boolean ign) { - boolean rv = false; - for (Operation op : ops) { - rv |= op.cancel("by application."); - } - return rv; + return ops.get(ops.size() - 1).cancel("by application."); } + /** + * if previous op is cancelled, then next ops are not added to the opQueue. + * So we only need to check current op. + * + * @return true if operation is cancelled. + */ @Override public boolean isCancelled() { - for (Operation op : ops) { - if (op.isCancelled()) { - return true; - } - } - return false; + return operationStatus.get().getResponse() == CollectionResponse.CANCELED; + } + + /** + * if previous op threw exception, then next ops are not added to the opQueue. + * So we only need to check current op. + * + * @return true if operation has errored by exception. + */ + public boolean hasErrored() { + return ops.get(ops.size() - 1).hasErrored(); } @Override public boolean isDone() { - for (Operation op : ops) { - if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) { - return false; - } - } - return true; + return latch.getCount() == 0; } @Override @@ -62,35 +66,29 @@ public Map get(long duration, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { long beforeAwait = System.currentTimeMillis(); + Operation lastExecutedOp; if (!latch.await(duration, unit)) { - Collection timedOutOps = new ArrayList<>(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - timedOutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (!timedOutOps.isEmpty()) { - // set timeout only once for piped ops requested to single node. - MemcachedConnection.opTimedOut(timedOutOps.iterator().next()); + lastExecutedOp = ops.get(ops.size() - 1); + if (lastExecutedOp.getState() != OperationState.COMPLETE) { + MemcachedConnection.opTimedOut(lastExecutedOp); long elapsed = System.currentTimeMillis() - beforeAwait; - throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps); + throw new CheckedOperationTimeoutException(duration, unit, elapsed, lastExecutedOp); } } else { // continuous timeout counter will be reset only once in pipe - MemcachedConnection.opSucceeded(ops.iterator().next()); + lastExecutedOp = ops.get(ops.size() - 1); + MemcachedConnection.opSucceeded(lastExecutedOp); } - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); - } + // If previous op has errored or cancelled, operations are not executed anymore. + // Therefore, we only need to check last executed op. + if (lastExecutedOp != null && lastExecutedOp.hasErrored()) { + throw new ExecutionException(lastExecutedOp.getException()); + } - if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); - } + if (lastExecutedOp != null && lastExecutedOp.isCancelled()) { + throw new ExecutionException(new RuntimeException(lastExecutedOp.getCancelCause())); } return failedResult; diff --git a/src/main/java/net/spy/memcached/ops/SyncPipedOperationCallback.java b/src/main/java/net/spy/memcached/ops/SyncPipedOperationCallback.java new file mode 100644 index 000000000..a64f6cc5b --- /dev/null +++ b/src/main/java/net/spy/memcached/ops/SyncPipedOperationCallback.java @@ -0,0 +1,7 @@ +package net.spy.memcached.ops; + +public interface SyncPipedOperationCallback extends PipedOperationCallback { + + void setNotExecutedIndex(Integer index); + +} diff --git a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java index 65c9b6fb3..bd58c6d37 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java @@ -29,7 +29,7 @@ /** * Operation to store collection data in a memcached server. */ -public final class CollectionPipedInsertOperationImpl extends PipeOperationImpl +public final class CollectionPipedInsertOperationImpl extends SingleKeyPipeOperationImpl implements CollectionPipedInsertOperation { public CollectionPipedInsertOperationImpl(String key, diff --git a/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java index f982f7334..5ecb8f35a 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/PipeOperationImpl.java @@ -62,13 +62,13 @@ abstract class PipeOperationImpl extends OperationImpl { protected boolean successAll = true; - private final CollectionPipe collectionPipe; - private final PipedOperationCallback cb; - private final List keys; + protected final CollectionPipe collectionPipe; + protected final PipedOperationCallback cb; + protected final List keys; private final boolean isIdempotent; - private int index = 0; - private boolean readUntilLastLine = false; + protected int index = 0; + protected boolean readUntilLastLine = false; protected PipeOperationImpl(List keys, CollectionPipe collectionPipe, OperationCallback cb) { diff --git a/src/main/java/net/spy/memcached/protocol/ascii/SingleKeyPipeOperationImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/SingleKeyPipeOperationImpl.java new file mode 100644 index 000000000..31fc293a7 --- /dev/null +++ b/src/main/java/net/spy/memcached/protocol/ascii/SingleKeyPipeOperationImpl.java @@ -0,0 +1,114 @@ +package net.spy.memcached.protocol.ascii; + +import java.util.List; + +import net.spy.memcached.collection.CollectionPipe; +import net.spy.memcached.ops.OperationCallback; +import net.spy.memcached.ops.OperationState; +import net.spy.memcached.ops.OperationStatus; +import net.spy.memcached.ops.SyncPipedOperationCallback; + +public abstract class SingleKeyPipeOperationImpl extends PipeOperationImpl { + + private final SyncPipedOperationCallback cb; + + protected SingleKeyPipeOperationImpl(List keys, + CollectionPipe collectionPipe, + OperationCallback cb) { + super(keys, collectionPipe, cb); + this.cb = (SyncPipedOperationCallback) cb; + } + + @Override + public void handleLine(String line) { + assert getState() == OperationState.READING + : "Read ``" + line + "'' when in " + getState() + " state"; + + /* ENABLE_REPLICATION if */ + if (isWriteOperation() && hasSwitchedOver(line)) { + collectionPipe.setNextOpIndex(index); + prepareSwitchover(line); + return; + } + /* ENABLE_REPLICATION end */ + + /* ENABLE_MIGRATION if */ + if (hasNotMyKey(line)) { + if (isBulkOperation()) { + addRedirectMultiKeyOperation(line, keys.get(index)); + if (collectionPipe.isNotPiped()) { + transitionState(OperationState.REDIRECT); + } else { + index++; + } + } else { + // Only one NOT_MY_KEY is provided in response of + // single key piped operation when redirection. + addRedirectSingleKeyOperation(line, keys.get(0)); + if (collectionPipe.isNotPiped()) { + transitionState(OperationState.REDIRECT); + } else { + collectionPipe.setNextOpIndex(index); + } + } + return; + } + /* ENABLE_MIGRATION end */ + + if (collectionPipe.isNotPiped()) { + OperationStatus status = checkStatus(line); + if (!status.isSuccess()) { + successAll = false; + } + cb.gotStatus(index, status); + + cb.receivedStatus((successAll) ? END : FAILED_END); + cb.setNotExecutedIndex(index); + transitionState(OperationState.COMPLETE); + return; + } + + /* + RESPONSE \r\n + \r\n + [ ... ] + \r\n + END|PIPE_ERROR \r\n + */ + if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { + /* ENABLE_MIGRATION if */ + if (needRedirect()) { + transitionState(OperationState.REDIRECT); + return; + } + /* ENABLE_MIGRATION end */ + cb.receivedStatus((index == collectionPipe.getItemCount() && successAll) ? END : FAILED_END); + cb.setNotExecutedIndex(index); + transitionState(OperationState.COMPLETE); + } else if (line.startsWith("RESPONSE ")) { + getLogger().debug("Got line %s", line); + + // TODO server should be fixed + line = line.replace(" ", " "); + line = line.replace(" ", " "); + + String[] stuff = line.split(" "); + assert "RESPONSE".equals(stuff[0]); + readUntilLastLine = true; + } else { + OperationStatus status = checkStatus(line); + if (!status.isSuccess()) { + successAll = false; + } + cb.gotStatus(index, status); + + index++; + } + } + + @Override + protected OperationStatus checkStatus(String line) { + return null; + } + +} diff --git a/src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java b/src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java index eb3fd0aa0..34c627f75 100644 --- a/src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java +++ b/src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java @@ -32,7 +32,7 @@ import net.spy.memcached.ops.OperationCallback; import net.spy.memcached.ops.OperationException; import net.spy.memcached.ops.OperationStatus; -import net.spy.memcached.ops.PipedOperationCallback; +import net.spy.memcached.ops.SyncPipedOperationCallback; import org.junit.jupiter.api.Test; @@ -115,7 +115,7 @@ void throwExceptionAfterReadingEndOrPipeError() throws Exception { CollectionPipedInsert.ListPipedInsert insert = new CollectionPipedInsert.ListPipedInsert<>(key, 0, Arrays.asList("a", "b"), null, null); - OperationCallback cb = new PipedOperationCallback() { + OperationCallback cb = new SyncPipedOperationCallback() { @Override public void receivedStatus(OperationStatus status) { } @@ -127,6 +127,10 @@ public void complete() { @Override public void gotStatus(Integer index, OperationStatus status) { } + + @Override + public void setNotExecutedIndex(Integer index) { + } }; CollectionPipedInsertOperationImpl op = new CollectionPipedInsertOperationImpl("test", insert, cb); diff --git a/src/test/manual/net/spy/memcached/MultibyteKeyTest.java b/src/test/manual/net/spy/memcached/MultibyteKeyTest.java index c9068dff3..c51850dfb 100644 --- a/src/test/manual/net/spy/memcached/MultibyteKeyTest.java +++ b/src/test/manual/net/spy/memcached/MultibyteKeyTest.java @@ -68,6 +68,7 @@ import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.PipedOperationCallback; import net.spy.memcached.ops.StoreType; +import net.spy.memcached.ops.SyncPipedOperationCallback; import net.spy.memcached.protocol.ascii.AsciiOperationFactory; import net.spy.memcached.transcoders.CollectionTranscoder; import net.spy.memcached.transcoders.IntegerTranscoder; @@ -339,8 +340,8 @@ public void complete() { @Test void CollectionPipedInsertOperationImplTest() { - PipedOperationCallback cpsCallback = - new PipedOperationCallback() { + SyncPipedOperationCallback cpsCallback = + new SyncPipedOperationCallback() { @Override public void gotStatus(Integer index, OperationStatus status) { } @@ -352,6 +353,10 @@ public void receivedStatus(OperationStatus status) { @Override public void complete() { } + + @Override + public void setNotExecutedIndex(Integer index) { + } }; List> elements = new ArrayList<>(); diff --git a/src/test/manual/net/spy/memcached/bulkoperation/BopInsertBulkMultipleTest.java b/src/test/manual/net/spy/memcached/bulkoperation/BopInsertBulkMultipleTest.java index 4634854ae..04a40bd57 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/BopInsertBulkMultipleTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/BopInsertBulkMultipleTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.ArcusClient; import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; import net.spy.memcached.collection.Element; @@ -122,7 +123,7 @@ void testErrorCount() { Map map = future.get(2000L, TimeUnit.MILLISECONDS); - assertEquals(bkeySize, map.size()); + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); } catch (Exception e) { e.printStackTrace(); diff --git a/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java b/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java index 2d8a5f8f0..220efecb8 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/LopInsertBulkMultipleValueTest.java @@ -23,8 +23,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.ArcusClient; import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; +import net.spy.memcached.collection.CollectionResponse; import net.spy.memcached.ops.CollectionOperationStatus; import org.junit.jupiter.api.AfterEach; @@ -111,9 +113,7 @@ void testInsertAndGet() { void testErrorCount() { int valueCount = 1200; Object[] valueList = new Object[valueCount]; - for (int i = 0; i < valueList.length; i++) { - valueList[i] = "MyValue"; - } + Arrays.fill(valueList, "MyValue"); try { // SET @@ -123,8 +123,11 @@ void testErrorCount() { Map map = future.get(1000L, TimeUnit.MILLISECONDS); - assertEquals(valueCount, map.size()); - + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); + assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT - 1).getResponse(), + CollectionResponse.NOT_FOUND); + assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT).getResponse(), + CollectionResponse.NOT_EXECUTED); } catch (Exception e) { e.printStackTrace(); fail(); diff --git a/src/test/manual/net/spy/memcached/bulkoperation/MopInsertBulkMultipleTest.java b/src/test/manual/net/spy/memcached/bulkoperation/MopInsertBulkMultipleTest.java index 1902675c1..80675642f 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/MopInsertBulkMultipleTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/MopInsertBulkMultipleTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.ArcusClient; import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; import net.spy.memcached.ops.CollectionOperationStatus; @@ -116,7 +117,7 @@ void testErrorCount() { Map map = future.get(2000L, TimeUnit.MILLISECONDS); - assertEquals(elementSize, map.size()); + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); } catch (Exception e) { e.printStackTrace(); diff --git a/src/test/manual/net/spy/memcached/bulkoperation/PipeInsertTest.java b/src/test/manual/net/spy/memcached/bulkoperation/PipeInsertTest.java index c6fc1e8d7..4f68e24be 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/PipeInsertTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/PipeInsertTest.java @@ -22,6 +22,7 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import net.spy.memcached.ArcusClient; import net.spy.memcached.collection.BaseIntegrationTest; import net.spy.memcached.collection.CollectionAttributes; import net.spy.memcached.collection.Element; @@ -99,8 +100,6 @@ void testBopPipeInsert2() { } try { - long start = System.currentTimeMillis(); - CollectionAttributes attr = new CollectionAttributes(); CollectionFuture> future = mc @@ -109,8 +108,6 @@ void testBopPipeInsert2() { Map map = future.get(5000L, TimeUnit.MILLISECONDS); - // System.out.println(System.currentTimeMillis() - start + "ms"); - assertTrue(map.isEmpty()); Map> map3 = mc.asyncBopGet(KEY, 0, 9999, @@ -135,8 +132,6 @@ void testLopPipeInsert() { } try { - long start = System.currentTimeMillis(); - CollectionAttributes attr = new CollectionAttributes(); CollectionFuture> future = mc @@ -145,8 +140,6 @@ void testLopPipeInsert() { Map map = future.get(5000L, TimeUnit.MILLISECONDS); - // System.out.println(System.currentTimeMillis() - start + "ms"); - assertTrue(map.isEmpty()); List list = mc.asyncLopGet(KEY, 0, 9999, false, false) @@ -190,7 +183,7 @@ void testLopPipeInsertIndex() { = future2.get(5000L, TimeUnit.MILLISECONDS); Map map3 = future3.get(5000L, TimeUnit.MILLISECONDS); - assertEquals(map1.size() + map2.size() + map3.size(), 0); + assertEquals(0, map1.size() + map2.size() + map3.size()); List list = mc.asyncLopGet(KEY, 0, 9999, false, false).get(); assertEquals((elementCount * 3) + 2, list.size()); @@ -226,7 +219,7 @@ void testMopPipeInsert() { Map map = future.get(5000L, TimeUnit.MILLISECONDS); - assertEquals(1000, map.size()); + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); Map rmap = mc.asyncMopGet(KEY, false, false) .get(); diff --git a/src/test/manual/net/spy/memcached/bulkoperation/SopInsertBulkMultipleValueTest.java b/src/test/manual/net/spy/memcached/bulkoperation/SopInsertBulkMultipleValueTest.java index 96a621b55..c1e9b20a1 100644 --- a/src/test/manual/net/spy/memcached/bulkoperation/SopInsertBulkMultipleValueTest.java +++ b/src/test/manual/net/spy/memcached/bulkoperation/SopInsertBulkMultipleValueTest.java @@ -122,7 +122,7 @@ void testErrorCount() { Map map = future.get(2000L, TimeUnit.MILLISECONDS); - assertEquals(valueCount, map.size()); + assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size()); } catch (Exception e) { e.printStackTrace(); fail();