From 6b57f22c2b6a067f76c3847d738f253af8930209 Mon Sep 17 00:00:00 2001 From: haneeshmv <164304366+haneeshmv@users.noreply.github.com> Date: Tue, 9 Apr 2024 08:10:02 +0530 Subject: [PATCH] add connector db2 (#159) --- .../classloader/DatasourceLoadConfig.java | 7 +- .../datasource-all/pom.xml | 5 + .../datasource-jdbc-db2/pom.xml | 92 +++++++ .../plugin/db2/jdbc/Db2DataSourceConfig.java | 51 ++++ .../db2/jdbc/Db2JdbcDataSourceChannel.java | 175 +++++++++++++ .../db2/jdbc/Db2JdbcDataSourceFactory.java | 48 ++++ .../plugin/db2/jdbc/Db2OptionRule.java | 69 +++++ .../seatunnel-datasource-plugins/pom.xml | 1 + .../impl/Db2DataSourceConfigSwitcher.java | 242 ++++++++++++++++++ .../connector-datasource-mapper.yaml | 14 +- seatunnel-web-dist/pom.xml | 13 + 11 files changed, 714 insertions(+), 3 deletions(-) create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/pom.xml create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceFactory.java create mode 100644 seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java create mode 100644 seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java diff --git a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java index a8b480786..b2869691a 100644 --- a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java +++ b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/classloader/DatasourceLoadConfig.java @@ -94,6 +94,9 @@ public class DatasourceLoadConfig { "org.apache.seatunnel.datasource.plugin.starrocks.jdbc.StarRocksJdbcDataSourceFactory"); classLoaderFactoryName.put( "MONGODB", "com.apache.seatunnel.datasource.plugin.mongodb.MongoDataSourceFactory"); + classLoaderFactoryName.put( + "JDBC-DB2", + "org.apache.seatunnel.datasource.plugin.db2.jdbc.Db2JdbcDataSourceFactory"); classLoaderJarName.put("JDBC-ORACLE", "datasource-jdbc-oracle-"); classLoaderJarName.put("JDBC-CLICKHOUSE", "datasource-jdbc-clickhouse-"); @@ -114,6 +117,7 @@ public class DatasourceLoadConfig { classLoaderJarName.put("S3-REDSHIFT", "datasource-s3redshift-"); classLoaderJarName.put("JDBC-STARROCKS", "datasource-jdbc-starrocks-"); classLoaderJarName.put("MONGODB", "datasource-mongodb-"); + classLoaderJarName.put("JDBC-DB2", "datasource-jdbc-db2-"); } public static final Set pluginSet = @@ -131,7 +135,8 @@ public class DatasourceLoadConfig { "S3", "SqlServer-CDC", "StarRocks", - "MongoDB"); + "MongoDB", + "JDBC-Db2"); public static Map datasourceClassLoaders = new HashMap<>(); diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml index dce3bcb97..d5dee4503 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml @@ -97,6 +97,11 @@ datasource-sqlserver-cdc ${project.version} + + org.apache.seatunnel + datasource-jdbc-db2 + ${project.version} + diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/pom.xml new file mode 100644 index 000000000..a13423735 --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/pom.xml @@ -0,0 +1,92 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-datasource-plugins + ${revision} + + + datasource-jdbc-db2 + + + 8.0.28 + + + + + org.apache.seatunnel + datasource-plugins-api + ${project.version} + provided + + + org.apache.commons + commons-lang3 + + + + com.google.auto.service + auto-service + + + org.apache.seatunnel + seatunnel-api + provided + + + + + mysql + mysql-connector-java + ${mysql-connector.version} + provided + + + + com.ibm.db2.jcc + db2jcc + db2jcc4 + + + org.apache.seatunnel + seatunnel-api + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + ${e2e.dependency.skip} + true + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + + + diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java new file mode 100644 index 000000000..90673587c --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.datasource.plugin.db2.jdbc; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo; +import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum; + +import com.google.common.collect.Sets; + +import java.util.Set; + +public class Db2DataSourceConfig { + + public static final String PLUGIN_NAME = "JDBC-Db2"; + + public static final DataSourcePluginInfo DB2_DATASOURCE_PLUGIN_INFO = + DataSourcePluginInfo.builder() + .name(PLUGIN_NAME) + .icon(PLUGIN_NAME) + .version("1.0.0") + .type(DatasourcePluginTypeEnum.DATABASE.getCode()) + .build(); + + public static final Set DB2_SYSTEM_DATABASES = + Sets.newHashSet("information_schema", "mysql", "performance_schema", "sys"); + + public static final OptionRule OPTION_RULE = + OptionRule.builder() + .required(Db2OptionRule.URL, Db2OptionRule.DRIVER) + .optional(Db2OptionRule.USER, Db2OptionRule.PASSWORD) + .build(); + + public static final OptionRule METADATA_RULE = + OptionRule.builder().required(Db2OptionRule.DATABASE, Db2OptionRule.TABLE).build(); +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java new file mode 100644 index 000000000..15c21229d --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java @@ -0,0 +1,175 @@ +package org.apache.seatunnel.datasource.plugin.db2.jdbc; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; +import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException; +import org.apache.seatunnel.datasource.plugin.api.model.TableField; + +import org.apache.commons.lang3.StringUtils; + +import lombok.NonNull; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class Db2JdbcDataSourceChannel implements DataSourceChannel { + + @Override + public OptionRule getDataSourceOptions(@NonNull String pluginName) { + return Db2DataSourceConfig.OPTION_RULE; + } + + @Override + public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) { + return Db2DataSourceConfig.METADATA_RULE; + } + + @Override + public List getTables( + @NonNull String pluginName, + Map requestParams, + String database, + Map options) { + List tableNames = new ArrayList<>(); + String filterName = options.get("filterName"); + String size = options.get("size"); + boolean isSize = StringUtils.isNotEmpty(size); + if (StringUtils.isNotEmpty(filterName) && !filterName.contains("%")) { + filterName = "%" + filterName + "%"; + } else if (StringUtils.equals(filterName, "")) { + filterName = null; + } + try (Connection connection = getConnection(requestParams); + ResultSet resultSet = + connection + .getMetaData() + .getTables(null, null, "%", new String[] {"TABLE"})) { + while (resultSet.next()) { + String tableName = resultSet.getString("TABLE_NAME"); + if (StringUtils.isNotBlank(tableName)) { + tableNames.add(tableName); + if (isSize && tableNames.size() >= Integer.parseInt(size)) { + break; + } + } + } + return tableNames; + } catch (ClassNotFoundException | SQLException e) { + throw new DataSourcePluginException("get table names failed", e); + } + } + + @Override + public List getDatabases( + @NonNull String pluginName, @NonNull Map requestParams) { + // Hardcoded list of example database names + List dbNames = Arrays.asList("default"); + return dbNames; + } + + @Override + public boolean checkDataSourceConnectivity( + @NonNull String pluginName, @NonNull Map requestParams) { + try (Connection ignored = getConnection(requestParams)) { + return true; + } catch (Exception e) { + throw new DataSourcePluginException("check jdbc connectivity failed", e); + } + } + + @Override + public List getTableFields( + @NonNull String pluginName, + @NonNull Map requestParams, + @NonNull String database, + @NonNull String table) { + List tableFields = new ArrayList<>(); + try (Connection connection = getConnection(requestParams)) { + DatabaseMetaData metaData = connection.getMetaData(); + + // Retrieve primary key information + String primaryKey = getPrimaryKey(metaData, database, table); + + // Retrieve column information + try (ResultSet resultSet = metaData.getColumns(null, null, table, null)) { + + while (resultSet.next()) { + TableField tableField = new TableField(); + String columnName = resultSet.getString("COLUMN_NAME"); + + // Set primary key flag + tableField.setPrimaryKey(primaryKey != null && primaryKey.equals(columnName)); + + // Set other field attributes + tableField.setName(columnName); + tableField.setType(resultSet.getString("TYPE_NAME")); + tableField.setComment(resultSet.getString("REMARKS")); + + // Set nullable flag + String isNullable = resultSet.getString("IS_NULLABLE"); + tableField.setNullable("YES".equalsIgnoreCase(isNullable)); + + tableFields.add(tableField); + } + } + } catch (SQLException e) { + // Log the exception and rethrow as DataSourcePluginException + System.out.println("Error while retrieving table fields: " + e); + throw new DataSourcePluginException("Failed to get table fields", e); + } catch (ClassNotFoundException e) { + // Log the exception and rethrow as DataSourcePluginException + System.out.println("JDBC driver class not found" + e); + throw new DataSourcePluginException("JDBC driver class not found", e); + } + return tableFields; + } + + @Override + public Map> getTableFields( + @NonNull String pluginName, + @NonNull Map requestParams, + @NonNull String database, + @NonNull List tables) { + return tables.parallelStream() + .collect( + Collectors.toMap( + Function.identity(), + table -> + getTableFields( + pluginName, requestParams, database, table))); + } + + private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName) + throws SQLException { + ResultSet primaryKeysInfo = metaData.getPrimaryKeys(null, dbName, tableName); + while (primaryKeysInfo.next()) { + return primaryKeysInfo.getString("COLUMN_NAME"); + } + return null; + } + + private Connection getConnection(Map requestParams) + throws SQLException, ClassNotFoundException { + // Ensure the DB2 JDBC driver is loaded + Class.forName("com.ibm.db2.jcc.DB2Driver"); + checkNotNull(requestParams.get(Db2OptionRule.URL.key()), "Jdbc url cannot be null"); + String url = requestParams.get(Db2OptionRule.URL.key()); + if (requestParams.containsKey(Db2OptionRule.USER.key())) { + String username = requestParams.get(Db2OptionRule.USER.key()); + String password = requestParams.get(Db2OptionRule.PASSWORD.key()); + return DriverManager.getConnection(url, username, password); + } + return DriverManager.getConnection(url); + } +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceFactory.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceFactory.java new file mode 100644 index 000000000..afdd75c95 --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.datasource.plugin.db2.jdbc; + +import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; +import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory; +import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo; + +import com.google.auto.service.AutoService; +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; + +import java.util.Set; + +@Slf4j +@AutoService(DataSourceFactory.class) +public class Db2JdbcDataSourceFactory implements DataSourceFactory { + + @Override + public String factoryIdentifier() { + return Db2DataSourceConfig.PLUGIN_NAME; + } + + @Override + public Set supportedDataSources() { + return Sets.newHashSet(Db2DataSourceConfig.DB2_DATASOURCE_PLUGIN_INFO); + } + + @Override + public DataSourceChannel createChannel() { + return new Db2JdbcDataSourceChannel(); + } +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java new file mode 100644 index 000000000..e5e2b9b1b --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.datasource.plugin.db2.jdbc; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class Db2OptionRule { + + public static final Option URL = + Options.key("url") + .stringType() + .noDefaultValue() + .withDescription( + "jdbc url, eg:" + + " jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8"); + + public static final Option USER = + Options.key("user").stringType().noDefaultValue().withDescription("jdbc user"); + + public static final Option PASSWORD = + Options.key("password").stringType().noDefaultValue().withDescription("jdbc password"); + + public static final Option DATABASE = + Options.key("database").stringType().noDefaultValue().withDescription("jdbc database"); + + public static final Option TABLE = + Options.key("table").stringType().noDefaultValue().withDescription("jdbc table"); + + public static final Option DRIVER = + Options.key("driver") + .enumType(DriverType.class) + .defaultValue(DriverType.DB2) + .withDescription("driver"); + + public enum DriverType { + DB2("com.ibm.db2.jcc.DB2Driver"), + ; + private final String driverClassName; + + DriverType(String driverClassName) { + this.driverClassName = driverClassName; + } + + public String getDriverClassName() { + return driverClassName; + } + + @Override + public String toString() { + return driverClassName; + } + } +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml index 44c6ac6cb..13af3009f 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml +++ b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml @@ -45,6 +45,7 @@ datasource-sqlserver-cdc datasource-jdbc-tidb datasource-mongodb + datasource-jdbc-db2 diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java new file mode 100644 index 000000000..327f98290 --- /dev/null +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.app.thirdparty.datasource.impl; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.app.domain.request.connector.BusinessMode; +import org.apache.seatunnel.app.domain.request.job.DataSourceOption; +import org.apache.seatunnel.app.domain.request.job.SelectTableFields; +import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes; +import org.apache.seatunnel.app.thirdparty.datasource.DataSourceConfigSwitcher; +import org.apache.seatunnel.common.constants.PluginType; + +import com.google.auto.service.AutoService; + +import java.util.List; +import java.util.Optional; + +import static org.apache.seatunnel.app.domain.request.connector.BusinessMode.DATA_INTEGRATION; +import static org.apache.seatunnel.app.domain.request.connector.BusinessMode.DATA_REPLICA; + +@AutoService(DataSourceConfigSwitcher.class) +public class Db2DataSourceConfigSwitcher extends BaseJdbcDataSourceConfigSwitcher { + + private static final String QUERY_KEY = "query"; + + private static final String GENERATE_SINK_SQL = "generate_sink_sql"; + + private static final String URL_KEY = "url"; + + // for catalog + private static final String CATALOG = "catalog"; + private static final String FACTORY = "factory"; + private static final String USERNAME = "username"; + private static final String PASSWORD = "password"; + private static final String BASE_URL = "base-url"; + private static final String CATALOG_SCHEMA = "schema"; + + private static final Option DATABASE_SCHEMA = + Options.key("database_schema") + .stringType() + .noDefaultValue() + .withDescription("the default database used during automated table creation."); + private static final String CATALOG_NAME = "Db2"; + + public Db2DataSourceConfigSwitcher() {} + + protected Optional getCatalogName() { + return Optional.of(CATALOG_NAME); + } + + protected boolean isSupportPrefixOrSuffix() { + return true; + } + + protected boolean isSupportToggleCase() { + return true; + } + + public Config mergeDatasourceConfig( + Config dataSourceInstanceConfig, + VirtualTableDetailRes virtualTableDetail, + DataSourceOption dataSourceOption, + SelectTableFields selectTableFields, + BusinessMode businessMode, + PluginType pluginType, + Config connectorConfig) { + + // 替换url中的database + if (dataSourceOption.getDatabases().size() == 1) { + String databaseName = dataSourceOption.getDatabases().get(0); + String url = dataSourceInstanceConfig.getString(URL_KEY); + String newUrl = replaceDatabaseNameInUrl(url, databaseName); + dataSourceInstanceConfig = + dataSourceInstanceConfig.withValue( + URL_KEY, ConfigValueFactory.fromAnyRef(newUrl)); + } + if (businessMode.equals(DATA_INTEGRATION)) { + + String databaseName = dataSourceOption.getDatabases().get(0); + + String tableName = dataSourceOption.getTables().get(0); + + // 将schema转换成sql + if (pluginType.equals(PluginType.SOURCE)) { + + List tableFields = selectTableFields.getTableFields(); + + String sql = tableFieldsToSql(tableFields, databaseName, tableName); + + connectorConfig = + connectorConfig.withValue(QUERY_KEY, ConfigValueFactory.fromAnyRef(sql)); + } else if (pluginType.equals(PluginType.SINK)) { + + List tableFields = selectTableFields.getTableFields(); + + String sql = generateDb2(tableFields, databaseName, tableName); + + connectorConfig = + connectorConfig.withValue(QUERY_KEY, ConfigValueFactory.fromAnyRef(sql)); + } else { + throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType); + } + + return super.mergeDatasourceConfig( + dataSourceInstanceConfig, + virtualTableDetail, + dataSourceOption, + selectTableFields, + businessMode, + pluginType, + connectorConfig); + } else if (businessMode.equals(DATA_REPLICA)) { + String databaseName = dataSourceOption.getDatabases().get(0); + if (pluginType.equals(PluginType.SINK)) { + if (getCatalogName().isPresent()) { + Config config = ConfigFactory.empty(); + config = + config.withValue( + FACTORY, ConfigValueFactory.fromAnyRef(getCatalogName().get())); + config = + config.withValue( + USERNAME, + ConfigValueFactory.fromAnyRef( + dataSourceInstanceConfig.getString("user"))); + config = + config.withValue( + PASSWORD, + ConfigValueFactory.fromAnyRef( + dataSourceInstanceConfig.getString(PASSWORD))); + config = + config.withValue( + BASE_URL, + ConfigValueFactory.fromAnyRef( + dataSourceInstanceConfig.getString(URL_KEY))); + if (isSupportDefaultSchema() + && connectorConfig.hasPath(DATABASE_SCHEMA.key())) { + config = + config.withValue( + CATALOG_SCHEMA, + ConfigValueFactory.fromAnyRef( + connectorConfig.getString(DATABASE_SCHEMA.key()))); + } + + connectorConfig = connectorConfig.withValue(CATALOG, config.root()); + } + return super.mergeDatasourceConfig( + dataSourceInstanceConfig, + virtualTableDetail, + dataSourceOption, + selectTableFields, + businessMode, + pluginType, + connectorConfig); + } else { + throw new UnsupportedOperationException( + "JDBC DATA_REPLICA Unsupported plugin type: " + pluginType); + } + + } else { + throw new UnsupportedOperationException("Unsupported businessMode : " + businessMode); + } + } + + protected String tableFieldsToSql(List tableFields, String database, String table) { + return generateSql(tableFields, database, null, table); + } + + protected String generateDb2(List tableFields, String database, String table) { + return generateSinkSql(tableFields, database, null, table); + } + + protected String generateSql( + List tableFields, String database, String schema, String table) { + StringBuilder sb = new StringBuilder(); + sb.append("SELECT "); + for (int i = 0; i < tableFields.size(); i++) { + sb.append(quoteIdentifier(tableFields.get(i))); + if (i < tableFields.size() - 1) { + sb.append(", "); + } + } + sb.append(" FROM ").append(quoteIdentifier(table)); + + return sb.toString(); + } + + protected String quoteIdentifier(String identifier) { + return "\"" + identifier + "\""; + } + + protected String generateSinkSql( + List tableFields, String database, String schema, String table) { + StringBuilder sb = new StringBuilder(); + sb.append("INSERT INTO ").append(quoteIdentifier(table)).append(" ("); + + // Append column names + for (int i = 0; i < tableFields.size(); i++) { + sb.append(quoteIdentifier(tableFields.get(i))); + if (i < tableFields.size() - 1) { + sb.append(", "); + } + } + + sb.append(") VALUES ("); + + // Append placeholders + for (int i = 0; i < tableFields.size(); i++) { + sb.append("?"); + if (i < tableFields.size() - 1) { + sb.append(", "); + } + } + + sb.append(");"); + return sb.toString(); + } + + @Override + public String getDataSourceName() { + return "JDBC-DB2"; + } +} diff --git a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml index d322d2e7b..4bcd6fd0f 100644 --- a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml +++ b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml @@ -25,6 +25,7 @@ connector-datasource-mapper: - JDBC-SQLServer - JDBC-KingBase - JDBC-Oracle + - JDBC-Db2 MySQL-CDC: dataSources: @@ -134,7 +135,11 @@ connector-datasource-mapper: - SINGLE_TABLE jobMode: - BATCH - + JDBC-Db2: + businessMode: + - DATA_INTEGRATION + sceneMode: + - SINGLE_TABLE sinkDatasourceFeatures: JDBC-Mysql: @@ -214,4 +219,9 @@ connector-datasource-mapper: - DATA_REPLICA sceneMode: - SINGLE_TABLE - - MULTIPLE_TABLE \ No newline at end of file + - MULTIPLE_TABLE + JDBC-Db2: + businessMode: + - DATA_INTEGRATION + sceneMode: + - SINGLE_TABLE \ No newline at end of file diff --git a/seatunnel-web-dist/pom.xml b/seatunnel-web-dist/pom.xml index f19e379a7..0623bd2f6 100644 --- a/seatunnel-web-dist/pom.xml +++ b/seatunnel-web-dist/pom.xml @@ -363,6 +363,19 @@ + + org.apache.seatunnel + datasource-jdbc-db2 + ${project.version} + provided + + + * + * + + + + com.oracle.database.jdbc