Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] fix connection close and add dorise2ecase #329

Merged
merged 2 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
# under the License.
#
---
name: Build Extensions
name: Build Connector
on:
pull_request:
push:

jobs:
build-extension:
name: "Build Extensions"
name: "Build Connector"
runs-on: ubuntu-latest
defaults:
run:
Expand Down
44 changes: 0 additions & 44 deletions .github/workflows/run-e2ecase-12.yml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
# under the License.
#
---
name: Run E2ECases 2.0
name: Run E2ECases
on:
pull_request:
push:

jobs:
build-extension:
name: "Run E2ECases 2.0"
name: "Run E2ECases"
runs-on: ubuntu-latest
defaults:
run:
Expand Down
44 changes: 0 additions & 44 deletions .github/workflows/run-itcase-12.yml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
# under the License.
#
---
name: Run ITCases 2.0
name: Run ITCases
on:
pull_request:
push:

jobs:
build-extension:
name: "Run ITCases 2.0"
name: "Run ITCases"
runs-on: ubuntu-latest
defaults:
run:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.URL;
import java.net.URLClassLoader;
Expand All @@ -51,15 +54,18 @@

public abstract class DorisTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(DorisTestBase.class);
protected static final String DORIS_DOCKER_IMAGE = System.getProperty("image");
private static final String DEFAULT_DOCKER_IMAGE = "adamlee489/doris:2.0.3";
protected static final String DORIS_DOCKER_IMAGE =
System.getProperty("image") == null
? DEFAULT_DOCKER_IMAGE
: System.getProperty("image");
private static final String DRIVER_JAR =
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
protected static final String URL = "jdbc:mysql://%s:9030";
protected static final String USERNAME = "root";
protected static final String PASSWORD = "";
protected static final GenericContainer DORIS_CONTAINER = createDorisContainer();
protected static Connection connection;
protected static final int DEFAULT_PARALLELISM = 4;

protected static String getFenodes() {
Expand All @@ -68,39 +74,33 @@ protected static String getFenodes() {

@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
LOG.info("Starting doris containers...");
Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
given().ignoreExceptions()
.await()
.atMost(300, TimeUnit.SECONDS)
.pollInterval(ONE_SECOND)
.untilAsserted(DorisTestBase::initializeJdbcConnection);
LOG.info("Containers are started.");
LOG.info("Containers doris are started.");
}

@AfterClass
public static void stopContainers() {
LOG.info("Stopping containers...");
LOG.info("Stopping doris containers...");
DORIS_CONTAINER.stop();
LOG.info("Containers are stopped.");
LOG.info("Containers doris are stopped.");
}

public static GenericContainer createDorisContainer() {
GenericContainer container =
new GenericContainer<>(DORIS_DOCKER_IMAGE)
.withNetwork(Network.newNetwork())
.withNetworkAliases("DorisContainer")
.withEnv("FE_SERVERS", "fe1:127.0.0.1:9010")
.withEnv("FE_ID", "1")
.withEnv("CURRENT_BE_IP", "127.0.0.1")
.withEnv("CURRENT_BE_PORT", "9050")
.withCommand("ulimit -n 65536")
.withCreateContainerCmdModifier(
cmd -> cmd.getHostConfig().withMemorySwap(0L))
.withPrivilegedMode(true)
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)));
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)))
.withReuse(true);

