diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml index d5dee4503..a3035bf6b 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml @@ -102,6 +102,11 @@ datasource-jdbc-db2 ${project.version} + + org.apache.seatunnel + datasource-jdbc-snowflake + ${project.version} + diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java index 844a47563..5ee609b88 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.seatunnel.datasource.plugin.db2.jdbc; import org.apache.seatunnel.api.configuration.util.OptionRule; diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/pom.xml new file mode 100644 index 000000000..e22d1cd81 --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/pom.xml @@ -0,0 +1,81 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-datasource-plugins + ${revision} + + + datasource-jdbc-snowflake + + + 8.0.28 + + + + + org.apache.seatunnel + datasource-plugins-api + ${project.version} + provided + + + org.apache.commons + commons-lang3 + + + + com.google.auto.service + auto-service + + + org.apache.seatunnel + seatunnel-api + provided + + + + + + net.snowflake + snowflake-jdbc + 3.14.4 + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + ${e2e.dependency.skip} + true + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + + diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeDataSourceConfig.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeDataSourceConfig.java new file mode 100644 index 000000000..cb7fdc451 --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeDataSourceConfig.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.datasource.plugin.snowflake.jdbc; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo; +import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum; + +import com.google.common.collect.Sets; + +import java.util.Set; + +public class SnowFlakeDataSourceConfig { + + public static final String PLUGIN_NAME = "JDBC-SnowFlake"; + + public static final DataSourcePluginInfo SNOWFLAKE_DATASOURCE_PLUGIN_INFO = + DataSourcePluginInfo.builder() + .name(PLUGIN_NAME) + .icon(PLUGIN_NAME) + .version("1.0.0") + .type(DatasourcePluginTypeEnum.DATABASE.getCode()) + .build(); + + public static final Set SNOWFLAKE_SYSTEM_DATABASES = + Sets.newHashSet( + "information_schema", "mysql", "performance_schema", "sys", "SNOWFLAKE"); + + public static final OptionRule OPTION_RULE = + OptionRule.builder() + .required(SnowFlakeOptionRule.URL, SnowFlakeOptionRule.DRIVER) + .optional(SnowFlakeOptionRule.USER, SnowFlakeOptionRule.PASSWORD) + .build(); + + public static final OptionRule METADATA_RULE = + OptionRule.builder() + .required(SnowFlakeOptionRule.DATABASE, SnowFlakeOptionRule.TABLE) + .build(); +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeJdbcDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeJdbcDataSourceChannel.java new file mode 100644 index 000000000..e54b4389d --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeJdbcDataSourceChannel.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.datasource.plugin.snowflake.jdbc; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; +import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException; +import org.apache.seatunnel.datasource.plugin.api.model.TableField; + +import org.apache.commons.lang3.StringUtils; + +import lombok.NonNull; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class SnowFlakeJdbcDataSourceChannel implements DataSourceChannel { + + @Override + public OptionRule getDataSourceOptions(@NonNull String pluginName) { + return SnowFlakeDataSourceConfig.OPTION_RULE; + } + + @Override + public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) { + return SnowFlakeDataSourceConfig.METADATA_RULE; + } + + public List getTables( + String pluginName, + Map requestParams, + String database, + Map options) { + List schemaTableNames = new ArrayList<>(); + try { + Class.forName(requestParams.get(SnowFlakeOptionRule.DRIVER.key())); + try (Connection connection = getConnection(requestParams)) { + DatabaseMetaData metaData = connection.getMetaData(); + try (ResultSet resultSet = + metaData.getTables(database, null, null, new String[] {"TABLE"})) { + while (resultSet.next()) { + String schemaName = resultSet.getString("TABLE_SCHEM"); + String tableName = resultSet.getString("TABLE_NAME"); + if (StringUtils.isNotBlank(tableName)) { + schemaTableNames.add(schemaName + "." + tableName); + } + } + } + } + } catch (ClassNotFoundException | SQLException e) { + // Handle exceptions or rethrow as unchecked + throw new RuntimeException("Error accessing database metadata", e); + } + return schemaTableNames; + } + + @Override + public List getDatabases( + @NonNull String pluginName, @NonNull Map requestParams) { + List dbNames = new ArrayList<>(); + try { + Connection connection = getConnection(requestParams); + PreparedStatement statement = connection.prepareStatement("SHOW DATABASES;"); + ResultSet re = statement.executeQuery(); + // filter system databases + while (re.next()) { + String dbName = re.getString("name"); + if (StringUtils.isNotBlank(dbName) + && !SnowFlakeDataSourceConfig.SNOWFLAKE_SYSTEM_DATABASES.contains(dbName)) { + dbNames.add(dbName); + } + } + return dbNames; + } catch (SQLException | ClassNotFoundException e) { + throw new DataSourcePluginException("Get databases failed", e); + } + } + + @Override + public boolean checkDataSourceConnectivity( + @NonNull String pluginName, @NonNull Map requestParams) { + try (Connection ignored = getConnection(requestParams)) { + return true; + } catch (Exception e) { + throw new DataSourcePluginException("check jdbc connectivity failed", e); + } + } + + @Override + public List getTableFields( + @NonNull String pluginName, + @NonNull Map requestParams, + @NonNull String database, + @NonNull String table) { + List tableFields = new ArrayList<>(); + try (Connection connection = getConnection(requestParams, database); ) { + DatabaseMetaData metaData = connection.getMetaData(); + String primaryKey = getPrimaryKey(metaData, database, table); + String[] split = table.split("\\."); + if (split.length != 2) { + // throw new DataSourcePluginException( + // "Postgresql tableName should composed by + // schemaName.tableName"); + } + try (ResultSet resultSet = metaData.getColumns(database, split[0], split[1], null)) { + while (resultSet.next()) { + TableField tableField = new TableField(); + String columnName = resultSet.getString("COLUMN_NAME"); + tableField.setPrimaryKey(false); + if (StringUtils.isNotBlank(primaryKey) && primaryKey.equals(columnName)) { + tableField.setPrimaryKey(true); + } + tableField.setName(columnName); + tableField.setType(resultSet.getString("TYPE_NAME")); + tableField.setComment(resultSet.getString("REMARKS")); + Object nullable = resultSet.getObject("IS_NULLABLE"); + tableField.setNullable(Boolean.TRUE.toString().equals(nullable.toString())); + tableFields.add(tableField); + } + } + } catch (SQLException | ClassNotFoundException e) { + // throw new DataSourcePluginException("get table fields failed", e); + } + return tableFields; + } + + @Override + public Map> getTableFields( + @NonNull String pluginName, + @NonNull Map requestParams, + @NonNull String database, + @NonNull List tables) { + return tables.parallelStream() + .collect( + Collectors.toMap( + Function.identity(), + table -> + getTableFields( + pluginName, requestParams, database, table))); + } + + private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName) + throws SQLException { + ResultSet primaryKeysInfo = metaData.getPrimaryKeys(dbName, "%", tableName); + while (primaryKeysInfo.next()) { + return primaryKeysInfo.getString("COLUMN_NAME"); + } + return null; + } + + private Connection getConnection(Map requestParams) + throws SQLException, ClassNotFoundException { + return getConnection(requestParams, null); + } + + private static Connection getConnection(Map requestParams, String databaseName) + throws SQLException, ClassNotFoundException { + checkNotNull(requestParams.get(SnowFlakeOptionRule.DRIVER.key())); + checkNotNull(requestParams.get(SnowFlakeOptionRule.URL.key()), "Jdbc url cannot be null"); + String url = + replaceDatabase(requestParams.get(SnowFlakeOptionRule.URL.key()), databaseName); + if (requestParams.containsKey(SnowFlakeOptionRule.USER.key())) { + String username = requestParams.get(SnowFlakeOptionRule.USER.key()); + String password = requestParams.get(SnowFlakeOptionRule.PASSWORD.key()); + return DriverManager.getConnection(url, username, password); + } + return DriverManager.getConnection(url); + } + // Placeholder for the JdbcUtils.replaceDatabase method + private static String replaceDatabase(String url, String databaseName) { + // Implement database name replacement logic + return url; // Return the original URL or modified URL + } +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeJdbcDataSourceFactory.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeJdbcDataSourceFactory.java new file mode 100644 index 000000000..baf2b0c48 --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeJdbcDataSourceFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.datasource.plugin.snowflake.jdbc; + +import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; +import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory; +import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo; + +import com.google.auto.service.AutoService; +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; + +import java.util.Set; + +@Slf4j +@AutoService(DataSourceFactory.class) +public class SnowFlakeJdbcDataSourceFactory implements DataSourceFactory { + + @Override + public String factoryIdentifier() { + return SnowFlakeDataSourceConfig.PLUGIN_NAME; + } + + @Override + public Set supportedDataSources() { + return Sets.newHashSet(SnowFlakeDataSourceConfig.SNOWFLAKE_DATASOURCE_PLUGIN_INFO); + } + + @Override + public DataSourceChannel createChannel() { + return new SnowFlakeJdbcDataSourceChannel(); + } +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeOptionRule.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeOptionRule.java new file mode 100644 index 000000000..36453111a --- /dev/null +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeOptionRule.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.datasource.plugin.snowflake.jdbc; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class SnowFlakeOptionRule { + + public static final Option URL = + Options.key("url") + .stringType() + .noDefaultValue() + .withDescription( + "jdbc url, eg:" + + " jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8"); + + public static final Option USER = + Options.key("user").stringType().noDefaultValue().withDescription("jdbc user"); + + public static final Option PASSWORD = + Options.key("password").stringType().noDefaultValue().withDescription("jdbc password"); + + public static final Option DATABASE = + Options.key("database").stringType().noDefaultValue().withDescription("jdbc database"); + + public static final Option TABLE = + Options.key("table").stringType().noDefaultValue().withDescription("jdbc table"); + + public static final Option DRIVER = + Options.key("driver") + .enumType(DriverType.class) + .defaultValue(DriverType.SNOWFLAKE) + .withDescription("driver"); + + public enum DriverType { + SNOWFLAKE("net.snowflake.client.jdbc.SnowflakeDriver"), + ; + private final String driverClassName; + + DriverType(String driverClassName) { + this.driverClassName = driverClassName; + } + + public String getDriverClassName() { + return driverClassName; + } + + @Override + public String toString() { + return driverClassName; + } + } +} diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml index 7a4568582..9b79eeecc 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml +++ b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml @@ -46,6 +46,7 @@ datasource-jdbc-tidb datasource-mongodb datasource-jdbc-db2 + datasource-jdbc-snowflake datasource-fakesource datasource-console diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/SnowFlakeDatasourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/SnowFlakeDatasourceConfigSwitcher.java new file mode 100644 index 000000000..bd8192608 --- /dev/null +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/SnowFlakeDatasourceConfigSwitcher.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.app.thirdparty.datasource.impl; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.app.domain.request.connector.BusinessMode; +import org.apache.seatunnel.app.domain.request.job.DataSourceOption; +import org.apache.seatunnel.app.domain.request.job.SelectTableFields; +import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes; +import org.apache.seatunnel.app.thirdparty.datasource.DataSourceConfigSwitcher; +import org.apache.seatunnel.common.constants.PluginType; + +import com.google.auto.service.AutoService; + +import java.util.List; +import java.util.Optional; + +import static org.apache.seatunnel.app.domain.request.connector.BusinessMode.DATA_INTEGRATION; +import static org.apache.seatunnel.app.domain.request.connector.BusinessMode.DATA_REPLICA; + +@AutoService(DataSourceConfigSwitcher.class) +public class SnowFlakeDatasourceConfigSwitcher extends BaseJdbcDataSourceConfigSwitcher { + private static final String TABLE_KEY = "table"; + private static final String DATABASE_KEY = "database"; + + private static final String QUERY_KEY = "query"; + + private static final String GENERATE_SINK_SQL = "generate_sink_sql"; + + private static final String URL_KEY = "url"; + + // for catalog + private static final String CATALOG = "catalog"; + private static final String FACTORY = "factory"; + private static final String USERNAME = "username"; + private static final String PASSWORD = "password"; + private static final String BASE_URL = "base-url"; + private static final String CATALOG_SCHEMA = "schema"; + + private static final Option DATABASE_SCHEMA = + Options.key("database_schema") + .stringType() + .noDefaultValue() + .withDescription("the default database used during automated table creation."); + private static final String CATALOG_NAME = "SnowFlake"; + + public SnowFlakeDatasourceConfigSwitcher() {} + + protected Optional getCatalogName() { + return Optional.of(CATALOG_NAME); + } + + protected boolean isSupportPrefixOrSuffix() { + return true; + } + + protected boolean isSupportToggleCase() { + return true; + } + + public Config mergeDatasourceConfig( + Config dataSourceInstanceConfig, + VirtualTableDetailRes virtualTableDetail, + DataSourceOption dataSourceOption, + SelectTableFields selectTableFields, + BusinessMode businessMode, + PluginType pluginType, + Config connectorConfig) { + + // 替换url中的database + if (dataSourceOption.getDatabases().size() == 1) { + String databaseName = dataSourceOption.getDatabases().get(0); + String url = dataSourceInstanceConfig.getString(URL_KEY); + String newUrl = replaceDatabaseNameInUrl(url, databaseName); + dataSourceInstanceConfig = + dataSourceInstanceConfig.withValue( + URL_KEY, ConfigValueFactory.fromAnyRef(newUrl)); + } + if (pluginType.equals(PluginType.SINK)) { + connectorConfig = + connectorConfig.withValue( + GENERATE_SINK_SQL, ConfigValueFactory.fromAnyRef(true)); + } + if (businessMode.equals(DATA_INTEGRATION)) { + + String databaseName = dataSourceOption.getDatabases().get(0); + + String tableName = dataSourceOption.getTables().get(0); + + // 将schema转换成sql + if (pluginType.equals(PluginType.SOURCE)) { + + List tableFields = selectTableFields.getTableFields(); + + String sql = tableFieldsToSql(tableFields, databaseName, tableName); + + connectorConfig = + connectorConfig.withValue(QUERY_KEY, ConfigValueFactory.fromAnyRef(sql)); + } else if (pluginType.equals(PluginType.SINK)) { + connectorConfig = + connectorConfig.withValue( + DATABASE_KEY, ConfigValueFactory.fromAnyRef(databaseName)); + connectorConfig = + connectorConfig.withValue( + TABLE_KEY, ConfigValueFactory.fromAnyRef(tableName)); + } else { + throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType); + } + + return super.mergeDatasourceConfig( + dataSourceInstanceConfig, + virtualTableDetail, + dataSourceOption, + selectTableFields, + businessMode, + pluginType, + connectorConfig); + } else if (businessMode.equals(DATA_REPLICA)) { + String databaseName = dataSourceOption.getDatabases().get(0); + if (pluginType.equals(PluginType.SINK)) { + if (getCatalogName().isPresent()) { + Config config = ConfigFactory.empty(); + config = + config.withValue( + FACTORY, ConfigValueFactory.fromAnyRef(getCatalogName().get())); + config = + config.withValue( + USERNAME, + ConfigValueFactory.fromAnyRef( + dataSourceInstanceConfig.getString("user"))); + config = + config.withValue( + PASSWORD, + ConfigValueFactory.fromAnyRef( + dataSourceInstanceConfig.getString(PASSWORD))); + config = + config.withValue( + BASE_URL, + ConfigValueFactory.fromAnyRef( + dataSourceInstanceConfig.getString(URL_KEY))); + if (isSupportDefaultSchema() + && connectorConfig.hasPath(DATABASE_SCHEMA.key())) { + config = + config.withValue( + CATALOG_SCHEMA, + ConfigValueFactory.fromAnyRef( + connectorConfig.getString(DATABASE_SCHEMA.key()))); + } + + connectorConfig = connectorConfig.withValue(CATALOG, config.root()); + } + connectorConfig = + connectorConfig.withValue( + DATABASE_KEY, ConfigValueFactory.fromAnyRef(databaseName)); + return super.mergeDatasourceConfig( + dataSourceInstanceConfig, + virtualTableDetail, + dataSourceOption, + selectTableFields, + businessMode, + pluginType, + connectorConfig); + } else { + throw new UnsupportedOperationException( + "JDBC DATA_REPLICA Unsupported plugin type: " + pluginType); + } + + } else { + throw new UnsupportedOperationException("Unsupported businessMode : " + businessMode); + } + } + + protected String tableFieldsToSql(List tableFields, String database, String table) { + return generateSql(tableFields, database, null, table); + } + + protected String generateSql( + List tableFields, String database, String schema, String table) { + StringBuilder sb = new StringBuilder(); + sb.append("SELECT ") + .append("*") + .append(" FROM ") + .append(database) + .append(".") + .append(table); + + return sb.toString(); + } + + protected String quoteIdentifier(String identifier) { + return "\"" + identifier + "\""; + } + + @Override + public String getDataSourceName() { + return "JDBC-SNOWFLAKE"; + } +} diff --git a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml index 0ba9a780f..2ee429418 100644 --- a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml +++ b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml @@ -26,6 +26,7 @@ connector-datasource-mapper: - JDBC-KingBase - JDBC-Oracle - JDBC-Db2 + - JDBC-SnowFlake MySQL-CDC: dataSources: @@ -148,6 +149,11 @@ connector-datasource-mapper: - DATA_INTEGRATION sceneMode: - SINGLE_TABLE + JDBC-SnowFlake: + businessMode: + - DATA_INTEGRATION + sceneMode: + - SINGLE_TABLE FakeSource: businessMode: - DATA_INTEGRATION @@ -240,6 +246,11 @@ connector-datasource-mapper: - DATA_INTEGRATION sceneMode: - SINGLE_TABLE + JDBC-SnowFlake: + businessMode: + - DATA_INTEGRATION + sceneMode: + - SINGLE_TABLE Console: businessMode: @@ -247,4 +258,4 @@ connector-datasource-mapper: - DATA_REPLICA sceneMode: - SINGLE_TABLE - - MULTIPLE_TABLE \ No newline at end of file + - MULTIPLE_TABLE diff --git a/seatunnel-web-dist/pom.xml b/seatunnel-web-dist/pom.xml index 531e31f0d..0ee2d851b 100644 --- a/seatunnel-web-dist/pom.xml +++ b/seatunnel-web-dist/pom.xml @@ -375,6 +375,18 @@ + + org.apache.seatunnel + datasource-jdbc-snowflake + ${project.version} + provided + + + * + * + + + org.apache.seatunnel datasource-fakesource