From 90b0b2f4e46e2a8da9b52be063ae434d602b6bc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=93=81=E6=9F=B1?= Date: Fri, 20 Nov 2020 15:08:44 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E5=90=88=E5=B9=B6?= =?UTF-8?q?=E9=80=A0=E6=88=90=E7=9A=84=E5=86=B2=E7=AA=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../flink/sql/side/rdb/all/AbstractRdbAllReqRow.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java index 28b6ca3b3..9cd1018d0 100644 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java +++ b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java @@ -29,20 +29,19 @@ import org.apache.calcite.sql.JoinType; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.dataformat.BaseRow; -import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.*; -import java.time.LocalDateTime; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Calendar; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; From b42a13a249c17414663a7f177ea8b5d7c44504b3 Mon Sep 17 00:00:00 2001 From: jianlan519588 <52684564+jianlan519588@users.noreply.github.com> Date: Fri, 11 Dec 2020 19:45:35 +0800 Subject: [PATCH 2/3] =?UTF-8?q?2020-12-11=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 源文档中缺少双引号 --- docs/quickStart.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/quickStart.md b/docs/quickStart.md index a5bf1fa9a..5ad03d884 100644 --- a/docs/quickStart.md +++ b/docs/quickStart.md @@ -65,7 +65,7 @@ sh submit.sh -yarnconf F:\dtstack\flinkStreamSql\localhost\hadoop -flinkJarPath F:\Java\flink-1.8.2-bin-scala_2.12\flink-1.8.2\lib -pluginLoadMode shipfile - -confProp {\"time.characteristic\":\"eventTime\",\"logLevel\":\"info\"} + -confProp "{\"time.characteristic\":\"eventTime\",\"logLevel\":\"info\"}" ``` #### yarn模式命令 @@ -95,7 +95,7 @@ sh submit.sh -yarnconf /home/wen/Desktop/flink_stream_sql_conf/yarnConf_node1 -flinkJarPath /home/wen/Desktop/dtstack/flink-1.8.1/lib -pluginLoadMode shipfile - -confProp {\"time.characteristic\":\"eventTime\",\"logLevel\":\"info\"} + -confProp "{\"time.characteristic\":\"eventTime\",\"logLevel\":\"info\"}" -queue c ``` 参数具体细节请看[命令参数说明](./config.md) From d11e8f282430ab8f11e6d15b9f8f633d7181719a Mon Sep 17 00:00:00 2001 From: Ada Wong Date: Wed, 10 Nov 2021 17:04:55 +0800 Subject: [PATCH 3/3] Delete AbstractRdbAllReqRow.java --- .../side/rdb/all/AbstractRdbAllReqRow.java | 208 ------------------ 1 file changed, 208 deletions(-) delete mode 100644 rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java deleted file mode 100644 index 9cd1018d0..000000000 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.flink.sql.side.rdb.all; - -import com.dtstack.flink.sql.side.BaseAllReqRow; -import com.dtstack.flink.sql.side.BaseSideInfo; -import com.dtstack.flink.sql.side.rdb.table.RdbSideTableInfo; -import com.dtstack.flink.sql.side.rdb.util.SwitchUtil; -import com.dtstack.flink.sql.util.RowDataComplete; -import com.dtstack.flink.sql.util.RowDataConvert; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.calcite.sql.JoinType; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.dataformat.BaseRow; -import org.apache.flink.types.Row; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - -/** - * side operator with cache for all(period reload) - * Date: 2018/11/26 - * Company: www.dtstack.com - * - * @author maqi - */ - -public abstract class AbstractRdbAllReqRow extends BaseAllReqRow { - - private static final long serialVersionUID = 2098635140857937718L; - - private static final Logger LOG = LoggerFactory.getLogger(AbstractRdbAllReqRow.class); - - private static final int CONN_RETRY_NUM = 3; - - private static final int DEFAULT_FETCH_SIZE = 1000; - - private AtomicReference>>> cacheRef = new AtomicReference<>(); - - public AbstractRdbAllReqRow(BaseSideInfo sideInfo) { - super(sideInfo); - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - RdbSideTableInfo tableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo(); - LOG.info("rdb dim table config info: {} ", tableInfo.toString()); - } - - @Override - protected void initCache() throws SQLException { - Map>> newCache = Maps.newConcurrentMap(); - cacheRef.set(newCache); - loadData(newCache); - } - - @Override - protected void reloadCache() { - //reload cacheRef and replace to old cacheRef - Map>> newCache = Maps.newConcurrentMap(); - try { - loadData(newCache); - } catch (SQLException e) { - throw new RuntimeException(e); - } - cacheRef.set(newCache); - LOG.info("----- rdb all cacheRef reload end:{}", Calendar.getInstance()); - } - - @Override - public void flatMap(Row value, Collector out) throws Exception { - List equalValIndex = sideInfo.getEqualValIndex(); - ArrayList inputParams = equalValIndex.stream() - .map(value::getField) - .filter(Objects::nonNull) - .collect(Collectors.toCollection(ArrayList::new)); - - if (inputParams.size() != equalValIndex.size() && sideInfo.getJoinType() == JoinType.LEFT) { - Row row = fillData(value, null); - RowDataComplete.collectRow(out, row); - return; - } - - String cacheKey = inputParams.stream() - .map(Object::toString) - .collect(Collectors.joining("_")); - - List> cacheList = cacheRef.get().get(cacheKey); - if (CollectionUtils.isEmpty(cacheList) && sideInfo.getJoinType() == JoinType.LEFT) { - Row row = fillData(value, null); - RowDataComplete.collectRow(out, row); - } else if (!CollectionUtils.isEmpty(cacheList)) { - cacheList.forEach(one -> out.collect(RowDataConvert.convertToBaseRow(fillData(value, one)))); - } - } - - private void loadData(Map>> tmpCache) throws SQLException { - RdbSideTableInfo tableInfo = (RdbSideTableInfo) sideInfo.getSideTableInfo(); - Connection connection = null; - - try { - for (int i = 0; i < CONN_RETRY_NUM; i++) { - try { - connection = getConn(tableInfo.getUrl(), tableInfo.getUserName(), tableInfo.getPassword()); - break; - } catch (Exception e) { - if (i == CONN_RETRY_NUM - 1) { - throw new RuntimeException("", e); - } - try { - String connInfo = "url:" + tableInfo.getUrl() + ";userName:" + tableInfo.getUserName() + ",pwd:" + tableInfo.getPassword(); - LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo); - Thread.sleep(5 * 1000); - } catch (InterruptedException e1) { - LOG.error("", e1); - } - } - } - queryAndFillData(tmpCache, connection); - } catch (Exception e) { - LOG.error("", e); - throw new SQLException(e); - } finally { - if (connection != null) { - connection.close(); - } - } - } - - private void queryAndFillData(Map>> tmpCache, Connection connection) throws SQLException { - //load data from table - String sql = sideInfo.getSqlCondition(); - Statement statement = connection.createStatement(); - statement.setFetchSize(getFetchSize()); - ResultSet resultSet = statement.executeQuery(sql); - - String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields(), ","); - String[] sideFieldTypes = sideInfo.getSideTableInfo().getFieldTypes(); - Map sideFieldNamesAndTypes = Maps.newHashMap(); - for (int i = 0; i < sideFieldNames.length; i++) { - sideFieldNamesAndTypes.put(sideFieldNames[i], sideFieldTypes[i]); - } - - while (resultSet.next()) { - Map oneRow = Maps.newHashMap(); - for (String fieldName : sideFieldNames) { - Object object = resultSet.getObject(fieldName.trim()); - object = SwitchUtil.getTarget(object, sideFieldNamesAndTypes.get(fieldName)); - oneRow.put(fieldName.trim(), object); - } - - String cacheKey = sideInfo.getEqualFieldList().stream() - .map(oneRow::get) - .map(Object::toString) - .collect(Collectors.joining("_")); - - tmpCache.computeIfAbsent(cacheKey, key -> Lists.newArrayList()) - .add(oneRow); - } - } - - public int getFetchSize() { - return DEFAULT_FETCH_SIZE; - } - - /** - * get jdbc connection - * - * @param dbURL - * @param userName - * @param password - * @return - */ - public abstract Connection getConn(String dbURL, String userName, String password); - -}