From b07ad5662b6caddfa071f62daaaf533d70a3d04a Mon Sep 17 00:00:00 2001
From: DongLiang-0 <1747644936@qq.com>
Date: Mon, 5 Feb 2024 14:58:50 +0800
Subject: [PATCH] [fix]Fix github workflow Run ITCases 2.0 failure
---
flink-doris-connector/pom.xml | 15 ++-
.../org/apache/doris/flink/DorisTestBase.java | 18 +--
.../doris/flink/sink/DorisSinkITCase.java | 17 ++-
.../doris/flink/source/DorisSourceITCase.java | 17 ++-
.../flink/tools/cdc/MySQLDorisE2ECase.java | 106 ++++++++++--------
5 files changed, 112 insertions(+), 61 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index a8014d0bd..83cef38ba 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -90,8 +90,8 @@ under the License.
1.7.25
4.2.0
1.17.6
- 5.10.1
- 4.11
+ 5.10.1
+ 4.11
1.3
@@ -314,13 +314,20 @@ under the License.
junit
junit
- ${junit.version}
+ ${junit4.version}
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ ${junit5.version}
test
org.junit.jupiter
junit-jupiter-api
- ${junit-jupiter.version}
+ ${junit5.version}
test
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
index 278be8ca8..55a25bbc3 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisTestBase.java
@@ -18,8 +18,8 @@
package org.apache.doris.flink;
import com.google.common.collect.Lists;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
@@ -66,8 +66,8 @@ protected static String getFenodes() {
return DORIS_CONTAINER.getHost() + ":8030";
}
- @BeforeClass
- public static void startContainers() {
+ @BeforeEach
+ public void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
given().ignoreExceptions()
@@ -75,14 +75,14 @@ public static void startContainers() {
.atMost(300, TimeUnit.SECONDS)
.pollInterval(ONE_SECOND)
.untilAsserted(DorisTestBase::initializeJdbcConnection);
- LOG.info("Containers are started.");
+ LOG.info("Containers doris are started.");
}
- @AfterClass
- public static void stopContainers() {
- LOG.info("Stopping containers...");
+ @AfterEach
+ public void stopContainers() {
+ LOG.info("Stopping doris containers...");
DORIS_CONTAINER.stop();
- LOG.info("Containers are stopped.");
+ LOG.info("Containers doris are stopped.");
}
public static GenericContainer createDorisContainer() {
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index 26cbc2c4a..9bdc88ab1 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -27,8 +27,12 @@
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
import java.sql.ResultSet;
import java.sql.Statement;
@@ -44,12 +48,23 @@
import java.util.stream.Stream;
/** DorisSink ITCase with csv and arrow format. */
+@Execution(ExecutionMode.SAME_THREAD)
public class DorisSinkITCase extends DorisTestBase {
static final String DATABASE = "test";
static final String TABLE_CSV = "tbl_csv";
static final String TABLE_JSON = "tbl_json";
static final String TABLE_JSON_TBL = "tbl_json_tbl";
+ @BeforeEach
+ public void startContainers() {
+ super.startContainers();
+ }
+
+ @AfterEach
+ public void stopContainers() {
+ super.stopContainers();
+ }
+
@Test
public void testSinkCsvFormat() throws Exception {
initializeTable(TABLE_CSV);
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index a5a3b534a..ee1515639 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -29,8 +29,12 @@
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
import java.sql.Statement;
import java.util.ArrayList;
@@ -38,11 +42,22 @@
import java.util.List;
/** DorisSource ITCase. */
+@Execution(ExecutionMode.SAME_THREAD)
public class DorisSourceITCase extends DorisTestBase {
static final String DATABASE = "test";
static final String TABLE_READ = "tbl_read";
static final String TABLE_READ_TBL = "tbl_read_tbl";
+ @BeforeEach
+ public void startContainers() {
+ super.startContainers();
+ }
+
+ @AfterEach
+ public void stopContainers() {
+ super.stopContainers();
+ }
+
@Test
public void testSource() throws Exception {
initializeTable(TABLE_READ);
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
index 3390f7553..19d367076 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java
@@ -27,10 +27,14 @@
import org.apache.doris.flink.DorisTestBase;
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MySQLContainer;
@@ -61,6 +65,7 @@
* MySQLDorisE2ECase 1. Automatically create tables 2. Schema change event synchronization
* 3.Synchronization of addition, deletion and modification events 4. CDC multi-table writing.
*/
+@Execution(ExecutionMode.SAME_THREAD)
public class MySQLDorisE2ECase extends DorisTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(MySQLDorisE2ECase.class);
private static final String DATABASE = "test";
@@ -71,13 +76,24 @@ public class MySQLDorisE2ECase extends DorisTestBase {
private static final String TABLE_3 = "tbl3";
private static final String TABLE_4 = "tbl4";
+ private static final String MYSQL_IMAGE_NAME = "mysql:8.0";
private static final MySQLContainer MYSQL_CONTAINER =
- new MySQLContainer("mysql:8.0")
+ new MySQLContainer(MYSQL_IMAGE_NAME)
.withDatabaseName(DATABASE)
.withUsername(MYSQL_USER)
.withPassword(MYSQL_PASSWD);
- @BeforeClass
+ @BeforeEach
+ public void startContainers() {
+ super.startContainers();
+ }
+
+ @AfterEach
+ public void stopContainers() {
+ super.stopContainers();
+ }
+
+ @BeforeAll
public static void startMySQLContainers() {
MYSQL_CONTAINER.setCommand("--default-time-zone=Asia/Shanghai");
LOG.info("Starting MySQL containers...");
@@ -85,30 +101,32 @@ public static void startMySQLContainers() {
LOG.info("MySQL Containers are started.");
}
- @AfterClass
+ @AfterAll
public static void stopMySQLContainers() {
LOG.info("Stopping MySQL containers...");
MYSQL_CONTAINER.stop();
LOG.info("MySQL Containers are stopped.");
+
+ // Cleanup the mysql image, because it's too large and will cause the next test to fail.
+ LOG.info("Cleaning MySQL containers...");
+ MYSQL_CONTAINER.getDockerClient().removeImageCmd(MYSQL_IMAGE_NAME).exec();
+ MYSQL_CONTAINER
+ .getDockerClient()
+ .listImagesCmd()
+ .withImageNameFilter(MYSQL_IMAGE_NAME)
+ .exec()
+ .forEach(
+ image ->
+ MYSQL_CONTAINER
+ .getDockerClient()
+ .removeImageCmd(image.getId())
+ .exec());
+ LOG.info("MySQL containers cleaned");
}
@Test
public void testMySQL2Doris() throws Exception {
- printClusterStatus();
- initializeMySQLTable();
- JobClient jobClient = submitJob();
- // wait 2 times checkpoint
- Thread.sleep(20000);
- Set> expected =
- Stream.>of(
- Arrays.asList("doris_1", 1),
- Arrays.asList("doris_2", 2),
- Arrays.asList("doris_3", 3))
- .collect(Collectors.toSet());
- String sql =
- "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
- String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
- checkResult(expected, query1, 2);
+ JobClient jobClient = initHistoricalData();
// add incremental data
try (Connection connection =
@@ -138,7 +156,7 @@ public void testMySQL2Doris() throws Exception {
Arrays.asList("doris_3", 3),
Arrays.asList("doris_3_1", 12))
.collect(Collectors.toSet());
- sql =
+ String sql =
"select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
String query2 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
checkResult(expected2, query2, 2);
@@ -174,22 +192,7 @@ public void testMySQL2Doris() throws Exception {
@Test
public void testAutoAddTable() throws Exception {
- printClusterStatus();
- initializeMySQLTable();
- initializeDorisTable();
- JobClient jobClient = submitJob();
- // wait 2 times checkpoint
- Thread.sleep(20000);
- Set> expected =
- Stream.>of(
- Arrays.asList("doris_1", 1),
- Arrays.asList("doris_2", 2),
- Arrays.asList("doris_3", 3))
- .collect(Collectors.toSet());
- String sql =
- "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
- String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
- checkResult(expected, query1, 2);
+ JobClient jobClient = initHistoricalData();
// auto create table4
addTableTable_4();
@@ -198,7 +201,7 @@ public void testAutoAddTable() throws Exception {
Stream.>of(
Arrays.asList("doris_4_1", 4), Arrays.asList("doris_4_2", 4))
.collect(Collectors.toSet());
- sql = "select * from %s.%s order by 1";
+ String sql = "select * from %s.%s order by 1";
String query2 = String.format(sql, DATABASE, TABLE_4);
checkResult(expected2, query2, 2);
@@ -275,13 +278,24 @@ public void testAutoAddTable() throws Exception {
jobClient.cancel().get();
}
- private void initializeDorisTable() throws Exception {
- try (Statement statement = connection.createStatement()) {
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_1));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3));
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_4));
- }
+ private JobClient initHistoricalData() throws Exception {
+ printClusterStatus();
+ initializeMySQLTable();
+ JobClient jobClient = submitJob();
+
+ // wait 2 times checkpoint
+ Thread.sleep(20000);
+ Set> expected =
+ Stream.>of(
+ Arrays.asList("doris_1", 1),
+ Arrays.asList("doris_2", 2),
+ Arrays.asList("doris_3", 3))
+ .collect(Collectors.toSet());
+ String sql =
+ "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
+ String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
+ checkResult(expected, query1, 2);
+ return jobClient;
}
public void checkResult(Set> expected, String query, int columnSize)