Skip to content

Commit 3a7659d

Browse files
committed
修改支持 流 join 流 join 维表类型
1 parent fbdd7f5 commit 3a7659d

File tree

7 files changed

+171
-139
lines changed

7 files changed

+171
-139
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,12 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
181181
//sql-dimensional table contains the dimension table of execution
182182
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig);
183183
}else{
184+
System.out.println("----------exec sql without dimension join-----------" );
185+
System.out.println("----------real sql exec is--------------------------");
186+
System.out.println(result.getExecSql());
184187
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
185188
if(LOG.isInfoEnabled()){
189+
System.out.println();
186190
LOG.info("exec sql: " + result.getExecSql());
187191
}
188192
}

core/src/main/java/com/dtstack/flink/sql/side/ParserJoinField.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ public static List<FieldInfo> getRowTypeInfo(SqlNode sqlNode, JoinScope scope, b
6565
for(SqlNode fieldNode : sqlNodeList.getList()){
6666
SqlIdentifier identifier = (SqlIdentifier)fieldNode;
6767
if(!identifier.isStar()) {
68-
System.out.println(identifier);
6968
String tableName = identifier.getComponent(0).getSimple();
7069
String fieldName = identifier.getComponent(1).getSimple();
7170
TypeInformation<?> type = scope.getFieldType(tableName, fieldName);
@@ -76,7 +75,6 @@ public static List<FieldInfo> getRowTypeInfo(SqlNode sqlNode, JoinScope scope, b
7675
fieldInfoList.add(fieldInfo);
7776
} else {
7877
//处理
79-
System.out.println("----------");
8078
int identifierSize = identifier.names.size();
8179

8280
switch(identifierSize) {

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 73 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.config.CalciteConfig;
24-
import com.google.common.base.Strings;
24+
import com.dtstack.flink.sql.util.TableUtils;
25+
import com.google.common.base.Preconditions;
2526
import com.google.common.collect.Lists;
2627
import com.google.common.collect.Maps;
2728
import com.google.common.collect.Queues;
@@ -73,15 +74,14 @@ public class SideSQLParser {
7374
private static final Logger LOG = LoggerFactory.getLogger(SideSQLParser.class);
7475

7576
private Map<String, Table> localTableCache = Maps.newHashMap();
76-
private final char SPLIT = '_';
7777

78-
//regular joins(不带时间窗口) 不允许查询出rowtime或者proctime
79-
private final String SELECT_TEMP_SQL = "select %s from %s %s";
78+
//用来构建临时的中间查询
79+
private static final String SELECT_TEMP_SQL = "select %s from %s %s";
8080

8181
public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws SqlParseException {
82-
System.out.println("---exeSql---");
82+
System.out.println("----------exec original Sql----------");
8383
System.out.println(exeSql);
84-
LOG.info("---exeSql---");
84+
LOG.info("----------exec original Sql----------");
8585
LOG.info(exeSql);
8686

8787
Queue<Object> queueInfo = Queues.newLinkedBlockingQueue();
@@ -160,7 +160,7 @@ private Object parseSql(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object>
160160
if(sqlFrom.getKind() != IDENTIFIER){
161161
Object result = parseSql(sqlFrom, sideTableSet, queueInfo, sqlWhere, selectList);
162162
if(result instanceof JoinInfo){
163-
return dealSelectResultWithJoinInfo((JoinInfo) result, (SqlSelect) sqlNode, queueInfo);
163+
return TableUtils.dealSelectResultWithJoinInfo((JoinInfo) result, (SqlSelect) sqlNode, queueInfo);
164164
}else if(result instanceof AliasInfo){
165165
String tableName = ((AliasInfo) result).getName();
166166
if(sideTableSet.contains(tableName)){
@@ -226,11 +226,6 @@ private AliasInfo getSqlNodeAliasInfo(SqlNode sqlNode) {
226226
private void convertSideJoinToNewQuery(SqlJoin sqlNode, Set<String> sideTableSet) {
227227
checkAndReplaceMultiJoin(sqlNode.getLeft(), sideTableSet);
228228
checkAndReplaceMultiJoin(sqlNode.getRight(), sideTableSet);
229-
230-
AliasInfo rightTableAliasInfo = getSqlNodeAliasInfo(sqlNode.getRight());
231-
if(sideTableSet.contains(rightTableAliasInfo.getName())){
232-
//构建新的查询
233-
}
234229
}
235230

236231
private SqlBasicCall buildAsSqlNode(String internalTableName, SqlNode newSource) {
@@ -243,6 +238,15 @@ private SqlBasicCall buildAsSqlNode(String internalTableName, SqlNode newSource)
243238
return new SqlBasicCall(operator, sqlNodes, sqlParserPos);
244239
}
245240

241+
/**
242+
* 解析 join 操作
243+
* @param joinNode
244+
* @param sideTableSet
245+
* @param queueInfo
246+
* @param parentWhere
247+
* @param parentSelectList
248+
* @return
249+
*/
246250
private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<Object> queueInfo,
247251
SqlNode parentWhere, SqlNodeList parentSelectList) {
248252
SqlNode leftNode = joinNode.getLeft();
@@ -255,35 +259,36 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
255259
String rightTableAlias = "";
256260
boolean leftTbisTmp = false;
257261

258-
Tuple2<String, String> rightTableNameAndAlias = null;
262+
//如果是连续join 判断是否已经处理过添加到执行队列
263+
Boolean alreadyOffer = false;
264+
259265
if(leftNode.getKind() == IDENTIFIER){
260266
leftTbName = leftNode.toString();
261267
} else if (leftNode.getKind() == JOIN) {
262268
//处理连续join
263-
SqlBasicCall sqlBasicCall = dealNestJoin((SqlJoin) leftNode, sideTableSet, queueInfo, parentWhere, parentSelectList);
264-
leftTbName = sqlBasicCall.getOperands()[0].toString();
265-
leftTbAlias = sqlBasicCall.getOperands()[1].toString();
269+
Tuple2<Boolean, SqlBasicCall> nestJoinResult = dealNestJoin((SqlJoin) leftNode, sideTableSet, queueInfo, parentWhere, parentSelectList);
270+
alreadyOffer = nestJoinResult.f0;
271+
leftTbName = nestJoinResult.f1.getOperands()[0].toString();
272+
leftTbAlias = nestJoinResult.f1.getOperands()[1].toString();
266273
leftTbisTmp = true;
267274
} else if (leftNode.getKind() == AS) {
268275
AliasInfo aliasInfo = (AliasInfo) parseSql(leftNode, sideTableSet, queueInfo, parentWhere, parentSelectList);
269276
leftTbName = aliasInfo.getName();
270277
leftTbAlias = aliasInfo.getAlias();
271278

272279
} else {
273-
throw new RuntimeException("---not deal---");
280+
throw new RuntimeException(String.format("---not deal node with type %s", leftNode.getKind().toString()));
274281
}
275282

276283
boolean leftIsSide = checkIsSideTable(leftTbName, sideTableSet);
277-
if(leftIsSide){
278-
throw new RuntimeException("side-table must be at the right of join operator");
279-
}
284+
Preconditions.checkState(!leftIsSide, "side-table must be at the right of join operator");
280285

281-
rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo, parentWhere, parentSelectList);
286+
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo, parentWhere, parentSelectList);
282287
rightTableName = rightTableNameAndAlias.f0;
283288
rightTableAlias = rightTableNameAndAlias.f1;
284289

285290
boolean rightIsSide = checkIsSideTable(rightTableName, sideTableSet);
286-
if(joinType == JoinType.RIGHT){
291+
if(rightIsSide && joinType == JoinType.RIGHT){
287292
throw new RuntimeException("side join not support join type of right[current support inner join and left join]");
288293
}
289294

@@ -303,51 +308,22 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
303308
}
304309

305310
tableInfo.setLeftIsTmpTable(leftTbisTmp);
306-
307311
tableInfo.setLeftIsSideTable(leftIsSide);
308312
tableInfo.setRightIsSideTable(rightIsSide);
309313
tableInfo.setLeftNode(leftNode);
310314
tableInfo.setRightNode(rightNode);
311315
tableInfo.setJoinType(joinType);
312316
tableInfo.setCondition(joinNode.getCondition());
313317

314-
//TODO 抽取
315-
if(tableInfo.getLeftNode().getKind() != AS){
316-
//build 临时中间查询
317-
try{
318-
//父一级的where 条件中如果只和临时查询相关的条件都截取进来
319-
Set<String> fromTableNameSet = Sets.newHashSet();
320-
List<SqlBasicCall> extractCondition = Lists.newArrayList();
321-
322-
getFromTableInfo(tableInfo.getLeftNode(), fromTableNameSet);
323-
checkAndRemoveCondition(fromTableNameSet, (SqlBasicCall) parentWhere, extractCondition);
324-
325-
//TODO 查询的字段需要根据最上层的字段中获取,而不是直接设置为*,当然如果上一层就是*另说
326-
327-
List<String> extractSelectField = extractSelectList(parentSelectList, fromTableNameSet);
328-
String extractSelectFieldStr = buildSelectNode(extractSelectField);
329-
String extractConditionStr = buildCondition(extractCondition);
330-
331-
String tmpSelectSql = String.format(SELECT_TEMP_SQL,
332-
extractSelectFieldStr,
333-
tableInfo.getLeftNode().toString(),
334-
extractConditionStr);
335-
336-
SqlParser sqlParser = SqlParser.create(tmpSelectSql, CalciteConfig.MYSQL_LEX_CONFIG);
337-
SqlNode sqlNode = sqlParser.parseStmt();
338-
SqlBasicCall sqlBasicCall = buildAsSqlNode(tableInfo.getLeftTableAlias(), sqlNode);
339-
queueInfo.offer(sqlBasicCall);
340-
341-
//TODO 打印合适的提示
342-
System.out.println(tmpSelectSql);
343-
}catch (Exception e){
344-
e.printStackTrace();
345-
throw new RuntimeException(e);
346-
}
318+
if(!rightIsSide || alreadyOffer){
319+
return tableInfo;
320+
}
347321

322+
if(tableInfo.getLeftNode().getKind() != AS){
323+
extractTemporaryQuery(tableInfo.getLeftNode(), tableInfo.getLeftTableAlias(), (SqlBasicCall) parentWhere, parentSelectList, queueInfo);
348324
}else {
349-
SqlKind asFirstKind = ((SqlBasicCall)tableInfo.getLeftNode()).operands[0].getKind();
350-
if(asFirstKind == SELECT){
325+
SqlKind asNodeFirstKind = ((SqlBasicCall)tableInfo.getLeftNode()).operands[0].getKind();
326+
if(asNodeFirstKind == SELECT){
351327
queueInfo.offer(tableInfo.getLeftNode());
352328
tableInfo.setLeftNode(((SqlBasicCall)tableInfo.getLeftNode()).operands[1]);
353329
}
@@ -356,23 +332,24 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
356332
}
357333

358334
//构建新的查询
359-
private SqlBasicCall dealNestJoin(SqlJoin joinNode, Set<String> sideTableSet, Queue<Object> queueInfo, SqlNode parentWhere, SqlNodeList selectList){
335+
private Tuple2<Boolean, SqlBasicCall> dealNestJoin(SqlJoin joinNode, Set<String> sideTableSet, Queue<Object> queueInfo, SqlNode parentWhere, SqlNodeList selectList){
360336
SqlNode rightNode = joinNode.getRight();
361-
362337
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo, parentWhere, selectList);
363-
364338
JoinInfo joinInfo = dealJoinNode(joinNode, sideTableSet, queueInfo, parentWhere, selectList);
365339

366340
String rightTableName = rightTableNameAndAlias.f0;
367341
boolean rightIsSide = checkIsSideTable(rightTableName, sideTableSet);
342+
boolean alreadyOffer = false;
343+
368344
if(!rightIsSide){
369345
//右表不是维表的情况
370346
}else{
371347
//右边表是维表需要重新构建左表的临时查询
372348
queueInfo.offer(joinInfo);
349+
alreadyOffer = true;
373350
}
374351

375-
return buildAsNodeByJoinInfo(joinInfo, null, null);
352+
return Tuple2.of(alreadyOffer, TableUtils.buildAsNodeByJoinInfo(joinInfo, null, null));
376353
}
377354

378355
public boolean checkAndRemoveCondition(Set<String> fromTableNameSet, SqlBasicCall parentWhere, List<SqlBasicCall> extractContition){
@@ -402,6 +379,40 @@ public boolean checkAndRemoveCondition(Set<String> fromTableNameSet, SqlBasicCal
402379
}
403380
}
404381

382+
private void extractTemporaryQuery(SqlNode node, String tableAlias, SqlBasicCall parentWhere,
383+
SqlNodeList parentSelectList, Queue<Object> queueInfo){
384+
try{
385+
//父一级的where 条件中如果只和临时查询相关的条件都截取进来
386+
Set<String> fromTableNameSet = Sets.newHashSet();
387+
List<SqlBasicCall> extractCondition = Lists.newArrayList();
388+
389+
getFromTableInfo(node, fromTableNameSet);
390+
checkAndRemoveCondition(fromTableNameSet, parentWhere, extractCondition);
391+
392+
List<String> extractSelectField = extractSelectList(parentSelectList, fromTableNameSet);
393+
String extractSelectFieldStr = buildSelectNode(extractSelectField);
394+
String extractConditionStr = buildCondition(extractCondition);
395+
396+
String tmpSelectSql = String.format(SELECT_TEMP_SQL,
397+
extractSelectFieldStr,
398+
node.toString(),
399+
extractConditionStr);
400+
401+
SqlParser sqlParser = SqlParser.create(tmpSelectSql, CalciteConfig.MYSQL_LEX_CONFIG);
402+
SqlNode sqlNode = sqlParser.parseStmt();
403+
SqlBasicCall sqlBasicCall = buildAsSqlNode(tableAlias, sqlNode);
404+
queueInfo.offer(sqlBasicCall);
405+
406+
//TODO 打印合适的提示
407+
System.out.println("-------build temporary query-----------");
408+
System.out.println(tmpSelectSql);
409+
System.out.println("---------------------------------------");
410+
}catch (Exception e){
411+
e.printStackTrace();
412+
throw new RuntimeException(e);
413+
}
414+
}
415+
405416
/**
406417
* 抽取上层需用使用到的字段
407418
* 由于where字段已经抽取到上一层了所以不用查询出来
@@ -568,71 +579,6 @@ public SqlBasicCall buildDefaultCondition(){
568579
}
569580

570581

571-
/**
572-
*
573-
* @param joinInfo
574-
* @param sqlNode
575-
* @param queueInfo
576-
* @return 两个边关联后的新表表名
577-
*/
578-
private String dealSelectResultWithJoinInfo(JoinInfo joinInfo, SqlSelect sqlNode, Queue<Object> queueInfo) {
579-
//SideJoinInfo rename
580-
if (joinInfo.checkIsSide()) {
581-
joinInfo.setSelectFields(sqlNode.getSelectList());
582-
joinInfo.setSelectNode(sqlNode);
583-
if (joinInfo.isRightIsSideTable()) {
584-
//Analyzing left is not a simple table
585-
if (joinInfo.getLeftNode().getKind() == SELECT) {
586-
queueInfo.offer(joinInfo.getLeftNode());
587-
}
588-
589-
queueInfo.offer(joinInfo);
590-
} else {
591-
//Determining right is not a simple table
592-
if (joinInfo.getRightNode().getKind() == SELECT) {
593-
queueInfo.offer(joinInfo.getLeftNode());
594-
}
595-
596-
queueInfo.offer(joinInfo);
597-
}
598-
replaceFromNodeForJoin(joinInfo, sqlNode);
599-
return joinInfo.getNewTableName();
600-
}
601-
return "";
602-
}
603-
604-
private void replaceFromNodeForJoin(JoinInfo joinInfo, SqlSelect sqlNode) {
605-
//Update from node
606-
SqlBasicCall sqlBasicCall = buildAsNodeByJoinInfo(joinInfo, null, null);
607-
sqlNode.setFrom(sqlBasicCall);
608-
}
609-
610-
private SqlBasicCall buildAsNodeByJoinInfo(JoinInfo joinInfo, SqlNode sqlNode0, String tableAlias) {
611-
SqlOperator operator = new SqlAsOperator();
612-
613-
SqlParserPos sqlParserPos = new SqlParserPos(0, 0);
614-
String joinLeftTableName = joinInfo.getLeftTableName();
615-
String joinLeftTableAlias = joinInfo.getLeftTableAlias();
616-
joinLeftTableName = Strings.isNullOrEmpty(joinLeftTableName) ? joinLeftTableAlias : joinLeftTableName;
617-
String newTableName = buildInternalTableName(joinLeftTableName, SPLIT, joinInfo.getRightTableName());
618-
String newTableAlias = !StringUtils.isEmpty(tableAlias) ? tableAlias : buildInternalTableName(joinInfo.getLeftTableAlias(), SPLIT, joinInfo.getRightTableAlias());
619-
620-
if (null == sqlNode0) {
621-
sqlNode0 = new SqlIdentifier(newTableName, null, sqlParserPos);
622-
}
623-
624-
SqlIdentifier sqlIdentifierAlias = new SqlIdentifier(newTableAlias, null, sqlParserPos);
625-
SqlNode[] sqlNodes = new SqlNode[2];
626-
sqlNodes[0] = sqlNode0;
627-
sqlNodes[1] = sqlIdentifierAlias;
628-
return new SqlBasicCall(operator, sqlNodes, sqlParserPos);
629-
}
630-
631-
private String buildInternalTableName(String left, char split, String right) {
632-
StringBuilder sb = new StringBuilder();
633-
return sb.append(left).append(split).append(right).toString();
634-
}
635-
636582
private boolean checkIsSideTable(String tableName, Set<String> sideTableList){
637583
if(sideTableList.contains(tableName)){
638584
return true;

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
140140
tableEnv.registerTable(aliasInfo.getAlias(), table);
141141
localTableCache.put(aliasInfo.getAlias(), table);
142142

143-
//TODO 解析出as查询的表和字段的关系
144143
FieldReplaceInfo fieldReplaceInfo = parseAsQuery((SqlBasicCall) pollSqlNode, tableCache);
145144
if(fieldReplaceInfo != null){
146145
replaceInfoList.add(fieldReplaceInfo);
@@ -160,8 +159,12 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
160159

161160
}
162161

163-
//TODO
164-
//FIXME 如果和create view 的名称命名相同
162+
/**
163+
* 解析出as查询的表和字段的关系
164+
* @param asSqlNode
165+
* @param tableCache
166+
* @return
167+
*/
165168
private FieldReplaceInfo parseAsQuery(SqlBasicCall asSqlNode, Map<String, Table> tableCache){
166169
SqlNode info = asSqlNode.getOperands()[0];
167170
SqlNode alias = asSqlNode.getOperands()[1];
@@ -172,7 +175,6 @@ private FieldReplaceInfo parseAsQuery(SqlBasicCall asSqlNode, Map<String, Table>
172175
}
173176

174177
List<FieldInfo> extractFieldList = TableUtils.parserSelectField((SqlSelect) info, tableCache);
175-
System.out.println(extractFieldList);
176178

177179
HashBasedTable<String, String, String> mappingTable = HashBasedTable.create();
178180
for (FieldInfo fieldInfo : extractFieldList) {
@@ -190,7 +192,12 @@ private FieldReplaceInfo parseAsQuery(SqlBasicCall asSqlNode, Map<String, Table>
190192
}
191193

192194

193-
//TODO
195+
/**
196+
* 添加字段别名
197+
* @param pollSqlNode
198+
* @param fieldList
199+
* @param mappingTable
200+
*/
194201
private void addAliasForFieldNode(SqlNode pollSqlNode, List<String> fieldList, HashBasedTable<String, String, String> mappingTable) {
195202
SqlKind sqlKind = pollSqlNode.getKind();
196203
switch (sqlKind) {

0 commit comments

Comments
 (0)