Skip to content

Commit

Permalink
[Feature][Seatunnel-web] Add support to create job with single API. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
arshadmohammad authored Aug 15, 2024
1 parent c5514da commit f45dacd
Show file tree
Hide file tree
Showing 13 changed files with 582 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.app.common;

import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
import org.apache.seatunnel.server.common.SeatunnelException;

public class Result<T> {

Expand Down Expand Up @@ -45,6 +46,12 @@ private Result(SeatunnelErrorEnum errorEnum, String... messages) {
this.data = null;
}

private Result(SeatunnelException e) {
this.code = e.getErrorEnum().getCode();
this.msg = e.getMessage();
this.data = null;
}

public static <T> Result<T> success() {
return new Result<>();
}
Expand All @@ -65,6 +72,11 @@ public static <T> Result<T> failure(SeatunnelErrorEnum errorEnum, String... mess
return result;
}

public static <T> Result<T> getFailure(SeatunnelException e) {
Result<T> result = new Result<>(e);
return result;
}

public boolean isSuccess() {
return OK.getCode() == this.code;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.app.controller;

import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
import org.apache.seatunnel.app.service.IJobService;

import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;

import javax.annotation.Resource;

@RestController
@RequestMapping("/seatunnel/api/v1/job")
public class JobController {

@Resource private IJobService jobCRUDService;

@PostMapping("/create")
@ApiOperation(
value =
"Create a job, In jobDAG for inputPluginId and targetPluginId use the plugin names instead of ids.",
httpMethod = "POST")
public Result<Long> createJob(
@ApiParam(value = "userId", required = true) @RequestAttribute("userId") Integer userId,
@RequestBody JobCreateReq jobCreateRequest)
throws JsonProcessingException {
return Result.success(jobCRUDService.createJob(userId, jobCreateRequest));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.app.domain.request.connector;

public enum JobMode {
BATCH,

STREAM;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.app.domain.request.job;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class JobCreateReq {
private JobConfig jobConfig;
private List<PluginConfig> pluginConfigs;
private JobDAG jobDAG;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.app.service;

import org.apache.seatunnel.app.domain.request.job.JobCreateReq;

import com.fasterxml.jackson.core.JsonProcessingException;

public interface IJobService {

long createJob(int userId, JobCreateReq jobCreateRequest) throws JsonProcessingException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.app.service.impl;

import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
import org.apache.seatunnel.app.domain.request.connector.JobMode;
import org.apache.seatunnel.app.domain.request.job.Edge;
import org.apache.seatunnel.app.domain.request.job.JobConfig;
import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
import org.apache.seatunnel.app.domain.request.job.JobDAG;
import org.apache.seatunnel.app.domain.request.job.JobReq;
import org.apache.seatunnel.app.domain.request.job.PluginConfig;
import org.apache.seatunnel.app.service.IJobConfigService;
import org.apache.seatunnel.app.service.IJobDefinitionService;
import org.apache.seatunnel.app.service.IJobService;
import org.apache.seatunnel.app.service.IJobTaskService;
import org.apache.seatunnel.server.common.CodeGenerateUtils;
import org.apache.seatunnel.server.common.ParamValidationException;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;

import org.apache.commons.lang3.StringUtils;

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.fasterxml.jackson.core.JsonProcessingException;

import javax.annotation.Resource;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Service
public class JobServiceImpl implements IJobService {

@Resource private IJobDefinitionService jobService;
@Resource private IJobTaskService jobTaskService;
@Resource private IJobConfigService jobConfigService;

@Override
@Transactional
public long createJob(int userId, JobCreateReq jobCreateRequest)
throws JsonProcessingException {
JobReq jobDefinition = getJobDefinition(jobCreateRequest.getJobConfig());
long jobId = jobService.createJob(userId, jobDefinition);
List<PluginConfig> pluginConfig = jobCreateRequest.getPluginConfigs();
Map<String, String> pluginNameVsPluginId = new HashMap<>();
if (pluginConfig != null) {
for (PluginConfig config : pluginConfig) {
String pluginId = String.valueOf(CodeGenerateUtils.getInstance().genCode());
config.setPluginId(pluginId);
jobTaskService.saveSingleTask(jobId, config);
pluginNameVsPluginId.put(config.getName(), pluginId);
}
}
jobConfigService.updateJobConfig(userId, jobId, jobCreateRequest.getJobConfig());
JobDAG jobDAG = jobCreateRequest.getJobDAG();
// Replace the plugin name with plugin id
List<Edge> edges = jobDAG.getEdges();
for (Edge edge : edges) {
edge.setInputPluginId(pluginNameVsPluginId.get(edge.getInputPluginId()));
edge.setTargetPluginId(pluginNameVsPluginId.get(edge.getTargetPluginId()));
}
jobTaskService.saveJobDAG(jobId, jobDAG);
return jobId;
}

private JobReq getJobDefinition(JobConfig jobConfig) {
JobReq jobReq = new JobReq();
if (StringUtils.isEmpty(jobConfig.getName())) {
throw new ParamValidationException(SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL, "name");
}
jobReq.setName(jobConfig.getName());
if (StringUtils.isEmpty(jobConfig.getDescription())) {
throw new ParamValidationException(
SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL, "description");
}
jobReq.setDescription(jobConfig.getDescription());
String jobMode = (String) jobConfig.getEnv().get("job.mode");
if (StringUtils.isEmpty(jobMode)) {
throw new ParamValidationException(
SeatunnelErrorEnum.PARAM_CAN_NOT_BE_NULL, "job.mode");
}
if (JobMode.BATCH.name().equals(jobMode)) {
jobReq.setJobType(BusinessMode.DATA_INTEGRATION);
} else if (JobMode.STREAM.name().equals(jobMode)) {
jobReq.setJobType(BusinessMode.DATA_REPLICA);
} else {
throw new ParamValidationException(
SeatunnelErrorEnum.INVALID_PARAM,
"job.mode",
"job.mode should be either BATCH or STREAM");
}
return jobReq;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -406,13 +406,18 @@ public void saveSingleTask(long jobVersionId, PluginConfig pluginConfig) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.SINGLE_TASK_CREATE, 0);
JobTask jobTask;
JobTask old = jobTaskDao.getTask(jobVersionId, pluginConfig.getPluginId());
String pluginId = pluginConfig.getPluginId();
try {
checkConfigFormat(pluginConfig.getConfig());
long id;
if (old != null) {
id = old.getId();
} else {
id = CodeGenerateUtils.getInstance().genCode();
pluginId =
pluginId == null
? String.valueOf(CodeGenerateUtils.getInstance().genCode())
: pluginId;
}
String connectorType;
String transformOptionsStr = null;
Expand All @@ -429,7 +434,7 @@ public void saveSingleTask(long jobVersionId, PluginConfig pluginConfig) {
jobTask =
JobTask.builder()
.id(id)
.pluginId(pluginConfig.getPluginId())
.pluginId(pluginId)
.name(pluginConfig.getName())
.type(pluginConfig.getType().name().toUpperCase())
.dataSourceId(pluginConfig.getDataSourceId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.server.common.ParamValidationException;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
import org.apache.seatunnel.server.common.SeatunnelException;

Expand Down Expand Up @@ -82,4 +83,9 @@ private Result<String> exceptionHandler(Exception e) {
private void logError(Throwable throwable) {
log.error(throwable.getMessage(), throwable);
}

@ExceptionHandler(value = ParamValidationException.class)
private Result<String> paramValidationHandler(SeatunnelException e) {
return Result.getFailure(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.server.common;

public class ParamValidationException extends SeatunnelException {
public ParamValidationException(SeatunnelErrorEnum e, Object... msg) {
super(e, msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public enum SeatunnelErrorEnum {
"datasource can not be delete because it used by task"),
INVALID_DATASOURCE(-70001, "Datasource [{0}] invalid", "datasource [{0}] invalid"),
MISSING_PARAM(1777000, "param miss [{0}]", "param miss [{0}]"),
PARAM_CAN_NOT_BE_NULL(60018, "", "param [%s] can not be null or empty"),
INVALID_PARAM(60019, "", "param [%s] is invalid. %s"),
;

private final int code;
Expand Down
Loading

0 comments on commit f45dacd

Please sign in to comment.