diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java index 1a41e15b6..f97bc3221 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/JobExecutorController.java @@ -84,4 +84,11 @@ public Result jobRestore( @ApiParam(value = "jobInstanceId", required = true) @RequestParam Long jobInstanceId) { return jobExecutorService.jobStore(userId, jobInstanceId); } + + @GetMapping("/delete") + public Result jobCancel( + @ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId, + @ApiParam(value = "jobInstanceId", required = true) @RequestParam Long jobInstanceId) { + return jobExecutorService.jobDelete(userId, jobInstanceId); + } } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java index f4c5b8f14..5b629672a 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/IJobInstanceDao.java @@ -37,6 +37,8 @@ public interface IJobInstanceDao { void insert(@NonNull JobInstance jobInstance); + void delete(@NonNull JobInstance jobInstance); + JobInstanceMapper getJobInstanceMapper(); IPage queryJobInstanceListPaging( diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java index 4eff1e18d..25320f472 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/dao/impl/JobInstanceDaoImpl.java @@ -60,6 +60,11 @@ public void insert(@NonNull JobInstance jobInstance) { jobInstanceMapper.insert(jobInstance); } + @Override + public void delete(@NonNull JobInstance jobInstance) { + jobInstanceMapper.deleteById(jobInstance); + } + @Override public JobInstanceMapper getJobInstanceMapper() { return jobInstanceMapper; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/permission/constants/SeatunnelFuncPermissionKeyConstant.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/permission/constants/SeatunnelFuncPermissionKeyConstant.java index 2ea22a3d5..ae0b12f05 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/permission/constants/SeatunnelFuncPermissionKeyConstant.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/permission/constants/SeatunnelFuncPermissionKeyConstant.java @@ -56,6 +56,7 @@ public class SeatunnelFuncPermissionKeyConstant { public static final String JOB_EXECUTOR_RESOURCE = "project:seatunnel-task:job-exec-resource"; public static final String JOB_EXECUTOR_INSTANCE = "project:seatunnel-task:job-exec-instance"; public static final String JOB_EXECUTOR_COMPLETE = "project:seatunnel-task:job-exec-complete"; + public static final String JOB_EXECUTOR_DELETE = "project:seatunnel-task:job-exec-delete"; /** sync task instance */ public static final String JOB_METRICS_SUMMARY = "project:seatunnel-task-instance:summary"; diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java index 01ce17e59..452ae6d02 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobExecutorService.java @@ -26,4 +26,6 @@ public interface IJobExecutorService { Result jobPause(Integer userId, Long jobInstanceId); Result jobStore(Integer userId, Long jobInstanceId); + + Result jobDelete(Integer userId, Long jobInstanceId); } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java index 1b9857777..bbd2ec6a9 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java @@ -34,4 +34,6 @@ public interface IJobInstanceService { void complete( @NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId); + + void delete(@NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId); } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java index 95a12157a..44bf7c1dd 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobMetricsService.java @@ -47,6 +47,9 @@ JobDAG getJobDAG(@NonNull Integer userId, @NonNull Long jobInstanceId) void syncJobDataToDb( @NonNull JobInstance jobInstance, @NonNull Integer userId, @NonNull String jobEngineId); + void deleteJobDataToDb( + @NonNull JobInstance jobInstance, @NonNull Integer userId, @NonNull String jobEngineId); + JobSummaryMetricsRes getJobSummaryMetrics( @NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId); diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java index 947519f1f..66ac7bacb 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java @@ -29,7 +29,6 @@ import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; -import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.engine.client.SeaTunnelClient; import org.apache.seatunnel.engine.client.job.ClientJobProxy; import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; @@ -37,6 +36,9 @@ import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.commons.lang3.EnumUtils; +import org.apache.commons.lang3.StringUtils; + import org.springframework.stereotype.Service; import com.hazelcast.client.config.ClientConfig; @@ -49,11 +51,13 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.nio.file.Files; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; @Slf4j @Service @@ -76,9 +80,7 @@ public Result jobExecute(Integer userId, Long jobDefineId) { } public String writeJobConfigIntoConfFile(String jobConfig, Long jobDefineId) { - String projectRoot = System.getProperty("user.dir"); - String filePath = - projectRoot + File.separator + "profile" + File.separator + jobDefineId + ".conf"; + String filePath = jobStorePath(jobDefineId); try { File file = new File(filePath); if (!file.exists()) { @@ -122,7 +124,6 @@ public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInsta }); } catch (ExecutionException | InterruptedException e) { - ExceptionUtils.getMessage(e); throw new RuntimeException(e); } return jobInstanceId; @@ -137,19 +138,21 @@ public void waitJobFinish( ExecutorService executor = Executors.newFixedThreadPool(1); CompletableFuture future = CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete, executor); + JobStatus jobStatus = null; try { - log.info("future.get before"); - JobStatus jobStatus = future.get(); + jobStatus = future.get(); - executor.shutdown(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { + } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } finally { + executor.shutdown(); seaTunnelClient.close(); - log.info("and jobInstanceService.complete begin"); - jobInstanceService.complete(userId, jobInstanceId, jobEngineId); + boolean isComplete = Objects.nonNull(jobStatus) && jobStatus.isEndState(); + if (Objects.nonNull(jobInstanceId) && isComplete) { + jobInstanceService.complete(userId, jobInstanceId, jobEngineId); + } else { + log.warn("JobInstance {} has been deleted Or Got Exception", jobEngineId); + } } } @@ -163,11 +166,31 @@ public static String getClusterName(String testClassName) { return testClassName; } + public static boolean isComplete(String JobStatus) { + return EnumUtils.getEnumList(JobStatus.class).stream() + .filter(org.apache.seatunnel.engine.core.job.JobStatus::isEndState) + .map(Enum::name) + .map(String::toUpperCase) + .collect(Collectors.toSet()) + .contains(JobStatus.toUpperCase()) + || "UNKNOWABLE".equalsIgnoreCase(JobStatus); + } + + public static String jobStorePath(Long jobDefinitionId) { + String projectRoot = System.getProperty("user.dir"); + return StringUtils.join( + projectRoot, + File.separator, + "profile", + File.separator, + String.valueOf(jobDefinitionId), + ".conf"); + } + @Override public Result jobPause(Integer userId, Long jobInstanceId) { JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); - if (Objects.equals( - getJobStatusFromEngine(jobInstance, jobInstance.getJobEngineId()), "RUNNING")) { + if (!isComplete(getJobStatusFromEngine(jobInstance, jobInstance.getJobEngineId()))) { pauseJobInEngine(jobInstance.getJobEngineId()); } return Result.success(); @@ -190,18 +213,39 @@ private void pauseJobInEngine(@NonNull String jobEngineId) { @Override public Result jobStore(Integer userId, Long jobInstanceId) { JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); - - String projectRoot = System.getProperty("user.dir"); - String filePath = - projectRoot - + File.separator - + "profile" - + File.separator - + jobInstance.getJobDefineId() - + ".conf"; - log.info("jobStore filePath:{}", filePath); + String filePath = jobStorePath(jobInstance.getJobDefineId()); SeaTunnelEngineProxy.getInstance() .restoreJob(filePath, jobInstanceId, Long.valueOf(jobInstance.getJobEngineId())); return Result.success(); } + + /** + * Delete Job. If the Job haven't been completed, cancel it in the engine, and then delete the + * restored conf file if it existed. Then delete it in the db. + * + * @param userId user's ID + * @param jobInstanceId job instance that should be deleted. + * @return void + */ + @Override + public Result jobDelete(Integer userId, Long jobInstanceId) { + JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); + String jobEngineId = jobInstance.getJobEngineId(); + if (!isComplete(getJobStatusFromEngine(jobInstance, jobEngineId))) { + deleteJobInEngine(jobEngineId); + log.info("Canceling {}.", jobInstanceId); + } + File jobConf = new File(jobStorePath(jobInstance.getJobDefineId())); + try { + Files.deleteIfExists(jobConf.toPath()); + } catch (IOException e) { + throw new RuntimeException(e); + } + jobInstanceService.delete(userId, jobInstanceId, jobEngineId); + return Result.success(); + } + + private void deleteJobInEngine(@NonNull String jobEngineId) { + SeaTunnelEngineProxy.getInstance().deleteJob(jobEngineId); + } } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java index 92726a660..538f89190 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java @@ -135,6 +135,7 @@ public JobExecutorRes createExecuteResource( JobVersion latestVersion = jobVersionDao.getLatestVersion(job.getId()); JobInstance jobInstance = new JobInstance(); String jobConfig = createJobConfig(latestVersion); + log.debug("job config: {}", jobConfig); try { jobInstance.setId(CodeGenerateUtils.getInstance().genCode()); @@ -272,6 +273,7 @@ public String generateJobConfig( targetLines.get(pluginId), config); if (!sinkMap.containsKey(task.getConnectorType())) { + // 这里对于clichouse需要替换JDBC到 Clickhouse sinkMap.put(task.getConnectorType(), new ArrayList<>()); } Config mergeConfig = @@ -283,7 +285,14 @@ public String generateJobConfig( config, optionRule); - sinkMap.get(task.getConnectorType()).add(filterEmptyValue(mergeConfig)); + if ("true".equals(mergeConfig.getString("Clickhouse"))) { + sinkMap.remove(task.getConnectorType()); + sinkMap.put("Clickhouse", new ArrayList<>()); + sinkMap.get("Clickhouse").add(filterEmptyValue(mergeConfig)); + } else { + sinkMap.get(task.getConnectorType()) + .add(filterEmptyValue(mergeConfig)); + } } break; default: @@ -345,8 +354,6 @@ public void complete( @NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId) { funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_COMPLETE, userId); JobInstance jobInstance = jobInstanceDao.getJobInstanceMapper().selectById(jobInstanceId); - jobMetricsService.syncJobDataToDb(jobInstance, userId, jobEngineId); - List status = jobMetricsService.getJobPipelineSummaryMetrics(userId, jobInstanceId); @@ -364,13 +371,41 @@ public void complete( jobStatus = JobStatus.CANCELED.name(); } else if (statusList.contains("CANCELLING")) { jobStatus = JobStatus.CANCELLING.name(); - } else { + } else if (!statusList.isEmpty()) { jobStatus = JobStatus.RUNNING.name(); + } else { + jobStatus = "UNKNOWABLE"; } jobInstance.setJobStatus(jobStatus); jobInstance.setJobEngineId(jobEngineId); jobInstance.setUpdateUserId(userId); - jobInstanceDao.update(jobInstance); + + jobMetricsService.syncJobDataToDb(jobInstance, userId, jobEngineId); + } + + @Override + public void delete( + @NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId) { + funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_DELETE, userId); + JobInstance jobInstance = jobInstanceDao.getJobInstanceMapper().selectById(jobInstanceId); + List status = + jobMetricsService.getJobPipelineSummaryMetrics(userId, jobInstanceId); + + List statusList = + status.stream() + .map(JobPipelineSummaryMetricsRes::getStatus) + .map(String::toUpperCase) + .collect(Collectors.toList()); + + String jobStatus = + statusList.size() == 1 && JobExecutorServiceImpl.isComplete(statusList.get(0)) + ? statusList.get(0) + : JobStatus.CANCELED.name(); + + jobInstance.setJobStatus(jobStatus); + jobInstance.setJobEngineId(jobEngineId); + jobInstance.setUpdateUserId(userId); + jobMetricsService.deleteJobDataToDb(jobInstance, userId, jobEngineId); } private Config buildTransformConfig( diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java index 837de0061..d2bacf71b 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobMetricsServiceImpl.java @@ -573,6 +573,17 @@ public void syncJobDataToDb( syncCompleteJobInfoToDb(jobInstance); } + @Override + public void deleteJobDataToDb( + @NonNull JobInstance jobInstance, + @NonNull Integer userId, + @NonNull String jobEngineId) { + relationJobInstanceAndJobEngineId(jobInstance, userId, jobEngineId); + deleteMetricsToDb(jobInstance, userId, jobEngineId); + syncHistoryJobInfoToDb(jobInstance, jobEngineId); + deleteJobInfoToDb(jobInstance); + } + private void syncMetricsToDb( @NonNull JobInstance jobInstance, @NonNull Integer userId, @@ -616,6 +627,16 @@ private void syncMetricsToDb( } } + private void deleteMetricsToDb( + @NonNull JobInstance jobInstance, + @NonNull Integer userId, + @NonNull String jobEngineId) { + getJobMetricsFromDb(jobInstance, userId, jobEngineId) + .forEach( + jobMetrics -> + jobMetricsDao.getJobMetricsMapper().deleteById(jobMetrics.getId())); + } + private void syncHistoryJobInfoToDb( @NonNull JobInstance jobInstance, @NonNull String jobEngineId) { JobInstanceHistory jobHistoryFromEngine = getJobHistoryFromEngine(jobInstance, jobEngineId); @@ -636,6 +657,10 @@ private void syncCompleteJobInfoToDb(@NonNull JobInstance jobInstance) { jobInstanceDao.update(jobInstance); } + private void deleteJobInfoToDb(@NonNull JobInstance jobInstance) { + jobInstanceDao.delete(jobInstance); + } + private void relationJobInstanceAndJobEngineId( @NonNull JobInstance jobInstance, @NonNull Integer userId, diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java index 20d39e828..d95b421a0 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/engine/SeaTunnelEngineProxy.java @@ -122,4 +122,10 @@ public void restoreJob( throw new RuntimeException(e); } } + + public void deleteJob(String jobEngineId) { + SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig); + JobClient jobClient = seaTunnelClient.getJobClient(); + jobClient.cancelJob(Long.valueOf(jobEngineId)); + } } diff --git a/seatunnel-ui/src/service/sync-task-instance/index.ts b/seatunnel-ui/src/service/sync-task-instance/index.ts index aa140673a..ec20cac0e 100644 --- a/seatunnel-ui/src/service/sync-task-instance/index.ts +++ b/seatunnel-ui/src/service/sync-task-instance/index.ts @@ -69,23 +69,23 @@ export function forcedSuccessByIds(taskInstanceIds: Array) { }) } -export function hanldlePauseJob(id: number): any { +export function handlePauseJob(id: number): any { return axios({ url: `/job/executor/pause?jobInstanceId=${id}`, method: 'get' }) } -export function hanldleRecoverJob(id: number): any { +export function handleRecoverJob(id: number): any { return axios({ url: `/job/executor/restore?jobInstanceId=${id}`, method: 'get' }) } -export function hanldleDelJob(id: number): any { +export function handleDeleteJob(id: number): any { return axios({ - url: `/job/executor/del?jobInstanceId=${id}`, + url: `/job/executor/delete?jobInstanceId=${id}`, method: 'get' }) } diff --git a/seatunnel-ui/src/views/task/synchronization-instance/use-sync-task.ts b/seatunnel-ui/src/views/task/synchronization-instance/use-sync-task.ts index b6e60020f..dde38f9e6 100644 --- a/seatunnel-ui/src/views/task/synchronization-instance/use-sync-task.ts +++ b/seatunnel-ui/src/views/task/synchronization-instance/use-sync-task.ts @@ -42,9 +42,9 @@ import { NIcon, NSpin, NTooltip } from 'naive-ui' import { useMessage } from 'naive-ui' import { querySyncTaskInstancePaging, - hanldlePauseJob, - hanldleRecoverJob, - hanldleDelJob + handlePauseJob, + handleRecoverJob, + handleDeleteJob } from '@/service/sync-task-instance' import type { RowKey } from 'naive-ui/lib/data-table/src/interface' import type { Router } from 'vue-router' @@ -180,10 +180,8 @@ export function useSyncTask(syncTaskType = 'BATCH') { isDelete: true, text: t('project.synchronization_instance.delete'), icon: h(DeleteOutlined), - onClick: (row) => void handleDel(row.id), - onPositiveClick: () => { - console.log('123') - }, + onClick: (row) => void handleDelete(row.id), + onPositiveClick: (row) => void handleDelete(row.id), positiveText: t('project.synchronization_instance.confirm'), popTips: t('project.synchronization_instance.delete_confirm') } @@ -214,18 +212,19 @@ export function useSyncTask(syncTaskType = 'BATCH') { }) } const handleRecover = (id: number) => { - hanldleRecoverJob(id).then(() => { + handleRecoverJob(id).then(() => { message.success(t('common.success_tips')) }) } const handlePause = (id: number) => { - hanldlePauseJob(id).then(() => { + handlePauseJob(id).then(() => { message.success(t('common.success_tips')) }) } - const handleDel = (id: number) => { - hanldleDelJob(id).then(() => { + const handleDelete = (id: number) => { + handleDeleteJob(id).then(() => { message.success(t('common.success_tips')) + getList() }) }