Skip to content

Commit

Permalink
INTERNAL: make piped insert operations process synchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
oliviarla committed Feb 24, 2025
1 parent 707c894 commit dfb07d8
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 59 deletions.
92 changes: 87 additions & 5 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.jar.JarFile;
import java.util.jar.Manifest;

Expand Down Expand Up @@ -1783,7 +1784,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
insertList.add(new BTreePipedInsert<>(key, elementMap, attributesForCreate, tc));
}
}
return asyncCollectionPipedInsert(key, insertList);
return syncCollectionPipedInsert(key, insertList);
}

@Override
Expand All @@ -1806,7 +1807,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
insertList.add(new ByteArraysBTreePipedInsert<>(key, elementList, attributesForCreate, tc));
}
}
return asyncCollectionPipedInsert(key, insertList);
return syncCollectionPipedInsert(key, insertList);
}

@Override
Expand All @@ -1833,7 +1834,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncMopPip
insertList.add(new MapPipedInsert<>(key, elementMap, attributesForCreate, tc));
}
}
return asyncCollectionPipedInsert(key, insertList);
return syncCollectionPipedInsert(key, insertList);
}

@Override
Expand All @@ -1859,7 +1860,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
}
}
}
return asyncCollectionPipedInsert(key, insertList);
return syncCollectionPipedInsert(key, insertList);
}

@Override
Expand All @@ -1882,7 +1883,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncSopPip
insertList.add(new SetPipedInsert<>(key, elementList, attributesForCreate, tc));
}
}
return asyncCollectionPipedInsert(key, insertList);
return syncCollectionPipedInsert(key, insertList);
}

@Override
Expand Down Expand Up @@ -3106,6 +3107,87 @@ public void gotStatus(Integer index, OperationStatus status) {
return rv;
}

/**
* insert items into collection synchronously.
*
* @param key arcus cache key
* @param insertList must not be empty.
* @return future holding the map of element index and the reason why insert operation failed
*/
private <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> syncCollectionPipedInsert(
final String key, final List<CollectionPipedInsert<T>> insertList) {
final CountDownLatch latch = new CountDownLatch(1);
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<>(latch, operationTimeout);

final List<Operation> ops = new ArrayList<>(insertList.size());
BiFunction<Integer, Integer, OperationCallback> makeCallback = (opIdx, itemCount) -> new PipedOperationCallback() {

private int lastExecutedIndex;

public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;

if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}

public void complete() {
if (!rv.getOperationStatus().isSuccess()) {
// If this operation has been errored or cancelled,
// add the command not executed in a operation as not executed state.
// The remaining operations will be also not executed state but not added into failed result.
if (lastExecutedIndex < itemCount - 2) {
// command remained in the same operation object.
rv.addEachResult(lastExecutedIndex + 1 + (opIdx * MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED));
} else if (opIdx + 1 < ops.size()) {
// command remained in the next operation object.
rv.addEachResult((opIdx + 1) * MAX_PIPED_ITEM_COUNT,
new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED));
}
latch.countDown();
} else {
if (opIdx + 1 < ops.size()) {
// If operations are succeed and next operation exists, then add it.
Operation nextOp = ops.get(opIdx + 1);
rv.addOperation(nextOp);
addOp(key, nextOp);
} else {
latch.countDown();
}
}
}

public void gotStatus(Integer index, OperationStatus status) {
if (!status.isSuccess()) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
this.lastExecutedIndex = index;
}
};

for (int i = 0; i < insertList.size(); i++) {
final CollectionPipedInsert<T> insert = insertList.get(i);
Operation op = opFact.collectionPipedInsert(key, insert, makeCallback.apply(i, insert.getItemCount()));
ops.add(op);
}
rv.addOperation(ops.get(0));
addOp(key, ops.get(0));
return rv;
}

