Skip to content

Commit

Permalink
[fix]Fix github workflow Run ITCases 2.0 failure
Browse files Browse the repository at this point in the history
  • Loading branch information
DongLiang-0 committed Feb 5, 2024
1 parent 99f2da9 commit b07ad56
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 61 deletions.
15 changes: 11 additions & 4 deletions flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ under the License.
<slf4j.version>1.7.25</slf4j.version>
<mockito.version>4.2.0</mockito.version>
<testcontainers.version>1.17.6</testcontainers.version>
<junit-jupiter.version>5.10.1</junit-jupiter.version>
<junit.version>4.11</junit.version>
<junit5.version>5.10.1</junit5.version>
<junit4.version>4.11</junit4.version>
<hamcrest.version>1.3</hamcrest.version>
</properties>

Expand Down Expand Up @@ -314,13 +314,20 @@ under the License.
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<version>${junit4.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit-jupiter.version}</version>
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.doris.flink;

import com.google.common.collect.Lists;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
Expand Down Expand Up @@ -66,23 +66,23 @@ protected static String getFenodes() {
return DORIS_CONTAINER.getHost() + ":8030";
}

@BeforeClass
public static void startContainers() {
@BeforeEach
public void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
given().ignoreExceptions()
.await()
.atMost(300, TimeUnit.SECONDS)
.pollInterval(ONE_SECOND)
.untilAsserted(DorisTestBase::initializeJdbcConnection);
LOG.info("Containers are started.");
LOG.info("Containers doris are started.");
}

