Skip to content

Commit

Permalink
[INLONG-9606][Manager] Fix the problem of incorrect flow status when …
Browse files Browse the repository at this point in the history
…cls sink configuration fails (apache#9607)
  • Loading branch information
fuweng11 authored Jan 23, 2024
1 parent deef029 commit 0709547
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.tencentcloudapi.cls.v20201016.ClsClient;
import com.tencentcloudapi.cls.v20201016.models.DescribeTopicsRequest;
import com.tencentcloudapi.common.Credential;
import com.tencentcloudapi.common.exception.TencentCloudSDKException;
import com.tencentcloudapi.common.profile.ClientProfile;
import com.tencentcloudapi.common.profile.HttpProfile;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -99,7 +98,7 @@ public Boolean testConnection(DataNodeRequest request) {
DescribeTopicsRequest req = new DescribeTopicsRequest();
try {
client.DescribeTopics(req);
} catch (TencentCloudSDKException e) {
} catch (Exception e) {
String errMsg = String.format("connect tencent cloud error endPoint = %s secretId = %s secretKey = %s",
dataNodeRequest.getEndpoint(), dataNodeRequest.getManageSecretId(),
dataNodeRequest.getManageSecretKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import com.tencentcloudapi.cls.v20201016.models.Tag;
import com.tencentcloudapi.cls.v20201016.models.TopicInfo;
import com.tencentcloudapi.common.Credential;
import com.tencentcloudapi.common.exception.TencentCloudSDKException;
import com.tencentcloudapi.common.profile.ClientProfile;
import com.tencentcloudapi.common.profile.HttpProfile;
import org.apache.commons.lang3.ArrayUtils;
Expand All @@ -65,7 +64,7 @@ public class ClsOperator {

public String createTopicReturnTopicId(String topicName, String logSetId, String tag, Integer storageDuration,
String secretId, String secretKey, String region)
throws TencentCloudSDKException {
throws Exception {
ClsClient client = getClsClient(secretId, secretKey, region);
CreateTopicRequest req = getCreateTopicRequest(tag, logSetId, topicName, storageDuration);
CreateTopicResponse resp = client.CreateTopic(req);
Expand All @@ -76,7 +75,7 @@ public String createTopicReturnTopicId(String topicName, String logSetId, String
}

public void updateTopicTag(String topicId, String tag, String secretId, String secretKey, String region)
throws TencentCloudSDKException {
throws Exception {
ClsClient client = getClsClient(secretId, secretKey, region);
ModifyTopicRequest modifyTopicRequest = new ModifyTopicRequest();
modifyTopicRequest.setTags(convertTags(tag.split(InlongConstants.CENTER_LINE)));
Expand Down Expand Up @@ -109,7 +108,7 @@ public void createTopicIndex(String tokenizer, String topicId, String secretId,
CreateIndexResponse createIndexResponse = clsClient.CreateIndex(req);
LOG.debug("create index success for topic = {}, tokenizer = {}, requestId = {}", topicId,
tokenizer, createIndexResponse.getRequestId());
} catch (TencentCloudSDKException e) {
} catch (Exception e) {
String errMsg = "Create cls topic index failed: " + e.getMessage();
LOG.error(errMsg, e);
throw new BusinessException(errMsg);
Expand All @@ -134,7 +133,7 @@ public String describeTopicIDByTopicName(String topicName, String logSetId, Stri
return topics[0].getTopicId();
}
return null;
} catch (TencentCloudSDKException e) {
} catch (Exception e) {
String errMsg = "describe cls topic failed: " + e.getMessage();
LOG.error(errMsg, e);
throw new BusinessException(errMsg);
Expand Down Expand Up @@ -165,7 +164,7 @@ public FullTextInfo getTopicIndexFullText(String secretId, String secretKey, Str
try {
DescribeIndexResponse resp = clsClient.DescribeIndex(req);
return resp.getRule() == null ? null : resp.getRule().getFullText();
} catch (TencentCloudSDKException e) {
} catch (Exception e) {
String errMsg = "describe cls topic index failed: " + e.getMessage();
LOG.error(errMsg, e);
throw new BusinessException(errMsg);
Expand All @@ -186,7 +185,7 @@ public void updateTopicIndex(String tokenizer, String topicId, String secretId,
ModifyIndexResponse modifyIndexResponse = clsClient.ModifyIndex(req);
LOG.debug("update index success for topicId = {}, tokenizer = {}, requestId = {}", topicId, tokenizer,
modifyIndexResponse.getRequestId());
} catch (TencentCloudSDKException e) {
} catch (Exception e) {
String errMsg = "update cls topic index failed: " + e.getMessage();
LOG.error(errMsg, e);
throw new BusinessException(errMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;

import com.tencentcloudapi.common.exception.TencentCloudSDKException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -93,7 +92,7 @@ private void createClsResource(SinkInfo sinkInfo) {
sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
LOG.info("update cls info status success for sinkId= {}, topicName = {}", sinkInfo.getSinkName(),
clsSinkDTO.getTopicName());
} catch (TencentCloudSDKException e) {
} catch (Exception e) {
String errMsg = "Create cls topic failed: " + e.getMessage();
LOG.error(errMsg, e);
sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg);
Expand All @@ -102,7 +101,7 @@ private void createClsResource(SinkInfo sinkInfo) {
}

private String getTopicID(ClsDataNodeDTO clsDataNode, ClsSinkDTO clsSinkDTO)
throws TencentCloudSDKException {
throws Exception {
String topicId = clsOperator.describeTopicIDByTopicName(clsSinkDTO.getTopicName(), clsDataNode.getLogSetId(),
clsDataNode.getManageSecretId(), clsDataNode.getManageSecretKey(),
clsDataNode.getRegion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.shiro.authz.annotation.Logical;
import org.apache.shiro.authz.annotation.RequiresRoles;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
Expand Down Expand Up @@ -158,7 +159,7 @@ public Response<Boolean> deleteTenantTag(@PathVariable Integer id) {
@PostMapping(value = "/cluster/save")
@ApiOperation(value = "Save cluster")
@OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.CLUSTER)
@RequiresRoles(value = UserRoleCode.TENANT_ADMIN)
@RequiresRoles(logical = Logical.OR, value = {UserRoleCode.INLONG_ADMIN, UserRoleCode.TENANT_ADMIN})
public Response<Integer> save(@Validated(SaveValidation.class) @RequestBody ClusterRequest request) {
String currentUser = LoginUserUtils.getLoginUser().getName();
return Response.success(clusterService.save(request, currentUser));
Expand Down

0 comments on commit 0709547

Please sign in to comment.