Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INTERNAL: make piped insert operations process synchronously #795

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

oliviarla
Copy link
Collaborator

@oliviarla oliviarla commented Aug 19, 2024

πŸ”— Related Issue

⌨️ What I did

  • lop/sop/mop/bop piped insert에 ν•œν•΄ μ μš©ν•˜λŠ” PRμž…λ‹ˆλ‹€.

λ™μž‘ κ³Όμ •

  • 500개 μ•„μ΄ν…œ λ‹¨μœ„λ‘œ Operation 객체λ₯Ό λ‚˜λˆ„μ–΄ λΉ„λ™κΈ°λ‘œ 일제히 μš”μ²­μ„ λ³΄λ‚΄λ˜ 것을 동기 λ°©μ‹μœΌλ‘œ ν•˜λ‚˜μ”© 보내어 Arcus μ„œλ²„μ— κ³Όλ„ν•œ λΆ€ν•˜κ°€ λ“€μ–΄κ°€λŠ” 것을 λ°©μ§€ν•˜κ³ , μ‹€νŒ¨ μ‹œ λ‹€μŒ Operation을 μˆ˜ν–‰ν•˜μ§€ μ•Šλ„λ‘ ν•©λ‹ˆλ‹€.

  • 이전 operation이 μ„±κ³΅ν•΄μ•Όλ§Œ λ‹€μŒ operation이 writeQueue와 Future의 operation list에 μΆ”κ°€λ©λ‹ˆλ‹€.

  • λ§Œμ•½ CLIENT_ERROR, SERVER_ERROR μ‹€νŒ¨κ°€ λ°œμƒν•˜λ©΄ κ·Έ μ¦‰μ‹œ latch.countdown을 ν˜ΈμΆœν•΄ 남은 operation을 μˆ˜ν–‰ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.

  • OVERFLOWED, OUT_OF_RANGE 같은 μ‹€νŒ¨κ°€ λ°œμƒν•˜λ©΄ 이후 operation의 첫 commandκ°€ NOT_EXECUTED μƒνƒœμž„μ„ future의 failedResult에 μΆ”κ°€ν•©λ‹ˆλ‹€.

  • NOT_EXECUTED μ΄ν›„μ˜ λͺ¨λ“  연산은 μ‹€ν–‰λ˜μ§€ μ•Šμ€ κ²ƒμž…λ‹ˆλ‹€. (CANCELEDκ°€ μ•„λ‹Œ NOT_EXECUTEDλ₯Ό μ‚¬μš©ν•˜λŠ” μ΄μœ λŠ” μ§„μ§œ cancel μƒν™©κ³Όμ˜ ν˜Όμš©μ„ λ§‰κ³ μž ν•˜κΈ° μœ„ν•¨μž…λ‹ˆλ‹€.)

  • λ³Έ PR 이전에 pipe κ΄€λ ¨ 클래슀 ꡬ쑰 변경이 μžˆμ—ˆκ³ , 이에 λ§žμΆ”μ–΄ SingleKeyPipedOperationImpl 클래슀λ₯Ό μΆ”κ°€ν•©λ‹ˆλ‹€.

    • PipedOperationImpl 자체λ₯Ό μˆ˜μ •ν•  경우 bulk insertκΉŒμ§€ 영ν–₯이 κ°€κΈ° λ•Œλ¬Έμ—, single key μ „μš© operation 클래슀λ₯Ό μΆ”κ°€ν•œ ν›„ handleLine λ©”μ„œλ“œλ₯Ό μˆ˜μ •ν–ˆμŠ΅λ‹ˆλ‹€.
    • 이 클래슀λ₯Ό CollectionPipedInsertOperationImpl κ°€ 상속받도둝 ν•©λ‹ˆλ‹€. (μΆ”ν›„ CollectionPipedUpdateOperationImpl λ“± ν΄λž˜μŠ€λ“€λ„ 상속받을 μ˜ˆμ •μž…λ‹ˆλ‹€.)
    • PipedOperationImpl ν΄λž˜μŠ€λŠ” νŒŒμ΄ν”„λ₯Ό μ‚¬μš©ν•˜λŠ” λͺ¨λ“  Operation에 λŒ€ν•œ λ‘œμ§μ„ κ΄€λ¦¬ν•˜κ³ , single keyλ‚˜ multi key에 λŒ€ν•œ 처리λ₯Ό 이 ν΄λž˜μŠ€μ™€ 같은 별도 ν΄λž˜μŠ€μ— 두어 λΆ„λ¦¬λœ ν˜•νƒœλ‘œ 관리할 수 μžˆμŠ΅λ‹ˆλ‹€.

