diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java index 29a07c126..3aafb3123 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java @@ -29,7 +29,6 @@ import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; -import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.engine.client.SeaTunnelClient; import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; import org.apache.seatunnel.engine.client.job.ClientJobProxy; @@ -38,6 +37,7 @@ import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder; import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.server.common.SeatunnelErrorEnum; import org.springframework.stereotype.Service; @@ -51,6 +51,7 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.util.Date; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -72,9 +73,18 @@ public Result jobExecute(Integer userId, Long jobDefineId) { String configFile = writeJobConfigIntoConfFile(jobConfig, jobDefineId); - Long jobInstanceId = - executeJobBySeaTunnel(userId, configFile, executeResource.getJobInstanceId()); - return Result.success(jobInstanceId); + try { + executeJobBySeaTunnel(userId, configFile, executeResource.getJobInstanceId()); + return Result.success(executeResource.getJobInstanceId()); + } catch (RuntimeException e) { + Result failure = + Result.failure(SeatunnelErrorEnum.JUB_EXEC_SUBMISSION_ERROR, e.getMessage()); + // Even though job execution submission failed, we still need to return the + // jobInstanceId to the user + // as the job instance has been created in the database. + failure.setData(executeResource.getJobInstanceId()); + return failure; + } } public String writeJobConfigIntoConfFile(String jobConfig, Long jobDefineId) { @@ -100,35 +110,38 @@ public String writeJobConfigIntoConfFile(String jobConfig, Long jobDefineId) { return filePath; } - public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) { + private void executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) { Common.setDeployMode(DeployMode.CLIENT); JobConfig jobConfig = new JobConfig(); jobConfig.setName(jobInstanceId + "_job"); - SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); + SeaTunnelClient seaTunnelClient; + ClientJobProxy clientJobProxy; try { + seaTunnelClient = createSeaTunnelClient(); SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build(); ClientJobExecutionEnvironment jobExecutionEnv = seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig); - final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + clientJobProxy = jobExecutionEnv.execute(); + } catch (Throwable e) { + log.error("Job execution submission failed.", e); JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); - jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId())); + jobInstance.setJobStatus(JobStatus.FAILED.name()); + jobInstance.setEndTime(new Date()); jobInstanceDao.update(jobInstance); - - CompletableFuture.runAsync( - () -> { - waitJobFinish( - clientJobProxy, - userId, - jobInstanceId, - Long.toString(clientJobProxy.getJobId()), - seaTunnelClient); - }); - - } catch (ExecutionException | InterruptedException e) { - ExceptionUtils.getMessage(e); - throw new RuntimeException(e); + throw new RuntimeException(e.getMessage(), e); } - return jobInstanceId; + JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); + jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId())); + jobInstanceDao.update(jobInstance); + CompletableFuture.runAsync( + () -> { + waitJobFinish( + clientJobProxy, + userId, + jobInstanceId, + Long.toString(clientJobProxy.getJobId()), + seaTunnelClient); + }); } public void waitJobFinish( diff --git a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java index b81cec095..5f4b218e6 100644 --- a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java +++ b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java @@ -82,6 +82,7 @@ public enum SeatunnelErrorEnum { "load job state from engine error", "load job statue from engine [%s] error, error msg is [%s]"), UNSUPPORTED_ENGINE(40003, "unsupported engine", "unsupported engine [%s] version [%s]"), + JUB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"), JOB_RUN_GENERATE_UUID_ERROR(50001, "generate uuid error", "generate uuid error"), /* datasource and virtual table */ diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java index effca68d0..4fe17e544 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceControllerWrapper.java @@ -46,6 +46,13 @@ public String createConsoleDatasource(String datasourceName) { return result.getData(); } + public String createMysqlDatasource(String datasourceName) { + DatasourceReq req = getMysqlDatasource(datasourceName); + Result result = createDatasource(req); + assertTrue(result.isSuccess()); + return result.getData(); + } + public DatasourceReq getFakeSourceDatasourceReq(String datasourceName) { DatasourceReq req = new DatasourceReq(); req.setDatasourceName(datasourceName); @@ -104,4 +111,14 @@ public Result> getDatasourceList( return JSONTestUtils.parseObject( response, new TypeReference>>() {}); } + + public DatasourceReq getMysqlDatasource(String datasourceName) { + DatasourceReq req = new DatasourceReq(); + req.setDatasourceName(datasourceName); + req.setPluginName("JDBC-Mysql"); + req.setDescription(datasourceName + " description"); + req.setDatasourceConfig( + "{\"url\":\"jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true\",\"driver\":\"com.mysql.cj.jdbc.Driver\",\"user\":\"someUser\",\"password\":\"somePassword\"}"); + return req; + } } diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java index 4a01df280..1e764dfc1 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java @@ -18,7 +18,11 @@ import org.apache.seatunnel.app.common.Result; import org.apache.seatunnel.app.common.SeaTunnelWebCluster; +import org.apache.seatunnel.app.controller.JobControllerWrapper; import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper; +import org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper; +import org.apache.seatunnel.app.domain.request.job.JobCreateReq; +import org.apache.seatunnel.app.domain.request.job.PluginConfig; import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; import org.apache.seatunnel.app.utils.JobUtils; @@ -29,17 +33,22 @@ import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; public class JobExecutorControllerTest { private static final SeaTunnelWebCluster seaTunnelWebCluster = new SeaTunnelWebCluster(); private static JobExecutorControllerWrapper jobExecutorControllerWrapper; + private static SeatunnelDatasourceControllerWrapper seatunnelDatasourceControllerWrapper; + private static JobControllerWrapper jobControllerWrapper; private static final String uniqueId = "_" + System.currentTimeMillis(); @BeforeAll public static void setUp() { seaTunnelWebCluster.start(); jobExecutorControllerWrapper = new JobExecutorControllerWrapper(); + seatunnelDatasourceControllerWrapper = new SeatunnelDatasourceControllerWrapper(); + jobControllerWrapper = new JobControllerWrapper(); } @Test @@ -57,6 +66,28 @@ public void executeJob_shouldReturnSuccess_whenValidRequest() { assertEquals(5, listResult.getData().get(0).getWriteRowCount()); } + @Test + public void executeJob_JobStatusUpdate_WhenSubmissionFailed() { + String jobName = "execJobStatus" + uniqueId; + JobCreateReq jobCreateReq = JobUtils.populateMySQLJobCreateReqFromFile(); + jobCreateReq.getJobConfig().setName(jobName); + jobCreateReq.getJobConfig().setDescription(jobName + " description"); + String datasourceName = "execJobStatus_db_1" + uniqueId; + String mysqlDatasourceId = + seatunnelDatasourceControllerWrapper.createMysqlDatasource(datasourceName); + for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) { + pluginConfig.setDataSourceId(Long.parseLong(mysqlDatasourceId)); + } + Result job = jobControllerWrapper.createJob(jobCreateReq); + assertTrue(job.isSuccess()); + Long jobVersionId = job.getData(); + Result result = jobExecutorControllerWrapper.jobExecutor(jobVersionId); + // Fails because of the wrong database credentials. + assertFalse(result.isSuccess()); + // Even though job failed but job instance is created into the database. + assertTrue(result.getData() > 0); + } + @Test public void restoreJob_shouldReturnSuccess_whenValidRequest() { String jobName = "jobRestore" + uniqueId; diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java index 0a530d3b7..a9214baf0 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/utils/JobUtils.java @@ -24,10 +24,14 @@ import org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper; import org.apache.seatunnel.app.domain.request.job.Edge; import org.apache.seatunnel.app.domain.request.job.JobConfig; +import org.apache.seatunnel.app.domain.request.job.JobCreateReq; import org.apache.seatunnel.app.domain.request.job.JobDAG; import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes; import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -138,4 +142,15 @@ public static Long createJob(String jobName) { assertTrue(jobTaskCheckResResult.isSuccess()); return jobVersionId; } + + public static JobCreateReq populateMySQLJobCreateReqFromFile() { + String filePath = "src/test/resources/jobs/mysql_source_mysql_sink.json"; + String jsonContent; + try { + jsonContent = new String(Files.readAllBytes(Paths.get(filePath))); + } catch (IOException e) { + throw new RuntimeException(e); + } + return JSONTestUtils.parseObject(jsonContent, JobCreateReq.class); + } } diff --git a/seatunnel-web-it/src/test/resources/jobs/mysql_source_mysql_sink.json b/seatunnel-web-it/src/test/resources/jobs/mysql_source_mysql_sink.json new file mode 100644 index 000000000..967244f56 --- /dev/null +++ b/seatunnel-web-it/src/test/resources/jobs/mysql_source_mysql_sink.json @@ -0,0 +1,106 @@ +{ + "jobConfig": { + "name": "mysql_source_mysql_sink", + "description": "mysql_source_mysql_sink description", + "engine": "SeaTunnel", + "env": { + "job.mode": "BATCH", + "job.name": "SeaTunnel_Job", + "jars": "", + "checkpoint.interval": "", + "checkpoint.timeout": "", + "read_limit.rows_per_second": "", + "read_limit.bytes_per_second": "", + "custom_parameters": "" + } + }, + "pluginConfigs": [ + { + "pluginId": "1724412762429155", + "name": "mysql_source_1", + "type": "SOURCE", + "connectorType": null, + "tableOption": { + "databases": [ + "test" + ], + "tables": [ + "test_table" + ] + }, + "selectTableFields": { + "tableFields": [ + "name", + "age" + ], + "all": true + }, + "dataSourceId": 14717667385504, + "sceneMode": "SINGLE_TABLE", + "config": "{\"query\":\"\",\"connection_check_timeout_sec\":30,\"fetch_size\":\"\",\"partition_column\":\"\",\"partition_upper_bound\":\"\",\"partition_lower_bound\":\"\",\"partition_num\":\"\",\"compatible_mode\":\"\",\"properties\":\"\",\"table_path\":\"\",\"where_condition\":\"\",\"table_list\":\"\",\"split.size\":8096,\"split.even-distribution.factor.upper-bound\":100,\"split.even-distribution.factor.lower-bound\":0.05,\"split.sample-sharding.threshold\":1000,\"split.inverse-sampling.rate\":1000,\"parallelism\":1}", + "outputSchema": [ + { + "fields": [ + { + "type": "LONGTEXT", + "name": "name", + "comment": "", + "primaryKey": false, + "defaultValue": null, + "nullable": false, + "properties": null, + "unSupport": false, + "outputDataType": "STRING" + }, + { + "type": "INT", + "name": "age", + "comment": "", + "primaryKey": false, + "defaultValue": null, + "nullable": false, + "properties": null, + "unSupport": false, + "outputDataType": "INT" + } + ], + "tableName": "test_table", + "database": "test" + } + ], + "transformOptions": {} + }, + { + "pluginId": "17244128298414uc", + "name": "mysql_sink_1", + "type": "SINK", + "connectorType": null, + "tableOption": { + "databases": [ + "test" + ], + "tables": [ + "test_table" + ] + }, + "selectTableFields": { + "tableFields": [ + "name", + "age" + ], + "all": true + }, + "dataSourceId": 14717667385504, + "config": "{\"query\":\"\",\"schema_save_mode\":\"CREATE_SCHEMA_WHEN_NOT_EXIST\",\"data_save_mode\":\"APPEND_DATA\",\"custom_sql\":\"\",\"connection_check_timeout_sec\":30,\"batch_size\":1000,\"is_exactly_once\":\"false\",\"xa_data_source_class_name\":\"\",\"max_commit_attempts\":3,\"transaction_timeout_sec\":-1,\"max_retries\":\"1\",\"auto_commit\":\"true\",\"support_upsert_by_query_primary_key_exist\":\"false\",\"primary_keys\":\"\",\"compatible_mode\":\"\",\"multi_table_sink_replica\":1}", + "transformOptions": {} + } + ], + "jobDAG": { + "edges": [ + { + "inputPluginId": "mysql_source_1", + "targetPluginId": "mysql_sink_1" + } + ] + } +} \ No newline at end of file