container.setPortBindings(
Lists.newArrayList(
Expand All @@ -118,10 +118,10 @@ protected static void initializeJdbcConnection() throws Exception {
new URL[] {new URL(DRIVER_JAR)}, DorisTestBase.class.getClassLoader());
LOG.info("Try to connect to Doris...");
Thread.currentThread().setContextClassLoader(urlClassLoader);
connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
try (Statement statement = connection.createStatement()) {
try (Connection connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
ResultSet resultSet;
do {
LOG.info("Wait for the Backend to start successfully...");
Expand All @@ -144,14 +144,37 @@ private static boolean isBeReady(ResultSet rs, Duration duration) throws SQLExce

protected static void printClusterStatus() throws Exception {
LOG.info("Current machine IP: {}", InetAddress.getLocalHost());
try (Statement statement = connection.createStatement()) {
echo("sh", "-c", "cat /proc/cpuinfo | grep 'cpu cores' | uniq");
echo("sh", "-c", "free -h");
try (Connection connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
ResultSet showFrontends = statement.executeQuery("show frontends");
LOG.info("Frontends status: {}", convertList(showFrontends));
ResultSet showBackends = statement.executeQuery("show backends");
LOG.info("Backends status: {}", convertList(showBackends));
}
}

static void echo(String... cmd) {
try {
Process p = Runtime.getRuntime().exec(cmd);
InputStream is = p.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
p.waitFor();
is.close();
reader.close();
p.destroy();
} catch (Exception e) {
e.printStackTrace();
}
}

private static List<Map> convertList(ResultSet rs) throws SQLException {
List<Map> list = new ArrayList<>();
ResultSetMetaData metaData = rs.getMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
Expand All @@ -45,7 +47,7 @@

/** DorisSink ITCase with csv and arrow format. */
public class DorisSinkITCase extends DorisTestBase {
static final String DATABASE = "test";
static final String DATABASE = "test_sink";
static final String TABLE_CSV = "tbl_csv";
static final String TABLE_JSON = "tbl_json";
static final String TABLE_JSON_TBL = "tbl_json_tbl";
Expand All @@ -61,9 +63,13 @@ public void testSinkCsvFormat() throws Exception {

Thread.sleep(10000);
Set<List<Object>> actual = new HashSet<>();
try (Statement sinkStatement = connection.createStatement()) {

try (Connection connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
ResultSet sinkResultSet =
sinkStatement.executeQuery(
statement.executeQuery(
String.format(
"select name,age from %s.%s order by 1", DATABASE, TABLE_CSV));
while (sinkResultSet.next()) {
Expand Down Expand Up @@ -102,9 +108,12 @@ public void testSinkJsonFormat() throws Exception {

Thread.sleep(10000);
Set<List<Object>> actual = new HashSet<>();
try (Statement sinkStatement = connection.createStatement()) {
try (Connection connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
ResultSet sinkResultSet =
sinkStatement.executeQuery(
statement.executeQuery(
String.format(
"select name,age from %s.%s order by 1", DATABASE, TABLE_JSON));
while (sinkResultSet.next()) {
Expand Down Expand Up @@ -172,9 +181,12 @@ public void testTableSinkJsonFormat() throws Exception {

Thread.sleep(10000);
Set<List<Object>> actual = new HashSet<>();
try (Statement sinkStatement = connection.createStatement()) {
try (Connection connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
ResultSet sinkResultSet =
sinkStatement.executeQuery(
statement.executeQuery(
String.format(
"select name,age from %s.%s order by 1",
DATABASE, TABLE_JSON_TBL));
Expand All @@ -191,7 +203,10 @@ public void testTableSinkJsonFormat() throws Exception {
}

private void initializeTable(String table) throws Exception {
try (Statement statement = connection.createStatement()) {
try (Connection connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
statement.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/** DorisSource ITCase. */
public class DorisSourceITCase extends DorisTestBase {
static final String DATABASE = "test";
static final String DATABASE = "test_source";
static final String TABLE_READ = "tbl_read";
static final String TABLE_READ_TBL = "tbl_read_tbl";

Expand Down Expand Up @@ -111,7 +113,10 @@ public void testTableSource() throws Exception {
}

private void initializeTable(String table) throws Exception {
try (Statement statement = connection.createStatement()) {
try (Connection connection =
DriverManager.getConnection(
String.format(URL, DORIS_CONTAINER.getHost()), USERNAME, PASSWORD);
Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE));
statement.execute(String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table));
statement.execute(
Expand Down
Loading
Loading