Skip to content

Commit aa08cef

Browse files
committed
Merge branch '1.10_release_4.0.x' into 1.10_release
2 parents 16e15fc + 25d59f6 commit aa08cef

File tree

16 files changed

+197
-66
lines changed

16 files changed

+197
-66
lines changed

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ private static void sqlTranslation(String localSqlPluginPath,
227227
}
228228
if (isSide) {
229229
//sql-dimensional table contains the dimension table of execution
230-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, null, null);
230+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, null, String.valueOf(scope));
231231
} else {
232232
LOG.info("----------exec sql without dimension join-----------");
233233
LOG.info("----------real sql exec is--------------------------\n{}", result.getExecSql());
@@ -317,7 +317,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
317317
URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), AbstractTargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
318318
pluginClassPathSets.add(sinkTablePathUrl);
319319
} else if (tableInfo instanceof AbstractSideTableInfo) {
320-
String sideOperator = ECacheType.ALL.name().equals(((AbstractSideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
320+
String sideOperator = ECacheType.ALL.name().equalsIgnoreCase(((AbstractSideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
321321
sideTableMap.put(tableInfo.getName(), (AbstractSideTableInfo) tableInfo);
322322

323323
URL sideTablePathUrl = PluginUtil.buildSidePathByLoadMode(tableInfo.getType(), sideOperator, AbstractSideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class JoinInfo implements Serializable {
6969

7070
private String scope = "";
7171

72+
private String newTableName = null;
73+
7274
/**
7375
* 左表需要查询的字段信息和output的时候对应的列名称
7476
*/
@@ -96,13 +98,12 @@ public String getNonSideTable(){
9698
}
9799

98100
public String getNewTableName(){
99-
//兼容左边表是as 的情况
100-
String leftStr = leftTableName;
101-
leftStr = Strings.isNullOrEmpty(leftStr) ? leftTableAlias : leftStr;
102-
String newName = leftStr + "_" + rightTableName;
103-
return TableUtils.buildTableNameWithScope(newName, scope);
101+
return this.newTableName;
104102
}
105103

104+
public void setNewTableName(String newTableName){
105+
this.newTableName = newTableName;
106+
}
106107

107108
public String getNewTableAlias(){
108109
String newName = leftTableAlias + "_" + rightTableAlias;

core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java

+14-11
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
9393
Set<Tuple2<String, String>> joinFieldSet,
9494
Map<String, String> tableRef,
9595
Map<String, String> fieldRef,
96-
String scope) {
96+
String scope,
97+
Set<String> joinTableNames) {
9798

9899
SqlNode leftNode = joinNode.getLeft();
99100
SqlNode rightNode = joinNode.getRight();
@@ -110,13 +111,13 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
110111
if (leftNode.getKind() == JOIN) {
111112
//处理连续join
112113
dealNestJoin(joinNode, sideTableSet, queueInfo, parentWhere, parentSelectList,
113-
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
114+
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope, joinTableNames);
114115
leftNode = joinNode.getLeft();
115116
}
116117

117118
if (leftNode.getKind() == AS) {
118119
AliasInfo aliasInfo = (AliasInfo) sideSQLParser.parseSql(leftNode, sideTableSet, queueInfo,
119-
parentWhere, parentSelectList, parentGroupByList, scope);
120+
parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
120121
leftTbName = aliasInfo.getName();
121122
leftTbAlias = aliasInfo.getAlias();
122123
} else if(leftNode.getKind() == IDENTIFIER){
@@ -128,7 +129,7 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
128129
Preconditions.checkState(!leftIsSide, "side-table must be at the right of join operator");
129130

130131
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo,
131-
parentWhere, parentSelectList, parentGroupByList, scope);
132+
parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
132133
rightTableName = rightTableNameAndAlias.f0;
133134
rightTableAlias = rightTableNameAndAlias.f1;
134135

@@ -152,7 +153,8 @@ public JoinInfo dealJoinNode(SqlJoin joinNode,
152153
tableInfo.setJoinType(joinType);
153154
tableInfo.setCondition(joinNode.getCondition());
154155
tableInfo.setScope(scope);
155-
156+
tableInfo.setNewTableName(TableUtils.buildTableNameWithScope(leftTbName, leftTbAlias, rightTableName, scope, joinTableNames));
157+
joinTableNames.add(tableInfo.getNewTableName());
156158
TableUtils.replaceJoinFieldRefTableName(joinNode.getCondition(), fieldRef);
157159

158160
//extract 需要查询的字段信息
@@ -264,20 +266,21 @@ private JoinInfo dealNestJoin(SqlJoin joinNode,
264266
Set<Tuple2<String, String>> joinFieldSet,
265267
Map<String, String> tableRef,
266268
Map<String, String> fieldRef,
267-
String scope){
269+
String scope,
270+
Set<String> joinTableNames){
268271

269272
SqlJoin leftJoinNode = (SqlJoin) joinNode.getLeft();
270273
SqlNode parentRightJoinNode = joinNode.getRight();
271274
SqlNode rightNode = leftJoinNode.getRight();
272275

273276
Tuple2<String, String> rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo,
274-
parentWhere, parentSelectList, parentGroupByList, scope);
277+
parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
275278
Tuple2<String, String> parentRightJoinInfo = parseRightNode(parentRightJoinNode, sideTableSet,
276-
queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
279+
queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
277280
boolean parentRightIsSide = checkIsSideTable(parentRightJoinInfo.f0, sideTableSet);
278281

279282
JoinInfo joinInfo = dealJoinNode(leftJoinNode, sideTableSet, queueInfo, parentWhere, parentSelectList,
280-
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
283+
parentGroupByList, joinFieldSet, tableRef, fieldRef, scope, joinTableNames);
281284

282285
String rightTableName = rightTableNameAndAlias.f0;
283286
boolean rightIsSide = checkIsSideTable(rightTableName, sideTableSet);
@@ -671,12 +674,12 @@ private void extractSelectField(SqlNode selectNode,
671674

672675
private Tuple2<String, String> parseRightNode(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object> queueInfo,
673676
SqlNode parentWhere, SqlNodeList selectList, SqlNodeList parentGroupByList,
674-
String scope) {
677+
String scope, Set<String> joinTableNames) {
675678
Tuple2<String, String> tabName = new Tuple2<>("", "");
676679
if(sqlNode.getKind() == IDENTIFIER){
677680
tabName.f0 = sqlNode.toString();
678681
}else{
679-
AliasInfo aliasInfo = (AliasInfo)sideSQLParser.parseSql(sqlNode, sideTableSet, queueInfo, parentWhere, selectList, parentGroupByList, scope);
682+
AliasInfo aliasInfo = (AliasInfo)sideSQLParser.parseSql(sqlNode, sideTableSet, queueInfo, parentWhere, selectList, parentGroupByList, scope, joinTableNames);
680683
tabName.f0 = aliasInfo.getName();
681684
tabName.f1 = aliasInfo.getAlias();
682685
}

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

+12-11
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet, String
7373
Queue<Object> queueInfo = Queues.newLinkedBlockingQueue();
7474
SqlNode sqlNode = flinkPlanner.getParser().parse(exeSql);
7575

76-
parseSql(sqlNode, sideTableSet, queueInfo, null, null, null, scope);
76+
parseSql(sqlNode, sideTableSet, queueInfo, null, null, null, scope, Sets.newHashSet());
7777
queueInfo.offer(sqlNode);
7878
return queueInfo;
7979
}
@@ -94,31 +94,32 @@ public Object parseSql(SqlNode sqlNode,
9494
SqlNode parentWhere,
9595
SqlNodeList parentSelectList,
9696
SqlNodeList parentGroupByList,
97-
String scope){
97+
String scope,
98+
Set<String> joinTableNames){
9899
SqlKind sqlKind = sqlNode.getKind();
99100
switch (sqlKind){
100101
case WITH: {
101102
SqlWith sqlWith = (SqlWith) sqlNode;
102103
SqlNodeList sqlNodeList = sqlWith.withList;
103104
for (SqlNode withAsTable : sqlNodeList) {
104105
SqlWithItem sqlWithItem = (SqlWithItem) withAsTable;
105-
parseSql(sqlWithItem.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
106+
parseSql(sqlWithItem.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
106107
queueInfo.add(sqlWithItem);
107108
}
108-
parseSql(sqlWith.body, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
109+
parseSql(sqlWith.body, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
109110
break;
110111
}
111112
case INSERT:
112113
SqlNode sqlSource = ((SqlInsert)sqlNode).getSource();
113-
return parseSql(sqlSource, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
114+
return parseSql(sqlSource, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
114115
case SELECT:
115116
SqlNode sqlFrom = ((SqlSelect)sqlNode).getFrom();
116117
SqlNode sqlWhere = ((SqlSelect)sqlNode).getWhere();
117118
SqlNodeList selectList = ((SqlSelect)sqlNode).getSelectList();
118119
SqlNodeList groupByList = ((SqlSelect) sqlNode).getGroup();
119120

120121
if(sqlFrom.getKind() != IDENTIFIER){
121-
Object result = parseSql(sqlFrom, sideTableSet, queueInfo, sqlWhere, selectList, groupByList, scope);
122+
Object result = parseSql(sqlFrom, sideTableSet, queueInfo, sqlWhere, selectList, groupByList, scope, joinTableNames);
122123
if(result instanceof JoinInfo){
123124
return TableUtils.dealSelectResultWithJoinInfo((JoinInfo) result, (SqlSelect) sqlNode, queueInfo);
124125
}else if(result instanceof AliasInfo){
@@ -140,7 +141,7 @@ public Object parseSql(SqlNode sqlNode,
140141
Map<String, String> tableRef = Maps.newHashMap();
141142
Map<String, String> fieldRef = Maps.newHashMap();
142143
return joinNodeDealer.dealJoinNode((SqlJoin) sqlNode, sideTableSet, queueInfo,
143-
parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef, scope);
144+
parentWhere, parentSelectList, parentGroupByList, joinFieldSet, tableRef, fieldRef, scope, joinTableNames);
144145
case AS:
145146
SqlNode info = ((SqlBasicCall)sqlNode).getOperands()[0];
146147
SqlNode alias = ((SqlBasicCall) sqlNode).getOperands()[1];
@@ -149,7 +150,7 @@ public Object parseSql(SqlNode sqlNode,
149150
if(info.getKind() == IDENTIFIER){
150151
infoStr = info.toString();
151152
} else {
152-
infoStr = parseSql(info, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope).toString();
153+
infoStr = parseSql(info, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames).toString();
153154
}
154155

155156
AliasInfo aliasInfo = new AliasInfo();
@@ -162,12 +163,12 @@ public Object parseSql(SqlNode sqlNode,
162163
SqlNode unionLeft = ((SqlBasicCall)sqlNode).getOperands()[0];
163164
SqlNode unionRight = ((SqlBasicCall)sqlNode).getOperands()[1];
164165

165-
parseSql(unionLeft, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
166-
parseSql(unionRight, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
166+
parseSql(unionLeft, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
167+
parseSql(unionRight, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
167168
break;
168169
case ORDER_BY:
169170
SqlOrderBy sqlOrderBy = (SqlOrderBy) sqlNode;
170-
parseSql(sqlOrderBy.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope);
171+
parseSql(sqlOrderBy.query, sideTableSet, queueInfo, parentWhere, parentSelectList, parentGroupByList, scope, joinTableNames);
171172

172173
case LITERAL:
173174
return LITERAL.toString();

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public abstract class AbstractTableParser {
4646
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
4747

4848
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
49-
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
49+
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(.+?)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
5050
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
5151
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
5252

@@ -84,13 +84,14 @@ public boolean dealKeyPattern(String fieldRow, AbstractTableInfo tableInfo){
8484
return false;
8585
}
8686

87-
public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
87+
public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
8888

89-
List<String> fieldRows = DtStringUtil.splitIgnoreQuota(fieldsInfo, ',');
90-
for(String fieldRow : fieldRows){
89+
List<String> fieldRows = DtStringUtil.splitField(fieldsInfo);
90+
91+
for (String fieldRow : fieldRows) {
9192
fieldRow = fieldRow.trim();
9293

93-
if(StringUtils.isBlank(fieldRow)){
94+
if (StringUtils.isBlank(fieldRow)) {
9495
throw new RuntimeException(String.format("table [%s],exists field empty.", tableInfo.getName()));
9596
}
9697

@@ -132,7 +133,7 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
132133
tableInfo.finish();
133134
}
134135

135-
public void dealPrimaryKey(Matcher matcher, AbstractTableInfo tableInfo){
136+
public void dealPrimaryKey(Matcher matcher, AbstractTableInfo tableInfo) {
136137
String primaryFields = matcher.group(1).trim();
137138
String[] splitArry = primaryFields.split(",");
138139
List<String> primaryKes = Lists.newArrayList(splitArry);
@@ -171,4 +172,5 @@ protected void addParserHandler(String parserName, Pattern pattern, ITableFieldD
171172
patternMap.put(parserName, pattern);
172173
handlerMap.put(parserName, handler);
173174
}
175+
174176
}

core/src/main/java/com/dtstack/flink/sql/util/ClassUtil.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class ClassUtil {
3737
public static Class<?> stringConvertClass(String str) {
3838

3939
// 这部分主要是告诉Class转TypeInfomation的方法,字段是Array类型
40-
String lowerStr = str.toLowerCase();
40+
String lowerStr = str.toLowerCase().trim();
4141
if (lowerStr.startsWith("array")) {
4242
return Array.newInstance(Integer.class, 0).getClass();
4343
}

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

+49-3
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,61 @@ public class DtStringUtil {
4646

4747
private static ObjectMapper objectMapper = new ObjectMapper();
4848

49-
5049
/**
5150
* Split the specified string delimiter --- ignored quotes delimiter
5251
* @param str
5352
* @param delimiter
5453
* @return
5554
*/
56-
public static List<String> splitIgnoreQuota(String str, char delimiter){
55+
public static List<String> splitIgnoreQuota(String str, char delimiter) {
56+
List<String> tokensList = new ArrayList<>();
57+
boolean inQuotes = false;
58+
boolean inSingleQuotes = false;
59+
int bracketLeftNum = 0;
60+
StringBuilder b = new StringBuilder();
61+
char[] chars = str.toCharArray();
62+
int idx = 0;
63+
for (char c : chars) {
64+
char flag = 0;
65+
if (idx > 0) {
66+
flag = chars[idx - 1];
67+
}
68+
if (c == delimiter) {
69+
if (inQuotes) {
70+
b.append(c);
71+
} else if (inSingleQuotes) {
72+
b.append(c);
73+
} else if (bracketLeftNum > 0) {
74+
b.append(c);
75+
} else {
76+
tokensList.add(b.toString());
77+
b = new StringBuilder();
78+
}
79+
} else if (c == '\"' && '\\' != flag && !inSingleQuotes) {
80+
inQuotes = !inQuotes;
81+
b.append(c);
82+
} else if (c == '\'' && '\\' != flag && !inQuotes) {
83+
inSingleQuotes = !inSingleQuotes;
84+
b.append(c);
85+
} else if (c == '(' && !inSingleQuotes && !inQuotes) {
86+
bracketLeftNum++;
87+
b.append(c);
88+
} else if (c == ')' && !inSingleQuotes && !inQuotes) {
89+
bracketLeftNum--;
90+
b.append(c);
91+
} else {
92+
b.append(c);
93+
}
94+
idx++;
95+
}
96+
97+
tokensList.add(b.toString());
98+
99+
return tokensList;
100+
}
101+
102+
public static List<String> splitField(String str) {
103+
final char delimiter = ',';
57104
List<String> tokensList = new ArrayList<>();
58105
boolean inQuotes = false;
59106
boolean inSingleQuotes = false;
@@ -106,7 +153,6 @@ public static List<String> splitIgnoreQuota(String str, char delimiter){
106153
return tokensList;
107154
}
108155

109-
110156
public static String replaceIgnoreQuota(String str, String oriStr, String replaceStr){
111157
String splitPatternStr = oriStr + "(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)(?=(?:[^']*'[^']*')*[^']*$)";
112158
return str.replaceAll(splitPatternStr, replaceStr);

0 commit comments

Comments
 (0)