Skip to content

Commit

Permalink
[Improve][feature] Add JobInstance Delete
Browse files Browse the repository at this point in the history
  • Loading branch information
Jetiaime committed Apr 10, 2024
1 parent 6b57f22 commit 6c4db33
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,11 @@ public Result<Void> jobRestore(
@ApiParam(value = "jobInstanceId", required = true) @RequestParam Long jobInstanceId) {
return jobExecutorService.jobStore(userId, jobInstanceId);
}

@GetMapping("/delete")
public Result<Void> jobCancel(
@ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId,
@ApiParam(value = "jobInstanceId", required = true) @RequestParam Long jobInstanceId) {
return jobExecutorService.jobDelete(userId, jobInstanceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public interface IJobInstanceDao {

void insert(@NonNull JobInstance jobInstance);

void delete(@NonNull JobInstance jobInstance);

JobInstanceMapper getJobInstanceMapper();

IPage<SeaTunnelJobInstanceDto> queryJobInstanceListPaging(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public void insert(@NonNull JobInstance jobInstance) {
jobInstanceMapper.insert(jobInstance);
}

@Override
public void delete(@NonNull JobInstance jobInstance) {
jobInstanceMapper.deleteById(jobInstance);
}

@Override
public JobInstanceMapper getJobInstanceMapper() {
return jobInstanceMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class SeatunnelFuncPermissionKeyConstant {
public static final String JOB_EXECUTOR_RESOURCE = "project:seatunnel-task:job-exec-resource";
public static final String JOB_EXECUTOR_INSTANCE = "project:seatunnel-task:job-exec-instance";
public static final String JOB_EXECUTOR_COMPLETE = "project:seatunnel-task:job-exec-complete";
public static final String JOB_EXECUTOR_DELETE = "project:seatunnel-task:job-exec-delete";

/** sync task instance */
public static final String JOB_METRICS_SUMMARY = "project:seatunnel-task-instance:summary";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ public interface IJobExecutorService {
Result<Void> jobPause(Integer userId, Long jobInstanceId);

Result<Void> jobStore(Integer userId, Long jobInstanceId);

Result<Void> jobDelete(Integer userId, Long jobInstanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ public interface IJobInstanceService {

void complete(
@NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId);

void delete(@NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ JobDAG getJobDAG(@NonNull Integer userId, @NonNull Long jobInstanceId)
void syncJobDataToDb(
@NonNull JobInstance jobInstance, @NonNull Integer userId, @NonNull String jobEngineId);

void deleteJobDataToDb(
@NonNull JobInstance jobInstance, @NonNull Integer userId, @NonNull String jobEngineId);

JobSummaryMetricsRes getJobSummaryMetrics(
@NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@
import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;

import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.StringUtils;

import org.springframework.stereotype.Service;

import com.hazelcast.client.config.ClientConfig;
Expand All @@ -49,11 +51,13 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

@Slf4j
@Service
Expand All @@ -76,9 +80,7 @@ public Result<Long> jobExecute(Integer userId, Long jobDefineId) {
}

public String writeJobConfigIntoConfFile(String jobConfig, Long jobDefineId) {
String projectRoot = System.getProperty("user.dir");
String filePath =
projectRoot + File.separator + "profile" + File.separator + jobDefineId + ".conf";
String filePath = jobStorePath(jobDefineId);
try {
File file = new File(filePath);
if (!file.exists()) {
Expand Down Expand Up @@ -122,7 +124,6 @@ public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInsta
});

} catch (ExecutionException | InterruptedException e) {
ExceptionUtils.getMessage(e);
throw new RuntimeException(e);
}
return jobInstanceId;
Expand All @@ -137,19 +138,21 @@ public void waitJobFinish(
ExecutorService executor = Executors.newFixedThreadPool(1);
CompletableFuture<JobStatus> future =
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete, executor);
JobStatus jobStatus = null;
try {
log.info("future.get before");
JobStatus jobStatus = future.get();
jobStatus = future.get();

executor.shutdown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
executor.shutdown();
seaTunnelClient.close();
log.info("and jobInstanceService.complete begin");
jobInstanceService.complete(userId, jobInstanceId, jobEngineId);
boolean isComplete = Objects.nonNull(jobStatus) && jobStatus.isEndState();
if (Objects.nonNull(jobInstanceId) && isComplete) {
jobInstanceService.complete(userId, jobInstanceId, jobEngineId);
} else {
log.warn("JobInstance {} has been deleted Or Got Exception", jobEngineId);
}
}
}

Expand All @@ -163,11 +166,31 @@ public static String getClusterName(String testClassName) {
return testClassName;
}

public static boolean isComplete(String JobStatus) {
return EnumUtils.getEnumList(JobStatus.class).stream()
.filter(org.apache.seatunnel.engine.core.job.JobStatus::isEndState)
.map(Enum::name)
.map(String::toUpperCase)
.collect(Collectors.toSet())
.contains(JobStatus.toUpperCase())
|| "UNKNOWABLE".equalsIgnoreCase(JobStatus);
}

public static String jobStorePath(Long jobDefinitionId) {
String projectRoot = System.getProperty("user.dir");
return StringUtils.join(
projectRoot,
File.separator,
"profile",
File.separator,
String.valueOf(jobDefinitionId),
".conf");
}

@Override
public Result<Void> jobPause(Integer userId, Long jobInstanceId) {
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
if (Objects.equals(
getJobStatusFromEngine(jobInstance, jobInstance.getJobEngineId()), "RUNNING")) {
if (!isComplete(getJobStatusFromEngine(jobInstance, jobInstance.getJobEngineId()))) {
pauseJobInEngine(jobInstance.getJobEngineId());
}
return Result.success();
Expand All @@ -190,18 +213,39 @@ private void pauseJobInEngine(@NonNull String jobEngineId) {
@Override
public Result<Void> jobStore(Integer userId, Long jobInstanceId) {
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);

String projectRoot = System.getProperty("user.dir");
String filePath =
projectRoot
+ File.separator
+ "profile"
+ File.separator
+ jobInstance.getJobDefineId()
+ ".conf";
log.info("jobStore filePath:{}", filePath);
String filePath = jobStorePath(jobInstance.getJobDefineId());
SeaTunnelEngineProxy.getInstance()
.restoreJob(filePath, jobInstanceId, Long.valueOf(jobInstance.getJobEngineId()));
return Result.success();
}

/**
* Delete Job. If the Job haven't been completed, cancel it in the engine, and then delete the
* restored conf file if it existed. Then delete it in the db.
*
* @param userId user's ID
* @param jobInstanceId job instance that should be deleted.
* @return void
*/
@Override
public Result<Void> jobDelete(Integer userId, Long jobInstanceId) {
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
String jobEngineId = jobInstance.getJobEngineId();
if (!isComplete(getJobStatusFromEngine(jobInstance, jobEngineId))) {
deleteJobInEngine(jobEngineId);
log.info("Canceling {}.", jobInstanceId);
}
File jobConf = new File(jobStorePath(jobInstance.getJobDefineId()));
try {
Files.deleteIfExists(jobConf.toPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
jobInstanceService.delete(userId, jobInstanceId, jobEngineId);
return Result.success();
}

private void deleteJobInEngine(@NonNull String jobEngineId) {
SeaTunnelEngineProxy.getInstance().deleteJob(jobEngineId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public JobExecutorRes createExecuteResource(
JobVersion latestVersion = jobVersionDao.getLatestVersion(job.getId());
JobInstance jobInstance = new JobInstance();
String jobConfig = createJobConfig(latestVersion);
log.debug("job config: {}", jobConfig);

try {
jobInstance.setId(CodeGenerateUtils.getInstance().genCode());
Expand Down Expand Up @@ -272,6 +273,7 @@ public String generateJobConfig(
targetLines.get(pluginId),
config);
if (!sinkMap.containsKey(task.getConnectorType())) {
// 这里对于clichouse需要替换JDBC到 Clickhouse
sinkMap.put(task.getConnectorType(), new ArrayList<>());
}
Config mergeConfig =
Expand All @@ -283,7 +285,14 @@ public String generateJobConfig(
config,
optionRule);

sinkMap.get(task.getConnectorType()).add(filterEmptyValue(mergeConfig));
if ("true".equals(mergeConfig.getString("Clickhouse"))) {
sinkMap.remove(task.getConnectorType());
sinkMap.put("Clickhouse", new ArrayList<>());
sinkMap.get("Clickhouse").add(filterEmptyValue(mergeConfig));
} else {
sinkMap.get(task.getConnectorType())
.add(filterEmptyValue(mergeConfig));
}
}
break;
default:
Expand Down Expand Up @@ -345,8 +354,6 @@ public void complete(
@NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_COMPLETE, userId);
JobInstance jobInstance = jobInstanceDao.getJobInstanceMapper().selectById(jobInstanceId);
jobMetricsService.syncJobDataToDb(jobInstance, userId, jobEngineId);

List<JobPipelineSummaryMetricsRes> status =
jobMetricsService.getJobPipelineSummaryMetrics(userId, jobInstanceId);

Expand All @@ -364,13 +371,41 @@ public void complete(
jobStatus = JobStatus.CANCELED.name();
} else if (statusList.contains("CANCELLING")) {
jobStatus = JobStatus.CANCELLING.name();
} else {
} else if (!statusList.isEmpty()) {
jobStatus = JobStatus.RUNNING.name();
} else {
jobStatus = "UNKNOWABLE";
}
jobInstance.setJobStatus(jobStatus);
jobInstance.setJobEngineId(jobEngineId);
jobInstance.setUpdateUserId(userId);
jobInstanceDao.update(jobInstance);

jobMetricsService.syncJobDataToDb(jobInstance, userId, jobEngineId);
}

@Override
public void delete(
@NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_DELETE, userId);
JobInstance jobInstance = jobInstanceDao.getJobInstanceMapper().selectById(jobInstanceId);
List<JobPipelineSummaryMetricsRes> status =
jobMetricsService.getJobPipelineSummaryMetrics(userId, jobInstanceId);

List<String> statusList =
status.stream()
.map(JobPipelineSummaryMetricsRes::getStatus)
.map(String::toUpperCase)
.collect(Collectors.toList());

String jobStatus =
statusList.size() == 1 && JobExecutorServiceImpl.isComplete(statusList.get(0))
? statusList.get(0)
: JobStatus.CANCELED.name();

jobInstance.setJobStatus(jobStatus);
jobInstance.setJobEngineId(jobEngineId);
jobInstance.setUpdateUserId(userId);
jobMetricsService.deleteJobDataToDb(jobInstance, userId, jobEngineId);
}

private Config buildTransformConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,17 @@ public void syncJobDataToDb(
syncCompleteJobInfoToDb(jobInstance);
}

@Override
public void deleteJobDataToDb(
@NonNull JobInstance jobInstance,
@NonNull Integer userId,
@NonNull String jobEngineId) {
relationJobInstanceAndJobEngineId(jobInstance, userId, jobEngineId);
deleteMetricsToDb(jobInstance, userId, jobEngineId);
syncHistoryJobInfoToDb(jobInstance, jobEngineId);
deleteJobInfoToDb(jobInstance);
}

private void syncMetricsToDb(
@NonNull JobInstance jobInstance,
@NonNull Integer userId,
Expand Down Expand Up @@ -616,6 +627,16 @@ private void syncMetricsToDb(
}
}

private void deleteMetricsToDb(
@NonNull JobInstance jobInstance,
@NonNull Integer userId,
@NonNull String jobEngineId) {
getJobMetricsFromDb(jobInstance, userId, jobEngineId)
.forEach(
jobMetrics ->
jobMetricsDao.getJobMetricsMapper().deleteById(jobMetrics.getId()));
}

private void syncHistoryJobInfoToDb(
@NonNull JobInstance jobInstance, @NonNull String jobEngineId) {
JobInstanceHistory jobHistoryFromEngine = getJobHistoryFromEngine(jobInstance, jobEngineId);
Expand All @@ -636,6 +657,10 @@ private void syncCompleteJobInfoToDb(@NonNull JobInstance jobInstance) {
jobInstanceDao.update(jobInstance);
}

private void deleteJobInfoToDb(@NonNull JobInstance jobInstance) {
jobInstanceDao.delete(jobInstance);
}

private void relationJobInstanceAndJobEngineId(
@NonNull JobInstance jobInstance,
@NonNull Integer userId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,10 @@ public void restoreJob(
throw new RuntimeException(e);
}
}

public void deleteJob(String jobEngineId) {
SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
JobClient jobClient = seaTunnelClient.getJobClient();
jobClient.cancelJob(Long.valueOf(jobEngineId));
}
}
8 changes: 4 additions & 4 deletions seatunnel-ui/src/service/sync-task-instance/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,23 @@ export function forcedSuccessByIds(taskInstanceIds: Array<any>) {
})
}

export function hanldlePauseJob(id: number): any {
export function handlePauseJob(id: number): any {
return axios({
url: `/job/executor/pause?jobInstanceId=${id}`,
method: 'get'
})
}

export function hanldleRecoverJob(id: number): any {
export function handleRecoverJob(id: number): any {
return axios({
url: `/job/executor/restore?jobInstanceId=${id}`,
method: 'get'
})
}

export function hanldleDelJob(id: number): any {
export function handleDeleteJob(id: number): any {
return axios({
url: `/job/executor/del?jobInstanceId=${id}`,
url: `/job/executor/delete?jobInstanceId=${id}`,
method: 'get'
})
}
Loading

0 comments on commit 6c4db33

Please sign in to comment.