Skip to content

Commit 1c71d38

Browse files
libailinlihongwei
authored and
lihongwei
committed
[Feature-#1897][gbase8s] Added gbase8s connector and gbasehk connector
1 parent 3a3f015 commit 1c71d38

File tree

15 files changed

+912
-0
lines changed

15 files changed

+912
-0
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one
4+
~ or more contributor license agreements. See the NOTICE file
5+
~ distributed with this work for additional information
6+
~ regarding copyright ownership. The ASF licenses this file
7+
~ to you under the Apache License, Version 2.0 (the
8+
~ "License"); you may not use this file except in compliance
9+
~ with the License. You may obtain a copy of the License at
10+
~
11+
~ http://www.apache.org/licenses/LICENSE-2.0
12+
~
13+
~ Unless required by applicable law or agreed to in writing,
14+
~ software distributed under the License is distributed on an
15+
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
~ KIND, either express or implied. See the License for the
17+
~ specific language governing permissions and limitations
18+
~ under the License.
19+
-->
20+
21+
<project xmlns="http://maven.apache.org/POM/4.0.0"
22+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
23+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
24+
<parent>
25+
<artifactId>chunjun-connectors</artifactId>
26+
<groupId>com.dtstack.chunjun</groupId>
27+
<version>${revision}</version>
28+
</parent>
29+
<modelVersion>4.0.0</modelVersion>
30+
31+
<artifactId>chunjun-connector-gbase8s</artifactId>
32+
<name>ChunJun : Connectors : GBase8s</name>
33+
34+
<properties>
35+
<connector.dir>gbase8s</connector.dir>
36+
</properties>
37+
38+
<dependencies>
39+
<dependency>
40+
<groupId>com.dtstack.chunjun</groupId>
41+
<artifactId>chunjun-connector-jdbc-base</artifactId>
42+
<version>${project.version}</version>
43+
</dependency>
44+
<dependency>
45+
<groupId>com.gbasedbt.jdbc.Driver</groupId>
46+
<artifactId>gbasedbt</artifactId>
47+
<version>3.5.1_1_d0c87a</version>
48+
</dependency>
49+
</dependencies>
50+
51+
<build>
52+
<plugins>
53+
<plugin>
54+
<groupId>org.apache.maven.plugins</groupId>
55+
<artifactId>maven-shade-plugin</artifactId>
56+
</plugin>
57+
<plugin>
58+
<groupId>org.apache.maven.plugins</groupId>
59+
<artifactId>maven-antrun-plugin</artifactId>
60+
</plugin>
61+
</plugins>
62+
</build>
63+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.gbase8s.converter;
20+
21+
import com.dtstack.chunjun.config.TypeConfig;
22+
import com.dtstack.chunjun.throwable.UnsupportedTypeException;
23+
24+
import org.apache.flink.table.api.DataTypes;
25+
import org.apache.flink.table.types.DataType;
26+
27+
public class Gbase8sRawTypeConverter {
28+
29+
public static DataType apply(TypeConfig type) {
30+
switch (type.getType()) {
31+
case "BIT":
32+
return DataTypes.BOOLEAN();
33+
case "TINYINT":
34+
return DataTypes.TINYINT();
35+
case "SMALLINT":
36+
case "MEDIUMINT":
37+
case "INT":
38+
case "INTEGER":
39+
case "INT24":
40+
case "SERIAL":
41+
return DataTypes.INT();
42+
case "BIGINT":
43+
case "INT8":
44+
case "BIGSERIAL":
45+
case "SERIAL8":
46+
return DataTypes.BIGINT();
47+
case "REAL":
48+
case "FLOAT":
49+
case "SMALLFLOAT":
50+
return DataTypes.FLOAT();
51+
case "DECIMAL":
52+
case "DEC":
53+
case "NUMERIC":
54+
case "MONEY":
55+
// TODO 精度应该可以动态传进来?
56+
return DataTypes.DECIMAL(38, 18);
57+
case "DOUBLE":
58+
case "PRECISION":
59+
return DataTypes.DOUBLE();
60+
case "CHAR":
61+
case "VARCHAR":
62+
case "TINYTEXT":
63+
case "TEXT":
64+
case "MEDIUMTEXT":
65+
case "LVARCHAR":
66+
case "LONGTEXT":
67+
case "JSON":
68+
case "ENUM":
69+
case "CHARACTER":
70+
case "VARYING":
71+
case "NCHAR":
72+
case "SET":
73+
return DataTypes.STRING();
74+
case "DATE":
75+
return DataTypes.DATE();
76+
case "YEAR":
77+
return DataTypes.INTERVAL(DataTypes.YEAR());
78+
case "TIME":
79+
return DataTypes.TIME();
80+
case "TIMESTAMP":
81+
return DataTypes.TIMESTAMP();
82+
case "DATETIME":
83+
return DataTypes.TIMESTAMP(5);
84+
case "TINYBLOB":
85+
case "BLOB":
86+
case "MEDIUMBLOB":
87+
case "LONGBLOB":
88+
case "BINARY":
89+
case "VARBINARY":
90+
case "GEOMETRY":
91+
// BYTES 底层调用的是VARBINARY最大长度
92+
return DataTypes.BYTES();
93+
94+
default:
95+
throw new UnsupportedTypeException(type);
96+
}
97+
}
98+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.gbase8s.dialect;
20+
21+
import com.dtstack.chunjun.connector.gbase8s.converter.Gbase8sRawTypeConverter;
22+
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
23+
import com.dtstack.chunjun.converter.RawTypeMapper;
24+
25+
import org.apache.commons.lang3.StringUtils;
26+
27+
import java.util.Arrays;
28+
import java.util.List;
29+
import java.util.Optional;
30+
import java.util.stream.Collectors;
31+
32+
public class Gbase8sDialect implements JdbcDialect {
33+
34+
private static final String GBASE_QUOTATION_MASK = "";
35+
36+
@Override
37+
public String dialectName() {
38+
return "GBase8s";
39+
}
40+
41+
@Override
42+
public boolean canHandle(String url) {
43+
return url.startsWith("jdbc:gbasedbt-sqli:");
44+
}
45+
46+
@Override
47+
public RawTypeMapper getRawTypeConverter() {
48+
return Gbase8sRawTypeConverter::apply;
49+
}
50+
51+
@Override
52+
public Optional<String> defaultDriverName() {
53+
return Optional.of("com.gbasedbt.jdbc.Driver");
54+
}
55+
56+
/** build select sql , such as (SELECT :A "A",? "B" FROM DUAL) */
57+
public String buildDualQueryStatement(String[] column) {
58+
StringBuilder sb = new StringBuilder("SELECT count(1),");
59+
String placeholders =
60+
Arrays.stream(column)
61+
.map(f -> ":" + f + " as " + quoteIdentifier(f))
62+
.collect(Collectors.joining(", "));
63+
sb.append(placeholders);
64+
65+
return sb.toString();
66+
}
67+
68+
@Override
69+
public Optional<String> getUpsertStatement(
70+
String schema,
71+
String tableName,
72+
String[] fieldNames,
73+
String[] uniqueKeyFields,
74+
boolean allReplace) {
75+
tableName = buildTableInfoWithSchema(schema, tableName);
76+
StringBuilder mergeIntoSql = new StringBuilder(64);
77+
mergeIntoSql
78+
.append("MERGE INTO ")
79+
.append(tableName)
80+
.append(" T1 USING (")
81+
.append(buildDualQueryStatement(fieldNames))
82+
.append(" FROM ")
83+
.append(tableName)
84+
.append(" limit 1 ")
85+
.append(") T2 ON (")
86+
.append(buildEqualConditions(uniqueKeyFields))
87+
.append(") ");
88+
89+
String updateSql = buildUpdateConnection(fieldNames, uniqueKeyFields, allReplace);
90+
91+
if (StringUtils.isNotEmpty(updateSql)) {
92+
mergeIntoSql.append(" WHEN MATCHED THEN UPDATE SET ");
93+
mergeIntoSql.append(updateSql);
94+
}
95+
96+
mergeIntoSql
97+
.append(" WHEN NOT MATCHED THEN ")
98+
.append("INSERT (")
99+
.append(
100+
Arrays.stream(fieldNames)
101+
.map(this::quoteIdentifier)
102+
.collect(Collectors.joining(", ")))
103+
.append(") VALUES (")
104+
.append(
105+
Arrays.stream(fieldNames)
106+
.map(col -> "T2." + quoteIdentifier(col))
107+
.collect(Collectors.joining(", ")))
108+
.append(")");
109+
110+
return Optional.of(mergeIntoSql.toString());
111+
}
112+
113+
/** build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A") */
114+
private String buildUpdateConnection(
115+
String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
116+
List<String> uniqueKeyList = Arrays.asList(uniqueKeyFields);
117+
return Arrays.stream(fieldNames)
118+
.filter(col -> !uniqueKeyList.contains(col))
119+
.map(col -> buildConnectString(allReplace, col))
120+
.collect(Collectors.joining(","));
121+
}
122+
123+
/**
124+
* Depending on parameter [allReplace] build different sql part. e.g T1."A"=T2."A" or
125+
* T1."A"=nvl(T2."A",T1."A")
126+
*/
127+
private String buildConnectString(boolean allReplace, String col) {
128+
return allReplace
129+
? quoteIdentifier("T1")
130+
+ "."
131+
+ quoteIdentifier(col)
132+
+ " = "
133+
+ quoteIdentifier("T2")
134+
+ "."
135+
+ quoteIdentifier(col)
136+
: quoteIdentifier("T1")
137+
+ "."
138+
+ quoteIdentifier(col)
139+
+ " =NVL("
140+
+ quoteIdentifier("T2")
141+
+ "."
142+
+ quoteIdentifier(col)
143+
+ ","
144+
+ quoteIdentifier("T1")
145+
+ "."
146+
+ quoteIdentifier(col)
147+
+ ")";
148+
}
149+
150+
/** build sql part e.g: T1.`A` = T2.`A`, T1.`B` = T2.`B` */
151+
private String buildEqualConditions(String[] uniqueKeyFields) {
152+
return Arrays.stream(uniqueKeyFields)
153+
.map(col -> "T1." + quoteIdentifier(col) + " = T2." + quoteIdentifier(col))
154+
.collect(Collectors.joining(", "));
155+
}
156+
157+
@Override
158+
public String quoteIdentifier(String identifier) {
159+
if (identifier.startsWith(GBASE_QUOTATION_MASK)
160+
&& identifier.endsWith(GBASE_QUOTATION_MASK)) {
161+
return identifier;
162+
}
163+
return GBASE_QUOTATION_MASK + identifier + GBASE_QUOTATION_MASK;
164+
}
165+
166+
@Override
167+
public String getRowNumColumn(String orderBy) {
168+
return "ROWID as " + getRowNumColumnAlias();
169+
}
170+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.gbase8s.sink;
20+
21+
import com.dtstack.chunjun.config.SyncConfig;
22+
import com.dtstack.chunjun.connector.gbase8s.dialect.Gbase8sDialect;
23+
import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory;
24+
25+
public class Gbase8sSinkFactory extends JdbcSinkFactory {
26+
27+
public Gbase8sSinkFactory(SyncConfig syncConfig) {
28+
super(syncConfig, new Gbase8sDialect());
29+
}
30+
}

0 commit comments

Comments
 (0)