Skip to content

Commit 7257388

Browse files
author
gituser
committed
Merge branch '1.10_test_4.1.x' into 1.10_release_4.1.x
2 parents 7b6f303 + 558dc46 commit 7257388

File tree

29 files changed

+242
-501
lines changed

29 files changed

+242
-501
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,18 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21-
import com.datastax.driver.core.*;
21+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22+
import org.apache.flink.types.Row;
23+
import org.apache.flink.util.Collector;
24+
25+
import com.datastax.driver.core.Cluster;
26+
import com.datastax.driver.core.ConsistencyLevel;
27+
import com.datastax.driver.core.HostDistance;
28+
import com.datastax.driver.core.PoolingOptions;
29+
import com.datastax.driver.core.QueryOptions;
30+
import com.datastax.driver.core.ResultSet;
31+
import com.datastax.driver.core.Session;
32+
import com.datastax.driver.core.SocketOptions;
2233
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
2334
import com.datastax.driver.core.policies.RetryPolicy;
2435
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
@@ -32,10 +43,7 @@
3243
import org.apache.calcite.sql.JoinType;
3344
import org.apache.commons.collections.CollectionUtils;
3445
import org.apache.commons.lang3.StringUtils;
35-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3646
import org.apache.flink.table.dataformat.BaseRow;
37-
import org.apache.flink.types.Row;
38-
import org.apache.flink.util.Collector;
3947
import org.slf4j.Logger;
4048
import org.slf4j.LoggerFactory;
4149

@@ -72,27 +80,6 @@ public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<Field
7280
super(new com.dtstack.flink.sql.side.cassandra.CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
7381
}
7482

75-
@Override
76-
public Row fillData(Row input, Object sideInput) {
77-
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
78-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
79-
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
80-
Object obj = input.getField(entry.getValue());
81-
obj = convertTimeIndictorTypeInfo(entry.getValue(), obj);
82-
row.setField(entry.getKey(), obj);
83-
}
84-
85-
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
86-
if (cacheInfo == null) {
87-
row.setField(entry.getKey(), null);
88-
} else {
89-
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
90-
}
91-
}
92-
93-
return row;
94-
}
95-
9683
@Override
9784
protected void initCache() throws SQLException {
9885
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -72,57 +72,4 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
7272
LOG.info("---------side_exe_sql-----\n{}" + sqlCondition);
7373
}
7474

75-
76-
@Override
77-
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
78-
if (sqlNode.getKind() != SqlKind.EQUALS) {
79-
throw new RuntimeException("not equal operator.");
80-
}
81-
82-
SqlIdentifier left = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[0];
83-
SqlIdentifier right = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[1];
84-
85-
String leftTableName = left.getComponent(0).getSimple();
86-
String leftField = left.getComponent(1).getSimple();
87-
88-
String rightTableName = right.getComponent(0).getSimple();
89-
String rightField = right.getComponent(1).getSimple();
90-
91-
if (leftTableName.equalsIgnoreCase(sideTableName)) {
92-
equalFieldList.add(leftField);
93-
int equalFieldIndex = -1;
94-
for (int i = 0; i < getFieldNames().length; i++) {
95-
String fieldName = getFieldNames()[i];
96-
if (fieldName.equalsIgnoreCase(rightField)) {
97-
equalFieldIndex = i;
98-
}
99-
}
100-
if (equalFieldIndex == -1) {
101-
throw new RuntimeException("can't deal equal field: " + sqlNode);
102-
}
103-
104-
equalValIndex.add(equalFieldIndex);
105-
106-
} else if (rightTableName.equalsIgnoreCase(sideTableName)) {
107-
108-
equalFieldList.add(rightField);
109-
int equalFieldIndex = -1;
110-
for (int i = 0; i < getFieldNames().length; i++) {
111-
String fieldName = getFieldNames()[i];
112-
if (fieldName.equalsIgnoreCase(leftField)) {
113-
equalFieldIndex = i;
114-
}
115-
}
116-
if (equalFieldIndex == -1) {
117-
throw new RuntimeException("can't deal equal field: " + sqlNode.toString());
118-
}
119-
120-
equalValIndex.add(equalFieldIndex);
121-
122-
} else {
123-
throw new RuntimeException("resolve equalFieldList error:" + sqlNode.toString());
124-
}
125-
126-
}
127-
12875
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.util.RowDataComplete;
2424
import org.apache.calcite.sql.JoinType;
2525
import org.apache.flink.api.common.functions.RichFlatMapFunction;
26+
import org.apache.flink.api.common.typeinfo.TypeInformation;
2627
import org.apache.flink.configuration.Configuration;
2728
import org.apache.flink.table.dataformat.BaseRow;
2829
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
@@ -34,6 +35,8 @@
3435
import java.sql.SQLException;
3536
import java.sql.Timestamp;
3637
import java.time.LocalDateTime;
38+
import java.util.Map;
39+
import java.util.TimeZone;
3740
import java.util.concurrent.ScheduledExecutorService;
3841
import java.util.concurrent.ScheduledThreadPoolExecutor;
3942
import java.util.concurrent.TimeUnit;
@@ -42,7 +45,6 @@
4245
* Reason:
4346
* Date: 2018/9/18
4447
* Company: www.dtstack.com
45-
*
4648
* @author xuchao
4749
*/
4850

