Skip to content

Commit

Permalink
[INLONG-9727][Manager] Support configuring the timezone for agent col…
Browse files Browse the repository at this point in the history
…lection addresses to streamSource (apache#9728)
  • Loading branch information
fuweng11 authored Feb 27, 2024
1 parent d66ff34 commit 1dc2d67
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class DataConfig {
private String syncPartitionKey;
private Integer state;
private String predefinedFields;
private String timeZone;
private String extParams;
/**
* The task version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class StreamSourceEntity implements Serializable {
private String serializationType;
private String snapshot;
private Date reportTime;
private String dataTimeZone;

// extParams saved filePath, fileRollingType, dbName, tableName, etc.
private String extParams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
<result column="serialization_type" jdbcType="VARCHAR" property="serializationType"/>
<result column="snapshot" jdbcType="LONGVARCHAR" property="snapshot"/>
<result column="report_time" jdbcType="TIMESTAMP" property="reportTime"/>
<result column="data_time_zone" jdbcType="VARCHAR" property="dataTimeZone"/>
<result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
<result column="version" jdbcType="INTEGER" property="version"/>
<result column="status" jdbcType="INTEGER" property="status"/>
Expand All @@ -48,23 +49,23 @@
<sql id="Base_Column_List">
id, inlong_group_id, inlong_stream_id, source_type, source_name, template_id, agent_ip, uuid,
data_node_name, inlong_cluster_name, inlong_cluster_node_group, serialization_type, snapshot, report_time,
ext_params, version, status, previous_status, is_deleted, creator, modifier, create_time, modify_time
data_time_zone, ext_params, version, status, previous_status, is_deleted, creator, modifier, create_time, modify_time
</sql>

<insert id="insert" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
insert into stream_source (inlong_group_id, inlong_stream_id,
source_type, source_name, template_id, agent_ip,
uuid, data_node_name, inlong_cluster_name, inlong_cluster_node_group,
serialization_type, snapshot,
report_time, ext_params, status,
serialization_type, snapshot, report_time,
data_time_zone, ext_params, status,
previous_status, creator, modifier)
values (#{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{templateId,jdbcType=INTEGER},
#{agentIp,jdbcType=VARCHAR}, #{uuid,jdbcType=VARCHAR}, #{dataNodeName,jdbcType=VARCHAR},
#{inlongClusterName,jdbcType=VARCHAR}, #{inlongClusterNodeGroup,jdbcType=VARCHAR},
#{serializationType,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR},
#{modifyTime,jdbcType=TIMESTAMP}, #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER},
#{serializationType,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR},#{modifyTime,jdbcType=TIMESTAMP},
#{dataTimeZone,jdbcType=VARCHAR}, #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER},
#{previousStatus,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
</insert>

Expand Down Expand Up @@ -400,6 +401,9 @@
<if test="reportTime != null">
report_time = #{reportTime,jdbcType=TIMESTAMP},
</if>
<if test="dataTimeZone != null">
data_time_zone = #{dataTimeZone,jdbcType=VARCHAR},
</if>
<if test="extParams != null">
ext_params = #{extParams,jdbcType=LONGVARCHAR},
</if>
Expand Down Expand Up @@ -437,6 +441,7 @@
serialization_type = #{serializationType,jdbcType=VARCHAR},
snapshot = #{snapshot,jdbcType=LONGVARCHAR},
report_time = #{reportTime,jdbcType=TIMESTAMP},
data_time_zone = #{dataTimeZone,jdbcType=VARCHAR},
ext_params = #{extParams,jdbcType=LONGVARCHAR},
version = #{version,jdbcType=INTEGER} + 1,
status = #{status,jdbcType=INTEGER},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public class SourceRequest {
@Length(min = 1, max = 163840, message = "length must be between 1 and 163840")
private String snapshot;

@ApiModelProperty("Data Time zone")
private String dataTimeZone;

@ApiModelProperty(value = "Whether to sync schema from source after saving or updating. Default is false")
private Boolean enableSyncSchema = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public abstract class StreamSource extends StreamNode {
@ApiModelProperty("Snapshot of this source task")
private String snapshot;

@ApiModelProperty("Data Time zone")
private String dataTimeZone;

@ApiModelProperty("Version")
private Integer version;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ private DataConfig getDataConfig(StreamSourceEntity entity, int op) {
dataConfig.setTaskType(getTaskType(entity));
dataConfig.setTaskName(entity.getSourceName());
dataConfig.setSnapshot(entity.getSnapshot());
dataConfig.setTimeZone(entity.getDataTimeZone());
dataConfig.setVersion(entity.getVersion());

String groupId = entity.getInlongGroupId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`serialization_type` varchar(20) DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc',
`snapshot` mediumtext DEFAULT NULL COMMENT 'Snapshot of this source task',
`report_time` timestamp NULL COMMENT 'Snapshot time',
`data_time_zone` varchar(256) DEFAULT NULL COMMENT 'Data time zone',
`ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string, such as filePath, dbName, tableName, etc',
`version` int(11) DEFAULT '1' COMMENT 'Stream source version',
`status` int(4) DEFAULT '110' COMMENT 'Stream source status',
Expand Down
1 change: 1 addition & 0 deletions inlong-manager/manager-web/sql/apache_inlong_manager.sql
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`serialization_type` varchar(20) DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc',
`snapshot` mediumtext DEFAULT NULL COMMENT 'Snapshot of this source task',
`report_time` timestamp NULL COMMENT 'Snapshot time',
`data_time_zone` varchar(256) DEFAULT NULL COMMENT 'Data time zone',
`ext_params` mediumtext DEFAULT NULL COMMENT 'Another fields will be saved as JSON string, such as filePath, dbName, tableName, etc',
`version` int(11) DEFAULT '1' COMMENT 'Stream source version',
`status` int(4) DEFAULT '110' COMMENT 'Stream source status',
Expand Down
28 changes: 28 additions & 0 deletions inlong-manager/manager-web/sql/changes-1.12.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- This is the SQL change file from version 1.9.0 to the current version 1.10.0.
-- When upgrading to version 1.10.0, please execute those SQLs in the DB (such as MySQL) used by the Manager module.

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

USE `apache_inlong_manager`;

ALTER TABLE `stream_source` ADD COLUMN `data_time_zone` varchar(256) DEFAULT NULL COMMENT 'Data time zone';


0 comments on commit 1dc2d67

Please sign in to comment.