@oliviarla oliviarla requested review from jhpark816 and uhm0311 August 19, 2024 06:37
oliviarla referenced this pull request in oliviarla/arcus-java-client Aug 19, 2024
@jhpark816 jhpark816 removed their request for review August 19, 2024 06:42
@oliviarla oliviarla requested review from jhpark816 and removed request for jhpark816 August 19, 2024 06:43
@oliviarla oliviarla self-assigned this Aug 19, 2024
@oliviarla oliviarla force-pushed the pipe2 branch 3 times, most recently from 9eb5496 to 076d02b Compare August 23, 2024 11:35
uhm0311
uhm0311 previously approved these changes Aug 27, 2024
@oliviarla oliviarla requested a review from jhpark816 August 28, 2024 02:43
Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

일뢀 리뷰

Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 μ™„λ£Œ

@oliviarla oliviarla force-pushed the pipe2 branch 4 times, most recently from 600f2b2 to d232a01 Compare September 6, 2024 09:15
@oliviarla
Copy link
Collaborator Author

@jhpark816 리뷰 λ°˜μ˜ν–ˆμŠ΅λ‹ˆλ‹€.

@jhpark816
Copy link
Collaborator

@uhm0311 리뷰 λ°”λžλ‹ˆλ‹€.

@uhm0311

This comment was marked as resolved.

Copy link
Collaborator

@uhm0311 uhm0311 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ν•˜λ‚˜ 더 μ§ˆλ¬Έμž…λ‹ˆλ‹€.

Copy link
Collaborator

@uhm0311 uhm0311 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

μ„Έ 번째 질문 λ‚¨κΉλ‹ˆλ‹€.

Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

일뢀 리뷰

@oliviarla oliviarla force-pushed the pipe2 branch 3 times, most recently from dfb07d8 to 7267b1a Compare February 24, 2025 03:22
Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

일뢀 리뷰

@oliviarla

This comment was marked as outdated.

@uhm0311

This comment was marked as outdated.

@oliviarla

This comment was marked as outdated.

@uhm0311

This comment was marked as outdated.

@oliviarla

This comment was marked as outdated.

@oliviarla
Copy link
Collaborator Author

oliviarla commented Feb 27, 2025

@uhm0311 @jhpark816
이전 μ½”λ©˜νŠΈκ°€ λ„ˆλ¬΄ λ³΅μž‘ν•œ λ°©ν–₯으둜 μ§„ν–‰λ˜μ–΄ @jhpark816 λ‹˜μ˜ ν”Όλ“œλ°±μ„ λ°›κ³  μ•„λž˜μ™€ 같이 ꡬ상을 λ³€κ²½ν•˜μ˜€μŠ΅λ‹ˆλ‹€.

future.cancel()

  • op.cancel이 μ‹€νŒ¨ν•˜λŠ” κ²½μš°λŠ” 이미 opκ°€ complete 처리되고 μžˆλŠ” 경우 뿐이닀. λ”°λΌμ„œ complete 처리된 opλŠ” μŠ€ν‚΅ν•˜κ³  λ‹€μŒ opλ₯Ό cancel μ‹œλ„ν•œλ‹€.
  • ν˜„μž¬ μˆ˜ν–‰μ€‘μΈ opλΆ€ν„° μˆœνšŒν•˜λ©° cancel이 μ„±κ³΅ν•˜μ˜€λ‹€λ©΄ μ¦‰μ‹œ 성곡 응닡을 λ°˜ν™˜ν•˜λ„λ‘ ν•œλ‹€.
  • cancel된 opκ°€ ν•˜λ‚˜λΌλ„ μžˆλ‹€λ©΄ futureλŠ” cancel된 μƒνƒœμ΄λ‹€.
public boolean cancel(boolean ign) {
  for (int i = currentOpIdx; i < ops.size(); i++) {
    if (ops.get(i).cancel("by application.")) {
      return true;
    }
  }
  return false;
}

public boolean isCancelled() {
  for (int i=0;i<ops.size();i++) {
    if (ops.get(i).isCancelled()) {
      return true;
    }
  }
  return false;
}

