Skip to content

Commit

Permalink
add buffer-map read-write-lock
Browse files Browse the repository at this point in the history
  • Loading branch information
tangchuanqi committed Feb 12, 2025
1 parent efe53f3 commit 614f926
Showing 1 changed file with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
Expand Down Expand Up @@ -111,6 +113,7 @@ public class DorisBatchStreamLoad implements Serializable {
private final AtomicLong currentCacheBytes = new AtomicLong(0L);
private final Lock lock = new ReentrantLock();
private final Condition block = lock.newCondition();
private final Map<String, ReadWriteLock> bufferMapLock = new ConcurrentHashMap<>();

public DorisBatchStreamLoad(
DorisOptions dorisOptions,
Expand Down Expand Up @@ -181,7 +184,7 @@ public DorisBatchStreamLoad(
public void writeRecord(String database, String table, byte[] record) {
checkFlushException();
String bufferKey = getTableIdentifier(database, table);

getLock(bufferKey).readLock().lock();
BatchRecordBuffer buffer =
bufferMap.computeIfAbsent(
bufferKey,
Expand All @@ -194,6 +197,7 @@ public void writeRecord(String database, String table, byte[] record) {

int bytes = buffer.insert(record);
currentCacheBytes.addAndGet(bytes);
getLock(bufferKey).readLock().unlock();
if (currentCacheBytes.get() > maxBlockedBytes) {
lock.lock();
try {
Expand Down Expand Up @@ -283,11 +287,19 @@ private synchronized boolean flush(String bufferKey, boolean waitUtilDone) {
}

private synchronized void flushBuffer(String bufferKey) {
BatchRecordBuffer buffer = bufferMap.get(bufferKey);
BatchRecordBuffer buffer;
try {
getLock(bufferKey).writeLock().lock();
buffer = bufferMap.remove(bufferKey);
} finally {
getLock(bufferKey).writeLock().unlock();
}
if (buffer == null) {
return;
}
buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable()));
LOG.debug("flush buffer for key {} with label {}", bufferKey, buffer.getLabelName());
putRecordToFlushQueue(buffer);
bufferMap.remove(bufferKey);
}

private void putRecordToFlushQueue(BatchRecordBuffer buffer) {
Expand Down Expand Up @@ -374,6 +386,10 @@ private boolean merge(BatchRecordBuffer mergeBuffer, BatchRecordBuffer buffer) {
return true;
}

private ReadWriteLock getLock(String bufferKey) {
return bufferMapLock.computeIfAbsent(bufferKey, k -> new ReentrantReadWriteLock());
}

class LoadAsyncExecutor implements Runnable {

private int flushQueueSize;
Expand Down

0 comments on commit 614f926

Please sign in to comment.