Skip to content

Commit 5d2d320

Browse files
committed
Merge branch 'github_temp_1.10' into 1.10_release
2 parents aa08cef + a1b7bf2 commit 5d2d320

File tree

130 files changed

+3972
-2049
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

130 files changed

+3972
-2049
lines changed

.gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,6 @@ lib/
1414
.DS_Store
1515
bin/nohup.out
1616
.DS_Store
17-
bin/sideSql.txt
17+
bin/sideSql.txt
18+
*.keytab
19+
krb5.conf

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

+4-27
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@
2626
import com.dtstack.flink.sql.side.FieldInfo;
2727
import com.dtstack.flink.sql.side.JoinInfo;
2828
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
29-
import com.dtstack.flink.sql.util.RowDataComplete;
3029
import com.google.common.collect.Lists;
3130
import com.google.common.collect.Maps;
3231
import org.apache.calcite.sql.JoinType;
3332
import org.apache.commons.collections.CollectionUtils;
3433
import org.apache.commons.lang3.StringUtils;
3534
import org.apache.flink.api.java.typeutils.RowTypeInfo;
36-
import org.apache.flink.table.dataformat.BaseRow;
3735
import org.apache.flink.types.Row;
3836
import org.apache.flink.util.Collector;
3937
import org.slf4j.Logger;
@@ -72,27 +70,6 @@ public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<Field
7270
super(new com.dtstack.flink.sql.side.cassandra.CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
7371
}
7472

