Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโ€™ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INTERNAL: make piped insert operations process synchronously #795

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 91 additions & 5 deletions src/main/java/net/spy/memcached/ArcusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.jar.JarFile;
import java.util.jar.Manifest;

Expand Down Expand Up @@ -1783,7 +1784,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
insertList.add(new BTreePipedInsert<>(key, elementMap, attributesForCreate, tc));
}
}
return asyncCollectionPipedInsert(key, insertList);
return syncCollectionPipedInsert(key, insertList);
}

@Override
Expand All @@ -1806,7 +1807,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
insertList.add(new ByteArraysBTreePipedInsert<>(key, elementList, attributesForCreate, tc));
}
}
return asyncCollectionPipedInsert(key, insertList);
return syncCollectionPipedInsert(key, insertList);
}

@Override
Expand All @@ -1833,7 +1834,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncMopPip
insertList.add(new MapPipedInsert<>(key, elementMap, attributesForCreate, tc));
}
}
return asyncCollectionPipedInsert(key, insertList);
return syncCollectionPipedInsert(key, insertList);
}

@Override
Expand All @@ -1859,7 +1860,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
}
}
}
return asyncCollectionPipedInsert(key, insertList);
return syncCollectionPipedInsert(key, insertList);
}

@Override
Expand All @@ -1882,7 +1883,7 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncSopPip
insertList.add(new SetPipedInsert<>(key, elementList, attributesForCreate, tc));
}
}
return asyncCollectionPipedInsert(key, insertList);
return syncCollectionPipedInsert(key, insertList);
}

@Override
Expand Down Expand Up @@ -3106,6 +3107,91 @@ public void gotStatus(Integer index, OperationStatus status) {
return rv;
}

/**
* insert items into collection synchronously.
*
* @param key arcus cache key
* @param insertList must not be empty.
* @return future holding the map of element index and the reason why insert operation failed
*/
private <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> syncCollectionPipedInsert(
final String key, final List<CollectionPipedInsert<T>> insertList) {
final CountDownLatch latch = new CountDownLatch(1);
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
new PipedCollectionFuture<>(latch, operationTimeout);

BiFunction<Integer, Integer, OperationCallback> makeCallback = (opIdx, itemCount) -> new PipedOperationCallback() {

private int currentCommandIdx = -1;

public void receivedStatus(OperationStatus status) {
CollectionOperationStatus cstatus;

if (status instanceof CollectionOperationStatus) {
cstatus = (CollectionOperationStatus) status;
} else {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
rv.setOperationStatus(cstatus);
}

public void complete() {
if (rv.getOperationStatus().isSuccess()) {
int nextOpIdx = rv.getCurrentOpIdx() + 1;
if (nextOpIdx < insertList.size()) {
Operation nextOp = rv.getOperation(nextOpIdx);
if (!nextOp.isCancelled()) {
addOp(key, nextOp);
rv.setCurrentOpIdx(nextOpIdx);
}
} else {
latch.countDown();
}
} else {
Copy link
Collaborator

@jhpark816 jhpark816 Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complete()๋Š” ์•„๋ž˜ ์ •๋„์˜ ๋กœ์ง์ด ๋‚˜์„ ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค.

  • getNextOperation()๋Š” cancel ๋˜์ง€ ์•Š์€ next op๋ฅผ ๋ฆฌํ„ดํ•˜๋Š” ํ•จ์ˆ˜๋ฅผ ๋‘๋„๋ก ํ•ฉ์‹œ๋‹ค.
  • currentCommandIdx => currItemIdx (op ๋‚ด์˜ n๊ฐœ items ์ค‘์—์„œ ํ•˜๋‚˜๋ฅผ ๊ฐ€๋ฆฌํ‚ค๋Š” index)
  • this.gotStatus() ํ˜ธ์ถœํ•˜๋ฉด ์ฝ”๋“œ๊ฐ€ ๊ฐ„๋‹จํ•ด ์ง.
        if (rv.getOperationStatus().isSuccess()) {
          Operation nextOp = rv.getNextOperation();
          if (nextOp != null) {
            addOp(key, nextOp);
            return;
          }
        } else {
          if ((currItemIdx + 1) < itemCount || (opIdx + 1) < insertList.size()) {
            this.gotStatus(currItemIdx+1, 
                           new CollectionOperationStatus(false, "NOT_EXECUTED", 
                               CollectionResponse.NOT_EXECUTED));
          }
        }
        latch.countDown();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOT_EXECUTED => STOPPED ๋กœ ํ• ๊นŒ์š”?

// If this operation has been errored or cancelled,
// add the command not executed in a operation as not executed state.
// The remaining operations will be also not executed state but not added into failed result.
int nextCommandIdx = 0;
if (currentCommandIdx < itemCount - 1) {
// command remained in the same operation object.
nextCommandIdx = currentCommandIdx + 1 + (opIdx * MAX_PIPED_ITEM_COUNT);
} else if (opIdx + 1 < insertList.size()) {
Comment on lines +3156 to +3159
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentCommandIdx ๊ฐ’์ด -1์ธ ๊ฒฝ์šฐ์— ๋Œ€ํ•ด์„œ๋Š” ๋”ฐ๋กœ ์ฒ˜๋ฆฌ๋ฅผ ํ•˜์ง€ ์•Š์•„๋„ ๋˜๋‚˜์š”?

// command remained in the next operation object.
nextCommandIdx = (opIdx + 1) * MAX_PIPED_ITEM_COUNT;
}
if (nextCommandIdx > 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextCommandIdx ๊ฐ’์ด 0์ธ ๊ฒฝ์šฐ์—๋Š” ์™œ ์ฒ˜๋ฆฌํ•˜์ง€ ์•Š์•„๋„ ๋˜๋Š”์ง€๋ฅผ ์„ค๋ช…ํ•˜๋Š” ์ฃผ์„์„ ์ถ”๊ฐ€ํ•ด์ฃผ์„ธ์š”.

rv.addEachResult(nextCommandIdx,
new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED));
}
latch.countDown();
}
}

public void gotStatus(Integer index, OperationStatus status) {
if (!status.isSuccess()) {
if (status instanceof CollectionOperationStatus) {
rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT),
(CollectionOperationStatus) status);
} else {
rv.addEachResult(index + (opIdx * MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(status));
}
}
currentCommandIdx = index;
}
};