callback.complete()

  • nextOpλ₯Ό cancel μ‹œν‚€λŠ” λ™μ‹œμ— addOpν•˜λŠ” 경우 cancel μƒνƒœμ΄λ©΄μ„œ input queue에 좔가될 수 μžˆμœΌλ‚˜, write queueμ—μ„œ μ œμ™Έλ˜λ―€λ‘œ λ¬Έμ œκ°€ λ°œμƒν•˜μ§€ μ•ŠλŠ”λ‹€.
public void complete() {
  if (rv.getOperationStatus().isSuccess()) {
    Operation nextOp = rv.getNextOp(); // next opκ°€ μžˆλ‹€λ©΄ op 객체λ₯Ό, μ—†λ‹€λ©΄ Null λ°˜ν™˜
    if (nextOp != null && !nextOp.isCancelled()) {
      addOp(key, nextOp);
      rv.setCurrentOpIdx(nextOpIdx);
    } else {
      latch.countDown();
    }
  } else {
    // ...
    if (nextIndex > 0) {
      rv.addEachResult(nextIndex, new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED));
    }
    latch.countDown();
  }
}

@oliviarla
Copy link
Collaborator Author

@uhm0311 @jhpark816
μ΄μ œλΆ€ν„° λ‹€μ‹œ λ¦¬λ·°ν•΄μ£Όμ‹œλ©΄ λ©λ‹ˆλ‹€.

// command remained in the next operation object.
nextCommandIdx = (opIdx + 1) * MAX_PIPED_ITEM_COUNT;
}
if (nextCommandIdx > 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextCommandIdx 값이 0인 κ²½μš°μ—λŠ” μ™œ μ²˜λ¦¬ν•˜μ§€ μ•Šμ•„λ„ λ˜λŠ”μ§€λ₯Ό μ„€λͺ…ν•˜λŠ” 주석을 μΆ”κ°€ν•΄μ£Όμ„Έμš”.

Comment on lines +3156 to +3159
if (currentCommandIdx < itemCount - 1) {
// command remained in the same operation object.
nextCommandIdx = currentCommandIdx + 1 + (opIdx * MAX_PIPED_ITEM_COUNT);
} else if (opIdx + 1 < insertList.size()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentCommandIdx 값이 -1인 κ²½μš°μ— λŒ€ν•΄μ„œλŠ” λ”°λ‘œ 처리λ₯Ό ν•˜μ§€ μ•Šμ•„λ„ λ˜λ‚˜μš”?

Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 μ™„λ£Œ

} else {
latch.countDown();
}
} else {
Copy link
Collaborator

@jhpark816 jhpark816 Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complete()λŠ” μ•„λž˜ μ •λ„μ˜ 둜직이 λ‚˜μ„ 것 κ°™μŠ΅λ‹ˆλ‹€.

  • getNextOperation()λŠ” cancel λ˜μ§€ μ•Šμ€ next opλ₯Ό λ¦¬ν„΄ν•˜λŠ” ν•¨μˆ˜λ₯Ό 두도둝 ν•©μ‹œλ‹€.
  • currentCommandIdx => currItemIdx (op λ‚΄μ˜ n개 items μ€‘μ—μ„œ ν•˜λ‚˜λ₯Ό κ°€λ¦¬ν‚€λŠ” index)
  • this.gotStatus() ν˜ΈμΆœν•˜λ©΄ μ½”λ“œκ°€ 간단해 짐.
        if (rv.getOperationStatus().isSuccess()) {
          Operation nextOp = rv.getNextOperation();
          if (nextOp != null) {
            addOp(key, nextOp);
            return;
          }
        } else {
          if ((currItemIdx + 1) < itemCount || (opIdx + 1) < insertList.size()) {
            this.gotStatus(currItemIdx+1, 
                           new CollectionOperationStatus(false, "NOT_EXECUTED", 
                               CollectionResponse.NOT_EXECUTED));
          }
        }
        latch.countDown();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOT_EXECUTED => STOPPED 둜 ν• κΉŒμš”?

}

/**
* @return true if any operation is cancelled.
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isCancelled()μ—μ„œλ„ μ•„λž˜ for 문을 μ‚¬μš©ν•˜λŠ” 것이 λ‚˜μ€ 지λ₯Ό κ²€ν†  λ°”λžλ‹ˆλ‹€.

for (int i = currentOpIdx; i < ops.size(); i++)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants