Skip to content

Commit 8fb5b4f

Browse files
committed
Merge remote-tracking branch 'origin/1.8_release_3.9.x' into hotfix_3.9.4_23229
# Conflicts: # launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java
2 parents dba0d55 + 3df78d7 commit 8fb5b4f

File tree

194 files changed

+5740
-8310
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

194 files changed

+5740
-8310
lines changed

README.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,18 @@
4848
* Java: JDK8及以上
4949
* Flink集群: 1.4,1.5,1.8(单机模式不需要安装Flink集群)
5050
* 操作系统:理论上不限
51+
* kerberos环境需要在flink-conf.yaml配置security.kerberos.login.keytab以及security.kerberos.login.principal参数,配置案例:
52+
```
53+
## hadoop配置文件路径
54+
fs.hdfs.hadoopconf: /Users/maqi/tmp/hadoopconf/hadoop_250
55+
security.kerberos.login.use-ticket-cache: true
56+
security.kerberos.login.keytab: /Users/maqi/tmp/hadoopconf/hadoop_250/maqi.keytab
57+
security.kerberos.login.principal: maqi@DTSTACK.COM
58+
security.kerberos.login.contexts: Client,KafkaClient
59+
zookeeper.sasl.service-name: zookeeper
60+
zookeeper.sasl.login-context-name: Client
61+
62+
```
5163

5264
### 1.3 打包
5365

@@ -56,9 +68,16 @@
5668
```
5769
mvn clean package -Dmaven.test.skip
5870
59-
打包结束后,项目根目录下会产生plugins目录,plugins目录下存放编译好的数据同步插件包,在lib目下存放job提交的包
6071
```
6172

73+
打包完成后的包结构:
74+
75+
> * dt-center-flinkStreamSQL
76+
> > * bin: 任务启动脚本
77+
> > * lib: launcher包存储路径,是任务提交的入口
78+
> > * plugins: 插件包存储路径
79+
> > * ........ : core及插件代码
80+
6281
### 1.4 启动
6382

6483
#### 1.4.1 启动命令

cassandra/cassandra-side/cassandra-all-side/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
<goal>shade</goal>
3737
</goals>
3838
<configuration>
39+
<createDependencyReducedPom>false</createDependencyReducedPom>
3940
<artifactSet>
4041
<excludes>
4142
<exclude>org.slf4j</exclude>

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
268268
LOG.warn("get conn fail, wait for 5 sec and try again, connInfo:" + connInfo);
269269
Thread.sleep(5 * 1000);
270270
} catch (InterruptedException e1) {
271-
e1.printStackTrace();
271+
LOG.error("", e1);
272272
}
273273
}
274274

cassandra/cassandra-side/cassandra-async-side/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
<goal>shade</goal>
5353
</goals>
5454
<configuration>
55+
<createDependencyReducedPom>false</createDependencyReducedPom>
5556
<artifactSet>
5657
<excludes>
5758
<exclude>org.slf4j</exclude>

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,16 +161,16 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
161161

162162
@Override
163163
public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
164-
164+
Row inputRow = Row.copy(input);
165165
JsonArray inputParams = new JsonArray();
166166
StringBuffer stringBuffer = new StringBuffer();
167167
String sqlWhere = " where ";
168168

169169
for (int i = 0; i < sideInfo.getEqualFieldList().size(); i++) {
170170
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
171-
Object equalObj = input.getField(conValIndex);
171+
Object equalObj = inputRow.getField(conValIndex);
172172
if (equalObj == null) {
173-
dealMissKey(input, resultFuture);
173+
dealMissKey(inputRow, resultFuture);
174174
return;
175175
}
176176
inputParams.add(equalObj);
@@ -194,12 +194,12 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
194194
if (val != null) {
195195

196196
if (ECacheContentType.MissVal == val.getType()) {
197-
dealMissKey(input, resultFuture);
197+
dealMissKey(inputRow, resultFuture);
198198
return;
199199
} else if (ECacheContentType.MultiLine == val.getType()) {
200200
List<Row> rowList = Lists.newArrayList();
201201
for (Object jsonArray : (List) val.getContent()) {
202-
Row row = fillData(input, jsonArray);
202+
Row row = fillData(inputRow, jsonArray);
203203
rowList.add(row);
204204
}
205205
resultFuture.complete(rowList);
@@ -240,7 +240,7 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
240240
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
241241
List<Row> rowList = Lists.newArrayList();
242242
for (com.datastax.driver.core.Row line : rows) {
243-
Row row = fillData(input, line);
243+
Row row = fillData(inputRow, line);
244244
if (openCache()) {
245245
cacheContent.add(line);
246246
}
@@ -251,7 +251,7 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
251251
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
252252
}
253253
} else {
254-
dealMissKey(input, resultFuture);
254+
dealMissKey(inputRow, resultFuture);
255255
if (openCache()) {
256256
putCache(key, CacheMissVal.getMissKeyObj());
257257
}

cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideParser.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,8 @@ public class CassandraSideParser extends AbsSideTableParser {
6868

6969
public static final String POOL_TIMEOUT_MILLIS_KEY = "poolTimeoutMillis";
7070

71-
static {
72-
keyPatternMap.put(SIDE_SIGN_KEY, SIDE_TABLE_SIGN);
73-
keyHandlerMap.put(SIDE_SIGN_KEY, CassandraSideParser::dealSideSign);
71+
public CassandraSideParser() {
72+
addParserHandler(SIDE_SIGN_KEY, SIDE_TABLE_SIGN, this::dealSideSign);
7473
}
7574

7675
@Override
@@ -97,7 +96,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
9796
return cassandraSideTableInfo;
9897
}
9998

100-
private static void dealSideSign(Matcher matcher, TableInfo tableInfo) {
99+
private void dealSideSign(Matcher matcher, TableInfo tableInfo) {
101100
}
102101

103102
public Class dbTypeConvertToJavaType(String fieldType) {

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import com.datastax.driver.core.SocketOptions;
4949
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5050
import com.datastax.driver.core.policies.RetryPolicy;
51-
import com.dtstack.flink.sql.sink.MetricOutputFormat;
51+
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
5252
import org.apache.flink.api.common.typeinfo.TypeInformation;
5353
import org.apache.flink.api.java.tuple.Tuple;
5454
import org.apache.flink.api.java.tuple.Tuple2;
@@ -69,7 +69,7 @@
6969
* @see Tuple
7070
* @see DriverManager
7171
*/
72-
public class CassandraOutputFormat extends MetricOutputFormat {
72+
public class CassandraOutputFormat extends DtRichOutputFormat {
7373
private static final long serialVersionUID = -7994311331389155692L;
7474

7575
private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);
@@ -193,7 +193,6 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
193193
try {
194194
if (retract) {
195195
insertWrite(row);
196-
outRecords.inc();
197196
} else {
198197
//do nothing
199198
}
@@ -204,14 +203,24 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
204203

205204
private void insertWrite(Row row) {
206205
try {
206+
207+
if(outRecords.getCount() % ROW_PRINT_FREQUENCY == 0){
208+
LOG.info("Receive data : {}", row);
209+
}
210+
207211
String cql = buildSql(row);
208212
if (cql != null) {
209213
ResultSet resultSet = session.execute(cql);
210214
resultSet.wasApplied();
215+
outRecords.inc();
211216
}
212217
} catch (Exception e) {
218+
if(outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0){
219+
LOG.error("record insert failed, total dirty num:{}, current record:{}", outDirtyRecords.getCount(), row.toString());
220+
LOG.error("", e);
221+
}
222+
213223
outDirtyRecords.inc();
214-
LOG.error("[upsert] is error:" + e.getMessage());
215224
}
216225
}
217226

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package com.dtstack.flink.sql.sink.console;
2020

21-
import com.dtstack.flink.sql.sink.MetricOutputFormat;
21+
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
2222
import com.dtstack.flink.sql.sink.console.table.TablePrintUtil;
2323
import org.apache.flink.api.common.typeinfo.TypeInformation;
2424
import org.apache.flink.api.java.tuple.Tuple2;
@@ -37,7 +37,7 @@
3737
*
3838
* @author xuqianjin
3939
*/
40-
public class ConsoleOutputFormat extends MetricOutputFormat {
40+
public class ConsoleOutputFormat extends DtRichOutputFormat {
4141

4242
private static final Logger LOG = LoggerFactory.getLogger(ConsoleOutputFormat.class);
4343

@@ -69,7 +69,11 @@ public void writeRecord(Tuple2 tuple2) throws IOException {
6969

7070
List<String[]> data = new ArrayList<>();
7171
data.add(fieldNames);
72-
data.add(record.toString().split(","));
72+
String[] recordStr = new String[record.getArity()];
73+
for (int i=0; i < record.getArity(); i++) {
74+
recordStr[i] = (String.valueOf(record.getField(i)));
75+
}
76+
data.add(recordStr);
7377
TablePrintUtil.build(data).print();
7478

7579
outRecords.inc();

0 commit comments

Comments
 (0)