@@ -52,6 +54,8 @@ public abstract class BaseAllReqRow extends RichFlatMapFunction<Row, BaseRow> im
5254

5355
public static final long LOAD_DATA_ERROR_SLEEP_TIME = 5_000L;
5456

57+
public static final TimeZone LOCAL_TZ = TimeZone.getDefault();
58+
5559
protected BaseSideInfo sideInfo;
5660

5761
private ScheduledExecutorService es;
@@ -95,6 +99,45 @@ protected void sendOutputRow(Row value, Object sideInput, Collector<BaseRow> out
9599
RowDataComplete.collectRow(out, row);
96100
}
97101

102+
@Override
103+
public Row fillData(Row input, Object sideInput) {
104+
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
105+
Row row = new Row(sideInfo.getOutFieldInfoList().size());
106+
107+
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
108+
// origin value
109+
Object obj = input.getField(entry.getValue());
110+
obj = dealTimeAttributeType(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass(), obj);
111+
row.setField(entry.getKey(), obj);
112+
}
113+
114+
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
115+
if (cacheInfo == null) {
116+
row.setField(entry.getKey(), null);
117+
} else {
118+
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
119+
}
120+
}
121+
return row;
122+
}
123+
124+
/**
125+
* covert flink time attribute.Type information for indicating event or processing time.
126+
* However, it behaves like a regular SQL timestamp but is serialized as Long.
127+
*
128+
* @param entry
129+
* @param obj
130+
* @return
131+
*/
132+
protected Object dealTimeAttributeType(Class<? extends TypeInformation> entry, Object obj) {
133+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(entry);
134+
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
135+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
136+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
137+
}
138+
return obj;
139+
}
140+
98141
@Override
99142
public void close() throws Exception {
100143
if (null != es && !es.isShutdown()) {

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 88 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,16 @@
2121
package com.dtstack.flink.sql.side;
2222

2323
import com.dtstack.flink.sql.side.cache.AbstractSideCache;
24+
import com.google.common.base.Preconditions;
2425
import org.apache.calcite.sql.JoinType;
2526
import org.apache.calcite.sql.SqlBasicCall;
2627
import org.apache.calcite.sql.SqlIdentifier;
2728
import org.apache.calcite.sql.SqlKind;
29+
import org.apache.calcite.sql.SqlLiteral;
2830
import org.apache.calcite.sql.SqlNode;
2931
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3032
import com.google.common.collect.Lists;
3133
import com.google.common.collect.Maps;
32-
import org.apache.flink.table.dataformat.BaseRow;
3334
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
3435

3536
import java.io.Serializable;
@@ -74,12 +75,15 @@ public abstract class BaseSideInfo implements Serializable{
7475

7576
protected AbstractSideCache sideCache;
7677

78+
protected JoinInfo joinInfo;
79+
7780
public BaseSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList,
7881
AbstractSideTableInfo sideTableInfo){
7982
this.rowTypeInfo = rowTypeInfo;
8083
this.outFieldInfoList = outFieldInfoList;
8184
this.joinType = joinInfo.getJoinType();
8285
this.sideTableInfo = sideTableInfo;
86+
this.joinInfo = joinInfo;
8387
parseSelectFields(joinInfo);
8488
buildEqualInfo(joinInfo, sideTableInfo);
8589
}
@@ -123,55 +127,102 @@ public String getTargetFieldType(String fieldName){
123127
return sideTableInfo.getFieldTypes()[fieldIndex];
124128
}
125129

126-
127-
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
128-
if(!SqlKind.COMPARISON.contains(sqlNode.getKind())){
130+
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
131+
if (!SqlKind.COMPARISON.contains(sqlNode.getKind())) {
129132
throw new RuntimeException("not compare operator.");
130133
}
131134

132-
SqlIdentifier left = (SqlIdentifier)((SqlBasicCall)sqlNode).getOperands()[0];
133-
SqlIdentifier right = (SqlIdentifier)((SqlBasicCall)sqlNode).getOperands()[1];
135+
SqlNode leftNode = ((SqlBasicCall) sqlNode).getOperands()[0];
136+
SqlNode rightNode = ((SqlBasicCall) sqlNode).getOperands()[1];
137+
if (leftNode.getKind() == SqlKind.LITERAL) {
138+
evalConstantEquation(
139+
(SqlLiteral) leftNode,
140+
(SqlIdentifier) rightNode
141+
);
142+
} else if (rightNode.getKind() == SqlKind.LITERAL) {
143+
evalConstantEquation(
144+
(SqlLiteral) rightNode,
145+
(SqlIdentifier) leftNode
146+
);
147+
} else {
148+
SqlIdentifier left = (SqlIdentifier) leftNode;
149+
SqlIdentifier right = (SqlIdentifier) rightNode;
150+
evalEquation(left, right, sideTableName, sqlNode);
151+
}
152+
}
134153

154+
/**
155+
* deal normal equation etc. foo.id = bar.id
156+
* @param left
157+
* @param right
158+
* @param sideTableName
159+
* @param sqlNode
160+
*/
161+
private void evalEquation(SqlIdentifier left, SqlIdentifier right, String sideTableName, SqlNode sqlNode) {
135162
String leftTableName = left.getComponent(0).getSimple();
136163
String leftField = left.getComponent(1).getSimple();
137164

138165
String rightTableName = right.getComponent(0).getSimple();
139166
String rightField = right.getComponent(1).getSimple();
140167

141-
if(leftTableName.equalsIgnoreCase(sideTableName)){
142-
equalFieldList.add(leftField);
143-
int equalFieldIndex = -1;
144-
for(int i=0; i<getFieldNames().length; i++){
145-
String fieldName = getFieldNames()[i];
146-
if(fieldName.equalsIgnoreCase(rightField)){
147-
equalFieldIndex = i;
148-
}
149-
}
150-
if(equalFieldIndex == -1){
151-
throw new RuntimeException("can't find equal field " + rightField);
152-
}
153-
154-
equalValIndex.add(equalFieldIndex);
155-
156-
}else if(rightTableName.equalsIgnoreCase(sideTableName)){
168+
if (leftTableName.equalsIgnoreCase(sideTableName)) {
169+
associateField(rightField, leftField, sqlNode);
170+
} else if (rightTableName.equalsIgnoreCase(sideTableName)) {
171+
associateField(leftField, rightField, sqlNode);
172+
} else {
173+
throw new RuntimeException("resolve equalFieldList error:" + sqlNode.toString());
174+
}
175+
}
157176

158-
equalFieldList.add(rightField);
159-
int equalFieldIndex = -1;
160-
for(int i=0; i<getFieldNames().length; i++){
161-
String fieldName = getFieldNames()[i];
162-
if(fieldName.equalsIgnoreCase(leftField)){
163-
equalFieldIndex = i;
164-
}
165-
}
166-
if(equalFieldIndex == -1){
167-
throw new RuntimeException("can't find equal field " + leftField);
177+
/**
178+
* deal with equation with constant etc. foo.id = 1
179+
* @param literal
180+
* @param identifier
181+
*/
182+
private void evalConstantEquation(SqlLiteral literal, SqlIdentifier identifier) {
183+
String tableName = identifier.getComponent(0).getSimple();
184+
checkSupport(identifier);
185+
String fieldName = identifier.getComponent(1).getSimple();
186+
Object constant = literal.getValue();
187+
List<PredicateInfo> predicateInfos = sideTableInfo.getPredicateInfoes();
188+
PredicateInfo predicate = PredicateInfo.builder()
189+
.setOperatorName("=")
190+
.setOperatorKind("EQUALS")
191+
.setOwnerTable(tableName)
192+
.setFieldName(fieldName)
193+
.setCondition(constant.toString())
194+
.build();
195+
predicateInfos.add(predicate);
196+
}
197+
198+
private void checkSupport(SqlIdentifier identifier) {
199+
String tableName = identifier.getComponent(0).getSimple();
200+
String sideTableName;
201+
String sideTableAlias;
202+
if (joinInfo.isLeftIsSideTable()) {
203+
sideTableName = joinInfo.getLeftTableName();
204+
sideTableAlias = joinInfo.getLeftTableAlias();
205+
} else {
206+
sideTableName = joinInfo.getRightTableName();
207+
sideTableAlias = joinInfo.getRightTableAlias();
208+
}
209+
boolean isSide = tableName.equals(sideTableName) || tableName.equals(sideTableAlias);
210+
String errorMsg = "only support set side table constant field, error field " + identifier;
211+
Preconditions.checkState(isSide, errorMsg);
212+
}
213+
214+
private void associateField(String sourceTableField, String sideTableField, SqlNode sqlNode) {
215+
String errorMsg = "can't deal equal field: " + sqlNode;
216+
equalFieldList.add(sideTableField);
217+
int equalFieldIndex = -1;
218+
for (int i = 0; i < rowTypeInfo.getFieldNames().length; i++) {
219+
String fieldName = rowTypeInfo.getFieldNames()[i];
220+
if (fieldName.equalsIgnoreCase(sourceTableField)) {
221+
equalFieldIndex = i;
168222
}
169-
170-
equalValIndex.add(equalFieldIndex);
171-
172-
}else{
173-
throw new RuntimeException("resolve equalFieldList error:" + sqlNode.toString());
174223
}
224+
Preconditions.checkState(equalFieldIndex != -1, errorMsg);
225+
equalValIndex.add(equalFieldIndex);
175226
}
176227

177228
public abstract void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo);

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,15 @@ public class JoinInfo implements Serializable {
5555

5656
private String rightTableAlias;
5757

58-
private SqlNode leftNode;
58+
private transient SqlNode leftNode;
5959

60-
private SqlNode rightNode;
60+
private transient SqlNode rightNode;
6161

62-
private SqlNode condition;
62+
private transient SqlNode condition;
6363

64-
private SqlNode selectFields;
64+
private transient SqlNode selectFields;
6565

66-
private SqlNode selectNode;
66+
private transient SqlNode selectNode;
6767

6868
private JoinType joinType;
6969

0 commit comments

Comments
 (0)