diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml
index d5dee4503..a3035bf6b 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-all/pom.xml
@@ -102,6 +102,11 @@
datasource-jdbc-db2
${project.version}
+
+ org.apache.seatunnel
+ datasource-jdbc-snowflake
+ ${project.version}
+
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java
index 844a47563..5ee609b88 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.seatunnel.datasource.plugin.db2.jdbc;
import org.apache.seatunnel.api.configuration.util.OptionRule;
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/pom.xml
new file mode 100644
index 000000000..e22d1cd81
--- /dev/null
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/pom.xml
@@ -0,0 +1,81 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ seatunnel-datasource-plugins
+ ${revision}
+
+
+ datasource-jdbc-snowflake
+
+
+ 8.0.28
+
+
+
+
+ org.apache.seatunnel
+ datasource-plugins-api
+ ${project.version}
+ provided
+
+
+ org.apache.commons
+ commons-lang3
+
+
+
+ com.google.auto.service
+ auto-service
+
+
+ org.apache.seatunnel
+ seatunnel-api
+ provided
+
+
+
+
+
+ net.snowflake
+ snowflake-jdbc
+ 3.14.4
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+ ${e2e.dependency.skip}
+ true
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+
+
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeDataSourceConfig.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeDataSourceConfig.java
new file mode 100644
index 000000000..cb7fdc451
--- /dev/null
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeDataSourceConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.datasource.plugin.snowflake.jdbc;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
+import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
+
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+
+public class SnowFlakeDataSourceConfig {
+
+ public static final String PLUGIN_NAME = "JDBC-SnowFlake";
+
+ public static final DataSourcePluginInfo SNOWFLAKE_DATASOURCE_PLUGIN_INFO =
+ DataSourcePluginInfo.builder()
+ .name(PLUGIN_NAME)
+ .icon(PLUGIN_NAME)
+ .version("1.0.0")
+ .type(DatasourcePluginTypeEnum.DATABASE.getCode())
+ .build();
+
+ public static final Set SNOWFLAKE_SYSTEM_DATABASES =
+ Sets.newHashSet(
+ "information_schema", "mysql", "performance_schema", "sys", "SNOWFLAKE");
+
+ public static final OptionRule OPTION_RULE =
+ OptionRule.builder()
+ .required(SnowFlakeOptionRule.URL, SnowFlakeOptionRule.DRIVER)
+ .optional(SnowFlakeOptionRule.USER, SnowFlakeOptionRule.PASSWORD)
+ .build();
+
+ public static final OptionRule METADATA_RULE =
+ OptionRule.builder()
+ .required(SnowFlakeOptionRule.DATABASE, SnowFlakeOptionRule.TABLE)
+ .build();
+}
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeJdbcDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeJdbcDataSourceChannel.java
new file mode 100644
index 000000000..e54b4389d
--- /dev/null
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeJdbcDataSourceChannel.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.datasource.plugin.snowflake.jdbc;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
+import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
+import org.apache.seatunnel.datasource.plugin.api.model.TableField;
+
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.NonNull;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class SnowFlakeJdbcDataSourceChannel implements DataSourceChannel {
+
+ @Override
+ public OptionRule getDataSourceOptions(@NonNull String pluginName) {
+ return SnowFlakeDataSourceConfig.OPTION_RULE;
+ }
+
+ @Override
+ public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
+ return SnowFlakeDataSourceConfig.METADATA_RULE;
+ }
+
+ public List getTables(
+ String pluginName,
+ Map requestParams,
+ String database,
+ Map options) {
+ List schemaTableNames = new ArrayList<>();
+ try {
+ Class.forName(requestParams.get(SnowFlakeOptionRule.DRIVER.key()));
+ try (Connection connection = getConnection(requestParams)) {
+ DatabaseMetaData metaData = connection.getMetaData();
+ try (ResultSet resultSet =
+ metaData.getTables(database, null, null, new String[] {"TABLE"})) {
+ while (resultSet.next()) {
+ String schemaName = resultSet.getString("TABLE_SCHEM");
+ String tableName = resultSet.getString("TABLE_NAME");
+ if (StringUtils.isNotBlank(tableName)) {
+ schemaTableNames.add(schemaName + "." + tableName);
+ }
+ }
+ }
+ }
+ } catch (ClassNotFoundException | SQLException e) {
+ // Handle exceptions or rethrow as unchecked
+ throw new RuntimeException("Error accessing database metadata", e);
+ }
+ return schemaTableNames;
+ }
+
+ @Override
+ public List getDatabases(
+ @NonNull String pluginName, @NonNull Map requestParams) {
+ List dbNames = new ArrayList<>();
+ try {
+ Connection connection = getConnection(requestParams);
+ PreparedStatement statement = connection.prepareStatement("SHOW DATABASES;");
+ ResultSet re = statement.executeQuery();
+ // filter system databases
+ while (re.next()) {
+ String dbName = re.getString("name");
+ if (StringUtils.isNotBlank(dbName)
+ && !SnowFlakeDataSourceConfig.SNOWFLAKE_SYSTEM_DATABASES.contains(dbName)) {
+ dbNames.add(dbName);
+ }
+ }
+ return dbNames;
+ } catch (SQLException | ClassNotFoundException e) {
+ throw new DataSourcePluginException("Get databases failed", e);
+ }
+ }
+
+ @Override
+ public boolean checkDataSourceConnectivity(
+ @NonNull String pluginName, @NonNull Map requestParams) {
+ try (Connection ignored = getConnection(requestParams)) {
+ return true;
+ } catch (Exception e) {
+ throw new DataSourcePluginException("check jdbc connectivity failed", e);
+ }
+ }
+
+ @Override
+ public List getTableFields(
+ @NonNull String pluginName,
+ @NonNull Map requestParams,
+ @NonNull String database,
+ @NonNull String table) {
+ List tableFields = new ArrayList<>();
+ try (Connection connection = getConnection(requestParams, database); ) {
+ DatabaseMetaData metaData = connection.getMetaData();
+ String primaryKey = getPrimaryKey(metaData, database, table);
+ String[] split = table.split("\\.");
+ if (split.length != 2) {
+ // throw new DataSourcePluginException(
+ // "Postgresql tableName should composed by
+ // schemaName.tableName");
+ }
+ try (ResultSet resultSet = metaData.getColumns(database, split[0], split[1], null)) {
+ while (resultSet.next()) {
+ TableField tableField = new TableField();
+ String columnName = resultSet.getString("COLUMN_NAME");
+ tableField.setPrimaryKey(false);
+ if (StringUtils.isNotBlank(primaryKey) && primaryKey.equals(columnName)) {
+ tableField.setPrimaryKey(true);
+ }
+ tableField.setName(columnName);
+ tableField.setType(resultSet.getString("TYPE_NAME"));
+ tableField.setComment(resultSet.getString("REMARKS"));
+ Object nullable = resultSet.getObject("IS_NULLABLE");
+ tableField.setNullable(Boolean.TRUE.toString().equals(nullable.toString()));
+ tableFields.add(tableField);
+ }
+ }
+ } catch (SQLException | ClassNotFoundException e) {
+ // throw new DataSourcePluginException("get table fields failed", e);
+ }
+ return tableFields;
+ }
+
+ @Override
+ public Map> getTableFields(
+ @NonNull String pluginName,
+ @NonNull Map requestParams,
+ @NonNull String database,
+ @NonNull List tables) {
+ return tables.parallelStream()
+ .collect(
+ Collectors.toMap(
+ Function.identity(),
+ table ->
+ getTableFields(
+ pluginName, requestParams, database, table)));
+ }
+
+ private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName)
+ throws SQLException {
+ ResultSet primaryKeysInfo = metaData.getPrimaryKeys(dbName, "%", tableName);
+ while (primaryKeysInfo.next()) {
+ return primaryKeysInfo.getString("COLUMN_NAME");
+ }
+ return null;
+ }
+
+ private Connection getConnection(Map requestParams)
+ throws SQLException, ClassNotFoundException {
+ return getConnection(requestParams, null);
+ }
+
+ private static Connection getConnection(Map requestParams, String databaseName)
+ throws SQLException, ClassNotFoundException {
+ checkNotNull(requestParams.get(SnowFlakeOptionRule.DRIVER.key()));
+ checkNotNull(requestParams.get(SnowFlakeOptionRule.URL.key()), "Jdbc url cannot be null");
+ String url =
+ replaceDatabase(requestParams.get(SnowFlakeOptionRule.URL.key()), databaseName);
+ if (requestParams.containsKey(SnowFlakeOptionRule.USER.key())) {
+ String username = requestParams.get(SnowFlakeOptionRule.USER.key());
+ String password = requestParams.get(SnowFlakeOptionRule.PASSWORD.key());
+ return DriverManager.getConnection(url, username, password);
+ }
+ return DriverManager.getConnection(url);
+ }
+ // Placeholder for the JdbcUtils.replaceDatabase method
+ private static String replaceDatabase(String url, String databaseName) {
+ // Implement database name replacement logic
+ return url; // Return the original URL or modified URL
+ }
+}
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeJdbcDataSourceFactory.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeJdbcDataSourceFactory.java
new file mode 100644
index 000000000..baf2b0c48
--- /dev/null
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeJdbcDataSourceFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.datasource.plugin.snowflake.jdbc;
+
+import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
+import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
+import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Set;
+
+@Slf4j
+@AutoService(DataSourceFactory.class)
+public class SnowFlakeJdbcDataSourceFactory implements DataSourceFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return SnowFlakeDataSourceConfig.PLUGIN_NAME;
+ }
+
+ @Override
+ public Set supportedDataSources() {
+ return Sets.newHashSet(SnowFlakeDataSourceConfig.SNOWFLAKE_DATASOURCE_PLUGIN_INFO);
+ }
+
+ @Override
+ public DataSourceChannel createChannel() {
+ return new SnowFlakeJdbcDataSourceChannel();
+ }
+}
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeOptionRule.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeOptionRule.java
new file mode 100644
index 000000000..36453111a
--- /dev/null
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-snowflake/src/main/java/org/apache/seatunnel/datasource/plugin/snowflake/jdbc/SnowFlakeOptionRule.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.datasource.plugin.snowflake.jdbc;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class SnowFlakeOptionRule {
+
+ public static final Option URL =
+ Options.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "jdbc url, eg:"
+ + " jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8");
+
+ public static final Option USER =
+ Options.key("user").stringType().noDefaultValue().withDescription("jdbc user");
+
+ public static final Option PASSWORD =
+ Options.key("password").stringType().noDefaultValue().withDescription("jdbc password");
+
+ public static final Option DATABASE =
+ Options.key("database").stringType().noDefaultValue().withDescription("jdbc database");
+
+ public static final Option TABLE =
+ Options.key("table").stringType().noDefaultValue().withDescription("jdbc table");
+
+ public static final Option DRIVER =
+ Options.key("driver")
+ .enumType(DriverType.class)
+ .defaultValue(DriverType.SNOWFLAKE)
+ .withDescription("driver");
+
+ public enum DriverType {
+ SNOWFLAKE("net.snowflake.client.jdbc.SnowflakeDriver"),
+ ;
+ private final String driverClassName;
+
+ DriverType(String driverClassName) {
+ this.driverClassName = driverClassName;
+ }
+
+ public String getDriverClassName() {
+ return driverClassName;
+ }
+
+ @Override
+ public String toString() {
+ return driverClassName;
+ }
+ }
+}
diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
index 7a4568582..9b79eeecc 100644
--- a/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
+++ b/seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
@@ -46,6 +46,7 @@
datasource-jdbc-tidb
datasource-mongodb
datasource-jdbc-db2
+ datasource-jdbc-snowflake
datasource-fakesource
datasource-console
diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/SnowFlakeDatasourceConfigSwitcher.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/SnowFlakeDatasourceConfigSwitcher.java
new file mode 100644
index 000000000..bd8192608
--- /dev/null
+++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/SnowFlakeDatasourceConfigSwitcher.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thirdparty.datasource.impl;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.app.domain.request.connector.BusinessMode;
+import org.apache.seatunnel.app.domain.request.job.DataSourceOption;
+import org.apache.seatunnel.app.domain.request.job.SelectTableFields;
+import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
+import org.apache.seatunnel.app.thirdparty.datasource.DataSourceConfigSwitcher;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import com.google.auto.service.AutoService;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.seatunnel.app.domain.request.connector.BusinessMode.DATA_INTEGRATION;
+import static org.apache.seatunnel.app.domain.request.connector.BusinessMode.DATA_REPLICA;
+
+@AutoService(DataSourceConfigSwitcher.class)
+public class SnowFlakeDatasourceConfigSwitcher extends BaseJdbcDataSourceConfigSwitcher {
+ private static final String TABLE_KEY = "table";
+ private static final String DATABASE_KEY = "database";
+
+ private static final String QUERY_KEY = "query";
+
+ private static final String GENERATE_SINK_SQL = "generate_sink_sql";
+
+ private static final String URL_KEY = "url";
+
+ // for catalog
+ private static final String CATALOG = "catalog";
+ private static final String FACTORY = "factory";
+ private static final String USERNAME = "username";
+ private static final String PASSWORD = "password";
+ private static final String BASE_URL = "base-url";
+ private static final String CATALOG_SCHEMA = "schema";
+
+ private static final Option DATABASE_SCHEMA =
+ Options.key("database_schema")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the default database used during automated table creation.");
+ private static final String CATALOG_NAME = "SnowFlake";
+
+ public SnowFlakeDatasourceConfigSwitcher() {}
+
+ protected Optional getCatalogName() {
+ return Optional.of(CATALOG_NAME);
+ }
+
+ protected boolean isSupportPrefixOrSuffix() {
+ return true;
+ }
+
+ protected boolean isSupportToggleCase() {
+ return true;
+ }
+
+ public Config mergeDatasourceConfig(
+ Config dataSourceInstanceConfig,
+ VirtualTableDetailRes virtualTableDetail,
+ DataSourceOption dataSourceOption,
+ SelectTableFields selectTableFields,
+ BusinessMode businessMode,
+ PluginType pluginType,
+ Config connectorConfig) {
+
+ // 替换url中的database
+ if (dataSourceOption.getDatabases().size() == 1) {
+ String databaseName = dataSourceOption.getDatabases().get(0);
+ String url = dataSourceInstanceConfig.getString(URL_KEY);
+ String newUrl = replaceDatabaseNameInUrl(url, databaseName);
+ dataSourceInstanceConfig =
+ dataSourceInstanceConfig.withValue(
+ URL_KEY, ConfigValueFactory.fromAnyRef(newUrl));
+ }
+ if (pluginType.equals(PluginType.SINK)) {
+ connectorConfig =
+ connectorConfig.withValue(
+ GENERATE_SINK_SQL, ConfigValueFactory.fromAnyRef(true));
+ }
+ if (businessMode.equals(DATA_INTEGRATION)) {
+
+ String databaseName = dataSourceOption.getDatabases().get(0);
+
+ String tableName = dataSourceOption.getTables().get(0);
+
+ // 将schema转换成sql
+ if (pluginType.equals(PluginType.SOURCE)) {
+
+ List tableFields = selectTableFields.getTableFields();
+
+ String sql = tableFieldsToSql(tableFields, databaseName, tableName);
+
+ connectorConfig =
+ connectorConfig.withValue(QUERY_KEY, ConfigValueFactory.fromAnyRef(sql));
+ } else if (pluginType.equals(PluginType.SINK)) {
+ connectorConfig =
+ connectorConfig.withValue(
+ DATABASE_KEY, ConfigValueFactory.fromAnyRef(databaseName));
+ connectorConfig =
+ connectorConfig.withValue(
+ TABLE_KEY, ConfigValueFactory.fromAnyRef(tableName));
+ } else {
+ throw new UnsupportedOperationException("Unsupported plugin type: " + pluginType);
+ }
+
+ return super.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ } else if (businessMode.equals(DATA_REPLICA)) {
+ String databaseName = dataSourceOption.getDatabases().get(0);
+ if (pluginType.equals(PluginType.SINK)) {
+ if (getCatalogName().isPresent()) {
+ Config config = ConfigFactory.empty();
+ config =
+ config.withValue(
+ FACTORY, ConfigValueFactory.fromAnyRef(getCatalogName().get()));
+ config =
+ config.withValue(
+ USERNAME,
+ ConfigValueFactory.fromAnyRef(
+ dataSourceInstanceConfig.getString("user")));
+ config =
+ config.withValue(
+ PASSWORD,
+ ConfigValueFactory.fromAnyRef(
+ dataSourceInstanceConfig.getString(PASSWORD)));
+ config =
+ config.withValue(
+ BASE_URL,
+ ConfigValueFactory.fromAnyRef(
+ dataSourceInstanceConfig.getString(URL_KEY)));
+ if (isSupportDefaultSchema()
+ && connectorConfig.hasPath(DATABASE_SCHEMA.key())) {
+ config =
+ config.withValue(
+ CATALOG_SCHEMA,
+ ConfigValueFactory.fromAnyRef(
+ connectorConfig.getString(DATABASE_SCHEMA.key())));
+ }
+
+ connectorConfig = connectorConfig.withValue(CATALOG, config.root());
+ }
+ connectorConfig =
+ connectorConfig.withValue(
+ DATABASE_KEY, ConfigValueFactory.fromAnyRef(databaseName));
+ return super.mergeDatasourceConfig(
+ dataSourceInstanceConfig,
+ virtualTableDetail,
+ dataSourceOption,
+ selectTableFields,
+ businessMode,
+ pluginType,
+ connectorConfig);
+ } else {
+ throw new UnsupportedOperationException(
+ "JDBC DATA_REPLICA Unsupported plugin type: " + pluginType);
+ }
+
+ } else {
+ throw new UnsupportedOperationException("Unsupported businessMode : " + businessMode);
+ }
+ }
+
+ protected String tableFieldsToSql(List tableFields, String database, String table) {
+ return generateSql(tableFields, database, null, table);
+ }
+
+ protected String generateSql(
+ List tableFields, String database, String schema, String table) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT ")
+ .append("*")
+ .append(" FROM ")
+ .append(database)
+ .append(".")
+ .append(table);
+
+ return sb.toString();
+ }
+
+ protected String quoteIdentifier(String identifier) {
+ return "\"" + identifier + "\"";
+ }
+
+ @Override
+ public String getDataSourceName() {
+ return "JDBC-SNOWFLAKE";
+ }
+}
diff --git a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml
index 0ba9a780f..2ee429418 100644
--- a/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml
+++ b/seatunnel-server/seatunnel-app/src/main/resources/connector-datasource-mapper.yaml
@@ -26,6 +26,7 @@ connector-datasource-mapper:
- JDBC-KingBase
- JDBC-Oracle
- JDBC-Db2
+ - JDBC-SnowFlake
MySQL-CDC:
dataSources:
@@ -148,6 +149,11 @@ connector-datasource-mapper:
- DATA_INTEGRATION
sceneMode:
- SINGLE_TABLE
+ JDBC-SnowFlake:
+ businessMode:
+ - DATA_INTEGRATION
+ sceneMode:
+ - SINGLE_TABLE
FakeSource:
businessMode:
- DATA_INTEGRATION
@@ -240,6 +246,11 @@ connector-datasource-mapper:
- DATA_INTEGRATION
sceneMode:
- SINGLE_TABLE
+ JDBC-SnowFlake:
+ businessMode:
+ - DATA_INTEGRATION
+ sceneMode:
+ - SINGLE_TABLE
Console:
businessMode:
@@ -247,4 +258,4 @@ connector-datasource-mapper:
- DATA_REPLICA
sceneMode:
- SINGLE_TABLE
- - MULTIPLE_TABLE
\ No newline at end of file
+ - MULTIPLE_TABLE
diff --git a/seatunnel-web-dist/pom.xml b/seatunnel-web-dist/pom.xml
index 531e31f0d..0ee2d851b 100644
--- a/seatunnel-web-dist/pom.xml
+++ b/seatunnel-web-dist/pom.xml
@@ -375,6 +375,18 @@
+
+ org.apache.seatunnel
+ datasource-jdbc-snowflake
+ ${project.version}
+ provided
+
+
+ *
+ *
+
+
+
org.apache.seatunnel
datasource-fakesource