@Override
public Future<Map<String, CollectionOperationStatus>> asyncBopInsertBulk(
List<String> keyList, long bkey, byte[] eFlag, Object value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public enum CollectionResponse {

UNDEFINED,
CANCELED,
NOT_EXECUTED,

INTERRUPT_EXCEPTION,
EXECUTION_EXCEPTION,
Expand Down
76 changes: 37 additions & 39 deletions src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package net.spy.memcached.internal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -11,13 +11,15 @@
import java.util.concurrent.atomic.AtomicReference;

import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.collection.CollectionResponse;
import net.spy.memcached.ops.CollectionOperationStatus;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationState;

public class PipedCollectionFuture<K, V>
extends CollectionFuture<Map<K, V>> {
private final Collection<Operation> ops = new ArrayList<>();
// operations that are completed or in progress
private final List<Operation> ops = new ArrayList<>();
private final AtomicReference<CollectionOperationStatus> operationStatus
= new AtomicReference<>(null);

Expand All @@ -30,67 +32,63 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) {

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
rv |= op.cancel("by application.");
}
return rv;
return ops.get(ops.size() - 1).cancel("by application.");
}

/**
* if previous op is cancelled, then next ops are not added to the opQueue.
* So we only need to check current op.
*
* @return true if operation is cancelled.
*/
@Override
public boolean isCancelled() {
for (Operation op : ops) {
if (op.isCancelled()) {
return true;
}
}
return false;
return operationStatus.get().getResponse() == CollectionResponse.CANCELED;
}

/**
* if previous op threw exception, then next ops are not added to the opQueue.
* So we only need to check current op.
*
* @return true if operation has errored by exception.
*/
public boolean hasErrored() {
return ops.get(ops.size() - 1).hasErrored();
}

@Override
public boolean isDone() {
for (Operation op : ops) {
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
}
return true;
return latch.getCount() == 0;
}

@Override
public Map<K, V> get(long duration, TimeUnit unit)
throws InterruptedException, TimeoutException, ExecutionException {

long beforeAwait = System.currentTimeMillis();
Operation lastExecutedOp;
if (!latch.await(duration, unit)) {
Collection<Operation> timedOutOps = new ArrayList<>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
timedOutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
}
if (!timedOutOps.isEmpty()) {
// set timeout only once for piped ops requested to single node.
MemcachedConnection.opTimedOut(timedOutOps.iterator().next());
lastExecutedOp = ops.get(ops.size() - 1);
if (lastExecutedOp.getState() != OperationState.COMPLETE) {
MemcachedConnection.opTimedOut(lastExecutedOp);

long elapsed = System.currentTimeMillis() - beforeAwait;
throw new CheckedOperationTimeoutException(duration, unit, elapsed, timedOutOps);
throw new CheckedOperationTimeoutException(duration, unit, elapsed, lastExecutedOp);
}
} else {
// continuous timeout counter will be reset only once in pipe
MemcachedConnection.opSucceeded(ops.iterator().next());
lastExecutedOp = ops.get(ops.size() - 1);
MemcachedConnection.opSucceeded(lastExecutedOp);
}

for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}
// If previous op has errored or cancelled, operations are not executed anymore.
// Therefore, we only need to check last executed op.
if (lastExecutedOp != null && lastExecutedOp.hasErrored()) {
throw new ExecutionException(lastExecutedOp.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
}
if (lastExecutedOp != null && lastExecutedOp.isCancelled()) {
throw new ExecutionException(new RuntimeException(lastExecutedOp.getCancelCause()));
}

return failedResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/**
* Operation to store collection data in a memcached server.
*/
public final class CollectionPipedInsertOperationImpl extends PipeOperationImpl
public final class CollectionPipedInsertOperationImpl extends SingleKeyPipeOperationImpl
implements CollectionPipedInsertOperation {

public CollectionPipedInsertOperationImpl(String key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ abstract class PipeOperationImpl extends OperationImpl {

protected boolean successAll = true;

private final CollectionPipe collectionPipe;
private final PipedOperationCallback cb;
private final List<String> keys;
protected final CollectionPipe collectionPipe;
protected final PipedOperationCallback cb;
protected final List<String> keys;
private final boolean isIdempotent;

private int index = 0;
private boolean readUntilLastLine = false;
protected int index = 0;
protected boolean readUntilLastLine = false;

protected PipeOperationImpl(List<String> keys, CollectionPipe collectionPipe,
OperationCallback cb) {
Expand Down
Loading

0 comments on commit dfb07d8

Please sign in to comment.