for (int i = 0; i < insertList.size(); i++) {
final CollectionPipedInsert<T> insert = insertList.get(i);
Operation op = opFact.collectionPipedInsert(key, insert, makeCallback.apply(i, insert.getItemCount()));
rv.addOperation(op);
}
addOp(key, rv.getOperation(0));
rv.setCurrentOpIdx(0);
return rv;
}

@Override
public Future<Map<String, CollectionOperationStatus>> asyncBopInsertBulk(
List<String> keyList, long bkey, byte[] eFlag, Object value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public enum CollectionResponse {

UNDEFINED,
CANCELED,
NOT_EXECUTED,

INTERRUPT_EXCEPTION,
EXECUTION_EXCEPTION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -17,9 +18,10 @@

public class PipedCollectionFuture<K, V>
extends CollectionFuture<Map<K, V>> {
private final Collection<Operation> ops = new ArrayList<>();
private final List<Operation> ops = new ArrayList<>();
private final AtomicReference<CollectionOperationStatus> operationStatus
= new AtomicReference<>(null);
private int currentOpIdx = 0;

private final Map<K, V> failedResult =
new ConcurrentHashMap<>();
Expand All @@ -30,13 +32,17 @@ public PipedCollectionFuture(CountDownLatch l, long opTimeout) {

@Override
public boolean cancel(boolean ign) {
boolean rv = false;
for (Operation op : ops) {
rv |= op.cancel("by application.");
for (int i = currentOpIdx; i < ops.size(); i++) {
if (ops.get(i).cancel("by application.")) {
return true;
}
}
return rv;
return false;
}

/**
* @return true if any operation is cancelled.
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isCancelled()์—์„œ๋„ ์•„๋ž˜ for ๋ฌธ์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ๋‚˜์€ ์ง€๋ฅผ ๊ฒ€ํ†  ๋ฐ”๋ž๋‹ˆ๋‹ค.

for (int i = currentOpIdx; i < ops.size(); i++)

@Override
public boolean isCancelled() {
for (Operation op : ops) {
Expand Down Expand Up @@ -119,4 +125,16 @@ public void addEachResult(K index, V status) {
public void addOperation(Operation op) {
ops.add(op);
}

public Operation getOperation(int idx) {
return ops.get(idx);
}

public void setCurrentOpIdx(int idx) {
this.currentOpIdx = idx;
}

public int getCurrentOpIdx() {
return currentOpIdx;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/**
* Operation to store collection data in a memcached server.
*/
public final class CollectionPipedInsertOperationImpl extends PipeOperationImpl
public final class CollectionPipedInsertOperationImpl extends SingleKeyPipeOperationImpl
implements CollectionPipedInsertOperation {

public CollectionPipedInsertOperationImpl(String key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ abstract class PipeOperationImpl extends OperationImpl {

protected boolean successAll = true;

private final CollectionPipe collectionPipe;
private final PipedOperationCallback cb;
private final List<String> keys;
protected final CollectionPipe collectionPipe;
protected final PipedOperationCallback cb;
protected final List<String> keys;
private final boolean isIdempotent;

private int index = 0;
private boolean readUntilLastLine = false;
protected int index = 0;
protected boolean readUntilLastLine = false;

protected PipeOperationImpl(List<String> keys, CollectionPipe collectionPipe,
OperationCallback cb) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package net.spy.memcached.protocol.ascii;

import java.util.List;

import net.spy.memcached.collection.CollectionPipe;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;

public abstract class SingleKeyPipeOperationImpl extends PipeOperationImpl {

protected SingleKeyPipeOperationImpl(List<String> keys,
CollectionPipe collectionPipe,
OperationCallback cb) {
super(keys, collectionPipe, cb);
}

@Override
public void handleLine(String line) {
assert getState() == OperationState.READING
: "Read ``" + line + "'' when in " + getState() + " state";

/* ENABLE_REPLICATION if */
if (isWriteOperation() && hasSwitchedOver(line)) {
collectionPipe.setNextOpIndex(index);
prepareSwitchover(line);
return;
}
/* ENABLE_REPLICATION end */

/* ENABLE_MIGRATION if */
if (hasNotMyKey(line)) {
// Only one NOT_MY_KEY is provided in response of
// single key piped operation when redirection.
addRedirectSingleKeyOperation(line, keys.get(0));
if (collectionPipe.isNotPiped()) {
transitionState(OperationState.REDIRECT);
} else {
collectionPipe.setNextOpIndex(index);
}
return;
}
/* ENABLE_MIGRATION end */

if (collectionPipe.isNotPiped()) {
OperationStatus status = checkStatus(line);
if (!status.isSuccess()) {
successAll = false;
}
cb.gotStatus(index, status);

cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
return;
}

/*
RESPONSE <count>\r\n
<status of the 1st pipelined command>\r\n
[ ... ]
<status of the last pipelined command>\r\n
END|PIPE_ERROR <error_string>\r\n
*/
if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
/* ENABLE_MIGRATION if */
if (needRedirect()) {
transitionState(OperationState.REDIRECT);
return;
}
/* ENABLE_MIGRATION end */
cb.receivedStatus((index == collectionPipe.getItemCount() && successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {
getLogger().debug("Got line %s", line);

// TODO server should be fixed
line = line.replace(" ", " ");
line = line.replace(" ", " ");

String[] stuff = line.split(" ");
assert "RESPONSE".equals(stuff[0]);
readUntilLastLine = true;
} else {
OperationStatus status = checkStatus(line);
if (!status.isSuccess()) {
successAll = false;
}
cb.gotStatus(index, status);

index++;
}
}

@Override
protected OperationStatus checkStatus(String line) {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import net.spy.memcached.ArcusClient;
import net.spy.memcached.collection.BaseIntegrationTest;
import net.spy.memcached.collection.CollectionAttributes;
import net.spy.memcached.collection.Element;
Expand Down Expand Up @@ -122,7 +123,7 @@ void testErrorCount() {

Map<Integer, CollectionOperationStatus> map = future.get(2000L,
TimeUnit.MILLISECONDS);
assertEquals(bkeySize, map.size());
assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size());

} catch (Exception e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import net.spy.memcached.ArcusClient;
import net.spy.memcached.collection.BaseIntegrationTest;
import net.spy.memcached.collection.CollectionAttributes;
import net.spy.memcached.collection.CollectionResponse;
import net.spy.memcached.ops.CollectionOperationStatus;

import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -111,9 +113,7 @@ void testInsertAndGet() {
void testErrorCount() {
int valueCount = 1200;
Object[] valueList = new Object[valueCount];
for (int i = 0; i < valueList.length; i++) {
valueList[i] = "MyValue";
}
Arrays.fill(valueList, "MyValue");

try {
// SET
Expand All @@ -123,8 +123,11 @@ void testErrorCount() {

Map<Integer, CollectionOperationStatus> map = future.get(1000L,
TimeUnit.MILLISECONDS);
assertEquals(valueCount, map.size());

assertEquals(ArcusClient.MAX_PIPED_ITEM_COUNT + 1, map.size());
assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT - 1).getResponse(),
CollectionResponse.NOT_FOUND);
assertEquals(map.get(ArcusClient.MAX_PIPED_ITEM_COUNT).getResponse(),
CollectionResponse.NOT_EXECUTED);
} catch (Exception e) {
e.printStackTrace();
fail();
Expand Down
Loading
Loading