diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java index cb0a3d218..03a26dbfe 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java @@ -91,6 +91,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat { private String keysType; private List backends; private long pos = 0L; + private int subtaskId = 0; private transient volatile boolean closed = false; private transient ScheduledExecutorService scheduler; private transient ScheduledFuture scheduledFuture; @@ -191,16 +192,17 @@ public void configure(Configuration configuration) { @Override public void open(int taskNumber, int numTasks) throws IOException { + this.subtaskId = taskNumber; this.backends = settingBackends(); + String backend = getAvailableBackend(); dorisStreamLoad = new DorisStreamLoad( - backends.get(0).toBackendString(), + backend, options.getTableIdentifier().split("\\.")[0], options.getTableIdentifier().split("\\.")[1], options.getUsername(), options.getPassword(), executionOptions.getStreamLoadProp(), readOptions); - LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr()); if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) { this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output" + @@ -321,6 +323,10 @@ public synchronized void flush() throws IOException { } else { result = String.join(this.lineDelimiter, batch); } + + // refresh backend + dorisStreamLoad.setHostPort(getAvailableBackend()); + for (int i = 0; i <= executionOptions.getMaxRetries(); i++) { try { dorisStreamLoad.load(result); @@ -334,7 +340,7 @@ public synchronized void flush() throws IOException { } try { dorisStreamLoad.setHostPort(getAvailableBackend()); - LOG.warn("streamload error,switch be: {}", dorisStreamLoad.getLoadUrlStr(), e); + LOG.warn("stream load error,switch be: {}", dorisStreamLoad.getLoadUrlStr(), e); Thread.sleep(1000L * ( i + 1 )); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); @@ -342,8 +348,10 @@ public synchronized void flush() throws IOException { } } } + } + @Deprecated private String getBackend() throws IOException { try { //get be url from fe @@ -371,7 +379,7 @@ public String getAvailableBackend() { long tmp = pos + backends.size(); while (pos < tmp) { BackendV2.BackendRowV2 backend = - backends.get((int) (pos % backends.size())); + backends.get((int) ((pos + subtaskId) % backends.size())); pos++; return backend.toBackendString(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java index 1ef8e20bf..72cf8ffc3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java @@ -108,7 +108,7 @@ public void setHostPort(String hostPort) { public void load(String value) throws StreamLoadException { LoadResponse loadResponse = loadBatch(value); - LOG.info("Streamload Response:{}", loadResponse); + LOG.info("stream load response:{}", loadResponse); if (loadResponse.status != 200) { throw new StreamLoadException("stream load error: " + loadResponse.respContent); } else { @@ -133,6 +133,7 @@ private LoadResponse loadBatch(String value) { UUID.randomUUID().toString().replaceAll("-", "")); } + LOG.info("stream load started for {} on host {}", label, hostPort); try { HttpPut put = new HttpPut(loadUrlStr); put.setHeader(HttpHeaders.EXPECT, "100-continue");