Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Feb 18, 2025
1 parent 118bea9 commit 7fd6c31
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public DorisBatchStreamLoad(
public void writeRecord(String database, String table, byte[] record) {
checkFlushException();
String bufferKey = getTableIdentifier(database, table);

getLock(bufferKey).readLock().lock();
BatchRecordBuffer buffer =
bufferMap.computeIfAbsent(
Expand All @@ -198,6 +199,7 @@ public void writeRecord(String database, String table, byte[] record) {
int bytes = buffer.insert(record);
currentCacheBytes.addAndGet(bytes);
getLock(bufferKey).readLock().unlock();

if (currentCacheBytes.get() > maxBlockedBytes) {
lock.lock();
try {
Expand Down Expand Up @@ -258,8 +260,9 @@ private synchronized boolean doFlush(
}

private synchronized boolean flush(String bufferKey, boolean waitUtilDone) {
if (bufferMap.isEmpty()) {
if (!waitUtilDone && bufferMap.isEmpty()) {
// bufferMap may have been flushed by other threads
LOG.info("bufferMap is empty, no need to flush {}", bufferKey);
return false;
}
if (null == bufferKey) {
Expand Down Expand Up @@ -295,6 +298,7 @@ private synchronized void flushBuffer(String bufferKey) {
getLock(bufferKey).writeLock().unlock();
}
if (buffer == null) {
LOG.info("buffer key is not exist {}, skipped", bufferKey);
return;
}
buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable()));
Expand All @@ -312,6 +316,9 @@ private void putRecordToFlushQueue(BatchRecordBuffer buffer) {
} catch (InterruptedException e) {
throw new RuntimeException("Failed to put record buffer to flush queue");
}
// When the load thread reports an error, the flushQueue will be cleared,
// and need to force a check for the exception.
checkFlushException();
}

private void checkFlushException() {
Expand All @@ -321,7 +328,9 @@ private void checkFlushException() {
}

private void waitAsyncLoadFinish() {
for (int i = 0; i < executionOptions.getFlushQueueSize() + 1; i++) {
// Because the queue will have a drainTo operation, it needs to be multiplied by 2
for (int i = 0; i < executionOptions.getFlushQueueSize() * 2 + 1; i++) {
// eof buffer
BatchRecordBuffer empty = new BatchRecordBuffer();
putRecordToFlushQueue(empty);
}
Expand All @@ -335,8 +344,6 @@ public void close() {
// close async executor
this.loadExecutorService.shutdown();
this.started.set(false);
// clear buffer
this.flushQueue.clear();
}

@VisibleForTesting
Expand Down Expand Up @@ -407,10 +414,14 @@ public void run() {
recordList.clear();
try {
BatchRecordBuffer buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS);
if (buffer == null || buffer.getLabelName() == null) {
// label is empty and does not need to load. It is the flag of waitUtilDone
if (buffer == null) {
continue;
}
if (buffer.getLabelName() == null) {
// When the label is empty, it is the eof buffer for checkpoint flush.
continue;
}

recordList.add(buffer);
boolean merge = false;
if (!flushQueue.isEmpty()) {
Expand All @@ -424,6 +435,7 @@ public void run() {
if (!merge) {
for (BatchRecordBuffer bf : recordList) {
if (bf == null || bf.getLabelName() == null) {
// When the label is empty, it's eof buffer for checkpointFlush.
continue;
}
load(bf.getLabelName(), bf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.flink.sink.batch;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
Expand Down Expand Up @@ -67,6 +68,12 @@ public DorisBatchWriter(
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions) {

long restoreCheckpointId =
initContext
.getRestoredCheckpointId()
.orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
LOG.info("restore from checkpointId {}", restoreCheckpointId);
if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
Preconditions.checkState(
Expand All @@ -75,6 +82,7 @@ public DorisBatchWriter(
this.database = tableInfo[0];
this.table = tableInfo[1];
}

LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
this.subtaskId = initContext.getSubtaskId();
this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId();
Expand Down Expand Up @@ -130,12 +138,13 @@ public void flush(boolean flush) throws IOException, InterruptedException {

@Override
public Collection<DorisCommittable> prepareCommit() throws IOException, InterruptedException {
// nothing to commit
checkFlushException();
return Collections.emptyList();
}

@Override
public List<DorisWriterState> snapshotState(long checkpointId) throws IOException {
checkFlushException();
return new ArrayList<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -124,17 +123,10 @@ public void testLoadFail() throws Exception {
when(httpClientBuilder.build()).thenReturn(httpClient);
when(httpClient.execute(any())).thenReturn(response);
loader.writeRecord("db", "tbl", "1,data".getBytes());
loader.checkpointFlush();

TestUtil.waitUntilCondition(
() -> !loader.isLoadThreadAlive(),
Deadline.fromNow(Duration.ofSeconds(20)),
100L,
"testLoadFail wait loader exit failed." + loader.isLoadThreadAlive());
AtomicReference<Throwable> exception = loader.getException();
Assert.assertTrue(exception.get() instanceof Exception);
Assert.assertTrue(exception.get().getMessage().contains("stream load error"));
LOG.info("testLoadFail end");
thrown.expect(Exception.class);
thrown.expectMessage("stream load error");
loader.checkpointFlush();
}

@Test
Expand Down Expand Up @@ -175,17 +167,10 @@ public void testLoadError() throws Exception {
when(httpClientBuilder.build()).thenReturn(httpClient);
when(httpClient.execute(any())).thenReturn(response);
loader.writeRecord("db", "tbl", "1,data".getBytes());
loader.checkpointFlush();

TestUtil.waitUntilCondition(
() -> !loader.isLoadThreadAlive(),
Deadline.fromNow(Duration.ofSeconds(20)),
100L,
"testLoadError wait loader exit failed." + loader.isLoadThreadAlive());
AtomicReference<Throwable> exception = loader.getException();
Assert.assertTrue(exception.get() instanceof Exception);
Assert.assertTrue(exception.get().getMessage().contains("stream load error"));
LOG.info("testLoadError end");
thrown.expect(Exception.class);
thrown.expectMessage("stream load error");
loader.checkpointFlush();
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public void testInit() {
.build();
thrown.expect(IllegalStateException.class);
thrown.expectMessage("tableIdentifier input error");
DorisBatchWriter batchWriter = new DorisBatchWriter(null, null, options, null, null);
Sink.InitContext initContext = mock(Sink.InitContext.class);
DorisBatchWriter batchWriter = new DorisBatchWriter(initContext, null, options, null, null);
}

@Test
Expand Down

0 comments on commit 7fd6c31

Please sign in to comment.