From 614f926b5a136563e9ac573d9aae1c03420209b5 Mon Sep 17 00:00:00 2001 From: tangchuanqi Date: Wed, 12 Feb 2025 19:22:11 +0800 Subject: [PATCH] add buffer-map read-write-lock --- .../sink/batch/DorisBatchStreamLoad.java | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 479fab642..267a121cb 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -63,7 +63,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT; import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; @@ -111,6 +113,7 @@ public class DorisBatchStreamLoad implements Serializable { private final AtomicLong currentCacheBytes = new AtomicLong(0L); private final Lock lock = new ReentrantLock(); private final Condition block = lock.newCondition(); + private final Map bufferMapLock = new ConcurrentHashMap<>(); public DorisBatchStreamLoad( DorisOptions dorisOptions, @@ -181,7 +184,7 @@ public DorisBatchStreamLoad( public void writeRecord(String database, String table, byte[] record) { checkFlushException(); String bufferKey = getTableIdentifier(database, table); - + getLock(bufferKey).readLock().lock(); BatchRecordBuffer buffer = bufferMap.computeIfAbsent( bufferKey, @@ -194,6 +197,7 @@ public void writeRecord(String database, String table, byte[] record) { int bytes = buffer.insert(record); currentCacheBytes.addAndGet(bytes); + getLock(bufferKey).readLock().unlock(); if (currentCacheBytes.get() > maxBlockedBytes) { lock.lock(); try { @@ -283,11 +287,19 @@ private synchronized boolean flush(String bufferKey, boolean waitUtilDone) { } private synchronized void flushBuffer(String bufferKey) { - BatchRecordBuffer buffer = bufferMap.get(bufferKey); + BatchRecordBuffer buffer; + try { + getLock(bufferKey).writeLock().lock(); + buffer = bufferMap.remove(bufferKey); + } finally { + getLock(bufferKey).writeLock().unlock(); + } + if (buffer == null) { + return; + } buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable())); LOG.debug("flush buffer for key {} with label {}", bufferKey, buffer.getLabelName()); putRecordToFlushQueue(buffer); - bufferMap.remove(bufferKey); } private void putRecordToFlushQueue(BatchRecordBuffer buffer) { @@ -374,6 +386,10 @@ private boolean merge(BatchRecordBuffer mergeBuffer, BatchRecordBuffer buffer) { return true; } + private ReadWriteLock getLock(String bufferKey) { + return bufferMapLock.computeIfAbsent(bufferKey, k -> new ReentrantReadWriteLock()); + } + class LoadAsyncExecutor implements Runnable { private int flushQueueSize;