-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
INTERNAL: make piped insert operations process synchronously #795
base: develop
Are you sure you want to change the base?
Conversation
src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Outdated
Show resolved
Hide resolved
src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Outdated
Show resolved
Hide resolved
9eb5496
to
076d02b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
μΌλΆ 리뷰
90a7ee5
to
df233d9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
리뷰 μλ£
600f2b2
to
d232a01
Compare
@jhpark816 리뷰 λ°μνμ΅λλ€. |
@uhm0311 리뷰 λ°λλλ€. |
This comment was marked as resolved.
This comment was marked as resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
νλ λ μ§λ¬Έμ λλ€.
src/main/java/net/spy/memcached/protocol/ascii/SingleKeyPipeOperationImpl.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
μΈ λ²μ§Έ μ§λ¬Έ λ¨κΉλλ€.
src/main/java/net/spy/memcached/protocol/ascii/SingleKeyPipeOperationImpl.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
μΌλΆ 리뷰
src/main/java/net/spy/memcached/protocol/ascii/SingleKeyPipeOperationImpl.java
Outdated
Show resolved
Hide resolved
dfb07d8
to
7267b1a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
μΌλΆ 리뷰
src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Outdated
Show resolved
Hide resolved
src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Outdated
Show resolved
Hide resolved
src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Outdated
Show resolved
Hide resolved
src/main/java/net/spy/memcached/protocol/ascii/SingleKeyPipeOperationImpl.java
Outdated
Show resolved
Hide resolved
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
@uhm0311 @jhpark816 future.cancel()
public boolean cancel(boolean ign) {
for (int i = currentOpIdx; i < ops.size(); i++) {
if (ops.get(i).cancel("by application.")) {
return true;
}
}
return false;
}
public boolean isCancelled() {
for (int i=0;i<ops.size();i++) {
if (ops.get(i).isCancelled()) {
return true;
}
}
return false;
} callback.complete()
public void complete() {
if (rv.getOperationStatus().isSuccess()) {
Operation nextOp = rv.getNextOp(); // next opκ° μλ€λ©΄ op κ°μ²΄λ₯Ό, μλ€λ©΄ Null λ°ν
if (nextOp != null && !nextOp.isCancelled()) {
addOp(key, nextOp);
rv.setCurrentOpIdx(nextOpIdx);
} else {
latch.countDown();
}
} else {
// ...
if (nextIndex > 0) {
rv.addEachResult(nextIndex, new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED));
}
latch.countDown();
}
} |
@uhm0311 @jhpark816 |
// command remained in the next operation object. | ||
nextCommandIdx = (opIdx + 1) * MAX_PIPED_ITEM_COUNT; | ||
} | ||
if (nextCommandIdx > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nextCommandIdx κ°μ΄ 0μΈ κ²½μ°μλ μ μ²λ¦¬νμ§ μμλ λλμ§λ₯Ό μ€λͺ νλ μ£Όμμ μΆκ°ν΄μ£ΌμΈμ.
if (currentCommandIdx < itemCount - 1) { | ||
// command remained in the same operation object. | ||
nextCommandIdx = currentCommandIdx + 1 + (opIdx * MAX_PIPED_ITEM_COUNT); | ||
} else if (opIdx + 1 < insertList.size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currentCommandIdx κ°μ΄ -1μΈ κ²½μ°μ λν΄μλ λ°λ‘ μ²λ¦¬λ₯Ό νμ§ μμλ λλμ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
리뷰 μλ£
} else { | ||
latch.countDown(); | ||
} | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
complete()λ μλ μ λμ λ‘μ§μ΄ λμ κ² κ°μ΅λλ€.
- getNextOperation()λ cancel λμ§ μμ next opλ₯Ό 리ν΄νλ ν¨μλ₯Ό λλλ‘ ν©μλ€.
- currentCommandIdx => currItemIdx (op λ΄μ nκ° items μ€μμ νλλ₯Ό κ°λ¦¬ν€λ index)
- this.gotStatus() νΈμΆνλ©΄ μ½λκ° κ°λ¨ν΄ μ§.
if (rv.getOperationStatus().isSuccess()) {
Operation nextOp = rv.getNextOperation();
if (nextOp != null) {
addOp(key, nextOp);
return;
}
} else {
if ((currItemIdx + 1) < itemCount || (opIdx + 1) < insertList.size()) {
this.gotStatus(currItemIdx+1,
new CollectionOperationStatus(false, "NOT_EXECUTED",
CollectionResponse.NOT_EXECUTED));
}
}
latch.countDown();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOT_EXECUTED => STOPPED λ‘ ν κΉμ?
} | ||
|
||
/** | ||
* @return true if any operation is cancelled. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isCancelled()μμλ μλ for λ¬Έμ μ¬μ©νλ κ²μ΄ λμ μ§λ₯Ό κ²ν λ°λλλ€.
for (int i = currentOpIdx; i < ops.size(); i++)
π Related Issue
β¨οΈ What I did
lop/sop/mop/bop piped insert
μ νν΄ μ μ©νλ PRμ λλ€.λμ κ³Όμ
500κ° μμ΄ν λ¨μλ‘ Operation κ°μ²΄λ₯Ό λλμ΄ λΉλκΈ°λ‘ μΌμ ν μμ²μ 보λ΄λ κ²μ λκΈ° λ°©μμΌλ‘ νλμ© λ³΄λ΄μ΄ Arcus μλ²μ κ³Όλν λΆνκ° λ€μ΄κ°λ κ²μ λ°©μ§νκ³ , μ€ν¨ μ λ€μ Operationμ μννμ§ μλλ‘ ν©λλ€.
μ΄μ operationμ΄ μ±κ³΅ν΄μΌλ§ λ€μ operationμ΄ writeQueueμ Futureμ operation listμ μΆκ°λ©λλ€.
λ§μ½ CLIENT_ERROR, SERVER_ERROR μ€ν¨κ° λ°μνλ©΄ κ·Έ μ¦μ latch.countdownμ νΈμΆν΄ λ¨μ operationμ μννμ§ μμ΅λλ€.
OVERFLOWED, OUT_OF_RANGE κ°μ μ€ν¨κ° λ°μνλ©΄ μ΄ν operationμ 첫 commandκ° NOT_EXECUTED μνμμ futureμ failedResultμ μΆκ°ν©λλ€.
NOT_EXECUTED μ΄νμ λͺ¨λ μ°μ°μ μ€νλμ§ μμ κ²μ λλ€. (CANCELEDκ° μλ NOT_EXECUTEDλ₯Ό μ¬μ©νλ μ΄μ λ μ§μ§ cancel μν©κ³Όμ νΌμ©μ λ§κ³ μ νκΈ° μν¨μ λλ€.)
λ³Έ PR μ΄μ μ pipe κ΄λ ¨ ν΄λμ€ κ΅¬μ‘° λ³κ²½μ΄ μμκ³ , μ΄μ λ§μΆμ΄ SingleKeyPipedOperationImpl ν΄λμ€λ₯Ό μΆκ°ν©λλ€.