75-
@Override
76-
public Row fillData(Row input, Object sideInput) {
77-
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
78-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
79-
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
80-
Object obj = input.getField(entry.getValue());
81-
obj = convertTimeIndictorTypeInfo(entry.getValue(), obj);
82-
row.setField(entry.getKey(), obj);
83-
}
84-
85-
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
86-
if (cacheInfo == null) {
87-
row.setField(entry.getKey(), null);
88-
} else {
89-
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
90-
}
91-
}
92-
93-
return row;
94-
}
95-
9673
@Override
9774
protected void initCache() throws SQLException {
9875
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();
@@ -116,14 +93,14 @@ protected void reloadCache() {
11693

11794

11895
@Override
119-
public void flatMap(Row input, Collector<BaseRow> out) throws Exception {
96+
public void flatMap(Row input, Collector<Row> out) throws Exception {
12097
List<Object> inputParams = Lists.newArrayList();
12198
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
12299
Object equalObj = input.getField(conValIndex);
123100
if (equalObj == null) {
124101
if (sideInfo.getJoinType() == JoinType.LEFT) {
125102
Row row = fillData(input, null);
126-
RowDataComplete.collectRow(out, row);
103+
out.collect(row);
127104
}
128105
return;
129106
}
@@ -136,7 +113,7 @@ public void flatMap(Row input, Collector<BaseRow> out) throws Exception {
136113
if (CollectionUtils.isEmpty(cacheList)) {
137114
if (sideInfo.getJoinType() == JoinType.LEFT) {
138115
Row row = fillData(input, null);
139-
RowDataComplete.collectRow(out, row);
116+
out.collect(row);
140117
} else {
141118
return;
142119
}
@@ -146,7 +123,7 @@ public void flatMap(Row input, Collector<BaseRow> out) throws Exception {
146123

147124
for (Map<String, Object> one : cacheList) {
148125
Row row = fillData(input, one);
149-
RowDataComplete.collectRow(out, row);
126+
out.collect(row);
150127
}
151128

152129
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.dtstack.flink.sql.side.*;
2727
import com.dtstack.flink.sql.side.cache.CacheObj;
2828
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
29-
import com.dtstack.flink.sql.util.RowDataComplete;
3029
import com.google.common.base.Function;
3130
import com.google.common.collect.Lists;
3231
import com.google.common.util.concurrent.AsyncFunction;
@@ -37,7 +36,6 @@
3736
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3837
import org.apache.flink.configuration.Configuration;
3938
import org.apache.flink.streaming.api.functions.async.ResultFuture;
40-
import org.apache.flink.table.dataformat.BaseRow;
4139
import org.apache.flink.types.Row;
4240
import org.slf4j.Logger;
4341
import org.slf4j.LoggerFactory;
@@ -149,7 +147,7 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
149147
}
150148

151149
@Override
152-
public void handleAsyncInvoke(Map<String, Object> inputParams, Row input, ResultFuture<BaseRow> resultFuture) throws Exception {
150+
public void handleAsyncInvoke(Map<String, Object> inputParams, Row input, ResultFuture<Row> resultFuture) throws Exception {
153151

154152
String key = buildCacheKey(inputParams);
155153
//connect Cassandra
@@ -188,7 +186,7 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
188186
}
189187
rowList.add(row);
190188
}
191-
RowDataComplete.completeRow(resultFuture, rowList);
189+
resultFuture.complete(rowList);
192190
if (openCache()) {
193191
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
194192
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

-53
Original file line numberDiff line numberDiff line change
@@ -72,57 +72,4 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
7272
LOG.info("---------side_exe_sql-----\n{}" + sqlCondition);
7373
}
7474

75-
76-
@Override
77-
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
78-
if (sqlNode.getKind() != SqlKind.EQUALS) {
79-
throw new RuntimeException("not equal operator.");
80-
}
81-
82-
SqlIdentifier left = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[0];
83-
SqlIdentifier right = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[1];
84-
85-
String leftTableName = left.getComponent(0).getSimple();
86-
String leftField = left.getComponent(1).getSimple();
87-
88-
String rightTableName = right.getComponent(0).getSimple();
89-
String rightField = right.getComponent(1).getSimple();
90-
91-
if (leftTableName.equalsIgnoreCase(sideTableName)) {
92-
equalFieldList.add(leftField);
93-
int equalFieldIndex = -1;
94-
for (int i = 0; i < getFieldNames().length; i++) {
95-
String fieldName = getFieldNames()[i];
96-
if (fieldName.equalsIgnoreCase(rightField)) {
97-
equalFieldIndex = i;
98-
}
99-
}
100-
if (equalFieldIndex == -1) {
101-
throw new RuntimeException("can't deal equal field: " + sqlNode);
102-
}
103-
104-
equalValIndex.add(equalFieldIndex);
105-
106-
} else if (rightTableName.equalsIgnoreCase(sideTableName)) {
107-
108-
equalFieldList.add(rightField);
109-
int equalFieldIndex = -1;
110-
for (int i = 0; i < getFieldNames().length; i++) {
111-
String fieldName = getFieldNames()[i];
112-
if (fieldName.equalsIgnoreCase(leftField)) {
113-
equalFieldIndex = i;
114-
}
115-
}
116-
if (equalFieldIndex == -1) {
117-
throw new RuntimeException("can't deal equal field: " + sqlNode.toString());
118-
}
119-
120-
equalValIndex.add(equalFieldIndex);
121-
122-
} else {
123-
throw new RuntimeException("resolve equalFieldList error:" + sqlNode.toString());
124-
}
125-
126-
}
127-
12875
}

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public class CassandraSink implements RetractStreamTableSink<Row>, IStreamSinkGe
5858
protected Integer readTimeoutMillis;
5959
protected Integer connectTimeoutMillis;
6060
protected Integer poolTimeoutMillis;
61+
protected Integer parallelism = 1;
62+
protected String registerTableName;
6163

6264
public CassandraSink() {
6365
// TO DO NOTHING
@@ -78,6 +80,8 @@ public CassandraSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
7880
this.readTimeoutMillis = cassandraTableInfo.getReadTimeoutMillis();
7981
this.connectTimeoutMillis = cassandraTableInfo.getConnectTimeoutMillis();
8082
this.poolTimeoutMillis = cassandraTableInfo.getPoolTimeoutMillis();
83+
this.parallelism = cassandraTableInfo.getParallelism();
84+
this.registerTableName = cassandraTableInfo.getTableName();
8185
return this;
8286
}
8387

@@ -106,7 +110,9 @@ public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<
106110

107111
CassandraOutputFormat outputFormat = builder.finish();
108112
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
109-
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction);
113+
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction)
114+
.setParallelism(parallelism)
115+
.name(registerTableName);
110116
return dataStreamSink;
111117
}
112118

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.constant;
20+
21+
/**
22+
* @program: flinkStreamSQL
23+
* @author: wuren
24+
* @create: 2020/09/15
25+
**/
26+
public class PluginParamConsts {
27+
public static final String PRINCIPAL = "principal";
28+
public static final String KEYTAB = "keytab";
29+
public static final String KRB5_CONF = "krb5conf";
30+
}

core/src/main/java/com/dtstack/flink/sql/environment/StreamEnvConfigManager.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ public static void streamExecutionEnvironmentConfig(StreamExecutionEnvironment s
8787
ExecutionConfig exeConfig = streamEnv.getConfig();
8888
if (exeConfig.getGlobalJobParameters() == null) {
8989
exeConfig.setGlobalJobParameters(globalJobParameters);
90-
} else if (exeConfig.getGlobalJobParameters() instanceof Configuration) {
91-
((Configuration) exeConfig.getGlobalJobParameters()).addAll(globalJobParameters);
90+
} else if (exeConfig.getGlobalJobParameters() instanceof ExecutionConfig.GlobalJobParameters) {
91+
exeConfig.setGlobalJobParameters(globalJobParameters);
9292
}
9393

9494
getEnvParallelism(confProperties).ifPresent(streamEnv::setParallelism);

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

+1
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
289289

290290
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
291291
DataStream adaptStream = tableEnv.toAppendStream(adaptTable, typeInfo);
292+
adaptStream.getTransformation().setOutputType(typeInfo);
292293

293294
String fields = String.join(",", typeInfo.getFieldNames());
294295

core/src/main/java/com/dtstack/flink/sql/exec/FlinkSQLExec.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.dtstack.flink.sql.exec;
2020

21+
import com.dtstack.flink.sql.util.SqlCheckUtils;
2122
import org.apache.calcite.sql.SqlIdentifier;
2223
import org.apache.calcite.sql.SqlInsert;
2324
import org.apache.flink.sql.parser.dml.RichSqlInsert;
@@ -58,7 +59,7 @@ public static void sqlUpdate(StreamTableEnvironment tableEnv, String stmt) throw
5859
StreamTableEnvironmentImpl tableEnvImpl = ((StreamTableEnvironmentImpl) tableEnv);
5960
StreamPlanner streamPlanner = (StreamPlanner)tableEnvImpl.getPlanner();
6061
FlinkPlannerImpl flinkPlanner = streamPlanner.createFlinkPlanner();
61-
62+
SqlCheckUtils.check(stmt);
6263
RichSqlInsert insert = (RichSqlInsert) flinkPlanner.validate(flinkPlanner.parser().parse(stmt));
6364
TableImpl queryResult = extractQueryTableFromInsertCaluse(tableEnvImpl, flinkPlanner, insert);
6465

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

+33-9
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.Iterator;
4343
import java.util.List;
4444
import java.util.Map;
45+
import java.util.regex.Pattern;
4546

4647
/**
4748
* source data parse to json format
@@ -64,6 +65,9 @@ public class DtNestRowDeserializationSchema extends AbstractDeserializationSchem
6465
private final List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
6566
private final String charsetName;
6667

68+
private static final Pattern TIMESTAMP_PATTERN = Pattern.compile("^\\d+$");
69+
private static final Pattern TIME_FORMAT_PATTERN = Pattern.compile("\\w+\\d+:\\d+:\\d+");
70+
6771
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping,
6872
List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos,
6973
String charsetName) {
@@ -108,11 +112,7 @@ private void parseTree(JsonNode jsonNode, String prefix) {
108112
String nodeKey = getNodeKey(prefix, next);
109113

110114
nodeAndJsonNodeMapping.put(nodeKey, child);
111-
if (child.isArray()) {
112-
parseTree(child, nodeKey);
113-
} else {
114-
parseTree(child, nodeKey);
115-
}
115+
parseTree(child, nodeKey);
116116
}
117117
}
118118

@@ -150,11 +150,11 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
150150
return Date.valueOf(node.asText());
151151
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
152152
// local zone
153-
return Time.valueOf(node.asText());
153+
return convertToTime(node.asText());
154154
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
155155
// local zone
156-
return Timestamp.valueOf(node.asText());
157-
} else if (info instanceof RowTypeInfo) {
156+
return convertToTimestamp(node.asText());
157+
} else if (info instanceof RowTypeInfo) {
158158
return convertRow(node, (RowTypeInfo) info);
159159
} else if (info instanceof ObjectArrayTypeInfo) {
160160
return convertObjectArray(node, ((ObjectArrayTypeInfo) info).getComponentInfo());
@@ -169,6 +169,29 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
169169
}
170170
}
171171

172+
/**
173+
* 将 2020-09-07 14:49:10.0 和 1598446699685 两种格式都转化为 Timestamp
174+
*/
175+
private Timestamp convertToTimestamp(String timestamp) {
176+
if (TIMESTAMP_PATTERN.matcher(timestamp).find()) {
177+
return new Timestamp(Long.parseLong(timestamp));
178+
}
179+
if (TIME_FORMAT_PATTERN.matcher(timestamp).find()) {
180+
return Timestamp.valueOf(timestamp);
181+
}
182+
throw new IllegalArgumentException("Incorrect time format of timestamp");
183+
}
184+
185+
private Time convertToTime(String timestamp) {
186+
if (TIMESTAMP_PATTERN.matcher(timestamp).find()) {
187+
return new Time(Long.parseLong(timestamp));
188+
}
189+
if (TIME_FORMAT_PATTERN.matcher(timestamp).find()) {
190+
return Time.valueOf(timestamp);
191+
}
192+
throw new IllegalArgumentException("Incorrect time format of time");
193+
}
194+
172195
private Row convertTopRow() {
173196
Row row = new Row(fieldNames.length);
174197
try {
@@ -179,7 +202,7 @@ private Row convertTopRow() {
179202
if (node == null) {
180203
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
181204
throw new IllegalStateException("Failed to find field with name '"
182-
+ fieldNames[i] + "'.");
205+
+ fieldNames[i] + "'.");
183206
} else {
184207
row.setField(i, null);
185208
}
@@ -220,6 +243,7 @@ private Object convertObjectArray(JsonNode node, TypeInformation<?> elementType)
220243
}
221244
return array;
222245
}
246+
223247
@Override
224248
public TypeInformation<Row> getProducedType() {
225249
return typeInfo;

0 commit comments

Comments
 (0)