Skip to content

Commit 726c356

Browse files
authored
[issue_1045][taier-schedule] self-dependence day tasks fill data lose… (#1046)
… dependencies #1045
1 parent 151b262 commit 726c356

File tree

2 files changed

+46
-44
lines changed

2 files changed

+46
-44
lines changed

taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/builder/AbstractJobBuilder.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
package com.dtstack.taier.scheduler.server.builder;
2020

21-
import com.dtstack.taier.common.enums.EScheduleJobType;
2221
import com.dtstack.taier.common.enums.Deleted;
22+
import com.dtstack.taier.common.enums.EScheduleJobType;
2323
import com.dtstack.taier.common.enums.Restarted;
2424
import com.dtstack.taier.common.env.EnvironmentContext;
2525
import com.dtstack.taier.common.exception.TaierDefineException;
@@ -32,8 +32,8 @@
3232
import com.dtstack.taier.scheduler.server.ScheduleJobDetails;
3333
import com.dtstack.taier.scheduler.server.builder.cron.ScheduleConfManager;
3434
import com.dtstack.taier.scheduler.server.builder.cron.ScheduleCorn;
35-
import com.dtstack.taier.scheduler.server.builder.dependency.JobDependency;
3635
import com.dtstack.taier.scheduler.server.builder.dependency.DependencyManager;
36+
import com.dtstack.taier.scheduler.server.builder.dependency.JobDependency;
3737
import com.dtstack.taier.scheduler.service.ScheduleActionService;
3838
import com.dtstack.taier.scheduler.service.ScheduleJobService;
3939
import com.dtstack.taier.scheduler.service.ScheduleTaskShadeService;
@@ -157,6 +157,9 @@ public List<ScheduleJobDetails> buildJob(ScheduleTaskShade batchTaskShade, Strin
157157
* @return 名称
158158
*/
159159
private String getName(ScheduleTaskShade scheduleTaskShade, String name, String cycTime) {
160+
if (StringUtils.isBlank(name)) {
161+
return getPrefix() + "_" + scheduleTaskShade.getName() + "_" + cycTime;
162+
}
160163
return getPrefix() + "_" + name + "_" + scheduleTaskShade.getName() + "_" + cycTime;
161164
}
162165

taier-scheduler/src/main/java/com/dtstack/taier/scheduler/server/builder/FillDataJobBuilder.java

+41-42
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.List;
4444
import java.util.Locale;
4545
import java.util.Set;
46+
import java.util.concurrent.CompletableFuture;
4647
import java.util.stream.Collectors;
4748

4849
/**
@@ -75,7 +76,6 @@ public class FillDataJobBuilder extends AbstractJobBuilder {
7576
* @param endDay 每天时间范围 结束范围
7677
* @throws Exception
7778
*/
78-
@Transactional(rollbackFor = Exception.class)
7979
public void createFillJob(Set<Long> all, Set<Long> run, Long fillId, String fillName, String beginTime, String endTime,
8080
String startDay, String endDay) throws Exception {
8181
Date startDate = DateUtil.parseDate(startDay, DateUtil.DATE_FORMAT, Locale.CHINA);
@@ -102,54 +102,52 @@ public void createFillJob(Set<Long> all, Set<Long> run, Long fillId, String fill
102102
* @param endTime 每天时间范围 结束范围
103103
* @throws Exception
104104
*/
105-
@Transactional(rollbackFor = Exception.class)
106105
public void buildFillDataJobGraph(String fillName, Long fillId, Set<Long> all, Set<Long> run, String triggerDay,
107-
String beginTime, String endTime) throws Exception {
106+
String beginTime, String endTime) throws Exception {
108107
List<Long> allList = Lists.newArrayList(all);
109108
List<List<Long>> partition = Lists.partition(allList, environmentContext.getJobGraphTaskLimitSize());
110109
AtomicJobSortWorker sortWorker = new AtomicJobSortWorker();
110+
List<ScheduleJobDetails> saveList = Lists.newArrayList();
111+
CompletableFuture.allOf(partition.stream()
112+
.map(taskKey ->
113+
CompletableFuture.runAsync(() ->
114+
fillTaskPartition(fillName, fillId, run, triggerDay, beginTime, endTime, allList, sortWorker, saveList, taskKey),
115+
jobGraphBuildPool))
116+
.toArray(CompletableFuture[]::new)).thenAccept(a -> savaFillJob(saveList)).join();
117+
}
111118

112-
for (List<Long> taskKey : partition) {
113-
jobGraphBuildPool.submit(() -> {
114-
try {
115-
List<ScheduleJobDetails> saveList = Lists.newArrayList();
116-
for (Long taskId : taskKey) {
117-
try {
118-
ScheduleTaskShade scheduleTaskShade = scheduleTaskService
119-
.lambdaQuery()
120-
.eq(ScheduleTaskShade::getTaskId, taskId)
121-
.eq(ScheduleTaskShade::getIsDeleted, Deleted.NORMAL.getStatus())
122-
.one();
123-
124-
if (scheduleTaskShade != null) {
125-
List<ScheduleJobDetails> jobBuilderBeanList = Lists.newArrayList();
126-
// 非工作流任务子任务
127-
if (scheduleTaskShade.getFlowId() == 0) {
128-
// 生成补数据实例
129-
jobBuilderBeanList = RetryUtil.executeWithRetry(() -> buildJob(scheduleTaskShade, fillName, triggerDay, beginTime, endTime, fillId, sortWorker),
130-
environmentContext.getBuildJobErrorRetry(), 200, false);
131-
} else {
132-
Long flowId = scheduleTaskShade.getFlowId();
133-
if (!allList.contains(flowId)) {
134-
// 生成周期实例
135-
jobBuilderBeanList = RetryUtil.executeWithRetry(() -> buildJob(scheduleTaskShade, fillName, triggerDay, beginTime, beginTime, fillId, sortWorker),
136-
environmentContext.getBuildJobErrorRetry(), 200, false);
137-
}
138-
}
139-
140-
for (ScheduleJobDetails jobBuilderBean : jobBuilderBeanList) {
141-
addMap(run, saveList, taskId, jobBuilderBean);
142-
}
143-
}
144-
} catch (Exception e) {
145-
LOGGER.error("taskKey : {} error:", taskId, e);
119+
private void fillTaskPartition(String fillName, Long fillId, Set<Long> run, String triggerDay, String beginTime, String endTime, List<Long> allList, AtomicJobSortWorker sortWorker, List<ScheduleJobDetails> saveList, List<Long> taskKey) {
120+
for (Long taskId : taskKey) {
121+
try {
122+
ScheduleTaskShade scheduleTaskShade = scheduleTaskService
123+
.lambdaQuery()
124+
.eq(ScheduleTaskShade::getTaskId, taskId)
125+
.eq(ScheduleTaskShade::getIsDeleted, Deleted.NORMAL.getStatus())
126+
.one();
127+
128+
if (scheduleTaskShade != null) {
129+
List<ScheduleJobDetails> jobBuilderBeanList = Lists.newArrayList();
130+
// 非工作流任务子任务
131+
if (scheduleTaskShade.getFlowId() == 0) {
132+
// 生成补数据实例
133+
jobBuilderBeanList = RetryUtil.executeWithRetry(() -> buildJob(scheduleTaskShade, fillName, triggerDay, beginTime, endTime, fillId, sortWorker),
134+
environmentContext.getBuildJobErrorRetry(), 200, false);
135+
} else {
136+
Long flowId = scheduleTaskShade.getFlowId();
137+
if (!allList.contains(flowId)) {
138+
// 生成周期实例
139+
jobBuilderBeanList = RetryUtil.executeWithRetry(() -> buildJob(scheduleTaskShade, fillName, triggerDay, beginTime, endTime, fillId, sortWorker),
140+
environmentContext.getBuildJobErrorRetry(), 200, false);
146141
}
147142
}
148-
savaFillJob(saveList);
149-
} catch (Exception e) {
150-
LOGGER.error("fill error:", e);
143+
144+
for (ScheduleJobDetails jobBuilderBean : jobBuilderBeanList) {
145+
addMap(run, saveList, taskId, jobBuilderBean);
146+
}
151147
}
152-
});
148+
} catch (Exception e) {
149+
LOGGER.error("taskKey : {} error:", taskId, e);
150+
}
153151
}
154152
}
155153

@@ -178,7 +176,8 @@ private void addMap(Set<Long> run, List<ScheduleJobDetails> saveList, Long taskI
178176
*
179177
* @param allJobList 所有集合
180178
*/
181-
private void savaFillJob(List<ScheduleJobDetails> allJobList) {
179+
@Transactional(rollbackFor = Exception.class)
180+
public void savaFillJob(List<ScheduleJobDetails> allJobList) {
182181
scheduleJobService.insertJobList(allJobList, EScheduleType.FILL_DATA.getType());
183182
Set<ScheduleJobOperatorRecord> operatorJobIds = allJobList
184183
.stream()

0 commit comments

Comments
 (0)