@AfterClass
public static void stopContainers() {
LOG.info("Stopping containers...");
@AfterEach
public void stopContainers() {
LOG.info("Stopping doris containers...");
DORIS_CONTAINER.stop();
LOG.info("Containers are stopped.");
LOG.info("Containers doris are stopped.");
}

public static GenericContainer createDorisContainer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

import java.sql.ResultSet;
import java.sql.Statement;
Expand All @@ -44,12 +48,23 @@
import java.util.stream.Stream;

/** DorisSink ITCase with csv and arrow format. */
@Execution(ExecutionMode.SAME_THREAD)
public class DorisSinkITCase extends DorisTestBase {
static final String DATABASE = "test";
static final String TABLE_CSV = "tbl_csv";
static final String TABLE_JSON = "tbl_json";
static final String TABLE_JSON_TBL = "tbl_json_tbl";

@BeforeEach
public void startContainers() {
super.startContainers();
}

@AfterEach
public void stopContainers() {
super.stopContainers();
}

@Test
public void testSinkCsvFormat() throws Exception {
initializeTable(TABLE_CSV);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,35 @@
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/** DorisSource ITCase. */
@Execution(ExecutionMode.SAME_THREAD)
public class DorisSourceITCase extends DorisTestBase {
static final String DATABASE = "test";
static final String TABLE_READ = "tbl_read";
static final String TABLE_READ_TBL = "tbl_read_tbl";

@BeforeEach
public void startContainers() {
super.startContainers();
}

@AfterEach
public void stopContainers() {
super.stopContainers();
}

@Test
public void testSource() throws Exception {
initializeTable(TABLE_READ);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@

import org.apache.doris.flink.DorisTestBase;
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MySQLContainer;
Expand Down Expand Up @@ -61,6 +65,7 @@
* MySQLDorisE2ECase 1. Automatically create tables 2. Schema change event synchronization
* 3.Synchronization of addition, deletion and modification events 4. CDC multi-table writing.
*/
@Execution(ExecutionMode.SAME_THREAD)
public class MySQLDorisE2ECase extends DorisTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(MySQLDorisE2ECase.class);
private static final String DATABASE = "test";
Expand All @@ -71,44 +76,57 @@ public class MySQLDorisE2ECase extends DorisTestBase {
private static final String TABLE_3 = "tbl3";
private static final String TABLE_4 = "tbl4";

private static final String MYSQL_IMAGE_NAME = "mysql:8.0";
private static final MySQLContainer MYSQL_CONTAINER =
new MySQLContainer("mysql:8.0")
new MySQLContainer(MYSQL_IMAGE_NAME)
.withDatabaseName(DATABASE)
.withUsername(MYSQL_USER)
.withPassword(MYSQL_PASSWD);

@BeforeClass
@BeforeEach
public void startContainers() {
super.startContainers();
}

@AfterEach
public void stopContainers() {
super.stopContainers();
}

@BeforeAll
public static void startMySQLContainers() {
MYSQL_CONTAINER.setCommand("--default-time-zone=Asia/Shanghai");
LOG.info("Starting MySQL containers...");
Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
LOG.info("MySQL Containers are started.");
}

@AfterClass
@AfterAll
public static void stopMySQLContainers() {
LOG.info("Stopping MySQL containers...");
MYSQL_CONTAINER.stop();
LOG.info("MySQL Containers are stopped.");

// Cleanup the mysql image, because it's too large and will cause the next test to fail.
LOG.info("Cleaning MySQL containers...");
MYSQL_CONTAINER.getDockerClient().removeImageCmd(MYSQL_IMAGE_NAME).exec();
MYSQL_CONTAINER
.getDockerClient()
.listImagesCmd()
.withImageNameFilter(MYSQL_IMAGE_NAME)
.exec()
.forEach(
image ->
MYSQL_CONTAINER
.getDockerClient()
.removeImageCmd(image.getId())
.exec());
LOG.info("MySQL containers cleaned");
}

@Test
public void testMySQL2Doris() throws Exception {
printClusterStatus();
initializeMySQLTable();
JobClient jobClient = submitJob();
// wait 2 times checkpoint
Thread.sleep(20000);
Set<List<Object>> expected =
Stream.<List<Object>>of(
Arrays.asList("doris_1", 1),
Arrays.asList("doris_2", 2),
Arrays.asList("doris_3", 3))
.collect(Collectors.toSet());
String sql =
"select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
checkResult(expected, query1, 2);
JobClient jobClient = initHistoricalData();

// add incremental data
try (Connection connection =
Expand Down Expand Up @@ -138,7 +156,7 @@ public void testMySQL2Doris() throws Exception {
Arrays.asList("doris_3", 3),
Arrays.asList("doris_3_1", 12))
.collect(Collectors.toSet());
sql =
String sql =
"select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
String query2 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
checkResult(expected2, query2, 2);
Expand Down Expand Up @@ -174,22 +192,7 @@ public void testMySQL2Doris() throws Exception {

@Test
public void testAutoAddTable() throws Exception {
printClusterStatus();
initializeMySQLTable();
initializeDorisTable();
JobClient jobClient = submitJob();
// wait 2 times checkpoint
Thread.sleep(20000);
Set<List<Object>> expected =
Stream.<List<Object>>of(
Arrays.asList("doris_1", 1),
Arrays.asList("doris_2", 2),
Arrays.asList("doris_3", 3))
.collect(Collectors.toSet());
String sql =
"select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
checkResult(expected, query1, 2);
JobClient jobClient = initHistoricalData();

// auto create table4
addTableTable_4();
Expand All @@ -198,7 +201,7 @@ public void testAutoAddTable() throws Exception {
Stream.<List<Object>>of(
Arrays.asList("doris_4_1", 4), Arrays.asList("doris_4_2", 4))
.collect(Collectors.toSet());
sql = "select * from %s.%s order by 1";
String sql = "select * from %s.%s order by 1";
String query2 = String.format(sql, DATABASE, TABLE_4);
checkResult(expected2, query2, 2);

Expand Down Expand Up @@ -275,13 +278,24 @@ public void testAutoAddTable() throws Exception {
jobClient.cancel().get();
}

private void initializeDorisTable() throws Exception {
try (Statement statement = connection.createStatement()) {
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_1));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_2));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_3));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, TABLE_4));
}
private JobClient initHistoricalData() throws Exception {
printClusterStatus();
initializeMySQLTable();
JobClient jobClient = submitJob();

// wait 2 times checkpoint
Thread.sleep(20000);
Set<List<Object>> expected =
Stream.<List<Object>>of(
Arrays.asList("doris_1", 1),
Arrays.asList("doris_2", 2),
Arrays.asList("doris_3", 3))
.collect(Collectors.toSet());
String sql =
"select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s ) res order by 1";
String query1 = String.format(sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3);
checkResult(expected, query1, 2);
return jobClient;
}

public void checkResult(Set<List<Object>> expected, String query, int columnSize)
Expand Down

0 comments on commit b07ad56

Please sign in to comment.